Skip to main content

objectstore_service/backend/
gcs.rs

1//! Google Cloud Storage backend for long-term storage of large objects.
2
3use std::borrow::Cow;
4use std::collections::BTreeMap;
5use std::future::Future;
6use std::sync::Arc;
7use std::time::{Duration, SystemTime};
8use std::{fmt, io};
9
10use anyhow::Context;
11use futures_util::{StreamExt, TryStreamExt};
12use gcp_auth::TokenProvider;
13use objectstore_types::metadata::{ExpirationPolicy, Metadata};
14use reqwest::header::HeaderName;
15use reqwest::{Body, IntoUrl, Method, RequestBuilder, StatusCode, Url, header, multipart};
16use serde::{Deserialize, Serialize};
17
18use crate::backend::common::{
19    self, Backend, DeleteResponse, GetResponse, MetadataResponse, MultipartUploadBackend,
20    PutResponse,
21};
22use crate::error::{Error, Result};
23use crate::gcp_auth::PrefetchingTokenProvider;
24use crate::id::ObjectId;
25use crate::multipart::{
26    AbortMultipartResponse, CompleteMultipartResponse, CompletedPart, InitiateMultipartResponse,
27    ListPartsResponse, PartNumber, UploadId, UploadPartResponse,
28};
29use crate::stream::{self, ClientStream};
30
31/// Configuration for [`GcsBackend`].
32///
33/// Stores objects in [Google Cloud Storage]. Authentication uses Application Default Credentials
34/// (ADC), which can be provided via the `GOOGLE_APPLICATION_CREDENTIALS` environment variable or
35/// the GCE/GKE metadata service.
36///
37/// **Note**: The bucket must be pre-created with the following lifecycle policy:
38/// - `daysSinceCustomTime`: 1 day
39/// - `action`: delete
40///
41/// [Google Cloud Storage]: https://cloud.google.com/storage
42///
43/// # Example
44///
45/// ```yaml
46/// storage:
47///   type: gcs
48///   bucket: objectstore-bucket
49/// ```
50#[derive(Debug, Clone, Deserialize, Serialize)]
51pub struct GcsConfig {
52    /// Optional custom GCS endpoint URL.
53    ///
54    /// Useful for testing with emulators. If `None`, uses the default GCS endpoint.
55    ///
56    /// # Default
57    ///
58    /// `None` (uses default GCS endpoint)
59    ///
60    /// # Environment Variables
61    ///
62    /// - `OS__STORAGE__TYPE=gcs`
63    /// - `OS__STORAGE__ENDPOINT=http://localhost:9000` (optional)
64    pub endpoint: Option<String>,
65
66    /// GCS bucket name.
67    ///
68    /// The bucket must exist before starting the server.
69    ///
70    /// # Environment Variables
71    ///
72    /// - `OS__STORAGE__BUCKET=my-gcs-bucket`
73    pub bucket: String,
74}
75
76/// Default endpoint used to access the GCS JSON API.
77const DEFAULT_ENDPOINT: &str = "https://storage.googleapis.com";
78/// Permission scopes required for accessing GCS.
79const TOKEN_SCOPES: &[&str] = &["https://www.googleapis.com/auth/devstorage.read_write"];
80/// Time to debounce bumping an object with configured TTI.
81const TTI_DEBOUNCE: Duration = Duration::from_secs(24 * 3600); // 1 day
82/// How many times to retry failed operations.
83const REQUEST_RETRY_COUNT: usize = 2;
84
85/// Prefix for our built-in metadata stored in GCS metadata field
86const BUILTIN_META_PREFIX: &str = "x-sn-";
87/// Prefix for user custom metadata stored in GCS metadata field
88const CUSTOM_META_PREFIX: &str = "x-snme-";
89
90/// GCS object resource.
91///
92/// This is the representation of the object resource in GCS JSON API without its payload. Where no
93/// dedicated fields are available, we encode both built-in and custom metadata in the `metadata`
94/// field.
95#[derive(Debug, Serialize, Deserialize)]
96#[serde(rename_all = "camelCase")]
97struct GcsObject {
98    /// Content-Type of the object data. If an object is stored without a Content-Type, it is served
99    /// as application/octet-stream.
100    pub content_type: Cow<'static, str>,
101
102    /// Content encoding, used to store [`Metadata::compression`].
103    #[serde(default, skip_serializing_if = "Option::is_none")]
104    pub content_encoding: Option<String>,
105
106    /// Custom time stamp used for time-based expiration.
107    #[serde(
108        default,
109        skip_serializing_if = "Option::is_none",
110        with = "humantime_serde"
111    )]
112    pub custom_time: Option<SystemTime>,
113
114    /// The `Content-Length` of the data in bytes. GCS returns this as a string.
115    ///
116    /// GCS sets this in metadata responses. We can use it to know the size of an object
117    /// without having to stream it.
118    pub size: Option<String>,
119
120    /// Timestamp of when this object was created.
121    #[serde(
122        default,
123        skip_serializing_if = "Option::is_none",
124        with = "humantime_serde"
125    )]
126    pub time_created: Option<SystemTime>,
127
128    /// User-provided metadata, including our built-in metadata.
129    #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
130    pub metadata: BTreeMap<GcsMetaKey, String>,
131}
132
133impl GcsObject {
134    /// Converts our Metadata type to GCS JSON object metadata.
135    pub fn from_metadata(metadata: &Metadata) -> Self {
136        let mut gcs_object = GcsObject {
137            content_type: metadata.content_type.clone(),
138            size: metadata.size.map(|size| size.to_string()),
139            content_encoding: None,
140            custom_time: None,
141            time_created: metadata.time_created,
142            metadata: BTreeMap::new(),
143        };
144
145        // For time-based expiration, set the `customTime` field. The bucket must have a
146        // `daysSinceCustomTime` lifecycle rule configured to delete objects with this field set.
147        // This rule automatically skips objects without `customTime` set.
148        if let Some(expires_in) = metadata.expiration_policy.expires_in() {
149            gcs_object.custom_time = Some(SystemTime::now() + expires_in);
150        }
151
152        if let Some(compression) = metadata.compression {
153            gcs_object.content_encoding = Some(compression.to_string());
154        }
155
156        if metadata.expiration_policy != ExpirationPolicy::default() {
157            gcs_object.metadata.insert(
158                GcsMetaKey::Expiration,
159                metadata.expiration_policy.to_string(),
160            );
161        }
162
163        if let Some(origin) = &metadata.origin {
164            gcs_object
165                .metadata
166                .insert(GcsMetaKey::Origin, origin.clone());
167        }
168
169        for (key, value) in &metadata.custom {
170            gcs_object
171                .metadata
172                .insert(GcsMetaKey::Custom(key.clone()), value.clone());
173        }
174
175        gcs_object
176    }
177
178    /// Converts GCS JSON object metadata to our Metadata type.
179    pub fn into_metadata(mut self) -> Result<Metadata> {
180        // Remove ignored metadata keys that are set by the GCS emulator.
181        self.metadata.remove(&GcsMetaKey::EmulatorIgnored);
182
183        let expiration_policy = self
184            .metadata
185            .remove(&GcsMetaKey::Expiration)
186            .map(|s| s.parse())
187            .transpose()?
188            .unwrap_or_default();
189
190        let origin = self.metadata.remove(&GcsMetaKey::Origin);
191
192        let content_type = self.content_type;
193        let compression = self.content_encoding.map(|s| s.parse()).transpose()?;
194        let size = self
195            .size
196            .map(|size| size.parse())
197            .transpose()
198            .map_err(|e| Error::Generic {
199                context: "GCS: failed to parse size from object metadata".to_string(),
200                cause: Some(Box::new(e)),
201            })?;
202        let time_created = self.time_created;
203
204        // At this point, all built-in metadata should have been removed from self.metadata.
205        let mut custom = BTreeMap::new();
206        for (key, value) in self.metadata {
207            if let GcsMetaKey::Custom(custom_key) = key {
208                custom.insert(custom_key, value);
209            } else {
210                return Err(Error::Generic {
211                    context: format!(
212                        "GCS: unexpected built-in metadata key in object metadata: {}",
213                        key
214                    ),
215                    cause: None,
216                });
217            }
218        }
219
220        Ok(Metadata {
221            content_type,
222            expiration_policy,
223            compression,
224            origin,
225            size,
226            custom,
227            time_created,
228            time_expires: self.custom_time,
229        })
230    }
231}
232
233/// Key for [`GcsObject::metadata`].
234#[derive(Clone, Debug, PartialEq, Eq, Ord, PartialOrd)]
235enum GcsMetaKey {
236    /// Built-in metadata key for [`Metadata::expiration_policy`].
237    Expiration,
238    /// Built-in metadata key for [`Metadata::origin`].
239    Origin,
240    /// Ignored metadata set by the GCS emulator.
241    EmulatorIgnored,
242    /// User-defined custom metadata key.
243    Custom(String),
244}
245
246impl std::str::FromStr for GcsMetaKey {
247    type Err = anyhow::Error;
248
249    fn from_str(s: &str) -> Result<Self, Self::Err> {
250        if matches!(s, "x_emulator_upload" | "x_testbench_upload") {
251            return Ok(GcsMetaKey::EmulatorIgnored);
252        }
253
254        Ok(match s.strip_prefix(BUILTIN_META_PREFIX) {
255            Some("expiration") => GcsMetaKey::Expiration,
256            Some("origin") => GcsMetaKey::Origin,
257            Some(unknown) => anyhow::bail!("unknown builtin metadata key: {unknown}"),
258            None => match s.strip_prefix(CUSTOM_META_PREFIX) {
259                Some(key) => GcsMetaKey::Custom(key.to_string()),
260                None => anyhow::bail!("invalid GCS metadata key format: {s}"),
261            },
262        })
263    }
264}
265
266impl fmt::Display for GcsMetaKey {
267    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
268        match self {
269            Self::Expiration => write!(f, "{BUILTIN_META_PREFIX}expiration"),
270            Self::Origin => write!(f, "{BUILTIN_META_PREFIX}origin"),
271            Self::EmulatorIgnored => unreachable!("do not serialize emulator metadata"),
272            Self::Custom(key) => write!(f, "{CUSTOM_META_PREFIX}{key}"),
273        }
274    }
275}
276
277impl<'de> serde::Deserialize<'de> for GcsMetaKey {
278    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
279    where
280        D: serde::Deserializer<'de>,
281    {
282        let s = Cow::<'de, str>::deserialize(deserializer)?;
283        s.parse().map_err(serde::de::Error::custom)
284    }
285}
286
287impl serde::Serialize for GcsMetaKey {
288    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
289    where
290        S: serde::Serializer,
291    {
292        serializer.collect_str(self)
293    }
294}
295
296/// Builds HTTP headers that encode `metadata` for a GCS XML API request.
297fn metadata_to_gcs_headers(metadata: &Metadata) -> Result<header::HeaderMap> {
298    let mut headers = header::HeaderMap::new();
299
300    if let Some(expires_in) = metadata.expiration_policy.expires_in() {
301        let custom_time = SystemTime::now() + expires_in;
302        let formatted = humantime::format_rfc3339_seconds(custom_time);
303        headers.insert(
304            HeaderName::from_static("x-goog-custom-time"),
305            formatted.to_string().parse().map_err(|e| Error::Generic {
306                context: "GCS: invalid custom-time header value".into(),
307                cause: Some(Box::new(e)),
308            })?,
309        );
310    }
311
312    if let Some(compression) = metadata.compression {
313        headers.insert(
314            header::CONTENT_ENCODING,
315            compression
316                .to_string()
317                .parse()
318                .map_err(|e| Error::Generic {
319                    context: "GCS: invalid content-encoding header value".into(),
320                    cause: Some(Box::new(e)),
321                })?,
322        );
323    }
324
325    if metadata.expiration_policy != ExpirationPolicy::default() {
326        insert_gcs_meta_header(
327            &mut headers,
328            &GcsMetaKey::Expiration,
329            &metadata.expiration_policy.to_string(),
330        )?;
331    }
332
333    if let Some(origin) = &metadata.origin {
334        insert_gcs_meta_header(&mut headers, &GcsMetaKey::Origin, origin)?;
335    }
336
337    for (key, value) in &metadata.custom {
338        insert_gcs_meta_header(&mut headers, &GcsMetaKey::Custom(key.clone()), value)?;
339    }
340
341    Ok(headers)
342}
343
344fn insert_gcs_meta_header(
345    headers: &mut header::HeaderMap,
346    key: &GcsMetaKey,
347    value: &str,
348) -> Result<()> {
349    let header_name = format!("x-goog-meta-{key}");
350    headers.insert(
351        HeaderName::try_from(&header_name).map_err(|e| Error::Generic {
352            context: format!("GCS: invalid header name: {header_name}"),
353            cause: Some(Box::new(e)),
354        })?,
355        value.parse().map_err(|e| Error::Generic {
356            context: format!("GCS: invalid header value for {header_name}"),
357            cause: Some(Box::new(e)),
358        })?,
359    );
360    Ok(())
361}
362
363/// Returns `true` if the error is a transient reqwest failure worth retrying.
364fn is_retryable(error: &Error) -> bool {
365    let Error::Reqwest { cause, .. } = error else {
366        return false;
367    };
368    if cause.is_timeout() || cause.is_connect() || cause.is_request() {
369        return true;
370    }
371    let Some(status) = cause.status() else {
372        return false;
373    };
374    // https://docs.cloud.google.com/storage/docs/json_api/v1/status-codes
375    matches!(
376        status,
377        StatusCode::REQUEST_TIMEOUT
378            | StatusCode::TOO_MANY_REQUESTS
379            | StatusCode::INTERNAL_SERVER_ERROR
380            | StatusCode::BAD_GATEWAY
381            | StatusCode::SERVICE_UNAVAILABLE
382            | StatusCode::GATEWAY_TIMEOUT
383    )
384}
385
386/// GCS JSON API backend for long-term storage of large objects.
387pub struct GcsBackend {
388    client: reqwest::Client,
389    endpoint: Url,
390    bucket: String,
391    token_provider: Option<PrefetchingTokenProvider>,
392}
393
394impl GcsBackend {
395    /// Creates an authenticated GCS JSON API backend bound to the bucket in `config`.
396    pub async fn new(config: GcsConfig) -> anyhow::Result<Self> {
397        let GcsConfig { endpoint, bucket } = config;
398
399        let token_provider = if endpoint.is_none() {
400            Some(PrefetchingTokenProvider::gcp_auth(TOKEN_SCOPES).await?)
401        } else {
402            None
403        };
404
405        let endpoint_str = endpoint.as_deref().unwrap_or(DEFAULT_ENDPOINT);
406
407        Ok(Self {
408            client: common::reqwest_client(),
409            endpoint: endpoint_str.parse().context("invalid GCS endpoint URL")?,
410            bucket,
411            token_provider,
412        })
413    }
414
415    /// Formats the GCS object (metadata) URL for the given key.
416    fn object_url(&self, id: &ObjectId) -> Result<Url> {
417        let mut url = self.endpoint.clone();
418
419        let path = id.as_storage_path().to_string();
420        url.path_segments_mut()
421            .map_err(|()| Error::Generic {
422                context: format!(
423                    "GCS: invalid endpoint URL, {} cannot be a base",
424                    self.endpoint
425                ),
426                cause: None,
427            })?
428            .extend(&["storage", "v1", "b", &self.bucket, "o", &path]);
429
430        Ok(url)
431    }
432
433    /// Formats the GCS upload URL for the given upload type.
434    fn upload_url(&self, id: &ObjectId, upload_type: &str) -> Result<Url> {
435        let mut url = self.endpoint.clone();
436
437        url.path_segments_mut()
438            .map_err(|()| Error::Generic {
439                context: format!(
440                    "GCS: invalid endpoint URL, {} cannot be a base",
441                    self.endpoint
442                ),
443                cause: None,
444            })?
445            .extend(&["upload", "storage", "v1", "b", &self.bucket, "o"]);
446
447        url.query_pairs_mut()
448            .append_pair("uploadType", upload_type)
449            .append_pair("name", &id.as_storage_path().to_string());
450
451        Ok(url)
452    }
453
454    /// Formats a GCS XML API URL for the given object.
455    ///
456    /// Unlike [`object_url`](Self::object_url) (JSON API at
457    /// `/storage/v1/b/{bucket}/o/{name}`), this produces
458    /// `/{bucket}/{path_segments}` for the S3-compatible XML API used by
459    /// multipart uploads.
460    fn xml_object_url(&self, id: &ObjectId) -> Result<Url> {
461        let mut url = self.endpoint.clone();
462        {
463            let mut segments = url.path_segments_mut().map_err(|()| Error::Generic {
464                context: format!(
465                    "GCS: invalid endpoint URL, {} cannot be a base",
466                    self.endpoint
467                ),
468                cause: None,
469            })?;
470            segments.push(&self.bucket);
471            for part in id.as_storage_path().to_string().split('/') {
472                segments.push(part);
473            }
474        }
475        Ok(url)
476    }
477
478    /// Creates a request builder with the appropriate authentication.
479    async fn request(&self, method: Method, url: impl IntoUrl) -> Result<RequestBuilder> {
480        let mut builder = self.client.request(method, url);
481        if let Some(provider) = &self.token_provider {
482            let token = provider.token(TOKEN_SCOPES).await?;
483            builder = builder.bearer_auth(token.as_str());
484        }
485        Ok(builder)
486    }
487
488    /// Retries a GCS request on transient errors.
489    async fn with_retry<T, F>(&self, action: &'static str, f: impl Fn() -> F) -> Result<T>
490    where
491        F: Future<Output = Result<T>> + Send,
492    {
493        let mut retry_count = 0usize;
494        loop {
495            match f().await {
496                Ok(res) => return Ok(res),
497                Err(ref e) if retry_count < REQUEST_RETRY_COUNT && is_retryable(e) => {
498                    retry_count += 1;
499                    objectstore_metrics::count!("gcs.retries", action = action);
500                    objectstore_log::warn!(!!e, retry_count, action, "Retrying request");
501                }
502                Err(e) => {
503                    objectstore_metrics::count!("gcs.failures", action = action);
504                    return Err(e);
505                }
506            }
507        }
508    }
509
510    /// Fetches the GCS object metadata (without the payload), bumps TTI if
511    /// needed, and returns the parsed [`Metadata`].
512    async fn fetch_gcs_metadata(&self, object_url: &Url) -> Result<Option<Metadata>> {
513        let metadata_opt = self
514            .with_retry("get_metadata", || async {
515                let resp = self
516                    .request(Method::GET, object_url.clone())
517                    .await?
518                    .send()
519                    .await
520                    .map_err(|e| Error::reqwest("GCS: get metadata request", e))?;
521
522                if resp.status() == StatusCode::NOT_FOUND {
523                    return Ok(None);
524                }
525
526                let metadata: GcsObject = resp
527                    .error_for_status()
528                    .map_err(|e| Error::reqwest("GCS: get metadata status", e))?
529                    .json()
530                    .await
531                    .map_err(|e| Error::reqwest("GCS: get metadata parse", e))?;
532
533                Ok(Some(metadata))
534            })
535            .await?;
536
537        let Some(gcs_metadata) = metadata_opt else {
538            objectstore_log::debug!("Object not found");
539            return Ok(None);
540        };
541
542        let expire_at = gcs_metadata.custom_time;
543        let metadata = gcs_metadata.into_metadata()?;
544
545        // TODO: Inject the access time from the request.
546        let access_time = SystemTime::now();
547
548        // Filter already expired objects but leave them to garbage collection
549        if metadata.expiration_policy.is_timeout() && expire_at.is_some_and(|ts| ts < access_time) {
550            objectstore_log::debug!("Object found but past expiry");
551            return Ok(None);
552        }
553
554        // TODO: Schedule into background persistently so this doesn't get lost on restarts
555        if let ExpirationPolicy::TimeToIdle(tti) = metadata.expiration_policy {
556            let new_expire_at = access_time + tti;
557            if expire_at.is_some_and(|ts| ts < new_expire_at - TTI_DEBOUNCE) {
558                self.update_custom_time(object_url.clone(), new_expire_at)
559                    .await?;
560            }
561        }
562
563        Ok(Some(metadata))
564    }
565
566    async fn update_custom_time(&self, object_url: Url, custom_time: SystemTime) -> Result<()> {
567        #[derive(Debug, Serialize)]
568        #[serde(rename_all = "camelCase")]
569        struct CustomTimeRequest {
570            #[serde(with = "humantime_serde")]
571            custom_time: SystemTime,
572        }
573
574        self.with_retry("update_custom_time", || async {
575            self.request(Method::PATCH, object_url.clone())
576                .await?
577                .json(&CustomTimeRequest { custom_time })
578                .send()
579                .await
580                .and_then(|r| r.error_for_status())
581                .map_err(|e| Error::reqwest("GCS: update custom time", e))?;
582            Ok(())
583        })
584        .await
585    }
586}
587
588impl fmt::Debug for GcsBackend {
589    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
590        f.debug_struct("GcsJsonApi")
591            .field("endpoint", &self.endpoint)
592            .field("bucket", &self.bucket)
593            .finish_non_exhaustive()
594    }
595}
596
597#[async_trait::async_trait]
598impl Backend for GcsBackend {
599    fn name(&self) -> &'static str {
600        "gcs"
601    }
602
603    fn as_multipart_upload_backend(self: Arc<Self>) -> Result<Arc<dyn MultipartUploadBackend>> {
604        Ok(self)
605    }
606
607    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
608    async fn put_object(
609        &self,
610        id: &ObjectId,
611        metadata: &Metadata,
612        stream: ClientStream,
613    ) -> Result<PutResponse> {
614        objectstore_log::debug!("Writing to GCS backend");
615        let gcs_metadata = GcsObject::from_metadata(metadata);
616
617        // NB: Ensure the order of these fields and that a content-type is attached to them. Both
618        // are required by the GCS API.
619        let metadata_json = serde_json::to_string(&gcs_metadata).map_err(|cause| Error::Serde {
620            context: "failed to serialize metadata for GCS upload".to_string(),
621            cause,
622        })?;
623
624        let multipart = multipart::Form::new()
625            .part(
626                "metadata",
627                multipart::Part::text(metadata_json)
628                    .mime_str("application/json")
629                    .expect("application/json is a valid mime type"),
630            )
631            .part(
632                "media",
633                multipart::Part::stream(Body::wrap_stream(stream))
634                    .mime_str(&metadata.content_type)
635                    .map_err(|e| Error::Generic {
636                        context: format!("invalid mime type: {}", &metadata.content_type),
637                        cause: Some(Box::new(e)),
638                    })?,
639            );
640
641        // GCS requires a multipart/related request. Its body looks identical to
642        // multipart/form-data, but the Content-Type header is different. Hence, we have to manually
643        // set the header *after* writing the multipart form into the request.
644        let content_type = format!("multipart/related; boundary={}", multipart.boundary());
645
646        self.request(Method::POST, self.upload_url(id, "multipart")?)
647            .await?
648            .multipart(multipart)
649            .header(header::CONTENT_TYPE, content_type)
650            .send()
651            .await
652            .and_then(|r| r.error_for_status())
653            .map_err(|e| match stream::unpack_client_error(&e) {
654                Some(ce) => Error::Client(ce),
655                _ => Error::reqwest("GCS: upload object", e),
656            })?;
657
658        Ok(())
659    }
660
661    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
662    async fn get_object(&self, id: &ObjectId) -> Result<GetResponse> {
663        objectstore_log::debug!("Reading from GCS backend");
664        let object_url = self.object_url(id)?;
665
666        let Some(metadata) = self.fetch_gcs_metadata(&object_url).await? else {
667            return Ok(None);
668        };
669
670        let mut download_url = object_url;
671        download_url.query_pairs_mut().append_pair("alt", "media");
672
673        let payload_response = self
674            .with_retry("get_payload", || async {
675                self.request(Method::GET, download_url.clone())
676                    .await?
677                    .send()
678                    .await
679                    .and_then(|r| r.error_for_status())
680                    .map_err(|e| Error::reqwest("GCS: get payload", e))
681            })
682            .await?;
683
684        let stream = payload_response
685            .bytes_stream()
686            .map_err(io::Error::other)
687            .boxed();
688
689        Ok(Some((metadata, stream)))
690    }
691
692    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
693    async fn get_metadata(&self, id: &ObjectId) -> Result<MetadataResponse> {
694        objectstore_log::debug!("Reading metadata from GCS backend");
695        let object_url = self.object_url(id)?;
696        self.fetch_gcs_metadata(&object_url).await
697    }
698
699    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
700    async fn delete_object(&self, id: &ObjectId) -> Result<DeleteResponse> {
701        objectstore_log::debug!("Deleting from GCS backend");
702        let object_url = self.object_url(id)?;
703
704        self.with_retry("delete", || async {
705            let resp = self
706                .request(Method::DELETE, object_url.clone())
707                .await?
708                .send()
709                .await
710                .map_err(|e| Error::reqwest("GCS: delete object", e))?;
711
712            // Do not error for objects that do not exist
713            if resp.status() == StatusCode::NOT_FOUND {
714                return Ok(());
715            }
716
717            resp.error_for_status()
718                .map_err(|e| Error::reqwest("GCS: delete object", e))?;
719
720            Ok(())
721        })
722        .await
723    }
724}
725
726#[derive(Debug, Deserialize)]
727#[serde(rename_all = "PascalCase")]
728struct XmlInitiateMultipartUploadResponse {
729    upload_id: String,
730}
731
732impl TryFrom<XmlInitiateMultipartUploadResponse> for InitiateMultipartResponse {
733    type Error = crate::error::Error;
734
735    fn try_from(r: XmlInitiateMultipartUploadResponse) -> crate::error::Result<Self> {
736        Ok(UploadId::new(r.upload_id)?)
737    }
738}
739
740#[derive(Debug, Deserialize)]
741#[serde(rename_all = "PascalCase")]
742struct XmlListPartsResponse {
743    #[serde(default)]
744    is_truncated: bool,
745    next_part_number_marker: Option<PartNumber>,
746    #[serde(default, rename = "Part")]
747    parts: Vec<XmlPart>,
748}
749
750impl From<XmlListPartsResponse> for ListPartsResponse {
751    fn from(xml: XmlListPartsResponse) -> Self {
752        Self {
753            parts: xml.parts.into_iter().map(Into::into).collect(),
754            is_truncated: xml.is_truncated,
755            next_part_number_marker: xml.next_part_number_marker,
756        }
757    }
758}
759
760#[derive(Debug, Deserialize)]
761#[serde(rename_all = "PascalCase")]
762struct XmlPart {
763    part_number: PartNumber,
764    #[serde(rename = "ETag")]
765    e_tag: String,
766    #[serde(with = "humantime_serde")]
767    last_modified: SystemTime,
768    size: u64,
769}
770
771impl From<XmlPart> for crate::multipart::Part {
772    fn from(p: XmlPart) -> Self {
773        Self {
774            part_number: p.part_number,
775            etag: p.e_tag,
776            last_modified: p.last_modified,
777            size: p.size,
778        }
779    }
780}
781
782#[derive(Debug, Serialize)]
783#[serde(rename = "CompleteMultipartUpload")]
784struct XmlCompleteMultipartUpload {
785    #[serde(rename = "Part")]
786    parts: Vec<XmlCompletePart>,
787}
788
789impl From<Vec<CompletedPart>> for XmlCompleteMultipartUpload {
790    fn from(parts: Vec<CompletedPart>) -> Self {
791        Self {
792            parts: parts.into_iter().map(Into::into).collect(),
793        }
794    }
795}
796
797#[derive(Debug, Serialize)]
798#[serde(rename_all = "PascalCase")]
799struct XmlCompletePart {
800    part_number: PartNumber,
801    #[serde(rename = "ETag")]
802    e_tag: String,
803}
804
805impl From<CompletedPart> for XmlCompletePart {
806    fn from(p: CompletedPart) -> Self {
807        Self {
808            part_number: p.part_number,
809            e_tag: p.etag,
810        }
811    }
812}
813
814#[derive(Debug, Deserialize)]
815#[serde(rename = "Error", rename_all = "PascalCase")]
816struct XmlError {
817    code: String,
818    message: String,
819}
820
821impl From<XmlError> for crate::multipart::CompleteMultipartError {
822    fn from(e: XmlError) -> Self {
823        Self {
824            code: e.code,
825            message: e.message,
826        }
827    }
828}
829
830/// XXX: Any change that affects this implementation should be manually tested against real GCS.
831/// That's because the fork of [storage-testbench](https://github.com/googleapis/storage-testbench)
832/// that we test against has an incomplete implementation of the XML multipart API that likely doesn't match GCS's behavior in many cases.
833#[async_trait::async_trait]
834impl MultipartUploadBackend for GcsBackend {
835    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
836    async fn initiate_multipart(
837        &self,
838        id: &ObjectId,
839        metadata: &Metadata,
840    ) -> Result<InitiateMultipartResponse> {
841        objectstore_log::debug!("Initiating multipart upload on GCS backend");
842        let mut url = self.xml_object_url(id)?;
843        url.set_query(Some("uploads"));
844
845        let mut builder = self
846            .request(Method::POST, url)
847            .await?
848            .header(header::CONTENT_TYPE, metadata.content_type.as_ref())
849            .header(header::CONTENT_LENGTH, "0");
850
851        let meta_headers = metadata_to_gcs_headers(metadata)?;
852        for (name, value) in &meta_headers {
853            builder = builder.header(name, value);
854        }
855
856        let resp = builder
857            .send()
858            .await
859            .and_then(|r| r.error_for_status())
860            .map_err(|e| Error::reqwest("GCS: initiate multipart upload", e))?;
861
862        let body = resp
863            .bytes()
864            .await
865            .map_err(|e| Error::reqwest("GCS: read initiate multipart body", e))?;
866
867        let xml: XmlInitiateMultipartUploadResponse = quick_xml::de::from_reader(body.as_ref())
868            .map_err(|e| Error::Generic {
869                context: "GCS: failed to parse initiate multipart response".to_owned(),
870                cause: Some(Box::new(e)),
871            })?;
872
873        Ok(xml.try_into()?)
874    }
875
876    #[tracing::instrument(level = "trace", fields(?id, upload_id, part_number), skip_all)]
877    async fn upload_part(
878        &self,
879        id: &ObjectId,
880        upload_id: &UploadId,
881        part_number: PartNumber,
882        content_length: u64,
883        content_md5: Option<&str>,
884        body: ClientStream,
885    ) -> Result<UploadPartResponse> {
886        objectstore_log::debug!("Uploading part to GCS backend");
887        let mut url = self.xml_object_url(id)?;
888        url.query_pairs_mut()
889            .append_pair("partNumber", &part_number.to_string())
890            .append_pair("uploadId", upload_id);
891
892        let mut builder = self
893            .request(Method::PUT, url)
894            .await?
895            .header(header::CONTENT_LENGTH, content_length)
896            .body(Body::wrap_stream(body));
897
898        if let Some(md5) = content_md5 {
899            builder = builder.header("content-md5", md5);
900        }
901
902        let resp = builder
903            .send()
904            .await
905            .and_then(|r| r.error_for_status())
906            .map_err(|e| Error::reqwest("GCS: upload part", e))?;
907
908        let etag = resp
909            .headers()
910            .get(header::ETAG)
911            .and_then(|v| v.to_str().ok())
912            .map(|s| s.to_owned())
913            .ok_or_else(|| Error::generic("GCS: upload part response missing ETag header"))?;
914
915        Ok(etag)
916    }
917
918    #[tracing::instrument(level = "trace", fields(?id, upload_id), skip_all)]
919    async fn list_parts(
920        &self,
921        id: &ObjectId,
922        upload_id: &UploadId,
923        max_parts: Option<u32>,
924        part_number_marker: Option<PartNumber>,
925    ) -> Result<ListPartsResponse> {
926        objectstore_log::debug!("Listing parts on GCS backend");
927        let mut url = self.xml_object_url(id)?;
928        {
929            let mut pairs = url.query_pairs_mut();
930            pairs.append_pair("uploadId", upload_id);
931            if let Some(max) = max_parts {
932                pairs.append_pair("max-parts", &max.to_string());
933            }
934            if let Some(marker) = part_number_marker {
935                pairs.append_pair("part-number-marker", &marker.to_string());
936            }
937        }
938
939        let resp = self
940            .request(Method::GET, url)
941            .await?
942            .send()
943            .await
944            .and_then(|r| r.error_for_status())
945            .map_err(|e| Error::reqwest("GCS: list parts", e))?;
946
947        let body = resp
948            .bytes()
949            .await
950            .map_err(|e| Error::reqwest("GCS: read list parts body", e))?;
951
952        let xml: XmlListPartsResponse =
953            quick_xml::de::from_reader(body.as_ref()).map_err(|e| Error::Generic {
954                context: "GCS: failed to parse list parts response".to_owned(),
955                cause: Some(Box::new(e)),
956            })?;
957
958        Ok(xml.into())
959    }
960
961    #[tracing::instrument(level = "trace", fields(?id, upload_id), skip_all)]
962    async fn abort_multipart(
963        &self,
964        id: &ObjectId,
965        upload_id: &UploadId,
966    ) -> Result<AbortMultipartResponse> {
967        objectstore_log::debug!("Aborting multipart upload on GCS backend");
968        let mut url = self.xml_object_url(id)?;
969        url.query_pairs_mut().append_pair("uploadId", upload_id);
970
971        self.request(Method::DELETE, url)
972            .await?
973            .send()
974            .await
975            .and_then(|r| r.error_for_status())
976            .map_err(|e| Error::reqwest("GCS: abort multipart upload", e))?;
977
978        Ok(())
979    }
980
981    #[tracing::instrument(level = "trace", fields(?id, upload_id), skip_all)]
982    async fn complete_multipart(
983        &self,
984        id: &ObjectId,
985        upload_id: &UploadId,
986        parts: Vec<CompletedPart>,
987    ) -> Result<CompleteMultipartResponse> {
988        objectstore_log::debug!("Completing multipart upload on GCS backend");
989        let mut url = self.xml_object_url(id)?;
990        url.query_pairs_mut().append_pair("uploadId", upload_id);
991
992        let body = XmlCompleteMultipartUpload::from(parts);
993        let xml = quick_xml::se::to_string(&body).map_err(|e| Error::Generic {
994            context: "GCS: failed to serialize complete multipart request".into(),
995            cause: Some(Box::new(e)),
996        })?;
997
998        let resp = self
999            .request(Method::POST, url)
1000            .await?
1001            .header(header::CONTENT_TYPE, "application/xml")
1002            .body(xml)
1003            .send()
1004            .await
1005            .and_then(|r| r.error_for_status())
1006            .map_err(|e| Error::reqwest("GCS: complete multipart upload", e))?;
1007
1008        let body = resp
1009            .bytes()
1010            .await
1011            .map_err(|e| Error::reqwest("GCS: read complete multipart body", e))?;
1012
1013        let error = quick_xml::de::from_reader::<_, XmlError>(body.as_ref())
1014            .ok()
1015            .map(Into::into);
1016
1017        Ok(error)
1018    }
1019}
1020
1021#[cfg(test)]
1022mod tests {
1023    use std::collections::BTreeMap;
1024    use std::num::NonZeroU32;
1025
1026    use anyhow::Result;
1027    use objectstore_types::scope::{Scope, Scopes};
1028
1029    use super::*;
1030    use crate::id::ObjectContext;
1031    use crate::multipart::CompletedPart;
1032    use crate::stream;
1033
1034    // NB: Not run any of these tests, you need to have a GCS emulator running. This is done
1035    // automatically in CI.
1036    //
1037    // Refer to the readme for how to set up the emulator.
1038
1039    async fn create_test_backend() -> Result<GcsBackend> {
1040        GcsBackend::new(GcsConfig {
1041            endpoint: Some("http://localhost:8087".into()),
1042            bucket: "test-bucket".into(),
1043        })
1044        .await
1045    }
1046
1047    fn make_id() -> ObjectId {
1048        ObjectId::random(ObjectContext {
1049            usecase: "testing".into(),
1050            scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
1051        })
1052    }
1053
1054    #[tokio::test]
1055    async fn test_roundtrip() -> Result<()> {
1056        let backend = create_test_backend().await?;
1057
1058        let id = make_id();
1059        let metadata = Metadata {
1060            content_type: "text/plain".into(),
1061            expiration_policy: ExpirationPolicy::Manual,
1062            compression: None,
1063            origin: Some("203.0.113.42".into()),
1064            custom: BTreeMap::from_iter([("hello".into(), "world".into())]),
1065            time_created: Some(SystemTime::now()),
1066            time_expires: None,
1067            size: None,
1068        };
1069
1070        backend
1071            .put_object(&id, &metadata, stream::single("hello, world"))
1072            .await?;
1073
1074        let (meta, stream) = backend.get_object(&id).await?.unwrap();
1075
1076        let payload = stream::read_to_vec(stream).await?;
1077        let str_payload = str::from_utf8(&payload).unwrap();
1078        assert_eq!(str_payload, "hello, world");
1079        assert_eq!(meta.content_type, metadata.content_type);
1080        assert_eq!(meta.origin, metadata.origin);
1081        assert_eq!(meta.custom, metadata.custom);
1082        assert!(metadata.time_created.is_some());
1083
1084        Ok(())
1085    }
1086
1087    #[tokio::test]
1088    async fn test_get_nonexistent() -> Result<()> {
1089        let backend = create_test_backend().await?;
1090
1091        let id = make_id();
1092        let result = backend.get_object(&id).await?;
1093        assert!(result.is_none());
1094
1095        Ok(())
1096    }
1097
1098    #[tokio::test]
1099    async fn test_delete_nonexistent() -> Result<()> {
1100        let backend = create_test_backend().await?;
1101
1102        let id = make_id();
1103        backend.delete_object(&id).await?;
1104
1105        Ok(())
1106    }
1107
1108    #[tokio::test]
1109    async fn test_overwrite() -> Result<()> {
1110        let backend = create_test_backend().await?;
1111
1112        let id = make_id();
1113        let metadata = Metadata {
1114            custom: BTreeMap::from_iter([("invalid".into(), "invalid".into())]),
1115            ..Default::default()
1116        };
1117
1118        backend
1119            .put_object(&id, &metadata, stream::single("hello"))
1120            .await?;
1121
1122        let metadata = Metadata {
1123            custom: BTreeMap::from_iter([("hello".into(), "world".into())]),
1124            ..Default::default()
1125        };
1126
1127        backend
1128            .put_object(&id, &metadata, stream::single("world"))
1129            .await?;
1130
1131        let (meta, stream) = backend.get_object(&id).await?.unwrap();
1132
1133        let payload = stream::read_to_vec(stream).await?;
1134        let str_payload = str::from_utf8(&payload).unwrap();
1135        assert_eq!(str_payload, "world");
1136        assert_eq!(meta.custom, metadata.custom);
1137
1138        Ok(())
1139    }
1140
1141    #[tokio::test]
1142    async fn test_read_after_delete() -> Result<()> {
1143        let backend = create_test_backend().await?;
1144
1145        let id = make_id();
1146        let metadata = Metadata::default();
1147
1148        backend
1149            .put_object(&id, &metadata, stream::single("hello, world"))
1150            .await?;
1151
1152        backend.delete_object(&id).await?;
1153
1154        let result = backend.get_object(&id).await?;
1155        assert!(result.is_none());
1156
1157        Ok(())
1158    }
1159
1160    #[tokio::test]
1161    async fn test_ttl_immediate() -> Result<()> {
1162        // NB: We create a TTL that immediately expires in this tests. This might be optimized away
1163        // in a future implementation, so we will have to update this test accordingly.
1164
1165        let backend = create_test_backend().await?;
1166
1167        let id = make_id();
1168        let metadata = Metadata {
1169            expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_secs(0)),
1170            ..Default::default()
1171        };
1172
1173        backend
1174            .put_object(&id, &metadata, stream::single("hello, world"))
1175            .await?;
1176
1177        let result = backend.get_object(&id).await?;
1178        assert!(result.is_none());
1179
1180        Ok(())
1181    }
1182
1183    #[tokio::test]
1184    async fn test_tti_immediate() -> Result<()> {
1185        // NB: We create a TTI that immediately expires in this tests. This might be optimized away
1186        // in a future implementation, so we will have to update this test accordingly.
1187
1188        let backend = create_test_backend().await?;
1189
1190        let id = make_id();
1191        let metadata = Metadata {
1192            expiration_policy: ExpirationPolicy::TimeToIdle(Duration::from_secs(0)),
1193            ..Default::default()
1194        };
1195
1196        backend
1197            .put_object(&id, &metadata, stream::single("hello, world"))
1198            .await?;
1199
1200        let result = backend.get_object(&id).await?;
1201        assert!(result.is_none());
1202
1203        Ok(())
1204    }
1205
1206    #[tokio::test]
1207    async fn test_get_metadata_returns_metadata() -> Result<()> {
1208        let backend = create_test_backend().await?;
1209
1210        let id = make_id();
1211        let metadata = Metadata {
1212            content_type: "text/plain".into(),
1213            origin: Some("203.0.113.42".into()),
1214            custom: BTreeMap::from_iter([("hello".into(), "world".into())]),
1215            ..Default::default()
1216        };
1217
1218        backend
1219            .put_object(&id, &metadata, stream::single("hello, world"))
1220            .await?;
1221
1222        let meta = backend.get_metadata(&id).await?.unwrap();
1223        assert_eq!(meta.content_type, metadata.content_type);
1224        assert_eq!(meta.origin, metadata.origin);
1225        assert_eq!(meta.custom, metadata.custom);
1226
1227        Ok(())
1228    }
1229
1230    #[tokio::test]
1231    async fn test_get_metadata_nonexistent() -> Result<()> {
1232        let backend = create_test_backend().await?;
1233
1234        let id = make_id();
1235        let result = backend.get_metadata(&id).await?;
1236        assert!(result.is_none());
1237
1238        Ok(())
1239    }
1240
1241    #[tokio::test]
1242    async fn test_get_metadata_bumps_tti() -> Result<()> {
1243        let backend = create_test_backend().await?;
1244
1245        let id = make_id();
1246        // TTI must exceed TTI_DEBOUNCE (1 day) for the bump condition to be reachable.
1247        let tti = Duration::from_secs(2 * 24 * 3600); // 2 days
1248        let metadata = Metadata {
1249            content_type: "text/plain".into(),
1250            expiration_policy: ExpirationPolicy::TimeToIdle(tti),
1251            ..Default::default()
1252        };
1253
1254        backend
1255            .put_object(&id, &metadata, stream::single("hello, world"))
1256            .await?;
1257
1258        // Manually set custom_time to just inside the bump window.
1259        // The bump condition is: expire_at < now + tti - TTI_DEBOUNCE.
1260        let object_url = backend.object_url(&id)?;
1261        let old_deadline = SystemTime::now() + tti - TTI_DEBOUNCE - Duration::from_secs(60);
1262        backend.update_custom_time(object_url, old_deadline).await?;
1263
1264        // First get_metadata sees the old timestamp and triggers a TTI bump.
1265        let pre_meta = backend.get_metadata(&id).await?.unwrap();
1266        let pre_expiry = pre_meta.time_expires.unwrap();
1267
1268        // Second get_metadata sees the bumped timestamp.
1269        let post_meta = backend.get_metadata(&id).await?.unwrap();
1270        let post_expiry = post_meta.time_expires.unwrap();
1271        assert!(
1272            post_expiry > pre_expiry,
1273            "TTI bump should have extended the expiry: {pre_expiry:?} -> {post_expiry:?}"
1274        );
1275
1276        // Verify the payload is still intact after the bump.
1277        let (_, stream) = backend.get_object(&id).await?.unwrap();
1278        let payload = stream::read_to_vec(stream).await?;
1279        assert_eq!(&payload, b"hello, world");
1280
1281        Ok(())
1282    }
1283
1284    #[tokio::test]
1285    async fn test_get_metadata_does_not_bump_fresh_tti() -> Result<()> {
1286        let backend = create_test_backend().await?;
1287
1288        let id = make_id();
1289        // TTI must exceed TTI_DEBOUNCE (1 day) for the bump condition to be reachable.
1290        let tti = Duration::from_secs(2 * 24 * 3600); // 2 days
1291        let metadata = Metadata {
1292            content_type: "text/plain".into(),
1293            expiration_policy: ExpirationPolicy::TimeToIdle(tti),
1294            ..Default::default()
1295        };
1296
1297        backend
1298            .put_object(&id, &metadata, stream::single("hello, world"))
1299            .await?;
1300
1301        // A freshly written object has time_expires ≈ now + 2d, which is well outside
1302        // the bump window (now + 2d - 1d = now + 1d). No bump should occur.
1303        let first = backend.get_metadata(&id).await?.unwrap();
1304        let first_expiry = first.time_expires.unwrap();
1305
1306        let second = backend.get_metadata(&id).await?.unwrap();
1307        let second_expiry = second.time_expires.unwrap();
1308
1309        assert_eq!(
1310            first_expiry, second_expiry,
1311            "Fresh TTI object should not have its expiry bumped"
1312        );
1313
1314        Ok(())
1315    }
1316
1317    #[tokio::test]
1318    async fn test_compressed_payload_roundtrip() -> Result<()> {
1319        use objectstore_types::metadata::Compression;
1320
1321        let backend = create_test_backend().await?;
1322
1323        let plaintext = b"hello, world (but compressed with zstd)";
1324        let compressed = zstd::encode_all(&plaintext[..], 3)?;
1325
1326        let id = make_id();
1327        let metadata = Metadata {
1328            content_type: "text/plain".into(),
1329            compression: Some(Compression::Zstd),
1330            ..Default::default()
1331        };
1332
1333        backend
1334            .put_object(&id, &metadata, stream::single(compressed.clone()))
1335            .await?;
1336
1337        let (meta, stream) = backend.get_object(&id).await?.unwrap();
1338        let payload = stream::read_to_vec(stream).await?;
1339
1340        assert_eq!(meta.compression, Some(Compression::Zstd));
1341        assert_eq!(
1342            payload, compressed,
1343            "Payload should be returned still compressed, not auto-decompressed"
1344        );
1345
1346        Ok(())
1347    }
1348
1349    #[tokio::test]
1350    async fn test_multipart_single_part() -> Result<()> {
1351        let backend = create_test_backend().await?;
1352        let id = make_id();
1353        let metadata = Metadata {
1354            content_type: "text/plain".into(),
1355            expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_mins(33)),
1356            origin: Some("203.0.113.42".into()),
1357            custom: BTreeMap::from_iter([("hello".into(), "world".into())]),
1358            ..Default::default()
1359        };
1360
1361        let upload_id = backend.initiate_multipart(&id, &metadata).await?;
1362
1363        let data = b"hello, multipart world!";
1364        let etag = backend
1365            .upload_part(
1366                &id,
1367                &upload_id,
1368                NonZeroU32::new(1).unwrap(),
1369                data.len() as u64,
1370                None,
1371                stream::single(data.to_vec()),
1372            )
1373            .await?;
1374
1375        let result = backend
1376            .complete_multipart(
1377                &id,
1378                &upload_id,
1379                vec![CompletedPart {
1380                    part_number: NonZeroU32::new(1).unwrap(),
1381                    etag,
1382                }],
1383            )
1384            .await?;
1385        assert!(result.is_none(), "expected no error on complete");
1386
1387        let (meta, stream) = backend.get_object(&id).await?.unwrap();
1388        let payload = stream::read_to_vec(stream).await?;
1389        assert_eq!(payload, data);
1390        assert_eq!(meta.content_type, "text/plain".to_string());
1391        assert_eq!(
1392            meta.expiration_policy,
1393            ExpirationPolicy::TimeToLive(Duration::from_mins(33))
1394        );
1395        assert_eq!(meta.origin, Some("203.0.113.42".into()));
1396        assert_eq!(
1397            meta.custom,
1398            BTreeMap::from_iter([("hello".into(), "world".into())])
1399        );
1400
1401        Ok(())
1402    }
1403
1404    #[tokio::test]
1405    async fn test_multipart_multiple_parts() -> Result<()> {
1406        let backend = create_test_backend().await?;
1407        let id = make_id();
1408        let metadata = Metadata::default();
1409
1410        let upload_id = backend.initiate_multipart(&id, &metadata).await?;
1411
1412        // Non-final parts must be >= 5 MiB.
1413        const MIN_PART: usize = 5 * 1024 * 1024;
1414        let part1 = vec![b'a'; MIN_PART];
1415        let part2 = vec![b'b'; MIN_PART];
1416        let part3 = b"cccc".to_vec();
1417
1418        let etag1 = backend
1419            .upload_part(
1420                &id,
1421                &upload_id,
1422                NonZeroU32::new(1).unwrap(),
1423                part1.len() as u64,
1424                None,
1425                stream::single(part1.clone()),
1426            )
1427            .await?;
1428        let etag2 = backend
1429            .upload_part(
1430                &id,
1431                &upload_id,
1432                NonZeroU32::new(2).unwrap(),
1433                part2.len() as u64,
1434                None,
1435                stream::single(part2.clone()),
1436            )
1437            .await?;
1438        let etag3 = backend
1439            .upload_part(
1440                &id,
1441                &upload_id,
1442                NonZeroU32::new(3).unwrap(),
1443                part3.len() as u64,
1444                None,
1445                stream::single(part3.clone()),
1446            )
1447            .await?;
1448
1449        let result = backend
1450            .complete_multipart(
1451                &id,
1452                &upload_id,
1453                vec![
1454                    CompletedPart {
1455                        part_number: NonZeroU32::new(1).unwrap(),
1456                        etag: etag1,
1457                    },
1458                    CompletedPart {
1459                        part_number: NonZeroU32::new(2).unwrap(),
1460                        etag: etag2,
1461                    },
1462                    CompletedPart {
1463                        part_number: NonZeroU32::new(3).unwrap(),
1464                        etag: etag3,
1465                    },
1466                ],
1467            )
1468            .await?;
1469        assert!(result.is_none(), "expected no error on complete");
1470
1471        // Object exists after complete
1472        let (_meta, stream) = backend.get_object(&id).await?.unwrap();
1473        let payload = stream::read_to_vec(stream).await?;
1474        let mut expected = Vec::new();
1475        expected.extend_from_slice(&part1);
1476        expected.extend_from_slice(&part2);
1477        expected.extend_from_slice(&part3);
1478        assert_eq!(payload, expected);
1479
1480        Ok(())
1481    }
1482
1483    #[tokio::test]
1484    async fn test_multipart_out_of_order_upload() -> Result<()> {
1485        let backend = create_test_backend().await?;
1486        let id = make_id();
1487        let metadata = Metadata::default();
1488
1489        let upload_id = backend.initiate_multipart(&id, &metadata).await?;
1490
1491        // Non-final parts must be >= 5 MiB.
1492        const MIN_PART: usize = 5 * 1024 * 1024;
1493        let part1 = vec![b'a'; MIN_PART];
1494        let part2 = vec![b'b'; MIN_PART];
1495        let part3 = b"cccc".to_vec();
1496
1497        // Upload parts out of order: 2, 3, 1.
1498        let etag2 = backend
1499            .upload_part(
1500                &id,
1501                &upload_id,
1502                NonZeroU32::new(2).unwrap(),
1503                part2.len() as u64,
1504                None,
1505                stream::single(part2.clone()),
1506            )
1507            .await?;
1508        let etag3 = backend
1509            .upload_part(
1510                &id,
1511                &upload_id,
1512                NonZeroU32::new(3).unwrap(),
1513                part3.len() as u64,
1514                None,
1515                stream::single(part3.clone()),
1516            )
1517            .await?;
1518        let etag1 = backend
1519            .upload_part(
1520                &id,
1521                &upload_id,
1522                NonZeroU32::new(1).unwrap(),
1523                part1.len() as u64,
1524                None,
1525                stream::single(part1.clone()),
1526            )
1527            .await?;
1528
1529        // Complete with parts listed in order.
1530        let result = backend
1531            .complete_multipart(
1532                &id,
1533                &upload_id,
1534                vec![
1535                    CompletedPart {
1536                        part_number: NonZeroU32::new(1).unwrap(),
1537                        etag: etag1,
1538                    },
1539                    CompletedPart {
1540                        part_number: NonZeroU32::new(2).unwrap(),
1541                        etag: etag2,
1542                    },
1543                    CompletedPart {
1544                        part_number: NonZeroU32::new(3).unwrap(),
1545                        etag: etag3,
1546                    },
1547                ],
1548            )
1549            .await?;
1550        assert!(result.is_none(), "expected no error on complete");
1551
1552        // Verify reassembly order matches part numbers, not upload order.
1553        let (_meta, stream) = backend.get_object(&id).await?.unwrap();
1554        let payload = stream::read_to_vec(stream).await?;
1555        let mut expected = Vec::new();
1556        expected.extend_from_slice(&part1);
1557        expected.extend_from_slice(&part2);
1558        expected.extend_from_slice(&part3);
1559        assert_eq!(payload, expected);
1560
1561        Ok(())
1562    }
1563
1564    #[tokio::test]
1565    async fn test_multipart_list_parts() -> Result<()> {
1566        let backend = create_test_backend().await?;
1567        let id = make_id();
1568        let metadata = Metadata::default();
1569
1570        let upload_id = backend.initiate_multipart(&id, &metadata).await?;
1571
1572        let etag1 = backend
1573            .upload_part(
1574                &id,
1575                &upload_id,
1576                NonZeroU32::new(1).unwrap(),
1577                3,
1578                None,
1579                stream::single(b"aaa".to_vec()),
1580            )
1581            .await?;
1582        let etag2 = backend
1583            .upload_part(
1584                &id,
1585                &upload_id,
1586                NonZeroU32::new(2).unwrap(),
1587                3,
1588                None,
1589                stream::single(b"bbb".to_vec()),
1590            )
1591            .await?;
1592
1593        // List all parts.
1594        let list = backend.list_parts(&id, &upload_id, None, None).await?;
1595        assert_eq!(list.parts.len(), 2);
1596        assert_eq!(list.parts[0].part_number.get(), 1);
1597        assert_eq!(list.parts[0].etag, etag1);
1598        assert_eq!(list.parts[0].size, 3);
1599        assert_eq!(list.parts[1].part_number.get(), 2);
1600        assert_eq!(list.parts[1].etag, etag2);
1601        assert_eq!(list.parts[1].size, 3);
1602
1603        // List with max_parts=1 to test pagination.
1604        let page1 = backend.list_parts(&id, &upload_id, Some(1), None).await?;
1605        assert_eq!(page1.parts.len(), 1);
1606        assert_eq!(page1.parts[0].part_number.get(), 1);
1607        assert!(page1.is_truncated);
1608        assert!(page1.next_part_number_marker.is_some());
1609
1610        let page2 = backend
1611            .list_parts(&id, &upload_id, Some(1), page1.next_part_number_marker)
1612            .await?;
1613        assert_eq!(page2.parts.len(), 1);
1614        assert_eq!(page2.parts[0].part_number.get(), 2);
1615
1616        // Clean up.
1617        backend.abort_multipart(&id, &upload_id).await?;
1618
1619        Ok(())
1620    }
1621
1622    #[tokio::test]
1623    async fn test_multipart_abort() -> Result<()> {
1624        let backend = create_test_backend().await?;
1625        let id = make_id();
1626        let metadata = Metadata::default();
1627
1628        let upload_id = backend.initiate_multipart(&id, &metadata).await?;
1629
1630        backend
1631            .upload_part(
1632                &id,
1633                &upload_id,
1634                NonZeroU32::new(1).unwrap(),
1635                5,
1636                None,
1637                stream::single(b"hello".to_vec()),
1638            )
1639            .await?;
1640
1641        backend.abort_multipart(&id, &upload_id).await?;
1642
1643        // Object should not exist after abort.
1644        let result = backend.get_object(&id).await?;
1645        assert!(result.is_none(), "object should not exist after abort");
1646
1647        Ok(())
1648    }
1649
1650    async fn multipart_put(
1651        backend: &GcsBackend,
1652        id: &ObjectId,
1653        metadata: &Metadata,
1654        payload: impl Into<bytes::Bytes>,
1655    ) -> Result<()> {
1656        let payload: bytes::Bytes = payload.into();
1657        let upload_id = backend.initiate_multipart(id, metadata).await?;
1658        let etag = backend
1659            .upload_part(
1660                id,
1661                &upload_id,
1662                NonZeroU32::new(1).unwrap(),
1663                payload.len() as u64,
1664                None,
1665                stream::single(payload),
1666            )
1667            .await?;
1668        let error = backend
1669            .complete_multipart(
1670                id,
1671                &upload_id,
1672                vec![CompletedPart {
1673                    part_number: NonZeroU32::new(1).unwrap(),
1674                    etag,
1675                }],
1676            )
1677            .await?;
1678        assert!(
1679            error.is_none(),
1680            "complete_multipart returned error: {error:?}"
1681        );
1682        Ok(())
1683    }
1684
1685    #[tokio::test]
1686    async fn test_multipart_ttl_immediate() -> Result<()> {
1687        let backend = create_test_backend().await?;
1688        let id = make_id();
1689        let metadata = Metadata {
1690            expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_secs(0)),
1691            ..Default::default()
1692        };
1693
1694        multipart_put(&backend, &id, &metadata, "hello, world").await?;
1695
1696        let result = backend.get_object(&id).await?;
1697        assert!(result.is_none());
1698
1699        Ok(())
1700    }
1701
1702    #[tokio::test]
1703    async fn test_multipart_tti_immediate() -> Result<()> {
1704        let backend = create_test_backend().await?;
1705        let id = make_id();
1706        let metadata = Metadata {
1707            expiration_policy: ExpirationPolicy::TimeToIdle(Duration::from_secs(0)),
1708            ..Default::default()
1709        };
1710
1711        multipart_put(&backend, &id, &metadata, "hello, world").await?;
1712
1713        let result = backend.get_object(&id).await?;
1714        assert!(result.is_none());
1715
1716        Ok(())
1717    }
1718
1719    #[tokio::test]
1720    async fn test_multipart_compressed_payload_roundtrip() -> Result<()> {
1721        use objectstore_types::metadata::Compression;
1722
1723        let backend = create_test_backend().await?;
1724
1725        let plaintext = b"hello, world (but compressed with zstd)";
1726        let compressed = zstd::encode_all(&plaintext[..], 3)?;
1727
1728        let id = make_id();
1729        let metadata = Metadata {
1730            content_type: "text/plain".into(),
1731            compression: Some(Compression::Zstd),
1732            ..Default::default()
1733        };
1734
1735        multipart_put(&backend, &id, &metadata, compressed.clone()).await?;
1736
1737        let (meta, stream) = backend.get_object(&id).await?.unwrap();
1738        let payload = stream::read_to_vec(stream).await?;
1739
1740        assert_eq!(meta.compression, Some(Compression::Zstd));
1741        assert_eq!(
1742            payload, compressed,
1743            "Payload should be returned still compressed, not auto-decompressed"
1744        );
1745
1746        Ok(())
1747    }
1748}