objectstore_client/
get.rs1use std::{fmt, io};
2
3use async_compression::tokio::bufread::ZstdDecoder;
4use bytes::BytesMut;
5use futures_util::{StreamExt, TryStreamExt};
6use objectstore_types::metadata::{Compression, Metadata};
7use reqwest::StatusCode;
8use tokio_util::io::{ReaderStream, StreamReader};
9
10use crate::{ClientStream, ObjectKey, Session};
11
12pub struct GetResponse {
16 pub metadata: Metadata,
18 pub stream: ClientStream,
20}
21
22impl GetResponse {
23 pub async fn payload(self) -> crate::Result<bytes::Bytes> {
25 let bytes: BytesMut = self.stream.try_collect().await?;
26 Ok(bytes.freeze())
27 }
28
29 pub async fn text(self) -> crate::Result<String> {
31 let bytes = self.payload().await?;
32 Ok(String::from_utf8(bytes.to_vec())?)
33 }
34}
35
36impl fmt::Debug for GetResponse {
37 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
38 f.debug_struct("GetResponse")
39 .field("metadata", &self.metadata)
40 .field("stream", &format_args!("[Stream]"))
41 .finish()
42 }
43}
44
45impl Session {
46 pub fn get(&self, key: &str) -> GetBuilder {
48 GetBuilder {
49 session: self.clone(),
50 key: key.to_owned(),
51 decompress: true,
52 accept_encoding: vec![],
53 }
54 }
55}
56
57#[derive(Debug)]
59pub struct GetBuilder {
60 pub(crate) session: Session,
61 pub(crate) key: ObjectKey,
62 pub(crate) decompress: bool,
63 pub(crate) accept_encoding: Vec<Compression>,
64}
65
66impl GetBuilder {
67 pub fn decompress(mut self, decompress: bool) -> Self {
72 self.decompress = decompress;
73 self
74 }
75
76 pub fn accept_encoding(mut self, encodings: impl IntoIterator<Item = Compression>) -> Self {
83 self.accept_encoding = encodings.into_iter().collect();
84 self
85 }
86
87 pub async fn send(self) -> crate::Result<Option<GetResponse>> {
89 let response = self
90 .session
91 .request(reqwest::Method::GET, &self.key)?
92 .send()
93 .await?;
94 if response.status() == StatusCode::NOT_FOUND {
95 return Ok(None);
96 }
97 let response = response.error_for_status()?;
98
99 let mut metadata = Metadata::from_headers(response.headers(), "")?;
100
101 let stream = response.bytes_stream().map_err(io::Error::other).boxed();
102 let stream = maybe_decompress(
103 stream,
104 &mut metadata,
105 self.decompress,
106 &self.accept_encoding,
107 );
108
109 Ok(Some(GetResponse { metadata, stream }))
110 }
111}
112
113pub(crate) fn maybe_decompress(
120 stream: ClientStream,
121 metadata: &mut Metadata,
122 decompress: bool,
123 accept_encoding: &[Compression],
124) -> ClientStream {
125 let encoding_accepted = metadata
126 .compression
127 .is_some_and(|c| accept_encoding.contains(&c));
128 match (metadata.compression, decompress && !encoding_accepted) {
129 (Some(Compression::Zstd), true) => {
130 metadata.compression = None;
131 let mut decoder = ZstdDecoder::new(StreamReader::new(stream));
132 decoder.multiple_members(true);
136 ReaderStream::new(decoder).boxed()
137 }
138 _ => stream,
139 }
140}
141
142#[cfg(test)]
143mod tests {
144 use futures_util::{StreamExt as _, TryStreamExt as _};
145 use objectstore_types::metadata::{Compression, Metadata};
146
147 use super::maybe_decompress;
148 use crate::ClientStream;
149
150 fn compressed_zstd_stream(data: &[u8]) -> ClientStream {
151 let mut encoder = zstd::Encoder::new(vec![], 0).unwrap();
152 std::io::copy(&mut std::io::Cursor::new(data), &mut encoder).unwrap();
153 let compressed = encoder.finish().unwrap();
154 futures_util::stream::once(async move {
155 Ok::<_, std::io::Error>(bytes::Bytes::from(compressed))
156 })
157 .boxed()
158 }
159
160 fn raw_stream(data: &[u8]) -> ClientStream {
161 let bytes = bytes::Bytes::copy_from_slice(data);
162 futures_util::stream::once(async move { Ok::<_, std::io::Error>(bytes) }).boxed()
163 }
164
165 async fn collect(stream: ClientStream) -> Vec<u8> {
166 let chunks: bytes::BytesMut = stream.try_collect().await.unwrap();
167 chunks.to_vec()
168 }
169
170 fn zstd_metadata() -> Metadata {
171 Metadata {
172 compression: Some(Compression::Zstd),
173 ..Default::default()
174 }
175 }
176
177 fn no_compression_metadata() -> Metadata {
178 Metadata::default()
179 }
180
181 #[tokio::test]
182 async fn empty_accept_decompress_true_decompresses() {
183 let payload = b"hello world";
184 let stream = compressed_zstd_stream(payload);
185 let mut metadata = zstd_metadata();
186
187 let out = maybe_decompress(stream, &mut metadata, true, &[]);
188 assert_eq!(collect(out).await, payload);
189 assert_eq!(metadata.compression, None);
190 }
191
192 #[tokio::test]
193 async fn empty_accept_decompress_false_returns_compressed() {
194 let payload = b"hello world";
195 let compressed_bytes = collect(compressed_zstd_stream(payload)).await;
196 let stream = compressed_zstd_stream(payload);
197
198 let mut metadata = zstd_metadata();
199 let out = maybe_decompress(stream, &mut metadata, false, &[]);
200 assert_eq!(collect(out).await, compressed_bytes);
201 assert_eq!(metadata.compression, Some(Compression::Zstd));
202 }
203
204 #[tokio::test]
205 async fn zstd_accept_decompress_true_skips_decompression() {
206 let payload = b"hello world";
207 let compressed_bytes = collect(compressed_zstd_stream(payload)).await;
208 let stream = compressed_zstd_stream(payload);
209
210 let mut metadata = zstd_metadata();
211 let out = maybe_decompress(stream, &mut metadata, true, &[Compression::Zstd]);
212 assert_eq!(collect(out).await, compressed_bytes);
213 assert_eq!(metadata.compression, Some(Compression::Zstd));
214 }
215
216 #[tokio::test]
217 async fn zstd_accept_decompress_false_returns_compressed() {
218 let payload = b"hello world";
219 let compressed_bytes = collect(compressed_zstd_stream(payload)).await;
220 let stream = compressed_zstd_stream(payload);
221
222 let mut metadata = zstd_metadata();
223 let out = maybe_decompress(stream, &mut metadata, false, &[Compression::Zstd]);
224 assert_eq!(collect(out).await, compressed_bytes);
225 assert_eq!(metadata.compression, Some(Compression::Zstd));
226 }
227
228 #[tokio::test]
229 async fn no_compression_returns_raw_regardless_of_accept() {
230 let payload = b"hello world";
231 let stream = raw_stream(payload);
232
233 let mut metadata = no_compression_metadata();
234 let out = maybe_decompress(stream, &mut metadata, true, &[Compression::Zstd]);
235 assert_eq!(collect(out).await, payload);
236 assert_eq!(metadata.compression, None);
237 }
238
239 #[tokio::test]
240 async fn zstd_concatenated_frames_decompress() {
241 let payload1 = b"hello ";
242 let payload2 = b"world";
243 let compressed1 = collect(compressed_zstd_stream(payload1)).await;
244 let compressed2 = collect(compressed_zstd_stream(payload2)).await;
245 let stream = futures_util::stream::iter([
246 Ok::<_, std::io::Error>(bytes::Bytes::from(compressed1)),
247 Ok::<_, std::io::Error>(bytes::Bytes::from(compressed2)),
248 ])
249 .boxed();
250
251 let mut metadata = zstd_metadata();
252 let out = maybe_decompress(stream, &mut metadata, true, &[]);
253 assert_eq!(collect(out).await, b"hello world");
254 assert_eq!(metadata.compression, None);
255 }
256}