objectstore_service/backend/
s3_compatible.rs1use std::time::{Duration, SystemTime};
4use std::{fmt, io};
5
6use futures_util::{StreamExt, TryStreamExt};
7use objectstore_types::metadata::{ExpirationPolicy, Metadata};
8use reqwest::header::HeaderMap;
9use reqwest::{Body, IntoUrl, Method, RequestBuilder, StatusCode};
10
11use crate::backend::common::{
12 self, Backend, DeleteResponse, GetResponse, MetadataResponse, PutResponse,
13};
14use crate::error::{Error, Result};
15use crate::id::ObjectId;
16use crate::stream::{self, ClientStream};
17
18#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
34pub struct S3CompatibleConfig {
35 pub endpoint: String,
44
45 pub bucket: String,
53}
54
55const GCS_CUSTOM_PREFIX: &str = "x-goog-meta-";
59const GCS_CUSTOM_TIME: &str = "x-goog-custom-time";
64const TTI_DEBOUNCE: Duration = Duration::from_secs(24 * 3600); pub trait Token: Send + Sync {
69 fn as_str(&self) -> &str;
71}
72
73pub trait TokenProvider: Send + Sync + 'static {
75 fn get_token(&self) -> impl Future<Output = anyhow::Result<impl Token>> + Send;
77}
78
79#[derive(Debug)]
81pub struct NoToken;
82
83impl TokenProvider for NoToken {
84 #[allow(refining_impl_trait)]
85 async fn get_token(&self) -> anyhow::Result<NoToken> {
86 unimplemented!()
87 }
88}
89impl Token for NoToken {
90 fn as_str(&self) -> &str {
91 unimplemented!()
92 }
93}
94
95pub struct S3CompatibleBackend<T> {
97 client: reqwest::Client,
98
99 endpoint: String,
100 bucket: String,
101
102 token_provider: Option<T>,
103}
104
105impl<T> S3CompatibleBackend<T> {
106 pub fn new(endpoint: &str, bucket: &str, token_provider: T) -> Self {
108 Self {
109 client: common::reqwest_client(),
110 endpoint: endpoint.into(),
111 bucket: bucket.into(),
112 token_provider: Some(token_provider),
113 }
114 }
115
116 fn object_url(&self, id: &ObjectId) -> String {
118 format!("{}/{}/{}", self.endpoint, self.bucket, id.as_storage_path())
119 }
120}
121
122fn metadata_to_gcs_headers(
124 metadata: &Metadata,
125 prefix: &str,
126) -> Result<HeaderMap, objectstore_types::metadata::Error> {
127 let mut headers = metadata.to_headers(prefix)?;
128 if let Some(expires_in) = metadata.expiration_policy.expires_in() {
130 let expires_at =
131 humantime::format_rfc3339_seconds(std::time::SystemTime::now() + expires_in);
132 headers.append(GCS_CUSTOM_TIME, expires_at.to_string().parse()?);
133 }
134 Ok(headers)
135}
136
137impl<T> S3CompatibleBackend<T>
138where
139 T: TokenProvider,
140{
141 async fn request(&self, method: Method, url: impl IntoUrl) -> Result<RequestBuilder> {
143 let mut builder = self.client.request(method, url);
144 if let Some(provider) = &self.token_provider {
145 builder = builder.bearer_auth(
146 provider
147 .get_token()
148 .await
149 .map_err(|err| Error::Generic {
150 context: "S3: failed to get authentication token".to_owned(),
151 cause: Some(err.into()),
152 })?
153 .as_str(),
154 );
155 }
156 Ok(builder)
157 }
158
159 async fn request_object(
163 &self,
164 method: Method,
165 id: &ObjectId,
166 ) -> Result<Option<(Metadata, reqwest::Response)>> {
167 let object_url = self.object_url(id);
168
169 let response = self
170 .request(method, &object_url)
171 .await?
172 .send()
173 .await
174 .map_err(|cause| Error::Reqwest {
175 context: "S3: failed to send request".to_string(),
176 cause,
177 })?;
178
179 if response.status() == StatusCode::NOT_FOUND {
180 objectstore_log::debug!("Object not found");
181 return Ok(None);
182 }
183
184 let response = response
185 .error_for_status()
186 .map_err(|cause| Error::Reqwest {
187 context: "S3: failed to get object".to_string(),
188 cause,
189 })?;
190
191 let headers = response.headers();
192 let mut metadata = Metadata::from_headers(headers, GCS_CUSTOM_PREFIX)?;
193 metadata.size = response.content_length().map(|len| len as usize);
194
195 if let ExpirationPolicy::TimeToIdle(tti) = metadata.expiration_policy {
197 let access_time = SystemTime::now();
199
200 let expire_at = headers
201 .get(GCS_CUSTOM_TIME)
202 .and_then(|s| s.to_str().ok())
203 .and_then(|s| humantime::parse_rfc3339(s).ok())
204 .unwrap_or(access_time);
205
206 if expire_at < access_time + tti - TTI_DEBOUNCE {
207 self.update_metadata(id, &metadata).await?;
208 }
209 }
210
211 Ok(Some((metadata, response)))
212 }
213
214 async fn update_metadata(&self, id: &ObjectId, metadata: &Metadata) -> Result<()> {
216 self.request(Method::PUT, self.object_url(id))
219 .await?
220 .header(
221 "x-goog-copy-source",
222 format!("/{}/{}", self.bucket, id.as_storage_path()),
223 )
224 .header("x-goog-metadata-directive", "REPLACE")
225 .headers(metadata_to_gcs_headers(metadata, GCS_CUSTOM_PREFIX)?)
226 .send()
227 .await
228 .map_err(|cause| Error::Reqwest {
229 context: "S3: failed to send TTI update request".to_string(),
230 cause,
231 })?
232 .error_for_status()
233 .map_err(|cause| Error::Reqwest {
234 context: "S3: failed to update expiration time for object with TTI".to_string(),
235 cause,
236 })?;
237
238 Ok(())
239 }
240}
241
242impl<T> fmt::Debug for S3CompatibleBackend<T> {
243 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
244 f.debug_struct("S3Compatible")
245 .field("client", &self.client)
246 .field("endpoint", &self.endpoint)
247 .field("bucket", &self.bucket)
248 .finish_non_exhaustive()
249 }
250}
251
252impl S3CompatibleBackend<NoToken> {
253 pub fn without_token(config: S3CompatibleConfig) -> Self {
255 Self {
256 client: common::reqwest_client(),
257 endpoint: config.endpoint,
258 bucket: config.bucket,
259 token_provider: None,
260 }
261 }
262}
263
264#[async_trait::async_trait]
265impl<T: TokenProvider> Backend for S3CompatibleBackend<T> {
266 fn name(&self) -> &'static str {
267 "s3-compatible"
268 }
269
270 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
271 async fn put_object(
272 &self,
273 id: &ObjectId,
274 metadata: &Metadata,
275 stream: ClientStream,
276 ) -> Result<PutResponse> {
277 objectstore_log::debug!("Writing to s3_compatible backend");
278 self.request(Method::PUT, self.object_url(id))
279 .await?
280 .headers(metadata_to_gcs_headers(metadata, GCS_CUSTOM_PREFIX)?)
281 .body(Body::wrap_stream(stream))
282 .send()
283 .await
284 .and_then(|response| response.error_for_status())
285 .map_err(|cause| match stream::unpack_client_error(&cause) {
286 Some(ce) => Error::Client(ce),
287 _ => Error::Reqwest {
288 context: "S3: failed to put object".to_string(),
289 cause,
290 },
291 })?;
292
293 Ok(())
294 }
295
296 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
297 async fn get_object(&self, id: &ObjectId) -> Result<GetResponse> {
298 objectstore_log::debug!("Reading from s3_compatible backend");
299
300 let Some((metadata, response)) = self.request_object(Method::GET, id).await? else {
301 return Ok(None);
302 };
303
304 let stream = response.bytes_stream().map_err(io::Error::other);
305 Ok(Some((metadata, stream.boxed())))
306 }
307
308 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
309 async fn get_metadata(&self, id: &ObjectId) -> Result<MetadataResponse> {
310 objectstore_log::debug!("Reading metadata from s3_compatible backend");
311 let response = self.request_object(Method::HEAD, id).await?;
312 Ok(response.map(|(metadata, _)| metadata))
313 }
314
315 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
316 async fn delete_object(&self, id: &ObjectId) -> Result<DeleteResponse> {
317 objectstore_log::debug!("Deleting from s3_compatible backend");
318 let response = self
319 .request(Method::DELETE, self.object_url(id))
320 .await?
321 .send()
322 .await
323 .map_err(|cause| Error::Reqwest {
324 context: "S3: failed to send delete request".to_string(),
325 cause,
326 })?;
327
328 if response.status() != StatusCode::NOT_FOUND {
330 objectstore_log::debug!("Object not found");
331 response
332 .error_for_status()
333 .map_err(|cause| Error::Reqwest {
334 context: "S3: failed to delete object".to_string(),
335 cause,
336 })?;
337 }
338
339 Ok(())
340 }
341}