objectstore_client/
put.rs

1use std::fmt;
2use std::io::Cursor;
3use std::{borrow::Cow, collections::BTreeMap};
4
5use async_compression::tokio::bufread::ZstdEncoder;
6use bytes::Bytes;
7use futures_util::StreamExt;
8use objectstore_types::metadata::Metadata;
9use reqwest::Body;
10use serde::Deserialize;
11use tokio::fs::File;
12use tokio::io::{AsyncRead, BufReader};
13use tokio_util::io::{ReaderStream, StreamReader};
14
15pub use objectstore_types::metadata::{Compression, ExpirationPolicy};
16
17use crate::{ClientStream, ObjectKey, Session};
18
19/// The response returned from the service after uploading an object.
20#[derive(Debug, Deserialize)]
21pub struct PutResponse {
22    /// The key of the object, as stored.
23    pub key: ObjectKey,
24}
25
26pub(crate) enum PutBody {
27    Buffer(Bytes),
28    Stream(ClientStream),
29    File(File),
30}
31
32impl fmt::Debug for PutBody {
33    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
34        f.debug_tuple("PutBody").finish_non_exhaustive()
35    }
36}
37
38impl Session {
39    fn put_body(&self, body: PutBody) -> PutBuilder {
40        let metadata = Metadata {
41            expiration_policy: self.scope.usecase().expiration_policy(),
42            compression: Some(self.scope.usecase().compression()),
43            ..Default::default()
44        };
45
46        PutBuilder {
47            session: self.clone(),
48            metadata,
49            key: None,
50            body,
51        }
52    }
53
54    /// Creates or replaces an object using a [`Bytes`]-like payload.
55    pub fn put(&self, body: impl Into<Bytes>) -> PutBuilder {
56        self.put_body(PutBody::Buffer(body.into()))
57    }
58
59    /// Creates or replaces an object using a streaming payload.
60    pub fn put_stream(&self, body: ClientStream) -> PutBuilder {
61        self.put_body(PutBody::Stream(body))
62    }
63
64    /// Creates or replaces an object using an [`AsyncRead`] payload.
65    pub fn put_read<R>(&self, body: R) -> PutBuilder
66    where
67        R: AsyncRead + Send + Sync + 'static,
68    {
69        let stream = ReaderStream::new(body).boxed();
70        self.put_body(PutBody::Stream(stream))
71    }
72
73    /// Creates or replaces an object using the contents of a file.
74    ///
75    /// The file is attempted to be read when the request is sent.
76    /// It's therefore possible for the file to be moved or changed in the meantime.
77    /// If you want to avoid this possibility, use one of the other functions to supply
78    /// a payload directly instead.
79    pub fn put_file(&self, file: File) -> PutBuilder {
80        self.put_body(PutBody::File(file))
81    }
82}
83
84/// A [`put`](Session::put) request builder.
85#[derive(Debug)]
86pub struct PutBuilder {
87    pub(crate) session: Session,
88    pub(crate) metadata: Metadata,
89    pub(crate) key: Option<ObjectKey>,
90    pub(crate) body: PutBody,
91}
92
93impl PutBuilder {
94    /// Sets an explicit object key.
95    ///
96    /// If a key is specified, the object will be stored under that key. Otherwise, the Objectstore
97    /// server will automatically assign a random key, which is then returned from this request.
98    pub fn key(mut self, key: impl Into<ObjectKey>) -> Self {
99        self.key = Some(key.into()).filter(|k| !k.is_empty());
100        self
101    }
102
103    /// Sets an explicit compression algorithm to be used for this payload.
104    ///
105    /// [`None`] should be used if no compression should be performed by the client,
106    /// either because the payload is uncompressible (such as a media format), or if the user
107    /// will handle any kind of compression, without the clients knowledge.
108    ///
109    /// By default, the compression algorithm set on this Session's Usecase is used.
110    pub fn compression(mut self, compression: impl Into<Option<Compression>>) -> Self {
111        self.metadata.compression = compression.into();
112        self
113    }
114
115    /// Sets the expiration policy of the object to be uploaded.
116    ///
117    /// By default, the expiration policy set on this Session's Usecase is used.
118    pub fn expiration_policy(mut self, expiration_policy: ExpirationPolicy) -> Self {
119        self.metadata.expiration_policy = expiration_policy;
120        self
121    }
122
123    /// Sets the content type of the object to be uploaded.
124    ///
125    /// You can use the utility function [`crate::utils::guess_mime_type`] to attempt to guess a
126    /// `content_type` based on magic bytes.
127    pub fn content_type(mut self, content_type: impl Into<Cow<'static, str>>) -> Self {
128        self.metadata.content_type = content_type.into();
129        self
130    }
131
132    /// Sets the origin of the object, typically the IP address of the original source.
133    ///
134    /// This is an optional but encouraged field that tracks where the payload was
135    /// originally obtained from. For example, the IP address of the Sentry SDK or CLI
136    /// that uploaded the data.
137    ///
138    /// # Example
139    ///
140    /// ```no_run
141    /// # async fn example(session: objectstore_client::Session) {
142    /// session.put("data")
143    ///     .origin("203.0.113.42")
144    ///     .send()
145    ///     .await
146    ///     .unwrap();
147    /// # }
148    /// ```
149    pub fn origin(mut self, origin: impl Into<String>) -> Self {
150        self.metadata.origin = Some(origin.into());
151        self
152    }
153
154    /// This sets the custom metadata to the provided map.
155    ///
156    /// It will clear any previously set metadata.
157    pub fn set_metadata(mut self, metadata: impl Into<BTreeMap<String, String>>) -> Self {
158        self.metadata.custom = metadata.into();
159        self
160    }
161
162    /// Appends they `key`/`value` to the custom metadata of this object.
163    pub fn append_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
164        self.metadata.custom.insert(key.into(), value.into());
165        self
166    }
167}
168
169/// Compresses the body if compression is specified.
170pub(crate) fn maybe_compress(body: PutBody, compression: Option<Compression>) -> Body {
171    match (compression, body) {
172        (Some(Compression::Zstd), PutBody::Buffer(bytes)) => {
173            let cursor = Cursor::new(bytes);
174            let encoder = ZstdEncoder::new(cursor);
175            let stream = ReaderStream::new(encoder);
176            Body::wrap_stream(stream)
177        }
178        (Some(Compression::Zstd), PutBody::Stream(stream)) => {
179            let stream = StreamReader::new(stream);
180            let encoder = ZstdEncoder::new(stream);
181            let stream = ReaderStream::new(encoder);
182            Body::wrap_stream(stream)
183        }
184        (Some(Compression::Zstd), PutBody::File(file)) => {
185            let reader = BufReader::new(file);
186            let encoder = ZstdEncoder::new(reader);
187            let stream = ReaderStream::new(encoder);
188            Body::wrap_stream(stream)
189        }
190        (None, PutBody::Buffer(bytes)) => bytes.into(),
191        (None, PutBody::Stream(stream)) => Body::wrap_stream(stream),
192        (None, PutBody::File(file)) => {
193            let stream = ReaderStream::new(file).boxed();
194            Body::wrap_stream(stream)
195        } // _ => todo!("compression algorithms other than `zstd` are currently not supported"),
196    }
197}
198
199// TODO: instead of a separate `send` method, it would be nice to just implement `IntoFuture`.
200// However, `IntoFuture` needs to define the resulting future as an associated type,
201// and "impl trait in associated type position" is not yet stable :-(
202impl PutBuilder {
203    /// Sends the built put request to the upstream service.
204    pub async fn send(self) -> crate::Result<PutResponse> {
205        let method = match self.key {
206            Some(_) => reqwest::Method::PUT,
207            None => reqwest::Method::POST,
208        };
209
210        let mut builder = self
211            .session
212            .request(method, self.key.as_deref().unwrap_or_default())?;
213
214        let body = maybe_compress(self.body, self.metadata.compression);
215
216        builder = builder.headers(self.metadata.to_headers("")?);
217
218        let response = builder.body(body).send().await?;
219        Ok(response.error_for_status()?.json().await?)
220    }
221}