objectstore_service/backend/
s3_compatible.rs

1//! S3-compatible backend with generic protocol support.
2
3use std::time::{Duration, SystemTime};
4use std::{fmt, io};
5
6use futures_util::{StreamExt, TryStreamExt};
7use objectstore_types::metadata::{ExpirationPolicy, Metadata};
8use reqwest::header::HeaderMap;
9use reqwest::{Body, IntoUrl, Method, RequestBuilder, StatusCode};
10
11use crate::backend::common::{
12    self, Backend, DeleteResponse, GetResponse, MetadataResponse, PutResponse,
13};
14use crate::error::{Error, Result};
15use crate::id::ObjectId;
16use crate::stream::{self, ClientStream};
17
18/// Configuration for [`S3CompatibleBackend`].
19///
20/// Supports [Amazon S3] and other S3-compatible services. Authentication is handled via
21/// environment variables (`AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`) or IAM roles.
22///
23/// [Amazon S3]: https://aws.amazon.com/s3/
24///
25/// # Example
26///
27/// ```yaml
28/// storage:
29///   type: s3compatible
30///   endpoint: https://s3.amazonaws.com
31///   bucket: my-bucket
32/// ```
33#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
34pub struct S3CompatibleConfig {
35    /// S3 endpoint URL.
36    ///
37    /// Examples: `https://s3.amazonaws.com`, `http://localhost:9000` (for MinIO)
38    ///
39    /// # Environment Variables
40    ///
41    /// - `OS__STORAGE__TYPE=s3compatible`
42    /// - `OS__STORAGE__ENDPOINT=https://s3.amazonaws.com`
43    pub endpoint: String,
44
45    /// S3 bucket name.
46    ///
47    /// The bucket must exist before starting the server.
48    ///
49    /// # Environment Variables
50    ///
51    /// - `OS__STORAGE__BUCKET=my-bucket`
52    pub bucket: String,
53}
54
55/// Prefix used for custom metadata in headers for the GCS backend.
56///
57/// See: <https://cloud.google.com/storage/docs/xml-api/reference-headers#xgoogmeta>
58const GCS_CUSTOM_PREFIX: &str = "x-goog-meta-";
59/// Header used to store the expiration time for GCS using the `daysSinceCustomTime` lifecycle
60/// condition.
61///
62/// See: <https://cloud.google.com/storage/docs/xml-api/reference-headers#xgoogcustomtime>
63const GCS_CUSTOM_TIME: &str = "x-goog-custom-time";
64/// Time to debounce bumping an object with configured TTI.
65const TTI_DEBOUNCE: Duration = Duration::from_secs(24 * 3600); // 1 day
66
67/// An authentication token that can be passed as a bearer credential.
68pub trait Token: Send + Sync {
69    /// Returns the token string.
70    fn as_str(&self) -> &str;
71}
72
73/// Provides authentication tokens for S3-compatible requests.
74pub trait TokenProvider: Send + Sync + 'static {
75    /// Returns a fresh token, fetching or refreshing it as needed.
76    fn get_token(&self) -> impl Future<Output = anyhow::Result<impl Token>> + Send;
77}
78
79/// Placeholder [`TokenProvider`] for unauthenticated backends.
80#[derive(Debug)]
81pub struct NoToken;
82
83impl TokenProvider for NoToken {
84    #[allow(refining_impl_trait)]
85    async fn get_token(&self) -> anyhow::Result<NoToken> {
86        unimplemented!()
87    }
88}
89impl Token for NoToken {
90    fn as_str(&self) -> &str {
91        unimplemented!()
92    }
93}
94
95/// S3-compatible storage backend with pluggable authentication.
96pub struct S3CompatibleBackend<T> {
97    client: reqwest::Client,
98
99    endpoint: String,
100    bucket: String,
101
102    token_provider: Option<T>,
103}
104
105impl<T> S3CompatibleBackend<T> {
106    /// Creates a new S3-compatible backend bound to the given bucket.
107    pub fn new(endpoint: &str, bucket: &str, token_provider: T) -> Self {
108        Self {
109            client: common::reqwest_client(),
110            endpoint: endpoint.into(),
111            bucket: bucket.into(),
112            token_provider: Some(token_provider),
113        }
114    }
115
116    /// Formats the S3 object URL for the given key.
117    fn object_url(&self, id: &ObjectId) -> String {
118        format!("{}/{}/{}", self.endpoint, self.bucket, id.as_storage_path())
119    }
120}
121
122/// Wraps [`Metadata::to_headers`] with GCS-specific concerns (tombstone + custom-time).
123fn metadata_to_gcs_headers(
124    metadata: &Metadata,
125    prefix: &str,
126) -> Result<HeaderMap, objectstore_types::metadata::Error> {
127    let mut headers = metadata.to_headers(prefix)?;
128    // GCS custom-time for lifecycle expiration
129    if let Some(expires_in) = metadata.expiration_policy.expires_in() {
130        let expires_at =
131            humantime::format_rfc3339_seconds(std::time::SystemTime::now() + expires_in);
132        headers.append(GCS_CUSTOM_TIME, expires_at.to_string().parse()?);
133    }
134    Ok(headers)
135}
136
137impl<T> S3CompatibleBackend<T>
138where
139    T: TokenProvider,
140{
141    /// Creates a request builder with the appropriate authentication.
142    async fn request(&self, method: Method, url: impl IntoUrl) -> Result<RequestBuilder> {
143        let mut builder = self.client.request(method, url);
144        if let Some(provider) = &self.token_provider {
145            builder = builder.bearer_auth(
146                provider
147                    .get_token()
148                    .await
149                    .map_err(|err| Error::Generic {
150                        context: "S3: failed to get authentication token".to_owned(),
151                        cause: Some(err.into()),
152                    })?
153                    .as_str(),
154            );
155        }
156        Ok(builder)
157    }
158
159    /// Fetches object metadata using the given HTTP method (GET or HEAD),
160    /// bumps TTI if needed, and returns the parsed metadata along with the
161    /// response (so `get_object` can read the body from a GET).
162    async fn request_object(
163        &self,
164        method: Method,
165        id: &ObjectId,
166    ) -> Result<Option<(Metadata, reqwest::Response)>> {
167        let object_url = self.object_url(id);
168
169        let response = self
170            .request(method, &object_url)
171            .await?
172            .send()
173            .await
174            .map_err(|cause| Error::Reqwest {
175                context: "S3: failed to send request".to_string(),
176                cause,
177            })?;
178
179        if response.status() == StatusCode::NOT_FOUND {
180            objectstore_log::debug!("Object not found");
181            return Ok(None);
182        }
183
184        let response = response
185            .error_for_status()
186            .map_err(|cause| Error::Reqwest {
187                context: "S3: failed to get object".to_string(),
188                cause,
189            })?;
190
191        let headers = response.headers();
192        let mut metadata = Metadata::from_headers(headers, GCS_CUSTOM_PREFIX)?;
193        metadata.size = response.content_length().map(|len| len as usize);
194
195        // TODO: Schedule into background persistently so this doesn't get lost on restarts
196        if let ExpirationPolicy::TimeToIdle(tti) = metadata.expiration_policy {
197            // TODO: Inject the access time from the request.
198            let access_time = SystemTime::now();
199
200            let expire_at = headers
201                .get(GCS_CUSTOM_TIME)
202                .and_then(|s| s.to_str().ok())
203                .and_then(|s| humantime::parse_rfc3339(s).ok())
204                .unwrap_or(access_time);
205
206            if expire_at < access_time + tti - TTI_DEBOUNCE {
207                self.update_metadata(id, &metadata).await?;
208            }
209        }
210
211        Ok(Some((metadata, response)))
212    }
213
214    /// Issues a request to update the metadata for the given object.
215    async fn update_metadata(&self, id: &ObjectId, metadata: &Metadata) -> Result<()> {
216        // NB: Meta updates require copy + REPLACE along with *all* metadata. See
217        // https://cloud.google.com/storage/docs/xml-api/put-object-copy
218        self.request(Method::PUT, self.object_url(id))
219            .await?
220            .header(
221                "x-goog-copy-source",
222                format!("/{}/{}", self.bucket, id.as_storage_path()),
223            )
224            .header("x-goog-metadata-directive", "REPLACE")
225            .headers(metadata_to_gcs_headers(metadata, GCS_CUSTOM_PREFIX)?)
226            .send()
227            .await
228            .map_err(|cause| Error::Reqwest {
229                context: "S3: failed to send TTI update request".to_string(),
230                cause,
231            })?
232            .error_for_status()
233            .map_err(|cause| Error::Reqwest {
234                context: "S3: failed to update expiration time for object with TTI".to_string(),
235                cause,
236            })?;
237
238        Ok(())
239    }
240}
241
242impl<T> fmt::Debug for S3CompatibleBackend<T> {
243    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
244        f.debug_struct("S3Compatible")
245            .field("client", &self.client)
246            .field("endpoint", &self.endpoint)
247            .field("bucket", &self.bucket)
248            .finish_non_exhaustive()
249    }
250}
251
252impl S3CompatibleBackend<NoToken> {
253    /// Creates a new S3-compatible backend that sends unauthenticated requests.
254    pub fn without_token(config: S3CompatibleConfig) -> Self {
255        Self {
256            client: common::reqwest_client(),
257            endpoint: config.endpoint,
258            bucket: config.bucket,
259            token_provider: None,
260        }
261    }
262}
263
264#[async_trait::async_trait]
265impl<T: TokenProvider> Backend for S3CompatibleBackend<T> {
266    fn name(&self) -> &'static str {
267        "s3-compatible"
268    }
269
270    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
271    async fn put_object(
272        &self,
273        id: &ObjectId,
274        metadata: &Metadata,
275        stream: ClientStream,
276    ) -> Result<PutResponse> {
277        objectstore_log::debug!("Writing to s3_compatible backend");
278        self.request(Method::PUT, self.object_url(id))
279            .await?
280            .headers(metadata_to_gcs_headers(metadata, GCS_CUSTOM_PREFIX)?)
281            .body(Body::wrap_stream(stream))
282            .send()
283            .await
284            .and_then(|response| response.error_for_status())
285            .map_err(|cause| match stream::unpack_client_error(&cause) {
286                Some(ce) => Error::Client(ce),
287                _ => Error::Reqwest {
288                    context: "S3: failed to put object".to_string(),
289                    cause,
290                },
291            })?;
292
293        Ok(())
294    }
295
296    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
297    async fn get_object(&self, id: &ObjectId) -> Result<GetResponse> {
298        objectstore_log::debug!("Reading from s3_compatible backend");
299
300        let Some((metadata, response)) = self.request_object(Method::GET, id).await? else {
301            return Ok(None);
302        };
303
304        let stream = response.bytes_stream().map_err(io::Error::other);
305        Ok(Some((metadata, stream.boxed())))
306    }
307
308    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
309    async fn get_metadata(&self, id: &ObjectId) -> Result<MetadataResponse> {
310        objectstore_log::debug!("Reading metadata from s3_compatible backend");
311        let response = self.request_object(Method::HEAD, id).await?;
312        Ok(response.map(|(metadata, _)| metadata))
313    }
314
315    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
316    async fn delete_object(&self, id: &ObjectId) -> Result<DeleteResponse> {
317        objectstore_log::debug!("Deleting from s3_compatible backend");
318        let response = self
319            .request(Method::DELETE, self.object_url(id))
320            .await?
321            .send()
322            .await
323            .map_err(|cause| Error::Reqwest {
324                context: "S3: failed to send delete request".to_string(),
325                cause,
326            })?;
327
328        // Do not error for objects that do not exist.
329        if response.status() != StatusCode::NOT_FOUND {
330            objectstore_log::debug!("Object not found");
331            response
332                .error_for_status()
333                .map_err(|cause| Error::Reqwest {
334                    context: "S3: failed to delete object".to_string(),
335                    cause,
336                })?;
337        }
338
339        Ok(())
340    }
341}