Skip to main content

objectstore_service/backend/
s3_compatible.rs

1//! S3-compatible backend with generic protocol support.
2
3use std::time::{Duration, SystemTime};
4use std::{fmt, io};
5
6use futures_util::{StreamExt, TryStreamExt};
7use objectstore_types::metadata::{ExpirationPolicy, Metadata};
8use objectstore_types::range::{ByteRange, ContentRange};
9use reqwest::header::HeaderMap;
10use reqwest::{Body, IntoUrl, Method, RequestBuilder, Response, StatusCode};
11
12use crate::backend::common::{
13    self, Backend, DeleteResponse, GetResponse, MetadataResponse, PutResponse,
14};
15use crate::error::{Error, Result};
16use crate::id::ObjectId;
17use crate::stream::{self, ClientStream};
18
19/// Configuration for [`S3CompatibleBackend`].
20///
21/// Supports [Amazon S3] and other S3-compatible services. Authentication is handled via
22/// environment variables (`AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`) or IAM roles.
23///
24/// [Amazon S3]: https://aws.amazon.com/s3/
25///
26/// # Example
27///
28/// ```yaml
29/// storage:
30///   type: s3compatible
31///   endpoint: https://s3.amazonaws.com
32///   bucket: my-bucket
33/// ```
34#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
35pub struct S3CompatibleConfig {
36    /// S3 endpoint URL.
37    ///
38    /// Examples: `https://s3.amazonaws.com`, `http://localhost:9000` (for MinIO)
39    ///
40    /// # Environment Variables
41    ///
42    /// - `OS__STORAGE__TYPE=s3compatible`
43    /// - `OS__STORAGE__ENDPOINT=https://s3.amazonaws.com`
44    pub endpoint: String,
45
46    /// S3 bucket name.
47    ///
48    /// The bucket must exist before starting the server.
49    ///
50    /// # Environment Variables
51    ///
52    /// - `OS__STORAGE__BUCKET=my-bucket`
53    pub bucket: String,
54}
55
56/// Prefix used for custom metadata in headers for the GCS backend.
57///
58/// See: <https://cloud.google.com/storage/docs/xml-api/reference-headers#xgoogmeta>
59const GCS_CUSTOM_PREFIX: &str = "x-goog-meta-";
60/// Header used to store the expiration time for GCS using the `daysSinceCustomTime` lifecycle
61/// condition.
62///
63/// See: <https://cloud.google.com/storage/docs/xml-api/reference-headers#xgoogcustomtime>
64const GCS_CUSTOM_TIME: &str = "x-goog-custom-time";
65/// Time to debounce bumping an object with configured TTI.
66const TTI_DEBOUNCE: Duration = Duration::from_hours(24);
67
68/// An authentication token that can be passed as a bearer credential.
69pub trait Token: Send + Sync {
70    /// Returns the token string.
71    fn as_str(&self) -> &str;
72}
73
74/// Provides authentication tokens for S3-compatible requests.
75pub trait TokenProvider: Send + Sync + 'static {
76    /// Returns a fresh token, fetching or refreshing it as needed.
77    fn get_token(&self) -> impl Future<Output = anyhow::Result<impl Token>> + Send;
78}
79
80/// Placeholder [`TokenProvider`] for unauthenticated backends.
81#[derive(Debug)]
82pub struct NoToken;
83
84impl TokenProvider for NoToken {
85    #[allow(refining_impl_trait)]
86    async fn get_token(&self) -> anyhow::Result<NoToken> {
87        unimplemented!()
88    }
89}
90impl Token for NoToken {
91    fn as_str(&self) -> &str {
92        unimplemented!()
93    }
94}
95
96/// S3-compatible storage backend with pluggable authentication.
97pub struct S3CompatibleBackend<T> {
98    client: reqwest::Client,
99
100    endpoint: String,
101    bucket: String,
102
103    token_provider: Option<T>,
104}
105
106impl<T> S3CompatibleBackend<T> {
107    /// Creates a new S3-compatible backend bound to the given bucket.
108    pub fn new(endpoint: &str, bucket: &str, token_provider: T) -> Self {
109        Self {
110            client: common::reqwest_client(),
111            endpoint: endpoint.into(),
112            bucket: bucket.into(),
113            token_provider: Some(token_provider),
114        }
115    }
116
117    /// Formats the S3 object URL for the given key.
118    fn object_url(&self, id: &ObjectId) -> String {
119        format!("{}/{}/{}", self.endpoint, self.bucket, id.as_storage_path())
120    }
121}
122
123/// Wraps [`Metadata::to_headers`] with GCS-specific concerns (tombstone + custom-time).
124fn metadata_to_gcs_headers(
125    metadata: &Metadata,
126    prefix: &str,
127) -> Result<HeaderMap, objectstore_types::metadata::Error> {
128    let mut headers = metadata.to_headers(prefix)?;
129    // GCS custom-time for lifecycle expiration
130    if let Some(expires_in) = metadata.expiration_policy.expires_in() {
131        let expires_at = humantime::format_rfc3339_seconds(SystemTime::now() + expires_in);
132        headers.append(GCS_CUSTOM_TIME, expires_at.to_string().parse()?);
133    }
134    Ok(headers)
135}
136
137impl<T> S3CompatibleBackend<T>
138where
139    T: TokenProvider,
140{
141    /// Creates a request builder with the appropriate authentication.
142    async fn request(&self, method: Method, url: impl IntoUrl) -> Result<RequestBuilder> {
143        let mut builder = self.client.request(method, url);
144        if let Some(provider) = &self.token_provider {
145            builder = builder.bearer_auth(
146                provider
147                    .get_token()
148                    .await
149                    .map_err(|err| Error::Generic {
150                        context: "S3: failed to get authentication token".to_owned(),
151                        cause: Some(err.into()),
152                    })?
153                    .as_str(),
154            );
155        }
156        Ok(builder)
157    }
158
159    /// Fetches object metadata using the given HTTP method (GET or HEAD),
160    /// bumps TTI if needed, and returns the parsed metadata along with the
161    /// response (so `get_object` can read the body from a GET).
162    async fn request_object(
163        &self,
164        method: Method,
165        id: &ObjectId,
166        range: Option<ByteRange>,
167    ) -> Result<Option<(Metadata, Option<ContentRange>, Response)>> {
168        let object_url = self.object_url(id);
169
170        let mut builder = self.request(method, &object_url).await?;
171        if let Some(r) = range {
172            builder = builder.header(reqwest::header::RANGE, r.to_header_value());
173        }
174        let response = builder.send().await.map_err(|cause| Error::Reqwest {
175            context: "S3: failed to send request".to_string(),
176            cause,
177        })?;
178
179        if response.status() == StatusCode::NOT_FOUND {
180            objectstore_log::debug!("Object not found");
181            return Ok(None);
182        }
183
184        if response.status() == StatusCode::RANGE_NOT_SATISFIABLE {
185            let raw = response
186                .headers()
187                .get(reqwest::header::CONTENT_RANGE)
188                .and_then(|v| v.to_str().ok());
189            let total = raw.and_then(ContentRange::parse_unsatisfiable_total);
190            match total {
191                Some(total) => return Err(Error::RangeNotSatisfiable { total }),
192                None => {
193                    return Err(Error::Generic {
194                        context: format!("S3: 416 response with invalid Content-Range: {raw:?}"),
195                        cause: None,
196                    });
197                }
198            }
199        }
200
201        let response = response
202            .error_for_status()
203            .map_err(|cause| Error::Reqwest {
204                context: "S3: failed to get object".to_string(),
205                cause,
206            })?;
207
208        let headers = response.headers();
209        let mut metadata = Metadata::from_headers(headers, GCS_CUSTOM_PREFIX)?;
210
211        let content_range = if response.status() == StatusCode::PARTIAL_CONTENT {
212            let range = headers
213                .get(reqwest::header::CONTENT_RANGE)
214                .and_then(|v| v.to_str().ok())
215                .and_then(|s| s.parse::<ContentRange>().ok())
216                .ok_or_else(|| Error::Generic {
217                    context: "S3: 206 response missing valid Content-Range header".to_owned(),
218                    cause: None,
219                })?;
220            metadata.size = Some(range.total as usize);
221            Some(range)
222        } else {
223            if let Some(len) = response.content_length() {
224                metadata.size = Some(len as usize);
225            } else {
226                objectstore_log::warn!("S3: 200 response missing Content-Length header");
227            }
228            None
229        };
230
231        // TODO: Schedule into background persistently so this doesn't get lost on restarts
232        if let ExpirationPolicy::TimeToIdle(tti) = metadata.expiration_policy {
233            // TODO: Inject the access time from the request.
234            let access_time = SystemTime::now();
235
236            let expire_at = headers
237                .get(GCS_CUSTOM_TIME)
238                .and_then(|s| s.to_str().ok())
239                .and_then(|s| humantime::parse_rfc3339(s).ok())
240                .unwrap_or(access_time);
241
242            if expire_at < access_time + tti - TTI_DEBOUNCE {
243                self.update_metadata(id, &metadata).await?;
244            }
245        }
246
247        Ok(Some((metadata, content_range, response)))
248    }
249
250    /// Issues a request to update the metadata for the given object.
251    async fn update_metadata(&self, id: &ObjectId, metadata: &Metadata) -> Result<()> {
252        // NB: Meta updates require copy + REPLACE along with *all* metadata. See
253        // https://cloud.google.com/storage/docs/xml-api/put-object-copy
254        self.request(Method::PUT, self.object_url(id))
255            .await?
256            .header(
257                "x-goog-copy-source",
258                format!("/{}/{}", self.bucket, id.as_storage_path()),
259            )
260            .header("x-goog-metadata-directive", "REPLACE")
261            .headers(metadata_to_gcs_headers(metadata, GCS_CUSTOM_PREFIX)?)
262            .send()
263            .await
264            .map_err(|cause| Error::Reqwest {
265                context: "S3: failed to send TTI update request".to_string(),
266                cause,
267            })?
268            .error_for_status()
269            .map_err(|cause| Error::Reqwest {
270                context: "S3: failed to update expiration time for object with TTI".to_string(),
271                cause,
272            })?;
273
274        Ok(())
275    }
276}
277
278impl<T> fmt::Debug for S3CompatibleBackend<T> {
279    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
280        f.debug_struct("S3Compatible")
281            .field("client", &self.client)
282            .field("endpoint", &self.endpoint)
283            .field("bucket", &self.bucket)
284            .finish_non_exhaustive()
285    }
286}
287
288impl S3CompatibleBackend<NoToken> {
289    /// Creates a new S3-compatible backend that sends unauthenticated requests.
290    pub fn without_token(config: S3CompatibleConfig) -> Self {
291        Self {
292            client: common::reqwest_client(),
293            endpoint: config.endpoint,
294            bucket: config.bucket,
295            token_provider: None,
296        }
297    }
298}
299
300#[async_trait::async_trait]
301impl<T: TokenProvider> Backend for S3CompatibleBackend<T> {
302    fn name(&self) -> &'static str {
303        "s3-compatible"
304    }
305
306    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
307    async fn put_object(
308        &self,
309        id: &ObjectId,
310        metadata: &Metadata,
311        stream: ClientStream,
312    ) -> Result<PutResponse> {
313        objectstore_log::debug!("Writing to s3_compatible backend");
314        self.request(Method::PUT, self.object_url(id))
315            .await?
316            .headers(metadata_to_gcs_headers(metadata, GCS_CUSTOM_PREFIX)?)
317            .body(Body::wrap_stream(stream))
318            .send()
319            .await
320            .and_then(Response::error_for_status)
321            .map_err(|cause| match stream::unpack_client_error(&cause) {
322                Some(ce) => Error::Client(ce),
323                _ => Error::Reqwest {
324                    context: "S3: failed to put object".to_string(),
325                    cause,
326                },
327            })?;
328
329        Ok(())
330    }
331
332    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
333    async fn get_object(&self, id: &ObjectId, range: Option<ByteRange>) -> Result<GetResponse> {
334        objectstore_log::debug!("Reading from s3_compatible backend");
335
336        let Some((metadata, content_range, response)) =
337            self.request_object(Method::GET, id, range).await?
338        else {
339            return Ok(None);
340        };
341
342        let stream = response.bytes_stream().map_err(io::Error::other);
343        Ok(Some((metadata, content_range, stream.boxed())))
344    }
345
346    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
347    async fn get_metadata(&self, id: &ObjectId) -> Result<MetadataResponse> {
348        objectstore_log::debug!("Reading metadata from s3_compatible backend");
349        let response = self.request_object(Method::HEAD, id, None).await?;
350        Ok(response.map(|(metadata, _, _)| metadata))
351    }
352
353    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
354    async fn delete_object(&self, id: &ObjectId) -> Result<DeleteResponse> {
355        objectstore_log::debug!("Deleting from s3_compatible backend");
356        let response = self
357            .request(Method::DELETE, self.object_url(id))
358            .await?
359            .send()
360            .await
361            .map_err(|cause| Error::Reqwest {
362                context: "S3: failed to send delete request".to_string(),
363                cause,
364            })?;
365
366        // Do not error for objects that do not exist.
367        if response.status() != StatusCode::NOT_FOUND {
368            objectstore_log::debug!("Object not found");
369            response
370                .error_for_status()
371                .map_err(|cause| Error::Reqwest {
372                    context: "S3: failed to delete object".to_string(),
373                    cause,
374                })?;
375        }
376
377        Ok(())
378    }
379}
380
381#[cfg(test)]
382mod tests {
383    use anyhow::Result;
384    use objectstore_types::scope::{Scope, Scopes};
385
386    use super::*;
387    use crate::backend::common::Backend;
388    use crate::id::ObjectContext;
389
390    // NB: To run these tests, you need to have a MinIO server running. This is done
391    // automatically in CI.
392    //
393    // Refer to the readme for how to set up MinIO via devservices.
394
395    fn create_test_backend() -> S3CompatibleBackend<NoToken> {
396        S3CompatibleBackend::without_token(S3CompatibleConfig {
397            endpoint: "http://localhost:8089".into(),
398            bucket: "test-bucket".into(),
399        })
400    }
401
402    fn make_id() -> ObjectId {
403        ObjectId::random(ObjectContext {
404            usecase: "testing".into(),
405            scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
406        })
407    }
408
409    #[tokio::test]
410    async fn test_get_metadata_nonexistent() -> Result<()> {
411        let backend = create_test_backend();
412        let id = make_id();
413        let result = backend.get_metadata(&id).await?;
414        assert!(result.is_none());
415        Ok(())
416    }
417}