objectstore_client/
get.rs

1use std::{fmt, io};
2
3use async_compression::tokio::bufread::ZstdDecoder;
4use bytes::BytesMut;
5use futures_util::{StreamExt, TryStreamExt};
6use objectstore_types::Metadata;
7use reqwest::StatusCode;
8use tokio_util::io::{ReaderStream, StreamReader};
9
10pub use objectstore_types::Compression;
11
12use crate::{ClientStream, Session};
13
14/// The result from a successful [`get()`](Session::get) call.
15///
16/// This carries the response as a stream, plus the compression algorithm of the data.
17pub struct GetResponse {
18    /// The metadata attached to this object, including the compression algorithm used for the payload.
19    pub metadata: Metadata,
20    /// The response stream.
21    pub stream: ClientStream,
22}
23
24impl GetResponse {
25    /// Loads the object payload fully into memory.
26    pub async fn payload(self) -> crate::Result<bytes::Bytes> {
27        let bytes: BytesMut = self.stream.try_collect().await?;
28        Ok(bytes.freeze())
29    }
30
31    /// Loads the object payload fully into memory and interprets it as UTF-8 text.
32    pub async fn text(self) -> crate::Result<String> {
33        let bytes = self.payload().await?;
34        Ok(String::from_utf8(bytes.to_vec())?)
35    }
36}
37
38impl fmt::Debug for GetResponse {
39    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
40        f.debug_struct("GetResponse")
41            .field("metadata", &self.metadata)
42            .field("stream", &format_args!("[Stream]"))
43            .finish()
44    }
45}
46
47impl Session {
48    /// Retrieves the object with the given `key`.
49    pub fn get(&self, key: &str) -> GetBuilder {
50        GetBuilder {
51            session: self.clone(),
52            key: key.to_owned(),
53            decompress: true,
54        }
55    }
56}
57
58/// A [`get`](Session::get) request builder.
59#[derive(Debug)]
60pub struct GetBuilder {
61    session: Session,
62    key: String,
63    decompress: bool,
64}
65
66impl GetBuilder {
67    /// Indicates whether the request should automatically handle decompression of known algorithms,
68    /// or rather return the payload as it is stored, along with the compression algorithm it is stored in.
69    ///
70    /// By default, automatic decompression is enabled.
71    pub fn decompress(mut self, decompress: bool) -> Self {
72        self.decompress = decompress;
73        self
74    }
75
76    /// Sends the get request.
77    pub async fn send(self) -> crate::Result<Option<GetResponse>> {
78        let response = self
79            .session
80            .request(reqwest::Method::GET, &self.key)
81            .send()
82            .await?;
83        if response.status() == StatusCode::NOT_FOUND {
84            return Ok(None);
85        }
86        let response = response.error_for_status()?;
87
88        let mut metadata = Metadata::from_headers(response.headers(), "")?;
89
90        let stream = response.bytes_stream().map_err(io::Error::other);
91        let stream = match (metadata.compression, self.decompress) {
92            (Some(Compression::Zstd), true) => {
93                metadata.compression = None;
94                ReaderStream::new(ZstdDecoder::new(StreamReader::new(stream))).boxed()
95            }
96            _ => stream.boxed(),
97        };
98
99        Ok(Some(GetResponse { metadata, stream }))
100    }
101}