Skip to main content

objectstore_service/backend/
gcs.rs

1//! Google Cloud Storage backend for long-term storage of large objects.
2
3use std::borrow::Cow;
4use std::collections::BTreeMap;
5use std::future::Future;
6use std::time::{Duration, SystemTime};
7use std::{fmt, io};
8
9use anyhow::Context;
10use futures_util::{StreamExt, TryStreamExt};
11use gcp_auth::TokenProvider;
12use objectstore_types::metadata::{ExpirationPolicy, Metadata};
13use reqwest::header::HeaderName;
14use reqwest::{Body, IntoUrl, Method, RequestBuilder, StatusCode, Url, header, multipart};
15use serde::{Deserialize, Serialize};
16
17use crate::backend::common::{
18    self, Backend, DeleteResponse, GetResponse, MetadataResponse, MultipartUploadBackend,
19    PutResponse,
20};
21use crate::error::{Error, Result};
22use crate::gcp_auth::PrefetchingTokenProvider;
23use crate::id::ObjectId;
24use crate::multipart::{
25    AbortMultipartResponse, CompleteMultipartResponse, CompletedPart, InitiateMultipartResponse,
26    ListPartsResponse, PartNumber, UploadId, UploadPartResponse,
27};
28use crate::stream::{self, ClientStream};
29
30/// Configuration for [`GcsBackend`].
31///
32/// Stores objects in [Google Cloud Storage]. Authentication uses Application Default Credentials
33/// (ADC), which can be provided via the `GOOGLE_APPLICATION_CREDENTIALS` environment variable or
34/// the GCE/GKE metadata service.
35///
36/// **Note**: The bucket must be pre-created with the following lifecycle policy:
37/// - `daysSinceCustomTime`: 1 day
38/// - `action`: delete
39///
40/// [Google Cloud Storage]: https://cloud.google.com/storage
41///
42/// # Example
43///
44/// ```yaml
45/// storage:
46///   type: gcs
47///   bucket: objectstore-bucket
48/// ```
49#[derive(Debug, Clone, Deserialize, Serialize)]
50pub struct GcsConfig {
51    /// Optional custom GCS endpoint URL.
52    ///
53    /// Useful for testing with emulators. If `None`, uses the default GCS endpoint.
54    ///
55    /// # Default
56    ///
57    /// `None` (uses default GCS endpoint)
58    ///
59    /// # Environment Variables
60    ///
61    /// - `OS__STORAGE__TYPE=gcs`
62    /// - `OS__STORAGE__ENDPOINT=http://localhost:9000` (optional)
63    pub endpoint: Option<String>,
64
65    /// GCS bucket name.
66    ///
67    /// The bucket must exist before starting the server.
68    ///
69    /// # Environment Variables
70    ///
71    /// - `OS__STORAGE__BUCKET=my-gcs-bucket`
72    pub bucket: String,
73}
74
75/// Default endpoint used to access the GCS JSON API.
76const DEFAULT_ENDPOINT: &str = "https://storage.googleapis.com";
77/// Permission scopes required for accessing GCS.
78const TOKEN_SCOPES: &[&str] = &["https://www.googleapis.com/auth/devstorage.read_write"];
79/// Time to debounce bumping an object with configured TTI.
80const TTI_DEBOUNCE: Duration = Duration::from_secs(24 * 3600); // 1 day
81/// How many times to retry failed operations.
82const REQUEST_RETRY_COUNT: usize = 2;
83
84/// Prefix for our built-in metadata stored in GCS metadata field
85const BUILTIN_META_PREFIX: &str = "x-sn-";
86/// Prefix for user custom metadata stored in GCS metadata field
87const CUSTOM_META_PREFIX: &str = "x-snme-";
88
89/// GCS object resource.
90///
91/// This is the representation of the object resource in GCS JSON API without its payload. Where no
92/// dedicated fields are available, we encode both built-in and custom metadata in the `metadata`
93/// field.
94#[derive(Debug, Serialize, Deserialize)]
95#[serde(rename_all = "camelCase")]
96struct GcsObject {
97    /// Content-Type of the object data. If an object is stored without a Content-Type, it is served
98    /// as application/octet-stream.
99    pub content_type: Cow<'static, str>,
100
101    /// Content encoding, used to store [`Metadata::compression`].
102    #[serde(default, skip_serializing_if = "Option::is_none")]
103    pub content_encoding: Option<String>,
104
105    /// Custom time stamp used for time-based expiration.
106    #[serde(
107        default,
108        skip_serializing_if = "Option::is_none",
109        with = "humantime_serde"
110    )]
111    pub custom_time: Option<SystemTime>,
112
113    /// The `Content-Length` of the data in bytes. GCS returns this as a string.
114    ///
115    /// GCS sets this in metadata responses. We can use it to know the size of an object
116    /// without having to stream it.
117    pub size: Option<String>,
118
119    /// Timestamp of when this object was created.
120    #[serde(
121        default,
122        skip_serializing_if = "Option::is_none",
123        with = "humantime_serde"
124    )]
125    pub time_created: Option<SystemTime>,
126
127    /// User-provided metadata, including our built-in metadata.
128    #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
129    pub metadata: BTreeMap<GcsMetaKey, String>,
130}
131
132impl GcsObject {
133    /// Converts our Metadata type to GCS JSON object metadata.
134    pub fn from_metadata(metadata: &Metadata) -> Self {
135        let mut gcs_object = GcsObject {
136            content_type: metadata.content_type.clone(),
137            size: metadata.size.map(|size| size.to_string()),
138            content_encoding: None,
139            custom_time: None,
140            time_created: metadata.time_created,
141            metadata: BTreeMap::new(),
142        };
143
144        // For time-based expiration, set the `customTime` field. The bucket must have a
145        // `daysSinceCustomTime` lifecycle rule configured to delete objects with this field set.
146        // This rule automatically skips objects without `customTime` set.
147        if let Some(expires_in) = metadata.expiration_policy.expires_in() {
148            gcs_object.custom_time = Some(SystemTime::now() + expires_in);
149        }
150
151        if let Some(compression) = metadata.compression {
152            gcs_object.content_encoding = Some(compression.to_string());
153        }
154
155        if metadata.expiration_policy != ExpirationPolicy::default() {
156            gcs_object.metadata.insert(
157                GcsMetaKey::Expiration,
158                metadata.expiration_policy.to_string(),
159            );
160        }
161
162        if let Some(origin) = &metadata.origin {
163            gcs_object
164                .metadata
165                .insert(GcsMetaKey::Origin, origin.clone());
166        }
167
168        for (key, value) in &metadata.custom {
169            gcs_object
170                .metadata
171                .insert(GcsMetaKey::Custom(key.clone()), value.clone());
172        }
173
174        gcs_object
175    }
176
177    /// Converts GCS JSON object metadata to our Metadata type.
178    pub fn into_metadata(mut self) -> Result<Metadata> {
179        // Remove ignored metadata keys that are set by the GCS emulator.
180        self.metadata.remove(&GcsMetaKey::EmulatorIgnored);
181
182        let expiration_policy = self
183            .metadata
184            .remove(&GcsMetaKey::Expiration)
185            .map(|s| s.parse())
186            .transpose()?
187            .unwrap_or_default();
188
189        let origin = self.metadata.remove(&GcsMetaKey::Origin);
190
191        let content_type = self.content_type;
192        let compression = self.content_encoding.map(|s| s.parse()).transpose()?;
193        let size = self
194            .size
195            .map(|size| size.parse())
196            .transpose()
197            .map_err(|e| Error::Generic {
198                context: "GCS: failed to parse size from object metadata".to_string(),
199                cause: Some(Box::new(e)),
200            })?;
201        let time_created = self.time_created;
202
203        // At this point, all built-in metadata should have been removed from self.metadata.
204        let mut custom = BTreeMap::new();
205        for (key, value) in self.metadata {
206            if let GcsMetaKey::Custom(custom_key) = key {
207                custom.insert(custom_key, value);
208            } else {
209                return Err(Error::Generic {
210                    context: format!(
211                        "GCS: unexpected built-in metadata key in object metadata: {}",
212                        key
213                    ),
214                    cause: None,
215                });
216            }
217        }
218
219        Ok(Metadata {
220            content_type,
221            expiration_policy,
222            compression,
223            origin,
224            size,
225            custom,
226            time_created,
227            time_expires: self.custom_time,
228        })
229    }
230}
231
232/// Key for [`GcsObject::metadata`].
233#[derive(Clone, Debug, PartialEq, Eq, Ord, PartialOrd)]
234enum GcsMetaKey {
235    /// Built-in metadata key for [`Metadata::expiration_policy`].
236    Expiration,
237    /// Built-in metadata key for [`Metadata::origin`].
238    Origin,
239    /// Ignored metadata set by the GCS emulator.
240    EmulatorIgnored,
241    /// User-defined custom metadata key.
242    Custom(String),
243}
244
245impl std::str::FromStr for GcsMetaKey {
246    type Err = anyhow::Error;
247
248    fn from_str(s: &str) -> Result<Self, Self::Err> {
249        if matches!(s, "x_emulator_upload" | "x_testbench_upload") {
250            return Ok(GcsMetaKey::EmulatorIgnored);
251        }
252
253        Ok(match s.strip_prefix(BUILTIN_META_PREFIX) {
254            Some("expiration") => GcsMetaKey::Expiration,
255            Some("origin") => GcsMetaKey::Origin,
256            Some(unknown) => anyhow::bail!("unknown builtin metadata key: {unknown}"),
257            None => match s.strip_prefix(CUSTOM_META_PREFIX) {
258                Some(key) => GcsMetaKey::Custom(key.to_string()),
259                None => anyhow::bail!("invalid GCS metadata key format: {s}"),
260            },
261        })
262    }
263}
264
265impl fmt::Display for GcsMetaKey {
266    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
267        match self {
268            Self::Expiration => write!(f, "{BUILTIN_META_PREFIX}expiration"),
269            Self::Origin => write!(f, "{BUILTIN_META_PREFIX}origin"),
270            Self::EmulatorIgnored => unreachable!("do not serialize emulator metadata"),
271            Self::Custom(key) => write!(f, "{CUSTOM_META_PREFIX}{key}"),
272        }
273    }
274}
275
276impl<'de> serde::Deserialize<'de> for GcsMetaKey {
277    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
278    where
279        D: serde::Deserializer<'de>,
280    {
281        let s = Cow::<'de, str>::deserialize(deserializer)?;
282        s.parse().map_err(serde::de::Error::custom)
283    }
284}
285
286impl serde::Serialize for GcsMetaKey {
287    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
288    where
289        S: serde::Serializer,
290    {
291        serializer.collect_str(self)
292    }
293}
294
295/// Builds HTTP headers that encode `metadata` for a GCS XML API request.
296fn metadata_to_gcs_headers(metadata: &Metadata) -> Result<header::HeaderMap> {
297    let mut headers = header::HeaderMap::new();
298
299    if let Some(expires_in) = metadata.expiration_policy.expires_in() {
300        let custom_time = SystemTime::now() + expires_in;
301        let formatted = humantime::format_rfc3339_seconds(custom_time);
302        headers.insert(
303            HeaderName::from_static("x-goog-custom-time"),
304            formatted.to_string().parse().map_err(|e| Error::Generic {
305                context: "GCS: invalid custom-time header value".into(),
306                cause: Some(Box::new(e)),
307            })?,
308        );
309    }
310
311    if let Some(compression) = metadata.compression {
312        headers.insert(
313            header::CONTENT_ENCODING,
314            compression
315                .to_string()
316                .parse()
317                .map_err(|e| Error::Generic {
318                    context: "GCS: invalid content-encoding header value".into(),
319                    cause: Some(Box::new(e)),
320                })?,
321        );
322    }
323
324    if metadata.expiration_policy != ExpirationPolicy::default() {
325        insert_gcs_meta_header(
326            &mut headers,
327            &GcsMetaKey::Expiration,
328            &metadata.expiration_policy.to_string(),
329        )?;
330    }
331
332    if let Some(origin) = &metadata.origin {
333        insert_gcs_meta_header(&mut headers, &GcsMetaKey::Origin, origin)?;
334    }
335
336    for (key, value) in &metadata.custom {
337        insert_gcs_meta_header(&mut headers, &GcsMetaKey::Custom(key.clone()), value)?;
338    }
339
340    Ok(headers)
341}
342
343fn insert_gcs_meta_header(
344    headers: &mut header::HeaderMap,
345    key: &GcsMetaKey,
346    value: &str,
347) -> Result<()> {
348    let header_name = format!("x-goog-meta-{key}");
349    headers.insert(
350        HeaderName::try_from(&header_name).map_err(|e| Error::Generic {
351            context: format!("GCS: invalid header name: {header_name}"),
352            cause: Some(Box::new(e)),
353        })?,
354        value.parse().map_err(|e| Error::Generic {
355            context: format!("GCS: invalid header value for {header_name}"),
356            cause: Some(Box::new(e)),
357        })?,
358    );
359    Ok(())
360}
361
362/// Returns `true` if the error is a transient reqwest failure worth retrying.
363fn is_retryable(error: &Error) -> bool {
364    let Error::Reqwest { cause, .. } = error else {
365        return false;
366    };
367    if cause.is_timeout() || cause.is_connect() || cause.is_request() {
368        return true;
369    }
370    let Some(status) = cause.status() else {
371        return false;
372    };
373    // https://docs.cloud.google.com/storage/docs/json_api/v1/status-codes
374    matches!(
375        status,
376        StatusCode::REQUEST_TIMEOUT
377            | StatusCode::TOO_MANY_REQUESTS
378            | StatusCode::INTERNAL_SERVER_ERROR
379            | StatusCode::BAD_GATEWAY
380            | StatusCode::SERVICE_UNAVAILABLE
381            | StatusCode::GATEWAY_TIMEOUT
382    )
383}
384
385/// GCS JSON API backend for long-term storage of large objects.
386pub struct GcsBackend {
387    client: reqwest::Client,
388    endpoint: Url,
389    bucket: String,
390    token_provider: Option<PrefetchingTokenProvider>,
391}
392
393impl GcsBackend {
394    /// Creates an authenticated GCS JSON API backend bound to the bucket in `config`.
395    pub async fn new(config: GcsConfig) -> anyhow::Result<Self> {
396        let GcsConfig { endpoint, bucket } = config;
397
398        let token_provider = if endpoint.is_none() {
399            Some(PrefetchingTokenProvider::gcp_auth(TOKEN_SCOPES).await?)
400        } else {
401            None
402        };
403
404        let endpoint_str = endpoint.as_deref().unwrap_or(DEFAULT_ENDPOINT);
405
406        Ok(Self {
407            client: common::reqwest_client(),
408            endpoint: endpoint_str.parse().context("invalid GCS endpoint URL")?,
409            bucket,
410            token_provider,
411        })
412    }
413
414    /// Formats the GCS object (metadata) URL for the given key.
415    fn object_url(&self, id: &ObjectId) -> Result<Url> {
416        let mut url = self.endpoint.clone();
417
418        let path = id.as_storage_path().to_string();
419        url.path_segments_mut()
420            .map_err(|()| Error::Generic {
421                context: format!(
422                    "GCS: invalid endpoint URL, {} cannot be a base",
423                    self.endpoint
424                ),
425                cause: None,
426            })?
427            .extend(&["storage", "v1", "b", &self.bucket, "o", &path]);
428
429        Ok(url)
430    }
431
432    /// Formats the GCS upload URL for the given upload type.
433    fn upload_url(&self, id: &ObjectId, upload_type: &str) -> Result<Url> {
434        let mut url = self.endpoint.clone();
435
436        url.path_segments_mut()
437            .map_err(|()| Error::Generic {
438                context: format!(
439                    "GCS: invalid endpoint URL, {} cannot be a base",
440                    self.endpoint
441                ),
442                cause: None,
443            })?
444            .extend(&["upload", "storage", "v1", "b", &self.bucket, "o"]);
445
446        url.query_pairs_mut()
447            .append_pair("uploadType", upload_type)
448            .append_pair("name", &id.as_storage_path().to_string());
449
450        Ok(url)
451    }
452
453    /// Formats a GCS XML API URL for the given object.
454    ///
455    /// Unlike [`object_url`](Self::object_url) (JSON API at
456    /// `/storage/v1/b/{bucket}/o/{name}`), this produces
457    /// `/{bucket}/{path_segments}` for the S3-compatible XML API used by
458    /// multipart uploads.
459    fn xml_object_url(&self, id: &ObjectId) -> Result<Url> {
460        let mut url = self.endpoint.clone();
461        {
462            let mut segments = url.path_segments_mut().map_err(|()| Error::Generic {
463                context: format!(
464                    "GCS: invalid endpoint URL, {} cannot be a base",
465                    self.endpoint
466                ),
467                cause: None,
468            })?;
469            segments.push(&self.bucket);
470            for part in id.as_storage_path().to_string().split('/') {
471                segments.push(part);
472            }
473        }
474        Ok(url)
475    }
476
477    /// Creates a request builder with the appropriate authentication.
478    async fn request(&self, method: Method, url: impl IntoUrl) -> Result<RequestBuilder> {
479        let mut builder = self.client.request(method, url);
480        if let Some(provider) = &self.token_provider {
481            let token = provider.token(TOKEN_SCOPES).await?;
482            builder = builder.bearer_auth(token.as_str());
483        }
484        Ok(builder)
485    }
486
487    /// Retries a GCS request on transient errors.
488    async fn with_retry<T, F>(&self, action: &'static str, f: impl Fn() -> F) -> Result<T>
489    where
490        F: Future<Output = Result<T>> + Send,
491    {
492        let mut retry_count = 0usize;
493        loop {
494            match f().await {
495                Ok(res) => return Ok(res),
496                Err(ref e) if retry_count < REQUEST_RETRY_COUNT && is_retryable(e) => {
497                    retry_count += 1;
498                    objectstore_metrics::count!("gcs.retries", action = action);
499                    objectstore_log::warn!(!!e, retry_count, action, "Retrying request");
500                }
501                Err(e) => {
502                    objectstore_metrics::count!("gcs.failures", action = action);
503                    return Err(e);
504                }
505            }
506        }
507    }
508
509    /// Fetches the GCS object metadata (without the payload), bumps TTI if
510    /// needed, and returns the parsed [`Metadata`].
511    async fn fetch_gcs_metadata(&self, object_url: &Url) -> Result<Option<Metadata>> {
512        let metadata_opt = self
513            .with_retry("get_metadata", || async {
514                let resp = self
515                    .request(Method::GET, object_url.clone())
516                    .await?
517                    .send()
518                    .await
519                    .map_err(|e| Error::reqwest("GCS: get metadata request", e))?;
520
521                if resp.status() == StatusCode::NOT_FOUND {
522                    return Ok(None);
523                }
524
525                let metadata: GcsObject = resp
526                    .error_for_status()
527                    .map_err(|e| Error::reqwest("GCS: get metadata status", e))?
528                    .json()
529                    .await
530                    .map_err(|e| Error::reqwest("GCS: get metadata parse", e))?;
531
532                Ok(Some(metadata))
533            })
534            .await?;
535
536        let Some(gcs_metadata) = metadata_opt else {
537            objectstore_log::debug!("Object not found");
538            return Ok(None);
539        };
540
541        let expire_at = gcs_metadata.custom_time;
542        let metadata = gcs_metadata.into_metadata()?;
543
544        // TODO: Inject the access time from the request.
545        let access_time = SystemTime::now();
546
547        // Filter already expired objects but leave them to garbage collection
548        if metadata.expiration_policy.is_timeout() && expire_at.is_some_and(|ts| ts < access_time) {
549            objectstore_log::debug!("Object found but past expiry");
550            return Ok(None);
551        }
552
553        // TODO: Schedule into background persistently so this doesn't get lost on restarts
554        if let ExpirationPolicy::TimeToIdle(tti) = metadata.expiration_policy {
555            let new_expire_at = access_time + tti;
556            if expire_at.is_some_and(|ts| ts < new_expire_at - TTI_DEBOUNCE) {
557                self.update_custom_time(object_url.clone(), new_expire_at)
558                    .await?;
559            }
560        }
561
562        Ok(Some(metadata))
563    }
564
565    async fn update_custom_time(&self, object_url: Url, custom_time: SystemTime) -> Result<()> {
566        #[derive(Debug, Serialize)]
567        #[serde(rename_all = "camelCase")]
568        struct CustomTimeRequest {
569            #[serde(with = "humantime_serde")]
570            custom_time: SystemTime,
571        }
572
573        self.with_retry("update_custom_time", || async {
574            self.request(Method::PATCH, object_url.clone())
575                .await?
576                .json(&CustomTimeRequest { custom_time })
577                .send()
578                .await
579                .and_then(|r| r.error_for_status())
580                .map_err(|e| Error::reqwest("GCS: update custom time", e))?;
581            Ok(())
582        })
583        .await
584    }
585}
586
587impl fmt::Debug for GcsBackend {
588    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
589        f.debug_struct("GcsJsonApi")
590            .field("endpoint", &self.endpoint)
591            .field("bucket", &self.bucket)
592            .finish_non_exhaustive()
593    }
594}
595
596#[async_trait::async_trait]
597impl Backend for GcsBackend {
598    fn name(&self) -> &'static str {
599        "gcs"
600    }
601
602    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
603    async fn put_object(
604        &self,
605        id: &ObjectId,
606        metadata: &Metadata,
607        stream: ClientStream,
608    ) -> Result<PutResponse> {
609        objectstore_log::debug!("Writing to GCS backend");
610        let gcs_metadata = GcsObject::from_metadata(metadata);
611
612        // NB: Ensure the order of these fields and that a content-type is attached to them. Both
613        // are required by the GCS API.
614        let metadata_json = serde_json::to_string(&gcs_metadata).map_err(|cause| Error::Serde {
615            context: "failed to serialize metadata for GCS upload".to_string(),
616            cause,
617        })?;
618
619        let multipart = multipart::Form::new()
620            .part(
621                "metadata",
622                multipart::Part::text(metadata_json)
623                    .mime_str("application/json")
624                    .expect("application/json is a valid mime type"),
625            )
626            .part(
627                "media",
628                multipart::Part::stream(Body::wrap_stream(stream))
629                    .mime_str(&metadata.content_type)
630                    .map_err(|e| Error::Generic {
631                        context: format!("invalid mime type: {}", &metadata.content_type),
632                        cause: Some(Box::new(e)),
633                    })?,
634            );
635
636        // GCS requires a multipart/related request. Its body looks identical to
637        // multipart/form-data, but the Content-Type header is different. Hence, we have to manually
638        // set the header *after* writing the multipart form into the request.
639        let content_type = format!("multipart/related; boundary={}", multipart.boundary());
640
641        self.request(Method::POST, self.upload_url(id, "multipart")?)
642            .await?
643            .multipart(multipart)
644            .header(header::CONTENT_TYPE, content_type)
645            .send()
646            .await
647            .and_then(|r| r.error_for_status())
648            .map_err(|e| match stream::unpack_client_error(&e) {
649                Some(ce) => Error::Client(ce),
650                _ => Error::reqwest("GCS: upload object", e),
651            })?;
652
653        Ok(())
654    }
655
656    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
657    async fn get_object(&self, id: &ObjectId) -> Result<GetResponse> {
658        objectstore_log::debug!("Reading from GCS backend");
659        let object_url = self.object_url(id)?;
660
661        let Some(metadata) = self.fetch_gcs_metadata(&object_url).await? else {
662            return Ok(None);
663        };
664
665        let mut download_url = object_url;
666        download_url.query_pairs_mut().append_pair("alt", "media");
667
668        let payload_response = self
669            .with_retry("get_payload", || async {
670                self.request(Method::GET, download_url.clone())
671                    .await?
672                    .send()
673                    .await
674                    .and_then(|r| r.error_for_status())
675                    .map_err(|e| Error::reqwest("GCS: get payload", e))
676            })
677            .await?;
678
679        let stream = payload_response
680            .bytes_stream()
681            .map_err(io::Error::other)
682            .boxed();
683
684        Ok(Some((metadata, stream)))
685    }
686
687    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
688    async fn get_metadata(&self, id: &ObjectId) -> Result<MetadataResponse> {
689        objectstore_log::debug!("Reading metadata from GCS backend");
690        let object_url = self.object_url(id)?;
691        self.fetch_gcs_metadata(&object_url).await
692    }
693
694    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
695    async fn delete_object(&self, id: &ObjectId) -> Result<DeleteResponse> {
696        objectstore_log::debug!("Deleting from GCS backend");
697        let object_url = self.object_url(id)?;
698
699        self.with_retry("delete", || async {
700            let resp = self
701                .request(Method::DELETE, object_url.clone())
702                .await?
703                .send()
704                .await
705                .map_err(|e| Error::reqwest("GCS: delete object", e))?;
706
707            // Do not error for objects that do not exist
708            if resp.status() == StatusCode::NOT_FOUND {
709                return Ok(());
710            }
711
712            resp.error_for_status()
713                .map_err(|e| Error::reqwest("GCS: delete object", e))?;
714
715            Ok(())
716        })
717        .await
718    }
719}
720
721#[derive(Debug, Deserialize)]
722#[serde(rename_all = "PascalCase")]
723struct XmlInitiateMultipartUploadResponse {
724    upload_id: String,
725}
726
727impl From<XmlInitiateMultipartUploadResponse> for InitiateMultipartResponse {
728    fn from(r: XmlInitiateMultipartUploadResponse) -> Self {
729        r.upload_id
730    }
731}
732
733#[derive(Debug, Deserialize)]
734#[serde(rename_all = "PascalCase")]
735struct XmlListPartsResponse {
736    #[serde(default)]
737    is_truncated: bool,
738    next_part_number_marker: Option<u32>,
739    #[serde(default, rename = "Part")]
740    parts: Vec<XmlPart>,
741}
742
743impl From<XmlListPartsResponse> for ListPartsResponse {
744    fn from(xml: XmlListPartsResponse) -> Self {
745        Self {
746            parts: xml.parts.into_iter().map(Into::into).collect(),
747            is_truncated: xml.is_truncated,
748            next_part_number_marker: xml.next_part_number_marker,
749        }
750    }
751}
752
753#[derive(Debug, Deserialize)]
754#[serde(rename_all = "PascalCase")]
755struct XmlPart {
756    part_number: u32,
757    #[serde(rename = "ETag")]
758    e_tag: String,
759    #[serde(with = "humantime_serde")]
760    last_modified: SystemTime,
761    size: u64,
762}
763
764impl From<XmlPart> for crate::multipart::Part {
765    fn from(p: XmlPart) -> Self {
766        Self {
767            part_number: p.part_number,
768            etag: p.e_tag,
769            last_modified: p.last_modified,
770            size: p.size,
771        }
772    }
773}
774
775#[derive(Debug, Serialize)]
776#[serde(rename = "CompleteMultipartUpload")]
777struct XmlCompleteMultipartUpload {
778    #[serde(rename = "Part")]
779    parts: Vec<XmlCompletePart>,
780}
781
782impl From<Vec<CompletedPart>> for XmlCompleteMultipartUpload {
783    fn from(parts: Vec<CompletedPart>) -> Self {
784        Self {
785            parts: parts.into_iter().map(Into::into).collect(),
786        }
787    }
788}
789
790#[derive(Debug, Serialize)]
791#[serde(rename_all = "PascalCase")]
792struct XmlCompletePart {
793    part_number: PartNumber,
794    #[serde(rename = "ETag")]
795    e_tag: String,
796}
797
798impl From<CompletedPart> for XmlCompletePart {
799    fn from(p: CompletedPart) -> Self {
800        Self {
801            part_number: p.part_number,
802            e_tag: p.etag,
803        }
804    }
805}
806
807#[derive(Debug, Deserialize)]
808#[serde(rename = "Error", rename_all = "PascalCase")]
809struct XmlError {
810    code: String,
811    message: String,
812}
813
814impl From<XmlError> for crate::multipart::CompleteMultipartError {
815    fn from(e: XmlError) -> Self {
816        Self {
817            code: e.code,
818            message: e.message,
819        }
820    }
821}
822
823/// XXX: Any change that affects this implementation should be manually tested against real GCS.
824/// That's because the fork of [storage-testbench](https://github.com/googleapis/storage-testbench)
825/// that we test against has an incomplete implementation of the XML multipart API that likely doesn't match GCS's behavior in many cases.
826#[async_trait::async_trait]
827impl MultipartUploadBackend for GcsBackend {
828    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
829    async fn initiate_multipart(
830        &self,
831        id: &ObjectId,
832        metadata: &Metadata,
833    ) -> Result<InitiateMultipartResponse> {
834        objectstore_log::debug!("Initiating multipart upload on GCS backend");
835        let mut url = self.xml_object_url(id)?;
836        url.set_query(Some("uploads"));
837
838        let mut builder = self
839            .request(Method::POST, url)
840            .await?
841            .header(header::CONTENT_TYPE, metadata.content_type.as_ref())
842            .header(header::CONTENT_LENGTH, "0");
843
844        let meta_headers = metadata_to_gcs_headers(metadata)?;
845        for (name, value) in &meta_headers {
846            builder = builder.header(name, value);
847        }
848
849        let resp = builder
850            .send()
851            .await
852            .and_then(|r| r.error_for_status())
853            .map_err(|e| Error::reqwest("GCS: initiate multipart upload", e))?;
854
855        let body = resp
856            .bytes()
857            .await
858            .map_err(|e| Error::reqwest("GCS: read initiate multipart body", e))?;
859
860        let xml: XmlInitiateMultipartUploadResponse = quick_xml::de::from_reader(body.as_ref())
861            .map_err(|e| Error::Generic {
862                context: "GCS: failed to parse initiate multipart response".to_owned(),
863                cause: Some(Box::new(e)),
864            })?;
865
866        Ok(xml.into())
867    }
868
869    #[tracing::instrument(level = "trace", fields(?id, upload_id, part_number), skip_all)]
870    async fn upload_part(
871        &self,
872        id: &ObjectId,
873        upload_id: &UploadId,
874        part_number: PartNumber,
875        content_length: u64,
876        content_md5: Option<&str>,
877        body: ClientStream,
878    ) -> Result<UploadPartResponse> {
879        objectstore_log::debug!("Uploading part to GCS backend");
880        let mut url = self.xml_object_url(id)?;
881        url.query_pairs_mut()
882            .append_pair("partNumber", &part_number.to_string())
883            .append_pair("uploadId", upload_id);
884
885        let mut builder = self
886            .request(Method::PUT, url)
887            .await?
888            .header(header::CONTENT_LENGTH, content_length)
889            .body(Body::wrap_stream(body));
890
891        if let Some(md5) = content_md5 {
892            builder = builder.header("content-md5", md5);
893        }
894
895        let resp = builder
896            .send()
897            .await
898            .and_then(|r| r.error_for_status())
899            .map_err(|e| Error::reqwest("GCS: upload part", e))?;
900
901        let etag = resp
902            .headers()
903            .get(header::ETAG)
904            .and_then(|v| v.to_str().ok())
905            .map(|s| s.to_owned())
906            .ok_or_else(|| Error::generic("GCS: upload part response missing ETag header"))?;
907
908        Ok(etag)
909    }
910
911    #[tracing::instrument(level = "trace", fields(?id, upload_id), skip_all)]
912    async fn list_parts(
913        &self,
914        id: &ObjectId,
915        upload_id: &UploadId,
916        max_parts: Option<u32>,
917        part_number_marker: Option<PartNumber>,
918    ) -> Result<ListPartsResponse> {
919        objectstore_log::debug!("Listing parts on GCS backend");
920        let mut url = self.xml_object_url(id)?;
921        {
922            let mut pairs = url.query_pairs_mut();
923            pairs.append_pair("uploadId", upload_id);
924            if let Some(max) = max_parts {
925                pairs.append_pair("max-parts", &max.to_string());
926            }
927            if let Some(marker) = part_number_marker {
928                pairs.append_pair("part-number-marker", &marker.to_string());
929            }
930        }
931
932        let resp = self
933            .request(Method::GET, url)
934            .await?
935            .send()
936            .await
937            .and_then(|r| r.error_for_status())
938            .map_err(|e| Error::reqwest("GCS: list parts", e))?;
939
940        let body = resp
941            .bytes()
942            .await
943            .map_err(|e| Error::reqwest("GCS: read list parts body", e))?;
944
945        let xml: XmlListPartsResponse =
946            quick_xml::de::from_reader(body.as_ref()).map_err(|e| Error::Generic {
947                context: "GCS: failed to parse list parts response".to_owned(),
948                cause: Some(Box::new(e)),
949            })?;
950
951        Ok(xml.into())
952    }
953
954    #[tracing::instrument(level = "trace", fields(?id, upload_id), skip_all)]
955    async fn abort_multipart(
956        &self,
957        id: &ObjectId,
958        upload_id: &UploadId,
959    ) -> Result<AbortMultipartResponse> {
960        objectstore_log::debug!("Aborting multipart upload on GCS backend");
961        let mut url = self.xml_object_url(id)?;
962        url.query_pairs_mut().append_pair("uploadId", upload_id);
963
964        self.request(Method::DELETE, url)
965            .await?
966            .send()
967            .await
968            .and_then(|r| r.error_for_status())
969            .map_err(|e| Error::reqwest("GCS: abort multipart upload", e))?;
970
971        Ok(())
972    }
973
974    #[tracing::instrument(level = "trace", fields(?id, upload_id), skip_all)]
975    async fn complete_multipart(
976        &self,
977        id: &ObjectId,
978        upload_id: &UploadId,
979        parts: Vec<CompletedPart>,
980    ) -> Result<CompleteMultipartResponse> {
981        objectstore_log::debug!("Completing multipart upload on GCS backend");
982        let mut url = self.xml_object_url(id)?;
983        url.query_pairs_mut().append_pair("uploadId", upload_id);
984
985        let body = XmlCompleteMultipartUpload::from(parts);
986        let xml = quick_xml::se::to_string(&body).map_err(|e| Error::Generic {
987            context: "GCS: failed to serialize complete multipart request".into(),
988            cause: Some(Box::new(e)),
989        })?;
990
991        let resp = self
992            .request(Method::POST, url)
993            .await?
994            .header(header::CONTENT_TYPE, "application/xml")
995            .body(xml)
996            .send()
997            .await
998            .and_then(|r| r.error_for_status())
999            .map_err(|e| Error::reqwest("GCS: complete multipart upload", e))?;
1000
1001        let body = resp
1002            .bytes()
1003            .await
1004            .map_err(|e| Error::reqwest("GCS: read complete multipart body", e))?;
1005
1006        let error = quick_xml::de::from_reader::<_, XmlError>(body.as_ref())
1007            .ok()
1008            .map(Into::into);
1009
1010        Ok(error)
1011    }
1012}
1013
1014#[cfg(test)]
1015mod tests {
1016    use std::collections::BTreeMap;
1017
1018    use anyhow::Result;
1019    use objectstore_types::scope::{Scope, Scopes};
1020
1021    use super::*;
1022    use crate::id::ObjectContext;
1023    use crate::multipart::CompletedPart;
1024    use crate::stream;
1025
1026    // NB: Not run any of these tests, you need to have a GCS emulator running. This is done
1027    // automatically in CI.
1028    //
1029    // Refer to the readme for how to set up the emulator.
1030
1031    async fn create_test_backend() -> Result<GcsBackend> {
1032        GcsBackend::new(GcsConfig {
1033            endpoint: Some("http://localhost:8087".into()),
1034            bucket: "test-bucket".into(),
1035        })
1036        .await
1037    }
1038
1039    fn make_id() -> ObjectId {
1040        ObjectId::random(ObjectContext {
1041            usecase: "testing".into(),
1042            scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
1043        })
1044    }
1045
1046    #[tokio::test]
1047    async fn test_roundtrip() -> Result<()> {
1048        let backend = create_test_backend().await?;
1049
1050        let id = make_id();
1051        let metadata = Metadata {
1052            content_type: "text/plain".into(),
1053            expiration_policy: ExpirationPolicy::Manual,
1054            compression: None,
1055            origin: Some("203.0.113.42".into()),
1056            custom: BTreeMap::from_iter([("hello".into(), "world".into())]),
1057            time_created: Some(SystemTime::now()),
1058            time_expires: None,
1059            size: None,
1060        };
1061
1062        backend
1063            .put_object(&id, &metadata, stream::single("hello, world"))
1064            .await?;
1065
1066        let (meta, stream) = backend.get_object(&id).await?.unwrap();
1067
1068        let payload = stream::read_to_vec(stream).await?;
1069        let str_payload = str::from_utf8(&payload).unwrap();
1070        assert_eq!(str_payload, "hello, world");
1071        assert_eq!(meta.content_type, metadata.content_type);
1072        assert_eq!(meta.origin, metadata.origin);
1073        assert_eq!(meta.custom, metadata.custom);
1074        assert!(metadata.time_created.is_some());
1075
1076        Ok(())
1077    }
1078
1079    #[tokio::test]
1080    async fn test_get_nonexistent() -> Result<()> {
1081        let backend = create_test_backend().await?;
1082
1083        let id = make_id();
1084        let result = backend.get_object(&id).await?;
1085        assert!(result.is_none());
1086
1087        Ok(())
1088    }
1089
1090    #[tokio::test]
1091    async fn test_delete_nonexistent() -> Result<()> {
1092        let backend = create_test_backend().await?;
1093
1094        let id = make_id();
1095        backend.delete_object(&id).await?;
1096
1097        Ok(())
1098    }
1099
1100    #[tokio::test]
1101    async fn test_overwrite() -> Result<()> {
1102        let backend = create_test_backend().await?;
1103
1104        let id = make_id();
1105        let metadata = Metadata {
1106            custom: BTreeMap::from_iter([("invalid".into(), "invalid".into())]),
1107            ..Default::default()
1108        };
1109
1110        backend
1111            .put_object(&id, &metadata, stream::single("hello"))
1112            .await?;
1113
1114        let metadata = Metadata {
1115            custom: BTreeMap::from_iter([("hello".into(), "world".into())]),
1116            ..Default::default()
1117        };
1118
1119        backend
1120            .put_object(&id, &metadata, stream::single("world"))
1121            .await?;
1122
1123        let (meta, stream) = backend.get_object(&id).await?.unwrap();
1124
1125        let payload = stream::read_to_vec(stream).await?;
1126        let str_payload = str::from_utf8(&payload).unwrap();
1127        assert_eq!(str_payload, "world");
1128        assert_eq!(meta.custom, metadata.custom);
1129
1130        Ok(())
1131    }
1132
1133    #[tokio::test]
1134    async fn test_read_after_delete() -> Result<()> {
1135        let backend = create_test_backend().await?;
1136
1137        let id = make_id();
1138        let metadata = Metadata::default();
1139
1140        backend
1141            .put_object(&id, &metadata, stream::single("hello, world"))
1142            .await?;
1143
1144        backend.delete_object(&id).await?;
1145
1146        let result = backend.get_object(&id).await?;
1147        assert!(result.is_none());
1148
1149        Ok(())
1150    }
1151
1152    #[tokio::test]
1153    async fn test_ttl_immediate() -> Result<()> {
1154        // NB: We create a TTL that immediately expires in this tests. This might be optimized away
1155        // in a future implementation, so we will have to update this test accordingly.
1156
1157        let backend = create_test_backend().await?;
1158
1159        let id = make_id();
1160        let metadata = Metadata {
1161            expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_secs(0)),
1162            ..Default::default()
1163        };
1164
1165        backend
1166            .put_object(&id, &metadata, stream::single("hello, world"))
1167            .await?;
1168
1169        let result = backend.get_object(&id).await?;
1170        assert!(result.is_none());
1171
1172        Ok(())
1173    }
1174
1175    #[tokio::test]
1176    async fn test_tti_immediate() -> Result<()> {
1177        // NB: We create a TTI that immediately expires in this tests. This might be optimized away
1178        // in a future implementation, so we will have to update this test accordingly.
1179
1180        let backend = create_test_backend().await?;
1181
1182        let id = make_id();
1183        let metadata = Metadata {
1184            expiration_policy: ExpirationPolicy::TimeToIdle(Duration::from_secs(0)),
1185            ..Default::default()
1186        };
1187
1188        backend
1189            .put_object(&id, &metadata, stream::single("hello, world"))
1190            .await?;
1191
1192        let result = backend.get_object(&id).await?;
1193        assert!(result.is_none());
1194
1195        Ok(())
1196    }
1197
1198    #[tokio::test]
1199    async fn test_get_metadata_returns_metadata() -> Result<()> {
1200        let backend = create_test_backend().await?;
1201
1202        let id = make_id();
1203        let metadata = Metadata {
1204            content_type: "text/plain".into(),
1205            origin: Some("203.0.113.42".into()),
1206            custom: BTreeMap::from_iter([("hello".into(), "world".into())]),
1207            ..Default::default()
1208        };
1209
1210        backend
1211            .put_object(&id, &metadata, stream::single("hello, world"))
1212            .await?;
1213
1214        let meta = backend.get_metadata(&id).await?.unwrap();
1215        assert_eq!(meta.content_type, metadata.content_type);
1216        assert_eq!(meta.origin, metadata.origin);
1217        assert_eq!(meta.custom, metadata.custom);
1218
1219        Ok(())
1220    }
1221
1222    #[tokio::test]
1223    async fn test_get_metadata_nonexistent() -> Result<()> {
1224        let backend = create_test_backend().await?;
1225
1226        let id = make_id();
1227        let result = backend.get_metadata(&id).await?;
1228        assert!(result.is_none());
1229
1230        Ok(())
1231    }
1232
1233    #[tokio::test]
1234    async fn test_get_metadata_bumps_tti() -> Result<()> {
1235        let backend = create_test_backend().await?;
1236
1237        let id = make_id();
1238        // TTI must exceed TTI_DEBOUNCE (1 day) for the bump condition to be reachable.
1239        let tti = Duration::from_secs(2 * 24 * 3600); // 2 days
1240        let metadata = Metadata {
1241            content_type: "text/plain".into(),
1242            expiration_policy: ExpirationPolicy::TimeToIdle(tti),
1243            ..Default::default()
1244        };
1245
1246        backend
1247            .put_object(&id, &metadata, stream::single("hello, world"))
1248            .await?;
1249
1250        // Manually set custom_time to just inside the bump window.
1251        // The bump condition is: expire_at < now + tti - TTI_DEBOUNCE.
1252        let object_url = backend.object_url(&id)?;
1253        let old_deadline = SystemTime::now() + tti - TTI_DEBOUNCE - Duration::from_secs(60);
1254        backend.update_custom_time(object_url, old_deadline).await?;
1255
1256        // First get_metadata sees the old timestamp and triggers a TTI bump.
1257        let pre_meta = backend.get_metadata(&id).await?.unwrap();
1258        let pre_expiry = pre_meta.time_expires.unwrap();
1259
1260        // Second get_metadata sees the bumped timestamp.
1261        let post_meta = backend.get_metadata(&id).await?.unwrap();
1262        let post_expiry = post_meta.time_expires.unwrap();
1263        assert!(
1264            post_expiry > pre_expiry,
1265            "TTI bump should have extended the expiry: {pre_expiry:?} -> {post_expiry:?}"
1266        );
1267
1268        // Verify the payload is still intact after the bump.
1269        let (_, stream) = backend.get_object(&id).await?.unwrap();
1270        let payload = stream::read_to_vec(stream).await?;
1271        assert_eq!(&payload, b"hello, world");
1272
1273        Ok(())
1274    }
1275
1276    #[tokio::test]
1277    async fn test_get_metadata_does_not_bump_fresh_tti() -> Result<()> {
1278        let backend = create_test_backend().await?;
1279
1280        let id = make_id();
1281        // TTI must exceed TTI_DEBOUNCE (1 day) for the bump condition to be reachable.
1282        let tti = Duration::from_secs(2 * 24 * 3600); // 2 days
1283        let metadata = Metadata {
1284            content_type: "text/plain".into(),
1285            expiration_policy: ExpirationPolicy::TimeToIdle(tti),
1286            ..Default::default()
1287        };
1288
1289        backend
1290            .put_object(&id, &metadata, stream::single("hello, world"))
1291            .await?;
1292
1293        // A freshly written object has time_expires ≈ now + 2d, which is well outside
1294        // the bump window (now + 2d - 1d = now + 1d). No bump should occur.
1295        let first = backend.get_metadata(&id).await?.unwrap();
1296        let first_expiry = first.time_expires.unwrap();
1297
1298        let second = backend.get_metadata(&id).await?.unwrap();
1299        let second_expiry = second.time_expires.unwrap();
1300
1301        assert_eq!(
1302            first_expiry, second_expiry,
1303            "Fresh TTI object should not have its expiry bumped"
1304        );
1305
1306        Ok(())
1307    }
1308
1309    #[tokio::test]
1310    async fn test_compressed_payload_roundtrip() -> Result<()> {
1311        use objectstore_types::metadata::Compression;
1312
1313        let backend = create_test_backend().await?;
1314
1315        let plaintext = b"hello, world (but compressed with zstd)";
1316        let compressed = zstd::encode_all(&plaintext[..], 3)?;
1317
1318        let id = make_id();
1319        let metadata = Metadata {
1320            content_type: "text/plain".into(),
1321            compression: Some(Compression::Zstd),
1322            ..Default::default()
1323        };
1324
1325        backend
1326            .put_object(&id, &metadata, stream::single(compressed.clone()))
1327            .await?;
1328
1329        let (meta, stream) = backend.get_object(&id).await?.unwrap();
1330        let payload = stream::read_to_vec(stream).await?;
1331
1332        assert_eq!(meta.compression, Some(Compression::Zstd));
1333        assert_eq!(
1334            payload, compressed,
1335            "Payload should be returned still compressed, not auto-decompressed"
1336        );
1337
1338        Ok(())
1339    }
1340
1341    #[tokio::test]
1342    async fn test_multipart_single_part() -> Result<()> {
1343        let backend = create_test_backend().await?;
1344        let id = make_id();
1345        let metadata = Metadata {
1346            content_type: "text/plain".into(),
1347            expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_mins(33)),
1348            origin: Some("203.0.113.42".into()),
1349            custom: BTreeMap::from_iter([("hello".into(), "world".into())]),
1350            ..Default::default()
1351        };
1352
1353        let upload_id = backend.initiate_multipart(&id, &metadata).await?;
1354
1355        let data = b"hello, multipart world!";
1356        let etag = backend
1357            .upload_part(
1358                &id,
1359                &upload_id,
1360                1,
1361                data.len() as u64,
1362                None,
1363                stream::single(data.to_vec()),
1364            )
1365            .await?;
1366
1367        let result = backend
1368            .complete_multipart(
1369                &id,
1370                &upload_id,
1371                vec![CompletedPart {
1372                    part_number: 1,
1373                    etag,
1374                }],
1375            )
1376            .await?;
1377        assert!(result.is_none(), "expected no error on complete");
1378
1379        let (meta, stream) = backend.get_object(&id).await?.unwrap();
1380        let payload = stream::read_to_vec(stream).await?;
1381        assert_eq!(payload, data);
1382        assert_eq!(meta.content_type, "text/plain".to_string());
1383        assert_eq!(
1384            meta.expiration_policy,
1385            ExpirationPolicy::TimeToLive(Duration::from_mins(33))
1386        );
1387        assert_eq!(meta.origin, Some("203.0.113.42".into()));
1388        assert_eq!(
1389            meta.custom,
1390            BTreeMap::from_iter([("hello".into(), "world".into())])
1391        );
1392
1393        Ok(())
1394    }
1395
1396    #[tokio::test]
1397    async fn test_multipart_multiple_parts() -> Result<()> {
1398        let backend = create_test_backend().await?;
1399        let id = make_id();
1400        let metadata = Metadata::default();
1401
1402        let upload_id = backend.initiate_multipart(&id, &metadata).await?;
1403
1404        // Non-final parts must be >= 5 MiB.
1405        const MIN_PART: usize = 5 * 1024 * 1024;
1406        let part1 = vec![b'a'; MIN_PART];
1407        let part2 = vec![b'b'; MIN_PART];
1408        let part3 = b"cccc".to_vec();
1409
1410        let etag1 = backend
1411            .upload_part(
1412                &id,
1413                &upload_id,
1414                1,
1415                part1.len() as u64,
1416                None,
1417                stream::single(part1.clone()),
1418            )
1419            .await?;
1420        let etag2 = backend
1421            .upload_part(
1422                &id,
1423                &upload_id,
1424                2,
1425                part2.len() as u64,
1426                None,
1427                stream::single(part2.clone()),
1428            )
1429            .await?;
1430        let etag3 = backend
1431            .upload_part(
1432                &id,
1433                &upload_id,
1434                3,
1435                part3.len() as u64,
1436                None,
1437                stream::single(part3.clone()),
1438            )
1439            .await?;
1440
1441        let result = backend
1442            .complete_multipart(
1443                &id,
1444                &upload_id,
1445                vec![
1446                    CompletedPart {
1447                        part_number: 1,
1448                        etag: etag1,
1449                    },
1450                    CompletedPart {
1451                        part_number: 2,
1452                        etag: etag2,
1453                    },
1454                    CompletedPart {
1455                        part_number: 3,
1456                        etag: etag3,
1457                    },
1458                ],
1459            )
1460            .await?;
1461        assert!(result.is_none(), "expected no error on complete");
1462
1463        // Object exists after complete
1464        let (_meta, stream) = backend.get_object(&id).await?.unwrap();
1465        let payload = stream::read_to_vec(stream).await?;
1466        let mut expected = Vec::new();
1467        expected.extend_from_slice(&part1);
1468        expected.extend_from_slice(&part2);
1469        expected.extend_from_slice(&part3);
1470        assert_eq!(payload, expected);
1471
1472        Ok(())
1473    }
1474
1475    #[tokio::test]
1476    async fn test_multipart_out_of_order_upload() -> Result<()> {
1477        let backend = create_test_backend().await?;
1478        let id = make_id();
1479        let metadata = Metadata::default();
1480
1481        let upload_id = backend.initiate_multipart(&id, &metadata).await?;
1482
1483        // Non-final parts must be >= 5 MiB.
1484        const MIN_PART: usize = 5 * 1024 * 1024;
1485        let part1 = vec![b'a'; MIN_PART];
1486        let part2 = vec![b'b'; MIN_PART];
1487        let part3 = b"cccc".to_vec();
1488
1489        // Upload parts out of order: 2, 3, 1.
1490        let etag2 = backend
1491            .upload_part(
1492                &id,
1493                &upload_id,
1494                2,
1495                part2.len() as u64,
1496                None,
1497                stream::single(part2.clone()),
1498            )
1499            .await?;
1500        let etag3 = backend
1501            .upload_part(
1502                &id,
1503                &upload_id,
1504                3,
1505                part3.len() as u64,
1506                None,
1507                stream::single(part3.clone()),
1508            )
1509            .await?;
1510        let etag1 = backend
1511            .upload_part(
1512                &id,
1513                &upload_id,
1514                1,
1515                part1.len() as u64,
1516                None,
1517                stream::single(part1.clone()),
1518            )
1519            .await?;
1520
1521        // Complete with parts listed in order.
1522        let result = backend
1523            .complete_multipart(
1524                &id,
1525                &upload_id,
1526                vec![
1527                    CompletedPart {
1528                        part_number: 1,
1529                        etag: etag1,
1530                    },
1531                    CompletedPart {
1532                        part_number: 2,
1533                        etag: etag2,
1534                    },
1535                    CompletedPart {
1536                        part_number: 3,
1537                        etag: etag3,
1538                    },
1539                ],
1540            )
1541            .await?;
1542        assert!(result.is_none(), "expected no error on complete");
1543
1544        // Verify reassembly order matches part numbers, not upload order.
1545        let (_meta, stream) = backend.get_object(&id).await?.unwrap();
1546        let payload = stream::read_to_vec(stream).await?;
1547        let mut expected = Vec::new();
1548        expected.extend_from_slice(&part1);
1549        expected.extend_from_slice(&part2);
1550        expected.extend_from_slice(&part3);
1551        assert_eq!(payload, expected);
1552
1553        Ok(())
1554    }
1555
1556    #[tokio::test]
1557    async fn test_multipart_list_parts() -> Result<()> {
1558        let backend = create_test_backend().await?;
1559        let id = make_id();
1560        let metadata = Metadata::default();
1561
1562        let upload_id = backend.initiate_multipart(&id, &metadata).await?;
1563
1564        let etag1 = backend
1565            .upload_part(&id, &upload_id, 1, 3, None, stream::single(b"aaa".to_vec()))
1566            .await?;
1567        let etag2 = backend
1568            .upload_part(&id, &upload_id, 2, 3, None, stream::single(b"bbb".to_vec()))
1569            .await?;
1570
1571        // List all parts.
1572        let list = backend.list_parts(&id, &upload_id, None, None).await?;
1573        assert_eq!(list.parts.len(), 2);
1574        assert_eq!(list.parts[0].part_number, 1);
1575        assert_eq!(list.parts[0].etag, etag1);
1576        assert_eq!(list.parts[0].size, 3);
1577        assert_eq!(list.parts[1].part_number, 2);
1578        assert_eq!(list.parts[1].etag, etag2);
1579        assert_eq!(list.parts[1].size, 3);
1580
1581        // List with max_parts=1 to test pagination.
1582        let page1 = backend.list_parts(&id, &upload_id, Some(1), None).await?;
1583        assert_eq!(page1.parts.len(), 1);
1584        assert_eq!(page1.parts[0].part_number, 1);
1585        assert!(page1.is_truncated);
1586        assert!(page1.next_part_number_marker.is_some());
1587
1588        let page2 = backend
1589            .list_parts(&id, &upload_id, Some(1), page1.next_part_number_marker)
1590            .await?;
1591        assert_eq!(page2.parts.len(), 1);
1592        assert_eq!(page2.parts[0].part_number, 2);
1593
1594        // Clean up.
1595        backend.abort_multipart(&id, &upload_id).await?;
1596
1597        Ok(())
1598    }
1599
1600    #[tokio::test]
1601    async fn test_multipart_abort() -> Result<()> {
1602        let backend = create_test_backend().await?;
1603        let id = make_id();
1604        let metadata = Metadata::default();
1605
1606        let upload_id = backend.initiate_multipart(&id, &metadata).await?;
1607
1608        backend
1609            .upload_part(
1610                &id,
1611                &upload_id,
1612                1,
1613                5,
1614                None,
1615                stream::single(b"hello".to_vec()),
1616            )
1617            .await?;
1618
1619        backend.abort_multipart(&id, &upload_id).await?;
1620
1621        // Object should not exist after abort.
1622        let result = backend.get_object(&id).await?;
1623        assert!(result.is_none(), "object should not exist after abort");
1624
1625        Ok(())
1626    }
1627
1628    async fn multipart_put(
1629        backend: &GcsBackend,
1630        id: &ObjectId,
1631        metadata: &Metadata,
1632        payload: impl Into<bytes::Bytes>,
1633    ) -> Result<()> {
1634        let payload: bytes::Bytes = payload.into();
1635        let upload_id = backend.initiate_multipart(id, metadata).await?;
1636        let etag = backend
1637            .upload_part(
1638                id,
1639                &upload_id,
1640                1,
1641                payload.len() as u64,
1642                None,
1643                stream::single(payload),
1644            )
1645            .await?;
1646        let error = backend
1647            .complete_multipart(
1648                id,
1649                &upload_id,
1650                vec![CompletedPart {
1651                    part_number: 1,
1652                    etag,
1653                }],
1654            )
1655            .await?;
1656        assert!(
1657            error.is_none(),
1658            "complete_multipart returned error: {error:?}"
1659        );
1660        Ok(())
1661    }
1662
1663    #[tokio::test]
1664    async fn test_multipart_ttl_immediate() -> Result<()> {
1665        let backend = create_test_backend().await?;
1666        let id = make_id();
1667        let metadata = Metadata {
1668            expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_secs(0)),
1669            ..Default::default()
1670        };
1671
1672        multipart_put(&backend, &id, &metadata, "hello, world").await?;
1673
1674        let result = backend.get_object(&id).await?;
1675        assert!(result.is_none());
1676
1677        Ok(())
1678    }
1679
1680    #[tokio::test]
1681    async fn test_multipart_tti_immediate() -> Result<()> {
1682        let backend = create_test_backend().await?;
1683        let id = make_id();
1684        let metadata = Metadata {
1685            expiration_policy: ExpirationPolicy::TimeToIdle(Duration::from_secs(0)),
1686            ..Default::default()
1687        };
1688
1689        multipart_put(&backend, &id, &metadata, "hello, world").await?;
1690
1691        let result = backend.get_object(&id).await?;
1692        assert!(result.is_none());
1693
1694        Ok(())
1695    }
1696
1697    #[tokio::test]
1698    async fn test_multipart_compressed_payload_roundtrip() -> Result<()> {
1699        use objectstore_types::metadata::Compression;
1700
1701        let backend = create_test_backend().await?;
1702
1703        let plaintext = b"hello, world (but compressed with zstd)";
1704        let compressed = zstd::encode_all(&plaintext[..], 3)?;
1705
1706        let id = make_id();
1707        let metadata = Metadata {
1708            content_type: "text/plain".into(),
1709            compression: Some(Compression::Zstd),
1710            ..Default::default()
1711        };
1712
1713        multipart_put(&backend, &id, &metadata, compressed.clone()).await?;
1714
1715        let (meta, stream) = backend.get_object(&id).await?.unwrap();
1716        let payload = stream::read_to_vec(stream).await?;
1717
1718        assert_eq!(meta.compression, Some(Compression::Zstd));
1719        assert_eq!(
1720            payload, compressed,
1721            "Payload should be returned still compressed, not auto-decompressed"
1722        );
1723
1724        Ok(())
1725    }
1726}