objectstore_client/
get.rs1use std::{fmt, io};
2
3use async_compression::tokio::bufread::ZstdDecoder;
4use bytes::BytesMut;
5use futures_util::{StreamExt, TryStreamExt};
6use objectstore_types::metadata::{Compression, Metadata};
7use reqwest::StatusCode;
8use tokio_util::io::{ReaderStream, StreamReader};
9
10use crate::{ClientStream, ObjectKey, Session};
11
12pub struct GetResponse {
16 pub metadata: Metadata,
18 pub stream: ClientStream,
20}
21
22impl GetResponse {
23 pub async fn payload(self) -> crate::Result<bytes::Bytes> {
25 let bytes: BytesMut = self.stream.try_collect().await?;
26 Ok(bytes.freeze())
27 }
28
29 pub async fn text(self) -> crate::Result<String> {
31 let bytes = self.payload().await?;
32 Ok(String::from_utf8(bytes.to_vec())?)
33 }
34}
35
36impl fmt::Debug for GetResponse {
37 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
38 f.debug_struct("GetResponse")
39 .field("metadata", &self.metadata)
40 .field("stream", &format_args!("[Stream]"))
41 .finish()
42 }
43}
44
45impl Session {
46 pub fn get(&self, key: &str) -> GetBuilder {
48 GetBuilder {
49 session: self.clone(),
50 key: key.to_owned(),
51 decompress: true,
52 accept_encoding: vec![],
53 }
54 }
55}
56
57#[derive(Debug)]
59pub struct GetBuilder {
60 pub(crate) session: Session,
61 pub(crate) key: ObjectKey,
62 pub(crate) decompress: bool,
63 pub(crate) accept_encoding: Vec<Compression>,
64}
65
66impl GetBuilder {
67 pub fn decompress(mut self, decompress: bool) -> Self {
72 self.decompress = decompress;
73 self
74 }
75
76 pub fn accept_encoding(mut self, encodings: impl IntoIterator<Item = Compression>) -> Self {
83 self.accept_encoding = encodings.into_iter().collect();
84 self
85 }
86
87 pub async fn send(self) -> crate::Result<Option<GetResponse>> {
89 let response = self
90 .session
91 .request(reqwest::Method::GET, &self.key)?
92 .send()
93 .await?;
94 if response.status() == StatusCode::NOT_FOUND {
95 return Ok(None);
96 }
97 let response = response.error_for_status()?;
98
99 let mut metadata = Metadata::from_headers(response.headers(), "")?;
100
101 let stream = response.bytes_stream().map_err(io::Error::other).boxed();
102 let stream = maybe_decompress(
103 stream,
104 &mut metadata,
105 self.decompress,
106 &self.accept_encoding,
107 );
108
109 Ok(Some(GetResponse { metadata, stream }))
110 }
111}
112
113pub(crate) fn maybe_decompress(
120 stream: ClientStream,
121 metadata: &mut Metadata,
122 decompress: bool,
123 accept_encoding: &[Compression],
124) -> ClientStream {
125 let encoding_accepted = metadata
126 .compression
127 .is_some_and(|c| accept_encoding.contains(&c));
128 match (metadata.compression, decompress && !encoding_accepted) {
129 (Some(Compression::Zstd), true) => {
130 metadata.compression = None;
131 ReaderStream::new(ZstdDecoder::new(StreamReader::new(stream))).boxed()
132 }
133 _ => stream,
134 }
135}
136
137#[cfg(test)]
138mod tests {
139 use futures_util::{StreamExt as _, TryStreamExt as _};
140 use objectstore_types::metadata::{Compression, Metadata};
141
142 use super::maybe_decompress;
143 use crate::ClientStream;
144
145 fn compressed_zstd_stream(data: &[u8]) -> ClientStream {
146 let mut encoder = zstd::Encoder::new(vec![], 0).unwrap();
147 std::io::copy(&mut std::io::Cursor::new(data), &mut encoder).unwrap();
148 let compressed = encoder.finish().unwrap();
149 futures_util::stream::once(async move {
150 Ok::<_, std::io::Error>(bytes::Bytes::from(compressed))
151 })
152 .boxed()
153 }
154
155 fn raw_stream(data: &[u8]) -> ClientStream {
156 let bytes = bytes::Bytes::copy_from_slice(data);
157 futures_util::stream::once(async move { Ok::<_, std::io::Error>(bytes) }).boxed()
158 }
159
160 async fn collect(stream: ClientStream) -> Vec<u8> {
161 let chunks: bytes::BytesMut = stream.try_collect().await.unwrap();
162 chunks.to_vec()
163 }
164
165 fn zstd_metadata() -> Metadata {
166 Metadata {
167 compression: Some(Compression::Zstd),
168 ..Default::default()
169 }
170 }
171
172 fn no_compression_metadata() -> Metadata {
173 Metadata::default()
174 }
175
176 #[tokio::test]
177 async fn empty_accept_decompress_true_decompresses() {
178 let payload = b"hello world";
179 let stream = compressed_zstd_stream(payload);
180 let mut metadata = zstd_metadata();
181
182 let out = maybe_decompress(stream, &mut metadata, true, &[]);
183 assert_eq!(collect(out).await, payload);
184 assert_eq!(metadata.compression, None);
185 }
186
187 #[tokio::test]
188 async fn empty_accept_decompress_false_returns_compressed() {
189 let payload = b"hello world";
190 let compressed_bytes = collect(compressed_zstd_stream(payload)).await;
191 let stream = compressed_zstd_stream(payload);
192
193 let mut metadata = zstd_metadata();
194 let out = maybe_decompress(stream, &mut metadata, false, &[]);
195 assert_eq!(collect(out).await, compressed_bytes);
196 assert_eq!(metadata.compression, Some(Compression::Zstd));
197 }
198
199 #[tokio::test]
200 async fn zstd_accept_decompress_true_skips_decompression() {
201 let payload = b"hello world";
202 let compressed_bytes = collect(compressed_zstd_stream(payload)).await;
203 let stream = compressed_zstd_stream(payload);
204
205 let mut metadata = zstd_metadata();
206 let out = maybe_decompress(stream, &mut metadata, true, &[Compression::Zstd]);
207 assert_eq!(collect(out).await, compressed_bytes);
208 assert_eq!(metadata.compression, Some(Compression::Zstd));
209 }
210
211 #[tokio::test]
212 async fn zstd_accept_decompress_false_returns_compressed() {
213 let payload = b"hello world";
214 let compressed_bytes = collect(compressed_zstd_stream(payload)).await;
215 let stream = compressed_zstd_stream(payload);
216
217 let mut metadata = zstd_metadata();
218 let out = maybe_decompress(stream, &mut metadata, false, &[Compression::Zstd]);
219 assert_eq!(collect(out).await, compressed_bytes);
220 assert_eq!(metadata.compression, Some(Compression::Zstd));
221 }
222
223 #[tokio::test]
224 async fn no_compression_returns_raw_regardless_of_accept() {
225 let payload = b"hello world";
226 let stream = raw_stream(payload);
227
228 let mut metadata = no_compression_metadata();
229 let out = maybe_decompress(stream, &mut metadata, true, &[Compression::Zstd]);
230 assert_eq!(collect(out).await, payload);
231 assert_eq!(metadata.compression, None);
232 }
233}