objectstore_service/backend/
gcs.rs

1//! Google Cloud Storage backend for long-term storage of large objects.
2
3use std::borrow::Cow;
4use std::collections::BTreeMap;
5use std::future::Future;
6use std::time::{Duration, SystemTime};
7use std::{fmt, io};
8
9use anyhow::Context;
10use futures_util::{StreamExt, TryStreamExt};
11use gcp_auth::TokenProvider;
12use objectstore_types::metadata::{ExpirationPolicy, Metadata};
13use reqwest::{Body, IntoUrl, Method, RequestBuilder, StatusCode, Url, header, multipart};
14use serde::{Deserialize, Serialize};
15
16use crate::backend::common::{
17    self, Backend, DeleteResponse, GetResponse, MetadataResponse, PutResponse,
18};
19use crate::error::{Error, Result};
20use crate::gcp_auth::PrefetchingTokenProvider;
21use crate::id::ObjectId;
22use crate::stream::{self, ClientStream};
23
24/// Configuration for [`GcsBackend`].
25///
26/// Stores objects in [Google Cloud Storage]. Authentication uses Application Default Credentials
27/// (ADC), which can be provided via the `GOOGLE_APPLICATION_CREDENTIALS` environment variable or
28/// the GCE/GKE metadata service.
29///
30/// **Note**: The bucket must be pre-created with the following lifecycle policy:
31/// - `daysSinceCustomTime`: 1 day
32/// - `action`: delete
33///
34/// [Google Cloud Storage]: https://cloud.google.com/storage
35///
36/// # Example
37///
38/// ```yaml
39/// storage:
40///   type: gcs
41///   bucket: objectstore-bucket
42/// ```
43#[derive(Debug, Clone, Deserialize, Serialize)]
44pub struct GcsConfig {
45    /// Optional custom GCS endpoint URL.
46    ///
47    /// Useful for testing with emulators. If `None`, uses the default GCS endpoint.
48    ///
49    /// # Default
50    ///
51    /// `None` (uses default GCS endpoint)
52    ///
53    /// # Environment Variables
54    ///
55    /// - `OS__STORAGE__TYPE=gcs`
56    /// - `OS__STORAGE__ENDPOINT=http://localhost:9000` (optional)
57    pub endpoint: Option<String>,
58
59    /// GCS bucket name.
60    ///
61    /// The bucket must exist before starting the server.
62    ///
63    /// # Environment Variables
64    ///
65    /// - `OS__STORAGE__BUCKET=my-gcs-bucket`
66    pub bucket: String,
67}
68
69/// Default endpoint used to access the GCS JSON API.
70const DEFAULT_ENDPOINT: &str = "https://storage.googleapis.com";
71/// Permission scopes required for accessing GCS.
72const TOKEN_SCOPES: &[&str] = &["https://www.googleapis.com/auth/devstorage.read_write"];
73/// Time to debounce bumping an object with configured TTI.
74const TTI_DEBOUNCE: Duration = Duration::from_secs(24 * 3600); // 1 day
75/// How many times to retry failed operations.
76const REQUEST_RETRY_COUNT: usize = 2;
77
78/// Prefix for our built-in metadata stored in GCS metadata field
79const BUILTIN_META_PREFIX: &str = "x-sn-";
80/// Prefix for user custom metadata stored in GCS metadata field
81const CUSTOM_META_PREFIX: &str = "x-snme-";
82
83/// GCS object resource.
84///
85/// This is the representation of the object resource in GCS JSON API without its payload. Where no
86/// dedicated fields are available, we encode both built-in and custom metadata in the `metadata`
87/// field.
88#[derive(Debug, Serialize, Deserialize)]
89#[serde(rename_all = "camelCase")]
90struct GcsObject {
91    /// Content-Type of the object data. If an object is stored without a Content-Type, it is served
92    /// as application/octet-stream.
93    pub content_type: Cow<'static, str>,
94
95    /// Content encoding, used to store [`Metadata::compression`].
96    #[serde(default, skip_serializing_if = "Option::is_none")]
97    pub content_encoding: Option<String>,
98
99    /// Custom time stamp used for time-based expiration.
100    #[serde(
101        default,
102        skip_serializing_if = "Option::is_none",
103        with = "humantime_serde"
104    )]
105    pub custom_time: Option<SystemTime>,
106
107    /// The `Content-Length` of the data in bytes. GCS returns this as a string.
108    ///
109    /// GCS sets this in metadata responses. We can use it to know the size of an object
110    /// without having to stream it.
111    pub size: Option<String>,
112
113    /// Timestamp of when this object was created.
114    #[serde(
115        default,
116        skip_serializing_if = "Option::is_none",
117        with = "humantime_serde"
118    )]
119    pub time_created: Option<SystemTime>,
120
121    /// User-provided metadata, including our built-in metadata.
122    #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
123    pub metadata: BTreeMap<GcsMetaKey, String>,
124}
125
126impl GcsObject {
127    /// Converts our Metadata type to GCS JSON object metadata.
128    pub fn from_metadata(metadata: &Metadata) -> Self {
129        let mut gcs_object = GcsObject {
130            content_type: metadata.content_type.clone(),
131            size: metadata.size.map(|size| size.to_string()),
132            content_encoding: None,
133            custom_time: None,
134            time_created: metadata.time_created,
135            metadata: BTreeMap::new(),
136        };
137
138        // For time-based expiration, set the `customTime` field. The bucket must have a
139        // `daysSinceCustomTime` lifecycle rule configured to delete objects with this field set.
140        // This rule automatically skips objects without `customTime` set.
141        if let Some(expires_in) = metadata.expiration_policy.expires_in() {
142            gcs_object.custom_time = Some(SystemTime::now() + expires_in);
143        }
144
145        if let Some(compression) = metadata.compression {
146            gcs_object.content_encoding = Some(compression.to_string());
147        }
148
149        if metadata.expiration_policy != ExpirationPolicy::default() {
150            gcs_object.metadata.insert(
151                GcsMetaKey::Expiration,
152                metadata.expiration_policy.to_string(),
153            );
154        }
155
156        if let Some(origin) = &metadata.origin {
157            gcs_object
158                .metadata
159                .insert(GcsMetaKey::Origin, origin.clone());
160        }
161
162        for (key, value) in &metadata.custom {
163            gcs_object
164                .metadata
165                .insert(GcsMetaKey::Custom(key.clone()), value.clone());
166        }
167
168        gcs_object
169    }
170
171    /// Converts GCS JSON object metadata to our Metadata type.
172    pub fn into_metadata(mut self) -> Result<Metadata> {
173        // Remove ignored metadata keys that are set by the GCS emulator.
174        self.metadata.remove(&GcsMetaKey::EmulatorIgnored);
175
176        let expiration_policy = self
177            .metadata
178            .remove(&GcsMetaKey::Expiration)
179            .map(|s| s.parse())
180            .transpose()?
181            .unwrap_or_default();
182
183        let origin = self.metadata.remove(&GcsMetaKey::Origin);
184
185        let content_type = self.content_type;
186        let compression = self.content_encoding.map(|s| s.parse()).transpose()?;
187        let size = self
188            .size
189            .map(|size| size.parse())
190            .transpose()
191            .map_err(|e| Error::Generic {
192                context: "GCS: failed to parse size from object metadata".to_string(),
193                cause: Some(Box::new(e)),
194            })?;
195        let time_created = self.time_created;
196
197        // At this point, all built-in metadata should have been removed from self.metadata.
198        let mut custom = BTreeMap::new();
199        for (key, value) in self.metadata {
200            if let GcsMetaKey::Custom(custom_key) = key {
201                custom.insert(custom_key, value);
202            } else {
203                return Err(Error::Generic {
204                    context: format!(
205                        "GCS: unexpected built-in metadata key in object metadata: {}",
206                        key
207                    ),
208                    cause: None,
209                });
210            }
211        }
212
213        Ok(Metadata {
214            content_type,
215            expiration_policy,
216            compression,
217            origin,
218            size,
219            custom,
220            time_created,
221            time_expires: self.custom_time,
222        })
223    }
224}
225
226/// Key for [`GcsObject::metadata`].
227#[derive(Clone, Debug, PartialEq, Eq, Ord, PartialOrd)]
228enum GcsMetaKey {
229    /// Built-in metadata key for [`Metadata::expiration_policy`].
230    Expiration,
231    /// Built-in metadata key for [`Metadata::origin`].
232    Origin,
233    /// Ignored metadata set by the GCS emulator.
234    EmulatorIgnored,
235    /// User-defined custom metadata key.
236    Custom(String),
237}
238
239impl std::str::FromStr for GcsMetaKey {
240    type Err = anyhow::Error;
241
242    fn from_str(s: &str) -> Result<Self, Self::Err> {
243        if matches!(s, "x_emulator_upload" | "x_testbench_upload") {
244            return Ok(GcsMetaKey::EmulatorIgnored);
245        }
246
247        Ok(match s.strip_prefix(BUILTIN_META_PREFIX) {
248            Some("expiration") => GcsMetaKey::Expiration,
249            Some("origin") => GcsMetaKey::Origin,
250            Some(unknown) => anyhow::bail!("unknown builtin metadata key: {unknown}"),
251            None => match s.strip_prefix(CUSTOM_META_PREFIX) {
252                Some(key) => GcsMetaKey::Custom(key.to_string()),
253                None => anyhow::bail!("invalid GCS metadata key format: {s}"),
254            },
255        })
256    }
257}
258
259impl fmt::Display for GcsMetaKey {
260    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
261        match self {
262            Self::Expiration => write!(f, "{BUILTIN_META_PREFIX}expiration"),
263            Self::Origin => write!(f, "{BUILTIN_META_PREFIX}origin"),
264            Self::EmulatorIgnored => unreachable!("do not serialize emulator metadata"),
265            Self::Custom(key) => write!(f, "{CUSTOM_META_PREFIX}{key}"),
266        }
267    }
268}
269
270impl<'de> serde::Deserialize<'de> for GcsMetaKey {
271    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
272    where
273        D: serde::Deserializer<'de>,
274    {
275        let s = Cow::<'de, str>::deserialize(deserializer)?;
276        s.parse().map_err(serde::de::Error::custom)
277    }
278}
279
280impl serde::Serialize for GcsMetaKey {
281    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
282    where
283        S: serde::Serializer,
284    {
285        serializer.collect_str(self)
286    }
287}
288
289/// Returns `true` if the error is a transient reqwest failure worth retrying.
290fn is_retryable(error: &Error) -> bool {
291    let Error::Reqwest { cause, .. } = error else {
292        return false;
293    };
294    if cause.is_timeout() || cause.is_connect() || cause.is_request() {
295        return true;
296    }
297    let Some(status) = cause.status() else {
298        return false;
299    };
300    // https://docs.cloud.google.com/storage/docs/json_api/v1/status-codes
301    matches!(
302        status,
303        StatusCode::REQUEST_TIMEOUT
304            | StatusCode::TOO_MANY_REQUESTS
305            | StatusCode::INTERNAL_SERVER_ERROR
306            | StatusCode::BAD_GATEWAY
307            | StatusCode::SERVICE_UNAVAILABLE
308            | StatusCode::GATEWAY_TIMEOUT
309    )
310}
311
312/// GCS JSON API backend for long-term storage of large objects.
313pub struct GcsBackend {
314    client: reqwest::Client,
315    endpoint: Url,
316    bucket: String,
317    token_provider: Option<PrefetchingTokenProvider>,
318}
319
320impl GcsBackend {
321    /// Creates an authenticated GCS JSON API backend bound to the bucket in `config`.
322    pub async fn new(config: GcsConfig) -> anyhow::Result<Self> {
323        let GcsConfig { endpoint, bucket } = config;
324
325        let token_provider = if endpoint.is_none() {
326            Some(PrefetchingTokenProvider::gcp_auth(TOKEN_SCOPES).await?)
327        } else {
328            None
329        };
330
331        let endpoint_str = endpoint.as_deref().unwrap_or(DEFAULT_ENDPOINT);
332
333        Ok(Self {
334            client: common::reqwest_client(),
335            endpoint: endpoint_str.parse().context("invalid GCS endpoint URL")?,
336            bucket,
337            token_provider,
338        })
339    }
340
341    /// Formats the GCS object (metadata) URL for the given key.
342    fn object_url(&self, id: &ObjectId) -> Result<Url> {
343        let mut url = self.endpoint.clone();
344
345        let path = id.as_storage_path().to_string();
346        url.path_segments_mut()
347            .map_err(|()| Error::Generic {
348                context: format!(
349                    "GCS: invalid endpoint URL, {} cannot be a base",
350                    self.endpoint
351                ),
352                cause: None,
353            })?
354            .extend(&["storage", "v1", "b", &self.bucket, "o", &path]);
355
356        Ok(url)
357    }
358
359    /// Formats the GCS upload URL for the given upload type.
360    fn upload_url(&self, id: &ObjectId, upload_type: &str) -> Result<Url> {
361        let mut url = self.endpoint.clone();
362
363        url.path_segments_mut()
364            .map_err(|()| Error::Generic {
365                context: format!(
366                    "GCS: invalid endpoint URL, {} cannot be a base",
367                    self.endpoint
368                ),
369                cause: None,
370            })?
371            .extend(&["upload", "storage", "v1", "b", &self.bucket, "o"]);
372
373        url.query_pairs_mut()
374            .append_pair("uploadType", upload_type)
375            .append_pair("name", &id.as_storage_path().to_string());
376
377        Ok(url)
378    }
379
380    /// Creates a request builder with the appropriate authentication.
381    async fn request(&self, method: Method, url: impl IntoUrl) -> Result<RequestBuilder> {
382        let mut builder = self.client.request(method, url);
383        if let Some(provider) = &self.token_provider {
384            let token = provider.token(TOKEN_SCOPES).await?;
385            builder = builder.bearer_auth(token.as_str());
386        }
387        Ok(builder)
388    }
389
390    /// Retries a GCS request on transient errors.
391    async fn with_retry<T, F>(&self, action: &'static str, f: impl Fn() -> F) -> Result<T>
392    where
393        F: Future<Output = Result<T>> + Send,
394    {
395        let mut retry_count = 0usize;
396        loop {
397            match f().await {
398                Ok(res) => return Ok(res),
399                Err(ref e) if retry_count < REQUEST_RETRY_COUNT && is_retryable(e) => {
400                    retry_count += 1;
401                    objectstore_metrics::count!("gcs.retries", action = action);
402                    objectstore_log::warn!(!!e, retry_count, action, "Retrying request");
403                }
404                Err(e) => {
405                    objectstore_metrics::count!("gcs.failures", action = action);
406                    return Err(e);
407                }
408            }
409        }
410    }
411
412    /// Fetches the GCS object metadata (without the payload), bumps TTI if
413    /// needed, and returns the parsed [`Metadata`].
414    async fn fetch_gcs_metadata(&self, object_url: &Url) -> Result<Option<Metadata>> {
415        let metadata_opt = self
416            .with_retry("get_metadata", || async {
417                let resp = self
418                    .request(Method::GET, object_url.clone())
419                    .await?
420                    .send()
421                    .await
422                    .map_err(|e| Error::reqwest("GCS: get metadata request", e))?;
423
424                if resp.status() == StatusCode::NOT_FOUND {
425                    return Ok(None);
426                }
427
428                let metadata: GcsObject = resp
429                    .error_for_status()
430                    .map_err(|e| Error::reqwest("GCS: get metadata status", e))?
431                    .json()
432                    .await
433                    .map_err(|e| Error::reqwest("GCS: get metadata parse", e))?;
434
435                Ok(Some(metadata))
436            })
437            .await?;
438
439        let Some(gcs_metadata) = metadata_opt else {
440            objectstore_log::debug!("Object not found");
441            return Ok(None);
442        };
443
444        let expire_at = gcs_metadata.custom_time;
445        let metadata = gcs_metadata.into_metadata()?;
446
447        // TODO: Inject the access time from the request.
448        let access_time = SystemTime::now();
449
450        // Filter already expired objects but leave them to garbage collection
451        if metadata.expiration_policy.is_timeout() && expire_at.is_some_and(|ts| ts < access_time) {
452            objectstore_log::debug!("Object found but past expiry");
453            return Ok(None);
454        }
455
456        // TODO: Schedule into background persistently so this doesn't get lost on restarts
457        if let ExpirationPolicy::TimeToIdle(tti) = metadata.expiration_policy {
458            let new_expire_at = access_time + tti;
459            if expire_at.is_some_and(|ts| ts < new_expire_at - TTI_DEBOUNCE) {
460                self.update_custom_time(object_url.clone(), new_expire_at)
461                    .await?;
462            }
463        }
464
465        Ok(Some(metadata))
466    }
467
468    async fn update_custom_time(&self, object_url: Url, custom_time: SystemTime) -> Result<()> {
469        #[derive(Debug, Serialize)]
470        #[serde(rename_all = "camelCase")]
471        struct CustomTimeRequest {
472            #[serde(with = "humantime_serde")]
473            custom_time: SystemTime,
474        }
475
476        self.with_retry("update_custom_time", || async {
477            self.request(Method::PATCH, object_url.clone())
478                .await?
479                .json(&CustomTimeRequest { custom_time })
480                .send()
481                .await
482                .and_then(|r| r.error_for_status())
483                .map_err(|e| Error::reqwest("GCS: update custom time", e))?;
484            Ok(())
485        })
486        .await
487    }
488}
489
490impl fmt::Debug for GcsBackend {
491    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
492        f.debug_struct("GcsJsonApi")
493            .field("endpoint", &self.endpoint)
494            .field("bucket", &self.bucket)
495            .finish_non_exhaustive()
496    }
497}
498
499#[async_trait::async_trait]
500impl Backend for GcsBackend {
501    fn name(&self) -> &'static str {
502        "gcs"
503    }
504
505    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
506    async fn put_object(
507        &self,
508        id: &ObjectId,
509        metadata: &Metadata,
510        stream: ClientStream,
511    ) -> Result<PutResponse> {
512        objectstore_log::debug!("Writing to GCS backend");
513        let gcs_metadata = GcsObject::from_metadata(metadata);
514
515        // NB: Ensure the order of these fields and that a content-type is attached to them. Both
516        // are required by the GCS API.
517        let metadata_json = serde_json::to_string(&gcs_metadata).map_err(|cause| Error::Serde {
518            context: "failed to serialize metadata for GCS upload".to_string(),
519            cause,
520        })?;
521
522        let multipart = multipart::Form::new()
523            .part(
524                "metadata",
525                multipart::Part::text(metadata_json)
526                    .mime_str("application/json")
527                    .expect("application/json is a valid mime type"),
528            )
529            .part(
530                "media",
531                multipart::Part::stream(Body::wrap_stream(stream))
532                    .mime_str(&metadata.content_type)
533                    .map_err(|e| Error::Generic {
534                        context: format!("invalid mime type: {}", &metadata.content_type),
535                        cause: Some(Box::new(e)),
536                    })?,
537            );
538
539        // GCS requires a multipart/related request. Its body looks identical to
540        // multipart/form-data, but the Content-Type header is different. Hence, we have to manually
541        // set the header *after* writing the multipart form into the request.
542        let content_type = format!("multipart/related; boundary={}", multipart.boundary());
543
544        self.request(Method::POST, self.upload_url(id, "multipart")?)
545            .await?
546            .multipart(multipart)
547            .header(header::CONTENT_TYPE, content_type)
548            .send()
549            .await
550            .and_then(|r| r.error_for_status())
551            .map_err(|e| match stream::unpack_client_error(&e) {
552                Some(ce) => Error::Client(ce),
553                _ => Error::reqwest("GCS: upload object", e),
554            })?;
555
556        Ok(())
557    }
558
559    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
560    async fn get_object(&self, id: &ObjectId) -> Result<GetResponse> {
561        objectstore_log::debug!("Reading from GCS backend");
562        let object_url = self.object_url(id)?;
563
564        let Some(metadata) = self.fetch_gcs_metadata(&object_url).await? else {
565            return Ok(None);
566        };
567
568        let mut download_url = object_url;
569        download_url.query_pairs_mut().append_pair("alt", "media");
570
571        let payload_response = self
572            .with_retry("get_payload", || async {
573                self.request(Method::GET, download_url.clone())
574                    .await?
575                    .send()
576                    .await
577                    .and_then(|r| r.error_for_status())
578                    .map_err(|e| Error::reqwest("GCS: get payload", e))
579            })
580            .await?;
581
582        let stream = payload_response
583            .bytes_stream()
584            .map_err(io::Error::other)
585            .boxed();
586
587        Ok(Some((metadata, stream)))
588    }
589
590    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
591    async fn get_metadata(&self, id: &ObjectId) -> Result<MetadataResponse> {
592        objectstore_log::debug!("Reading metadata from GCS backend");
593        let object_url = self.object_url(id)?;
594        self.fetch_gcs_metadata(&object_url).await
595    }
596
597    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
598    async fn delete_object(&self, id: &ObjectId) -> Result<DeleteResponse> {
599        objectstore_log::debug!("Deleting from GCS backend");
600        let object_url = self.object_url(id)?;
601
602        self.with_retry("delete", || async {
603            let resp = self
604                .request(Method::DELETE, object_url.clone())
605                .await?
606                .send()
607                .await
608                .map_err(|e| Error::reqwest("GCS: delete object", e))?;
609
610            // Do not error for objects that do not exist
611            if resp.status() == StatusCode::NOT_FOUND {
612                return Ok(());
613            }
614
615            resp.error_for_status()
616                .map_err(|e| Error::reqwest("GCS: delete object", e))?;
617
618            Ok(())
619        })
620        .await
621    }
622}
623
624#[cfg(test)]
625mod tests {
626    use std::collections::BTreeMap;
627
628    use anyhow::Result;
629    use objectstore_types::scope::{Scope, Scopes};
630
631    use super::*;
632    use crate::id::ObjectContext;
633    use crate::stream;
634
635    // NB: Not run any of these tests, you need to have a GCS emulator running. This is done
636    // automatically in CI.
637    //
638    // Refer to the readme for how to set up the emulator.
639
640    async fn create_test_backend() -> Result<GcsBackend> {
641        GcsBackend::new(GcsConfig {
642            endpoint: Some("http://localhost:8087".into()),
643            bucket: "test-bucket".into(),
644        })
645        .await
646    }
647
648    fn make_id() -> ObjectId {
649        ObjectId::random(ObjectContext {
650            usecase: "testing".into(),
651            scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
652        })
653    }
654
655    #[tokio::test]
656    async fn test_roundtrip() -> Result<()> {
657        let backend = create_test_backend().await?;
658
659        let id = make_id();
660        let metadata = Metadata {
661            content_type: "text/plain".into(),
662            expiration_policy: ExpirationPolicy::Manual,
663            compression: None,
664            origin: Some("203.0.113.42".into()),
665            custom: BTreeMap::from_iter([("hello".into(), "world".into())]),
666            time_created: Some(SystemTime::now()),
667            time_expires: None,
668            size: None,
669        };
670
671        backend
672            .put_object(&id, &metadata, stream::single("hello, world"))
673            .await?;
674
675        let (meta, stream) = backend.get_object(&id).await?.unwrap();
676
677        let payload = stream::read_to_vec(stream).await?;
678        let str_payload = str::from_utf8(&payload).unwrap();
679        assert_eq!(str_payload, "hello, world");
680        assert_eq!(meta.content_type, metadata.content_type);
681        assert_eq!(meta.origin, metadata.origin);
682        assert_eq!(meta.custom, metadata.custom);
683        assert!(metadata.time_created.is_some());
684
685        Ok(())
686    }
687
688    #[tokio::test]
689    async fn test_get_nonexistent() -> Result<()> {
690        let backend = create_test_backend().await?;
691
692        let id = make_id();
693        let result = backend.get_object(&id).await?;
694        assert!(result.is_none());
695
696        Ok(())
697    }
698
699    #[tokio::test]
700    async fn test_delete_nonexistent() -> Result<()> {
701        let backend = create_test_backend().await?;
702
703        let id = make_id();
704        backend.delete_object(&id).await?;
705
706        Ok(())
707    }
708
709    #[tokio::test]
710    async fn test_overwrite() -> Result<()> {
711        let backend = create_test_backend().await?;
712
713        let id = make_id();
714        let metadata = Metadata {
715            custom: BTreeMap::from_iter([("invalid".into(), "invalid".into())]),
716            ..Default::default()
717        };
718
719        backend
720            .put_object(&id, &metadata, stream::single("hello"))
721            .await?;
722
723        let metadata = Metadata {
724            custom: BTreeMap::from_iter([("hello".into(), "world".into())]),
725            ..Default::default()
726        };
727
728        backend
729            .put_object(&id, &metadata, stream::single("world"))
730            .await?;
731
732        let (meta, stream) = backend.get_object(&id).await?.unwrap();
733
734        let payload = stream::read_to_vec(stream).await?;
735        let str_payload = str::from_utf8(&payload).unwrap();
736        assert_eq!(str_payload, "world");
737        assert_eq!(meta.custom, metadata.custom);
738
739        Ok(())
740    }
741
742    #[tokio::test]
743    async fn test_read_after_delete() -> Result<()> {
744        let backend = create_test_backend().await?;
745
746        let id = make_id();
747        let metadata = Metadata::default();
748
749        backend
750            .put_object(&id, &metadata, stream::single("hello, world"))
751            .await?;
752
753        backend.delete_object(&id).await?;
754
755        let result = backend.get_object(&id).await?;
756        assert!(result.is_none());
757
758        Ok(())
759    }
760
761    #[tokio::test]
762    async fn test_ttl_immediate() -> Result<()> {
763        // NB: We create a TTL that immediately expires in this tests. This might be optimized away
764        // in a future implementation, so we will have to update this test accordingly.
765
766        let backend = create_test_backend().await?;
767
768        let id = make_id();
769        let metadata = Metadata {
770            expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_secs(0)),
771            ..Default::default()
772        };
773
774        backend
775            .put_object(&id, &metadata, stream::single("hello, world"))
776            .await?;
777
778        let result = backend.get_object(&id).await?;
779        assert!(result.is_none());
780
781        Ok(())
782    }
783
784    #[tokio::test]
785    async fn test_tti_immediate() -> Result<()> {
786        // NB: We create a TTI that immediately expires in this tests. This might be optimized away
787        // in a future implementation, so we will have to update this test accordingly.
788
789        let backend = create_test_backend().await?;
790
791        let id = make_id();
792        let metadata = Metadata {
793            expiration_policy: ExpirationPolicy::TimeToIdle(Duration::from_secs(0)),
794            ..Default::default()
795        };
796
797        backend
798            .put_object(&id, &metadata, stream::single("hello, world"))
799            .await?;
800
801        let result = backend.get_object(&id).await?;
802        assert!(result.is_none());
803
804        Ok(())
805    }
806
807    #[tokio::test]
808    async fn test_get_metadata_returns_metadata() -> Result<()> {
809        let backend = create_test_backend().await?;
810
811        let id = make_id();
812        let metadata = Metadata {
813            content_type: "text/plain".into(),
814            origin: Some("203.0.113.42".into()),
815            custom: BTreeMap::from_iter([("hello".into(), "world".into())]),
816            ..Default::default()
817        };
818
819        backend
820            .put_object(&id, &metadata, stream::single("hello, world"))
821            .await?;
822
823        let meta = backend.get_metadata(&id).await?.unwrap();
824        assert_eq!(meta.content_type, metadata.content_type);
825        assert_eq!(meta.origin, metadata.origin);
826        assert_eq!(meta.custom, metadata.custom);
827
828        Ok(())
829    }
830
831    #[tokio::test]
832    async fn test_get_metadata_nonexistent() -> Result<()> {
833        let backend = create_test_backend().await?;
834
835        let id = make_id();
836        let result = backend.get_metadata(&id).await?;
837        assert!(result.is_none());
838
839        Ok(())
840    }
841
842    #[tokio::test]
843    async fn test_get_metadata_bumps_tti() -> Result<()> {
844        let backend = create_test_backend().await?;
845
846        let id = make_id();
847        // TTI must exceed TTI_DEBOUNCE (1 day) for the bump condition to be reachable.
848        let tti = Duration::from_secs(2 * 24 * 3600); // 2 days
849        let metadata = Metadata {
850            content_type: "text/plain".into(),
851            expiration_policy: ExpirationPolicy::TimeToIdle(tti),
852            ..Default::default()
853        };
854
855        backend
856            .put_object(&id, &metadata, stream::single("hello, world"))
857            .await?;
858
859        // Manually set custom_time to just inside the bump window.
860        // The bump condition is: expire_at < now + tti - TTI_DEBOUNCE.
861        let object_url = backend.object_url(&id)?;
862        let old_deadline = SystemTime::now() + tti - TTI_DEBOUNCE - Duration::from_secs(60);
863        backend.update_custom_time(object_url, old_deadline).await?;
864
865        // First get_metadata sees the old timestamp and triggers a TTI bump.
866        let pre_meta = backend.get_metadata(&id).await?.unwrap();
867        let pre_expiry = pre_meta.time_expires.unwrap();
868
869        // Second get_metadata sees the bumped timestamp.
870        let post_meta = backend.get_metadata(&id).await?.unwrap();
871        let post_expiry = post_meta.time_expires.unwrap();
872        assert!(
873            post_expiry > pre_expiry,
874            "TTI bump should have extended the expiry: {pre_expiry:?} -> {post_expiry:?}"
875        );
876
877        // Verify the payload is still intact after the bump.
878        let (_, stream) = backend.get_object(&id).await?.unwrap();
879        let payload = stream::read_to_vec(stream).await?;
880        assert_eq!(&payload, b"hello, world");
881
882        Ok(())
883    }
884
885    #[tokio::test]
886    async fn test_get_metadata_does_not_bump_fresh_tti() -> Result<()> {
887        let backend = create_test_backend().await?;
888
889        let id = make_id();
890        // TTI must exceed TTI_DEBOUNCE (1 day) for the bump condition to be reachable.
891        let tti = Duration::from_secs(2 * 24 * 3600); // 2 days
892        let metadata = Metadata {
893            content_type: "text/plain".into(),
894            expiration_policy: ExpirationPolicy::TimeToIdle(tti),
895            ..Default::default()
896        };
897
898        backend
899            .put_object(&id, &metadata, stream::single("hello, world"))
900            .await?;
901
902        // A freshly written object has time_expires ≈ now + 2d, which is well outside
903        // the bump window (now + 2d - 1d = now + 1d). No bump should occur.
904        let first = backend.get_metadata(&id).await?.unwrap();
905        let first_expiry = first.time_expires.unwrap();
906
907        let second = backend.get_metadata(&id).await?.unwrap();
908        let second_expiry = second.time_expires.unwrap();
909
910        assert_eq!(
911            first_expiry, second_expiry,
912            "Fresh TTI object should not have its expiry bumped"
913        );
914
915        Ok(())
916    }
917
918    #[tokio::test]
919    async fn test_compressed_payload_roundtrip() -> Result<()> {
920        use objectstore_types::metadata::Compression;
921
922        let backend = create_test_backend().await?;
923
924        let plaintext = b"hello, world (but compressed with zstd)";
925        let compressed = zstd::encode_all(&plaintext[..], 3)?;
926
927        let id = make_id();
928        let metadata = Metadata {
929            content_type: "text/plain".into(),
930            compression: Some(Compression::Zstd),
931            ..Default::default()
932        };
933
934        backend
935            .put_object(&id, &metadata, stream::single(compressed.clone()))
936            .await?;
937
938        let (meta, stream) = backend.get_object(&id).await?.unwrap();
939        let payload = stream::read_to_vec(stream).await?;
940
941        assert_eq!(meta.compression, Some(Compression::Zstd));
942        assert_eq!(
943            payload, compressed,
944            "Payload should be returned still compressed, not auto-decompressed"
945        );
946
947        Ok(())
948    }
949}