Skip to main content

objectstore_client/
get.rs

1use std::{fmt, io};
2
3use async_compression::tokio::bufread::ZstdDecoder;
4use bytes::BytesMut;
5use futures_util::{StreamExt, TryStreamExt};
6use objectstore_types::metadata::{Compression, Metadata};
7use reqwest::StatusCode;
8use tokio_util::io::{ReaderStream, StreamReader};
9
10use crate::{ClientStream, ObjectKey, Session};
11
12/// The result from a successful [`get()`](Session::get) call.
13///
14/// This carries the response as a stream, plus the compression algorithm of the data.
15pub struct GetResponse {
16    /// The metadata attached to this object, including the compression algorithm used for the payload.
17    pub metadata: Metadata,
18    /// The response stream.
19    pub stream: ClientStream,
20}
21
22impl GetResponse {
23    /// Loads the object payload fully into memory.
24    pub async fn payload(self) -> crate::Result<bytes::Bytes> {
25        let bytes: BytesMut = self.stream.try_collect().await?;
26        Ok(bytes.freeze())
27    }
28
29    /// Loads the object payload fully into memory and interprets it as UTF-8 text.
30    pub async fn text(self) -> crate::Result<String> {
31        let bytes = self.payload().await?;
32        Ok(String::from_utf8(bytes.to_vec())?)
33    }
34}
35
36impl fmt::Debug for GetResponse {
37    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
38        f.debug_struct("GetResponse")
39            .field("metadata", &self.metadata)
40            .field("stream", &format_args!("[Stream]"))
41            .finish()
42    }
43}
44
45impl Session {
46    /// Retrieves the object with the given `key`.
47    pub fn get(&self, key: &str) -> GetBuilder {
48        GetBuilder {
49            session: self.clone(),
50            key: key.to_owned(),
51            decompress: true,
52            accept_encoding: vec![],
53        }
54    }
55}
56
57/// A [`get`](Session::get) request builder.
58#[derive(Debug)]
59pub struct GetBuilder {
60    pub(crate) session: Session,
61    pub(crate) key: ObjectKey,
62    pub(crate) decompress: bool,
63    pub(crate) accept_encoding: Vec<Compression>,
64}
65
66impl GetBuilder {
67    /// Indicates whether the request should automatically handle decompression of known algorithms,
68    /// or rather return the payload as it is stored, along with the compression algorithm it is stored in.
69    ///
70    /// By default, automatic decompression is enabled.
71    pub fn decompress(mut self, decompress: bool) -> Self {
72        self.decompress = decompress;
73        self
74    }
75
76    /// Specifies compression encodings the caller can handle natively.
77    ///
78    /// When the stored object's compression matches one of these, the payload
79    /// is returned still compressed and `metadata.compression` is preserved.
80    /// An empty list (the default) means the client does not accept any
81    /// compressed encoding, so automatic decompression applies as usual.
82    pub fn accept_encoding(mut self, encodings: impl IntoIterator<Item = Compression>) -> Self {
83        self.accept_encoding = encodings.into_iter().collect();
84        self
85    }
86
87    /// Sends the get request.
88    pub async fn send(self) -> crate::Result<Option<GetResponse>> {
89        let response = self
90            .session
91            .request(reqwest::Method::GET, &self.key)?
92            .send()
93            .await?;
94        if response.status() == StatusCode::NOT_FOUND {
95            return Ok(None);
96        }
97        let response = response.error_for_status()?;
98
99        let mut metadata = Metadata::from_headers(response.headers(), "")?;
100
101        let stream = response.bytes_stream().map_err(io::Error::other).boxed();
102        let stream = maybe_decompress(
103            stream,
104            &mut metadata,
105            self.decompress,
106            &self.accept_encoding,
107        );
108
109        Ok(Some(GetResponse { metadata, stream }))
110    }
111}
112
113/// Wraps a stream in a zstd decompression layer.
114///
115/// Decompresses if the metadata indicates zstd compression, `decompress` is `true`,
116/// and the stored encoding is not listed in `accept_encoding`. When the stored encoding
117/// is in `accept_encoding`, the payload is returned compressed and `metadata.compression`
118/// is preserved. Clears `metadata.compression` when decompression is applied.
119pub(crate) fn maybe_decompress(
120    stream: ClientStream,
121    metadata: &mut Metadata,
122    decompress: bool,
123    accept_encoding: &[Compression],
124) -> ClientStream {
125    let encoding_accepted = metadata
126        .compression
127        .is_some_and(|c| accept_encoding.contains(&c));
128    match (metadata.compression, decompress && !encoding_accepted) {
129        (Some(Compression::Zstd), true) => {
130            metadata.compression = None;
131            let mut decoder = ZstdDecoder::new(StreamReader::new(stream));
132            // Multipart uploads with compression, when each part is compressed individually,
133            // will consist of multiple concatenated zstd frames.
134            // This allows the client to handle automatic decompression for these objects transparently.
135            decoder.multiple_members(true);
136            ReaderStream::new(decoder).boxed()
137        }
138        _ => stream,
139    }
140}
141
142#[cfg(test)]
143mod tests {
144    use futures_util::{StreamExt as _, TryStreamExt as _};
145    use objectstore_types::metadata::{Compression, Metadata};
146
147    use super::maybe_decompress;
148    use crate::ClientStream;
149
150    fn compressed_zstd_stream(data: &[u8]) -> ClientStream {
151        let mut encoder = zstd::Encoder::new(vec![], 0).unwrap();
152        std::io::copy(&mut std::io::Cursor::new(data), &mut encoder).unwrap();
153        let compressed = encoder.finish().unwrap();
154        futures_util::stream::once(async move {
155            Ok::<_, std::io::Error>(bytes::Bytes::from(compressed))
156        })
157        .boxed()
158    }
159
160    fn raw_stream(data: &[u8]) -> ClientStream {
161        let bytes = bytes::Bytes::copy_from_slice(data);
162        futures_util::stream::once(async move { Ok::<_, std::io::Error>(bytes) }).boxed()
163    }
164
165    async fn collect(stream: ClientStream) -> Vec<u8> {
166        let chunks: bytes::BytesMut = stream.try_collect().await.unwrap();
167        chunks.to_vec()
168    }
169
170    fn zstd_metadata() -> Metadata {
171        Metadata {
172            compression: Some(Compression::Zstd),
173            ..Default::default()
174        }
175    }
176
177    fn no_compression_metadata() -> Metadata {
178        Metadata::default()
179    }
180
181    #[tokio::test]
182    async fn empty_accept_decompress_true_decompresses() {
183        let payload = b"hello world";
184        let stream = compressed_zstd_stream(payload);
185        let mut metadata = zstd_metadata();
186
187        let out = maybe_decompress(stream, &mut metadata, true, &[]);
188        assert_eq!(collect(out).await, payload);
189        assert_eq!(metadata.compression, None);
190    }
191
192    #[tokio::test]
193    async fn empty_accept_decompress_false_returns_compressed() {
194        let payload = b"hello world";
195        let compressed_bytes = collect(compressed_zstd_stream(payload)).await;
196        let stream = compressed_zstd_stream(payload);
197
198        let mut metadata = zstd_metadata();
199        let out = maybe_decompress(stream, &mut metadata, false, &[]);
200        assert_eq!(collect(out).await, compressed_bytes);
201        assert_eq!(metadata.compression, Some(Compression::Zstd));
202    }
203
204    #[tokio::test]
205    async fn zstd_accept_decompress_true_skips_decompression() {
206        let payload = b"hello world";
207        let compressed_bytes = collect(compressed_zstd_stream(payload)).await;
208        let stream = compressed_zstd_stream(payload);
209
210        let mut metadata = zstd_metadata();
211        let out = maybe_decompress(stream, &mut metadata, true, &[Compression::Zstd]);
212        assert_eq!(collect(out).await, compressed_bytes);
213        assert_eq!(metadata.compression, Some(Compression::Zstd));
214    }
215
216    #[tokio::test]
217    async fn zstd_accept_decompress_false_returns_compressed() {
218        let payload = b"hello world";
219        let compressed_bytes = collect(compressed_zstd_stream(payload)).await;
220        let stream = compressed_zstd_stream(payload);
221
222        let mut metadata = zstd_metadata();
223        let out = maybe_decompress(stream, &mut metadata, false, &[Compression::Zstd]);
224        assert_eq!(collect(out).await, compressed_bytes);
225        assert_eq!(metadata.compression, Some(Compression::Zstd));
226    }
227
228    #[tokio::test]
229    async fn no_compression_returns_raw_regardless_of_accept() {
230        let payload = b"hello world";
231        let stream = raw_stream(payload);
232
233        let mut metadata = no_compression_metadata();
234        let out = maybe_decompress(stream, &mut metadata, true, &[Compression::Zstd]);
235        assert_eq!(collect(out).await, payload);
236        assert_eq!(metadata.compression, None);
237    }
238
239    #[tokio::test]
240    async fn zstd_concatenated_frames_decompress() {
241        let payload1 = b"hello ";
242        let payload2 = b"world";
243        let compressed1 = collect(compressed_zstd_stream(payload1)).await;
244        let compressed2 = collect(compressed_zstd_stream(payload2)).await;
245        let stream = futures_util::stream::iter([
246            Ok::<_, std::io::Error>(bytes::Bytes::from(compressed1)),
247            Ok::<_, std::io::Error>(bytes::Bytes::from(compressed2)),
248        ])
249        .boxed();
250
251        let mut metadata = zstd_metadata();
252        let out = maybe_decompress(stream, &mut metadata, true, &[]);
253        assert_eq!(collect(out).await, b"hello world");
254        assert_eq!(metadata.compression, None);
255    }
256}