objectstore_client/
get.rs1use std::{fmt, io};
2
3use async_compression::tokio::bufread::ZstdDecoder;
4use bytes::BytesMut;
5use futures_util::{StreamExt, TryStreamExt};
6use objectstore_types::Metadata;
7use reqwest::StatusCode;
8use tokio_util::io::{ReaderStream, StreamReader};
9
10pub use objectstore_types::Compression;
11
12use crate::{Client, ClientStream};
13
14pub struct GetResult {
18 pub metadata: Metadata,
20 pub stream: ClientStream,
22}
23
24impl GetResult {
25 pub async fn payload(self) -> crate::Result<bytes::Bytes> {
27 let bytes: BytesMut = self.stream.try_collect().await?;
28 Ok(bytes.freeze())
29 }
30
31 pub async fn text(self) -> crate::Result<String> {
33 let bytes = self.payload().await?;
34 Ok(String::from_utf8(bytes.to_vec())?)
35 }
36}
37
38impl fmt::Debug for GetResult {
39 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
40 f.debug_struct("GetResult")
41 .field("metadata", &self.metadata)
42 .field("stream", &format_args!("[Stream]"))
43 .finish()
44 }
45}
46
47#[derive(Debug)]
49pub struct GetBuilder<'a> {
50 client: &'a Client,
51 id: &'a str,
52
53 decompress: bool,
54}
55
56impl Client {
57 pub fn get<'a>(&'a self, id: &'a str) -> GetBuilder<'a> {
59 GetBuilder {
60 client: self,
61 id,
62 decompress: true,
63 }
64 }
65}
66
67impl GetBuilder<'_> {
68 pub fn decompress(mut self, decompress: bool) -> Self {
71 self.decompress = decompress;
72 self
73 }
74
75 pub async fn send(self) -> crate::Result<Option<GetResult>> {
77 let get_url = format!("{}/v1/{}", self.client.service_url, self.id);
78
79 let response = self
80 .client
81 .request(reqwest::Method::GET, get_url)?
82 .send()
83 .await?;
84 if response.status() == StatusCode::NOT_FOUND {
85 return Ok(None);
86 }
87 let response = response.error_for_status()?;
88
89 let mut metadata = Metadata::from_headers(response.headers(), "")?;
90
91 let stream = response.bytes_stream().map_err(io::Error::other);
92 let stream = match (metadata.compression, self.decompress) {
93 (Some(Compression::Zstd), true) => {
94 metadata.compression = None;
95 ReaderStream::new(ZstdDecoder::new(StreamReader::new(stream))).boxed()
96 }
97 _ => stream.boxed(),
98 };
99
100 Ok(Some(GetResult { metadata, stream }))
101 }
102}