objectstore_client/
put.rs1use std::fmt;
2use std::io::Cursor;
3use std::{borrow::Cow, collections::BTreeMap};
4
5use async_compression::tokio::bufread::ZstdEncoder;
6use bytes::Bytes;
7use futures_util::StreamExt;
8use objectstore_types::metadata::Metadata;
9use reqwest::Body;
10use serde::Deserialize;
11use tokio::fs::File;
12use tokio::io::{AsyncRead, BufReader};
13use tokio_util::io::{ReaderStream, StreamReader};
14
15pub use objectstore_types::metadata::{Compression, ExpirationPolicy};
16
17use crate::{ClientStream, ObjectKey, Session};
18
19#[derive(Debug, Deserialize)]
21pub struct PutResponse {
22 pub key: ObjectKey,
24}
25
26pub(crate) enum PutBody {
27 Buffer(Bytes),
28 Stream(ClientStream),
29 File(File),
30}
31
32impl fmt::Debug for PutBody {
33 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
34 f.debug_tuple("PutBody").finish_non_exhaustive()
35 }
36}
37
38impl Session {
39 fn put_body(&self, body: PutBody) -> PutBuilder {
40 let metadata = Metadata {
41 expiration_policy: self.scope.usecase().expiration_policy(),
42 compression: Some(self.scope.usecase().compression()),
43 ..Default::default()
44 };
45
46 PutBuilder {
47 session: self.clone(),
48 metadata,
49 key: None,
50 body,
51 }
52 }
53
54 pub fn put(&self, body: impl Into<Bytes>) -> PutBuilder {
56 self.put_body(PutBody::Buffer(body.into()))
57 }
58
59 pub fn put_stream(&self, body: ClientStream) -> PutBuilder {
61 self.put_body(PutBody::Stream(body))
62 }
63
64 pub fn put_read<R>(&self, body: R) -> PutBuilder
66 where
67 R: AsyncRead + Send + Sync + 'static,
68 {
69 let stream = ReaderStream::new(body).boxed();
70 self.put_body(PutBody::Stream(stream))
71 }
72
73 pub fn put_file(&self, file: File) -> PutBuilder {
80 self.put_body(PutBody::File(file))
81 }
82}
83
84#[derive(Debug)]
86pub struct PutBuilder {
87 pub(crate) session: Session,
88 pub(crate) metadata: Metadata,
89 pub(crate) key: Option<ObjectKey>,
90 pub(crate) body: PutBody,
91}
92
93impl PutBuilder {
94 pub fn key(mut self, key: impl Into<ObjectKey>) -> Self {
99 self.key = Some(key.into()).filter(|k| !k.is_empty());
100 self
101 }
102
103 pub fn compression(mut self, compression: impl Into<Option<Compression>>) -> Self {
111 self.metadata.compression = compression.into();
112 self
113 }
114
115 pub fn expiration_policy(mut self, expiration_policy: ExpirationPolicy) -> Self {
119 self.metadata.expiration_policy = expiration_policy;
120 self
121 }
122
123 pub fn content_type(mut self, content_type: impl Into<Cow<'static, str>>) -> Self {
128 self.metadata.content_type = content_type.into();
129 self
130 }
131
132 pub fn origin(mut self, origin: impl Into<String>) -> Self {
150 self.metadata.origin = Some(origin.into());
151 self
152 }
153
154 pub fn set_metadata(mut self, metadata: impl Into<BTreeMap<String, String>>) -> Self {
158 self.metadata.custom = metadata.into();
159 self
160 }
161
162 pub fn append_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
164 self.metadata.custom.insert(key.into(), value.into());
165 self
166 }
167}
168
169pub(crate) fn maybe_compress(body: PutBody, compression: Option<Compression>) -> Body {
171 match (compression, body) {
172 (Some(Compression::Zstd), PutBody::Buffer(bytes)) => {
173 let cursor = Cursor::new(bytes);
174 let encoder = ZstdEncoder::new(cursor);
175 let stream = ReaderStream::new(encoder);
176 Body::wrap_stream(stream)
177 }
178 (Some(Compression::Zstd), PutBody::Stream(stream)) => {
179 let stream = StreamReader::new(stream);
180 let encoder = ZstdEncoder::new(stream);
181 let stream = ReaderStream::new(encoder);
182 Body::wrap_stream(stream)
183 }
184 (Some(Compression::Zstd), PutBody::File(file)) => {
185 let reader = BufReader::new(file);
186 let encoder = ZstdEncoder::new(reader);
187 let stream = ReaderStream::new(encoder);
188 Body::wrap_stream(stream)
189 }
190 (None, PutBody::Buffer(bytes)) => bytes.into(),
191 (None, PutBody::Stream(stream)) => Body::wrap_stream(stream),
192 (None, PutBody::File(file)) => {
193 let stream = ReaderStream::new(file).boxed();
194 Body::wrap_stream(stream)
195 } }
197}
198
199impl PutBuilder {
203 pub async fn send(self) -> crate::Result<PutResponse> {
205 let method = match self.key {
206 Some(_) => reqwest::Method::PUT,
207 None => reqwest::Method::POST,
208 };
209
210 let mut builder = self
211 .session
212 .request(method, self.key.as_deref().unwrap_or_default())?;
213
214 let body = maybe_compress(self.body, self.metadata.compression);
215
216 builder = builder.headers(self.metadata.to_headers("")?);
217
218 let response = builder.body(body).send().await?;
219 Ok(response.error_for_status()?.json().await?)
220 }
221}