objectstore_client/
put.rs1use std::collections::BTreeMap;
2use std::fmt;
3use std::io::Cursor;
4
5use async_compression::tokio::bufread::ZstdEncoder;
6use bytes::Bytes;
7use futures_util::StreamExt;
8use objectstore_types::Metadata;
9use reqwest::Body;
10use serde::Deserialize;
11use tokio::io::AsyncRead;
12use tokio_util::io::{ReaderStream, StreamReader};
13
14pub use objectstore_types::{Compression, ExpirationPolicy};
15
16use crate::{Client, ClientStream};
17
18impl Client {
19 fn put_body(&self, body: PutBody) -> PutBuilder<'_> {
20 let metadata = Metadata {
21 expiration_policy: self.default_expiration_policy,
22 compression: Some(self.default_compression),
23 ..Default::default()
24 };
25
26 PutBuilder {
27 client: self,
28 metadata,
29 key: None,
30 body,
31 }
32 }
33
34 pub fn put(&self, body: impl Into<Bytes>) -> PutBuilder<'_> {
36 self.put_body(PutBody::Buffer(body.into()))
37 }
38
39 pub fn put_stream(&self, body: ClientStream) -> PutBuilder<'_> {
41 self.put_body(PutBody::Stream(body))
42 }
43
44 pub fn put_read<R>(&self, body: R) -> PutBuilder<'_>
46 where
47 R: AsyncRead + Send + Sync + 'static,
48 {
49 let stream = ReaderStream::new(body).boxed();
50 self.put_body(PutBody::Stream(stream))
51 }
52}
53
54#[derive(Debug)]
56pub struct PutBuilder<'a> {
57 pub(crate) client: &'a Client,
58 pub(crate) metadata: Metadata,
59 pub(crate) key: Option<String>,
60 pub(crate) body: PutBody,
61}
62
63pub(crate) enum PutBody {
64 Buffer(Bytes),
65 Stream(ClientStream),
66}
67impl fmt::Debug for PutBody {
68 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
69 f.debug_tuple("PutBody").finish_non_exhaustive()
70 }
71}
72
73impl PutBuilder<'_> {
74 pub fn key(mut self, key: impl Into<String>) -> Self {
79 self.key = Some(key.into());
80 self
81 }
82
83 pub fn compression(mut self, compression: impl Into<Option<Compression>>) -> Self {
89 self.metadata.compression = compression.into();
90 self
91 }
92
93 pub fn expiration_policy(mut self, expiration_policy: ExpirationPolicy) -> Self {
95 self.metadata.expiration_policy = expiration_policy;
96 self
97 }
98
99 pub fn set_metadata(mut self, metadata: impl Into<BTreeMap<String, String>>) -> Self {
103 self.metadata.custom = metadata.into();
104 self
105 }
106
107 pub fn append_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
109 self.metadata.custom.insert(key.into(), value.into());
110 self
111 }
112}
113
114#[derive(Debug, Deserialize)]
116pub struct PutResponse {
117 pub key: String,
119}
120
121impl PutBuilder<'_> {
125 pub async fn send(self) -> crate::Result<PutResponse> {
127 let put_url = format!(
128 "{}/v1/{}",
129 self.client.service_url,
130 self.key.as_deref().unwrap_or_default()
131 );
132 let mut builder = self.client.request(reqwest::Method::PUT, put_url)?;
133
134 let body = match (self.metadata.compression, self.body) {
135 (Some(Compression::Zstd), PutBody::Buffer(bytes)) => {
136 let cursor = Cursor::new(bytes);
137 let encoder = ZstdEncoder::new(cursor);
138 let stream = ReaderStream::new(encoder);
139 Body::wrap_stream(stream)
140 }
141 (Some(Compression::Zstd), PutBody::Stream(stream)) => {
142 let stream = StreamReader::new(stream);
143 let encoder = ZstdEncoder::new(stream);
144 let stream = ReaderStream::new(encoder);
145 Body::wrap_stream(stream)
146 }
147 (None, PutBody::Buffer(bytes)) => bytes.into(),
148 (None, PutBody::Stream(stream)) => Body::wrap_stream(stream),
149 };
151
152 builder = builder.headers(self.metadata.to_headers("", false)?);
153
154 let response = builder.body(body).send().await?;
155 Ok(response.error_for_status()?.json().await?)
156 }
157}