objectstore_client/
put.rs

1use std::collections::BTreeMap;
2use std::fmt;
3use std::io::Cursor;
4
5use async_compression::tokio::bufread::ZstdEncoder;
6use bytes::Bytes;
7use futures_util::StreamExt;
8use objectstore_types::Metadata;
9use reqwest::Body;
10use serde::Deserialize;
11use tokio::io::AsyncRead;
12use tokio_util::io::{ReaderStream, StreamReader};
13
14pub use objectstore_types::{Compression, ExpirationPolicy};
15
16use crate::{Client, ClientStream};
17
18impl Client {
19    fn put_body(&self, body: PutBody) -> PutBuilder<'_> {
20        let metadata = Metadata {
21            expiration_policy: self.default_expiration_policy,
22            compression: Some(self.default_compression),
23            ..Default::default()
24        };
25
26        PutBuilder {
27            client: self,
28            metadata,
29            key: None,
30            body,
31        }
32    }
33
34    /// Creates a PUT request for a [`Bytes`]-like type.
35    pub fn put(&self, body: impl Into<Bytes>) -> PutBuilder<'_> {
36        self.put_body(PutBody::Buffer(body.into()))
37    }
38
39    /// Creates a PUT request with a stream.
40    pub fn put_stream(&self, body: ClientStream) -> PutBuilder<'_> {
41        self.put_body(PutBody::Stream(body))
42    }
43
44    /// Creates a PUT request with an [`AsyncRead`] type.
45    pub fn put_read<R>(&self, body: R) -> PutBuilder<'_>
46    where
47        R: AsyncRead + Send + Sync + 'static,
48    {
49        let stream = ReaderStream::new(body).boxed();
50        self.put_body(PutBody::Stream(stream))
51    }
52}
53
54/// A PUT request builder.
55#[derive(Debug)]
56pub struct PutBuilder<'a> {
57    pub(crate) client: &'a Client,
58    pub(crate) metadata: Metadata,
59    pub(crate) key: Option<String>,
60    pub(crate) body: PutBody,
61}
62
63pub(crate) enum PutBody {
64    Buffer(Bytes),
65    Stream(ClientStream),
66}
67impl fmt::Debug for PutBody {
68    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
69        f.debug_tuple("PutBody").finish_non_exhaustive()
70    }
71}
72
73impl PutBuilder<'_> {
74    /// Sets an explicit object key.
75    ///
76    /// If a key is specified, the object will be stored under that key. Otherwise, the objectstore
77    /// server will automatically assign a random key, which is then returned from this request.
78    pub fn key(mut self, key: impl Into<String>) -> Self {
79        self.key = Some(key.into());
80        self
81    }
82
83    /// Sets an explicit compression algorithm to be used for this payload.
84    ///
85    /// [`None`] should be used if no compression should be performed by the client,
86    /// either because the payload is uncompressible (such as a media format), or if the user
87    /// will handle any kind of compression, without the clients knowledge.
88    pub fn compression(mut self, compression: impl Into<Option<Compression>>) -> Self {
89        self.metadata.compression = compression.into();
90        self
91    }
92
93    /// Sets the expiration policy of the object to be uploaded.
94    pub fn expiration_policy(mut self, expiration_policy: ExpirationPolicy) -> Self {
95        self.metadata.expiration_policy = expiration_policy;
96        self
97    }
98
99    /// This sets the custom metadata to the provided map.
100    ///
101    /// It will clear any previously set metadata.
102    pub fn set_metadata(mut self, metadata: impl Into<BTreeMap<String, String>>) -> Self {
103        self.metadata.custom = metadata.into();
104        self
105    }
106
107    /// Appends they `key`/`value` to the custom metadata of this object.
108    pub fn append_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
109        self.metadata.custom.insert(key.into(), value.into());
110        self
111    }
112}
113
114/// The response returned from the service after uploading an object.
115#[derive(Debug, Deserialize)]
116pub struct PutResponse {
117    /// The key of the object, as stored.
118    pub key: String,
119}
120
121// TODO: instead of a separate `send` method, it would be nice to just implement `IntoFuture`.
122// However, `IntoFuture` needs to define the resulting future as an associated type,
123// and "impl trait in associated type position" is not yet stable :-(
124impl PutBuilder<'_> {
125    /// Sends the built PUT request to the upstream service.
126    pub async fn send(self) -> crate::Result<PutResponse> {
127        let put_url = format!(
128            "{}/v1/{}",
129            self.client.service_url,
130            self.key.as_deref().unwrap_or_default()
131        );
132        let mut builder = self.client.request(reqwest::Method::PUT, put_url)?;
133
134        let body = match (self.metadata.compression, self.body) {
135            (Some(Compression::Zstd), PutBody::Buffer(bytes)) => {
136                let cursor = Cursor::new(bytes);
137                let encoder = ZstdEncoder::new(cursor);
138                let stream = ReaderStream::new(encoder);
139                Body::wrap_stream(stream)
140            }
141            (Some(Compression::Zstd), PutBody::Stream(stream)) => {
142                let stream = StreamReader::new(stream);
143                let encoder = ZstdEncoder::new(stream);
144                let stream = ReaderStream::new(encoder);
145                Body::wrap_stream(stream)
146            }
147            (None, PutBody::Buffer(bytes)) => bytes.into(),
148            (None, PutBody::Stream(stream)) => Body::wrap_stream(stream),
149            // _ => todo!("compression algorithms other than `zstd` are currently not supported"),
150        };
151
152        builder = builder.headers(self.metadata.to_headers("", false)?);
153
154        let response = builder.body(body).send().await?;
155        Ok(response.error_for_status()?.json().await?)
156    }
157}