objectstore_client/
get.rs1use std::{fmt, io};
2
3use async_compression::tokio::bufread::ZstdDecoder;
4use bytes::BytesMut;
5use futures_util::{StreamExt, TryStreamExt};
6use objectstore_types::Metadata;
7use reqwest::StatusCode;
8use tokio_util::io::{ReaderStream, StreamReader};
9
10pub use objectstore_types::Compression;
11
12use crate::{ClientStream, Session};
13
14pub struct GetResponse {
18 pub metadata: Metadata,
20 pub stream: ClientStream,
22}
23
24impl GetResponse {
25 pub async fn payload(self) -> crate::Result<bytes::Bytes> {
27 let bytes: BytesMut = self.stream.try_collect().await?;
28 Ok(bytes.freeze())
29 }
30
31 pub async fn text(self) -> crate::Result<String> {
33 let bytes = self.payload().await?;
34 Ok(String::from_utf8(bytes.to_vec())?)
35 }
36}
37
38impl fmt::Debug for GetResponse {
39 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
40 f.debug_struct("GetResponse")
41 .field("metadata", &self.metadata)
42 .field("stream", &format_args!("[Stream]"))
43 .finish()
44 }
45}
46
47impl Session {
48 pub fn get(&self, key: &str) -> GetBuilder {
50 GetBuilder {
51 session: self.clone(),
52 key: key.to_owned(),
53 decompress: true,
54 }
55 }
56}
57
58#[derive(Debug)]
60pub struct GetBuilder {
61 session: Session,
62 key: String,
63 decompress: bool,
64}
65
66impl GetBuilder {
67 pub fn decompress(mut self, decompress: bool) -> Self {
72 self.decompress = decompress;
73 self
74 }
75
76 pub async fn send(self) -> crate::Result<Option<GetResponse>> {
78 let response = self
79 .session
80 .request(reqwest::Method::GET, &self.key)
81 .send()
82 .await?;
83 if response.status() == StatusCode::NOT_FOUND {
84 return Ok(None);
85 }
86 let response = response.error_for_status()?;
87
88 let mut metadata = Metadata::from_headers(response.headers(), "")?;
89
90 let stream = response.bytes_stream().map_err(io::Error::other);
91 let stream = match (metadata.compression, self.decompress) {
92 (Some(Compression::Zstd), true) => {
93 metadata.compression = None;
94 ReaderStream::new(ZstdDecoder::new(StreamReader::new(stream))).boxed()
95 }
96 _ => stream.boxed(),
97 };
98
99 Ok(Some(GetResponse { metadata, stream }))
100 }
101}