objectstore_client/
get.rs

1use std::{fmt, io};
2
3use async_compression::tokio::bufread::ZstdDecoder;
4use bytes::BytesMut;
5use futures_util::{StreamExt, TryStreamExt};
6use objectstore_types::Metadata;
7use reqwest::StatusCode;
8use tokio_util::io::{ReaderStream, StreamReader};
9
10pub use objectstore_types::Compression;
11
12use crate::{Client, ClientStream};
13
14/// The result from a successful [`get()`](Client::get) call.
15///
16/// This carries the response as a stream, plus the compression algorithm of the data.
17pub struct GetResult {
18    /// The metadata attached to this object, including the compression algorithm used for the payload.
19    pub metadata: Metadata,
20    /// The response stream.
21    pub stream: ClientStream,
22}
23
24impl GetResult {
25    /// Loads the object payload fully into memory.
26    pub async fn payload(self) -> crate::Result<bytes::Bytes> {
27        let bytes: BytesMut = self.stream.try_collect().await?;
28        Ok(bytes.freeze())
29    }
30
31    /// Loads the object payload fully into memory and interprets it as UTF-8 text.
32    pub async fn text(self) -> crate::Result<String> {
33        let bytes = self.payload().await?;
34        Ok(String::from_utf8(bytes.to_vec())?)
35    }
36}
37
38impl fmt::Debug for GetResult {
39    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
40        f.debug_struct("GetResult")
41            .field("metadata", &self.metadata)
42            .field("stream", &format_args!("[Stream]"))
43            .finish()
44    }
45}
46
47/// A GET request builder.
48#[derive(Debug)]
49pub struct GetBuilder<'a> {
50    client: &'a Client,
51    id: &'a str,
52
53    decompress: bool,
54}
55
56impl Client {
57    /// Requests the object with the given `id`.
58    pub fn get<'a>(&'a self, id: &'a str) -> GetBuilder<'a> {
59        GetBuilder {
60            client: self,
61            id,
62            decompress: true,
63        }
64    }
65}
66
67impl GetBuilder<'_> {
68    /// Indicates whether the request should automatically handle decompression of known algorithms,
69    /// or rather return the payload as it is stored, along with the compression algorithm it is stored in.
70    pub fn decompress(mut self, decompress: bool) -> Self {
71        self.decompress = decompress;
72        self
73    }
74
75    /// Sends the `GET` request.
76    pub async fn send(self) -> crate::Result<Option<GetResult>> {
77        let get_url = format!("{}/v1/{}", self.client.service_url, self.id);
78
79        let response = self
80            .client
81            .request(reqwest::Method::GET, get_url)?
82            .send()
83            .await?;
84        if response.status() == StatusCode::NOT_FOUND {
85            return Ok(None);
86        }
87        let response = response.error_for_status()?;
88
89        let mut metadata = Metadata::from_headers(response.headers(), "")?;
90
91        let stream = response.bytes_stream().map_err(io::Error::other);
92        let stream = match (metadata.compression, self.decompress) {
93            (Some(Compression::Zstd), true) => {
94                metadata.compression = None;
95                ReaderStream::new(ZstdDecoder::new(StreamReader::new(stream))).boxed()
96            }
97            _ => stream.boxed(),
98        };
99
100        Ok(Some(GetResult { metadata, stream }))
101    }
102}