objectstore_service/backend/
s3_compatible.rs1use std::time::{Duration, SystemTime};
4use std::{fmt, io};
5
6use futures_util::{StreamExt, TryStreamExt};
7use objectstore_types::metadata::{ExpirationPolicy, Metadata};
8use objectstore_types::range::{ByteRange, ContentRange};
9use reqwest::header::HeaderMap;
10use reqwest::{Body, IntoUrl, Method, RequestBuilder, Response, StatusCode};
11
12use crate::backend::common::{
13 self, Backend, DeleteResponse, GetResponse, MetadataResponse, PutResponse,
14};
15use crate::error::{Error, Result};
16use crate::id::ObjectId;
17use crate::stream::{self, ClientStream};
18
19#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
35pub struct S3CompatibleConfig {
36 pub endpoint: String,
45
46 pub bucket: String,
54}
55
56const GCS_CUSTOM_PREFIX: &str = "x-goog-meta-";
60const GCS_CUSTOM_TIME: &str = "x-goog-custom-time";
65const TTI_DEBOUNCE: Duration = Duration::from_hours(24);
67
68pub trait Token: Send + Sync {
70 fn as_str(&self) -> &str;
72}
73
74pub trait TokenProvider: Send + Sync + 'static {
76 fn get_token(&self) -> impl Future<Output = anyhow::Result<impl Token>> + Send;
78}
79
80#[derive(Debug)]
82pub struct NoToken;
83
84impl TokenProvider for NoToken {
85 #[allow(refining_impl_trait)]
86 async fn get_token(&self) -> anyhow::Result<NoToken> {
87 unimplemented!()
88 }
89}
90impl Token for NoToken {
91 fn as_str(&self) -> &str {
92 unimplemented!()
93 }
94}
95
96pub struct S3CompatibleBackend<T> {
98 client: reqwest::Client,
99
100 endpoint: String,
101 bucket: String,
102
103 token_provider: Option<T>,
104}
105
106impl<T> S3CompatibleBackend<T> {
107 pub fn new(endpoint: &str, bucket: &str, token_provider: T) -> Self {
109 Self {
110 client: common::reqwest_client(),
111 endpoint: endpoint.into(),
112 bucket: bucket.into(),
113 token_provider: Some(token_provider),
114 }
115 }
116
117 fn object_url(&self, id: &ObjectId) -> String {
119 format!("{}/{}/{}", self.endpoint, self.bucket, id.as_storage_path())
120 }
121}
122
123fn metadata_to_gcs_headers(
125 metadata: &Metadata,
126 prefix: &str,
127) -> Result<HeaderMap, objectstore_types::metadata::Error> {
128 let mut headers = metadata.to_headers(prefix)?;
129 if let Some(expires_in) = metadata.expiration_policy.expires_in() {
131 let expires_at = humantime::format_rfc3339_seconds(SystemTime::now() + expires_in);
132 headers.append(GCS_CUSTOM_TIME, expires_at.to_string().parse()?);
133 }
134 Ok(headers)
135}
136
137impl<T> S3CompatibleBackend<T>
138where
139 T: TokenProvider,
140{
141 async fn request(&self, method: Method, url: impl IntoUrl) -> Result<RequestBuilder> {
143 let mut builder = self.client.request(method, url);
144 if let Some(provider) = &self.token_provider {
145 builder = builder.bearer_auth(
146 provider
147 .get_token()
148 .await
149 .map_err(|err| Error::Generic {
150 context: "S3: failed to get authentication token".to_owned(),
151 cause: Some(err.into()),
152 })?
153 .as_str(),
154 );
155 }
156 Ok(builder)
157 }
158
159 async fn request_object(
163 &self,
164 method: Method,
165 id: &ObjectId,
166 range: Option<ByteRange>,
167 ) -> Result<Option<(Metadata, Option<ContentRange>, Response)>> {
168 let object_url = self.object_url(id);
169
170 let mut builder = self.request(method, &object_url).await?;
171 if let Some(r) = range {
172 builder = builder.header(reqwest::header::RANGE, r.to_header_value());
173 }
174 let response = builder.send().await.map_err(|cause| Error::Reqwest {
175 context: "S3: failed to send request".to_string(),
176 cause,
177 })?;
178
179 if response.status() == StatusCode::NOT_FOUND {
180 objectstore_log::debug!("Object not found");
181 return Ok(None);
182 }
183
184 if response.status() == StatusCode::RANGE_NOT_SATISFIABLE {
185 let raw = response
186 .headers()
187 .get(reqwest::header::CONTENT_RANGE)
188 .and_then(|v| v.to_str().ok());
189 let total = raw.and_then(ContentRange::parse_unsatisfiable_total);
190 match total {
191 Some(total) => return Err(Error::RangeNotSatisfiable { total }),
192 None => {
193 return Err(Error::Generic {
194 context: format!("S3: 416 response with invalid Content-Range: {raw:?}"),
195 cause: None,
196 });
197 }
198 }
199 }
200
201 let response = response
202 .error_for_status()
203 .map_err(|cause| Error::Reqwest {
204 context: "S3: failed to get object".to_string(),
205 cause,
206 })?;
207
208 let headers = response.headers();
209 let mut metadata = Metadata::from_headers(headers, GCS_CUSTOM_PREFIX)?;
210
211 let content_range = if response.status() == StatusCode::PARTIAL_CONTENT {
212 let range = headers
213 .get(reqwest::header::CONTENT_RANGE)
214 .and_then(|v| v.to_str().ok())
215 .and_then(|s| s.parse::<ContentRange>().ok())
216 .ok_or_else(|| Error::Generic {
217 context: "S3: 206 response missing valid Content-Range header".to_owned(),
218 cause: None,
219 })?;
220 metadata.size = Some(range.total as usize);
221 Some(range)
222 } else {
223 if let Some(len) = response.content_length() {
224 metadata.size = Some(len as usize);
225 } else {
226 objectstore_log::warn!("S3: 200 response missing Content-Length header");
227 }
228 None
229 };
230
231 if let ExpirationPolicy::TimeToIdle(tti) = metadata.expiration_policy {
233 let access_time = SystemTime::now();
235
236 let expire_at = headers
237 .get(GCS_CUSTOM_TIME)
238 .and_then(|s| s.to_str().ok())
239 .and_then(|s| humantime::parse_rfc3339(s).ok())
240 .unwrap_or(access_time);
241
242 if expire_at < access_time + tti - TTI_DEBOUNCE {
243 self.update_metadata(id, &metadata).await?;
244 }
245 }
246
247 Ok(Some((metadata, content_range, response)))
248 }
249
250 async fn update_metadata(&self, id: &ObjectId, metadata: &Metadata) -> Result<()> {
252 self.request(Method::PUT, self.object_url(id))
255 .await?
256 .header(
257 "x-goog-copy-source",
258 format!("/{}/{}", self.bucket, id.as_storage_path()),
259 )
260 .header("x-goog-metadata-directive", "REPLACE")
261 .headers(metadata_to_gcs_headers(metadata, GCS_CUSTOM_PREFIX)?)
262 .send()
263 .await
264 .map_err(|cause| Error::Reqwest {
265 context: "S3: failed to send TTI update request".to_string(),
266 cause,
267 })?
268 .error_for_status()
269 .map_err(|cause| Error::Reqwest {
270 context: "S3: failed to update expiration time for object with TTI".to_string(),
271 cause,
272 })?;
273
274 Ok(())
275 }
276}
277
278impl<T> fmt::Debug for S3CompatibleBackend<T> {
279 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
280 f.debug_struct("S3Compatible")
281 .field("client", &self.client)
282 .field("endpoint", &self.endpoint)
283 .field("bucket", &self.bucket)
284 .finish_non_exhaustive()
285 }
286}
287
288impl S3CompatibleBackend<NoToken> {
289 pub fn without_token(config: S3CompatibleConfig) -> Self {
291 Self {
292 client: common::reqwest_client(),
293 endpoint: config.endpoint,
294 bucket: config.bucket,
295 token_provider: None,
296 }
297 }
298}
299
300#[async_trait::async_trait]
301impl<T: TokenProvider> Backend for S3CompatibleBackend<T> {
302 fn name(&self) -> &'static str {
303 "s3-compatible"
304 }
305
306 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
307 async fn put_object(
308 &self,
309 id: &ObjectId,
310 metadata: &Metadata,
311 stream: ClientStream,
312 ) -> Result<PutResponse> {
313 objectstore_log::debug!("Writing to s3_compatible backend");
314 self.request(Method::PUT, self.object_url(id))
315 .await?
316 .headers(metadata_to_gcs_headers(metadata, GCS_CUSTOM_PREFIX)?)
317 .body(Body::wrap_stream(stream))
318 .send()
319 .await
320 .and_then(Response::error_for_status)
321 .map_err(|cause| match stream::unpack_client_error(&cause) {
322 Some(ce) => Error::Client(ce),
323 _ => Error::Reqwest {
324 context: "S3: failed to put object".to_string(),
325 cause,
326 },
327 })?;
328
329 Ok(())
330 }
331
332 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
333 async fn get_object(&self, id: &ObjectId, range: Option<ByteRange>) -> Result<GetResponse> {
334 objectstore_log::debug!("Reading from s3_compatible backend");
335
336 let Some((metadata, content_range, response)) =
337 self.request_object(Method::GET, id, range).await?
338 else {
339 return Ok(None);
340 };
341
342 let stream = response.bytes_stream().map_err(io::Error::other);
343 Ok(Some((metadata, content_range, stream.boxed())))
344 }
345
346 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
347 async fn get_metadata(&self, id: &ObjectId) -> Result<MetadataResponse> {
348 objectstore_log::debug!("Reading metadata from s3_compatible backend");
349 let response = self.request_object(Method::HEAD, id, None).await?;
350 Ok(response.map(|(metadata, _, _)| metadata))
351 }
352
353 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
354 async fn delete_object(&self, id: &ObjectId) -> Result<DeleteResponse> {
355 objectstore_log::debug!("Deleting from s3_compatible backend");
356 let response = self
357 .request(Method::DELETE, self.object_url(id))
358 .await?
359 .send()
360 .await
361 .map_err(|cause| Error::Reqwest {
362 context: "S3: failed to send delete request".to_string(),
363 cause,
364 })?;
365
366 if response.status() != StatusCode::NOT_FOUND {
368 objectstore_log::debug!("Object not found");
369 response
370 .error_for_status()
371 .map_err(|cause| Error::Reqwest {
372 context: "S3: failed to delete object".to_string(),
373 cause,
374 })?;
375 }
376
377 Ok(())
378 }
379}
380
381#[cfg(test)]
382mod tests {
383 use anyhow::Result;
384 use objectstore_types::scope::{Scope, Scopes};
385
386 use super::*;
387 use crate::backend::common::Backend;
388 use crate::id::ObjectContext;
389
390 fn create_test_backend() -> S3CompatibleBackend<NoToken> {
396 S3CompatibleBackend::without_token(S3CompatibleConfig {
397 endpoint: "http://localhost:8089".into(),
398 bucket: "test-bucket".into(),
399 })
400 }
401
402 fn make_id() -> ObjectId {
403 ObjectId::random(ObjectContext {
404 usecase: "testing".into(),
405 scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
406 })
407 }
408
409 #[tokio::test]
410 async fn test_get_metadata_nonexistent() -> Result<()> {
411 let backend = create_test_backend();
412 let id = make_id();
413 let result = backend.get_metadata(&id).await?;
414 assert!(result.is_none());
415 Ok(())
416 }
417}