Skip to main content

objectstore_service/backend/
gcs.rs

1//! Google Cloud Storage backend for long-term storage of large objects.
2
3use std::borrow::Cow;
4use std::collections::BTreeMap;
5use std::future::Future;
6use std::time::{Duration, SystemTime};
7use std::{fmt, io};
8
9use anyhow::Context;
10use futures_util::{StreamExt, TryStreamExt};
11use gcp_auth::TokenProvider;
12use objectstore_types::metadata::{ExpirationPolicy, Metadata};
13use objectstore_types::range::{ByteRange, ContentRange};
14use reqwest::header::HeaderName;
15use reqwest::{
16    Body, IntoUrl, Method, RequestBuilder, Response, StatusCode, Url, header, multipart,
17};
18use serde::{Deserialize, Serialize};
19
20use crate::backend::common::{
21    self, Backend, DeleteResponse, GetResponse, MetadataResponse, MultipartUploadBackend,
22    PutResponse,
23};
24use crate::error::{Error, Result};
25use crate::gcp_auth::PrefetchingTokenProvider;
26use crate::id::ObjectId;
27use crate::multipart::{
28    AbortMultipartResponse, CompleteMultipartResponse, CompletedPart, InitiateMultipartResponse,
29    ListPartsResponse, PartNumber, UploadId, UploadPartResponse,
30};
31use crate::stream::{self, ClientStream};
32
33/// Configuration for [`GcsBackend`].
34///
35/// Stores objects in [Google Cloud Storage]. Authentication uses Application Default Credentials
36/// (ADC), which can be provided via the `GOOGLE_APPLICATION_CREDENTIALS` environment variable or
37/// the GCE/GKE metadata service.
38///
39/// **Note**: The bucket must be pre-created with the following lifecycle policy:
40/// - `daysSinceCustomTime`: 1 day
41/// - `action`: delete
42///
43/// [Google Cloud Storage]: https://cloud.google.com/storage
44///
45/// # Example
46///
47/// ```yaml
48/// storage:
49///   type: gcs
50///   bucket: objectstore-bucket
51/// ```
52#[derive(Debug, Clone, Deserialize, Serialize)]
53pub struct GcsConfig {
54    /// Optional custom GCS endpoint URL.
55    ///
56    /// Useful for testing with emulators. If `None`, uses the default GCS endpoint.
57    ///
58    /// # Default
59    ///
60    /// `None` (uses default GCS endpoint)
61    ///
62    /// # Environment Variables
63    ///
64    /// - `OS__STORAGE__TYPE=gcs`
65    /// - `OS__STORAGE__ENDPOINT=http://localhost:9000` (optional)
66    pub endpoint: Option<String>,
67
68    /// GCS bucket name.
69    ///
70    /// The bucket must exist before starting the server.
71    ///
72    /// # Environment Variables
73    ///
74    /// - `OS__STORAGE__BUCKET=my-gcs-bucket`
75    pub bucket: String,
76}
77
78/// Default endpoint used to access the GCS JSON API.
79const DEFAULT_ENDPOINT: &str = "https://storage.googleapis.com";
80/// Permission scopes required for accessing GCS.
81const TOKEN_SCOPES: &[&str] = &["https://www.googleapis.com/auth/devstorage.read_write"];
82/// Time to debounce bumping an object with configured TTI.
83const TTI_DEBOUNCE: Duration = Duration::from_hours(24);
84/// How many times to retry failed operations.
85const REQUEST_RETRY_COUNT: usize = 2;
86
87/// Prefix for our built-in metadata stored in GCS metadata field
88const BUILTIN_META_PREFIX: &str = "x-sn-";
89/// Prefix for user custom metadata stored in GCS metadata field
90const CUSTOM_META_PREFIX: &str = "x-snme-";
91
92/// GCS object resource.
93///
94/// This is the representation of the object resource in GCS JSON API without its payload. Where no
95/// dedicated fields are available, we encode both built-in and custom metadata in the `metadata`
96/// field.
97#[derive(Debug, Serialize, Deserialize)]
98#[serde(rename_all = "camelCase")]
99struct GcsObject {
100    /// Content-Type of the object data. If an object is stored without a Content-Type, it is served
101    /// as application/octet-stream.
102    pub content_type: Cow<'static, str>,
103
104    /// Content encoding, used to store [`Metadata::compression`].
105    #[serde(default, skip_serializing_if = "Option::is_none")]
106    pub content_encoding: Option<String>,
107
108    /// Custom time stamp used for time-based expiration.
109    #[serde(
110        default,
111        skip_serializing_if = "Option::is_none",
112        with = "humantime_serde"
113    )]
114    pub custom_time: Option<SystemTime>,
115
116    /// The `Content-Length` of the data in bytes. GCS returns this as a string.
117    ///
118    /// GCS sets this in metadata responses. We can use it to know the size of an object
119    /// without having to stream it.
120    pub size: Option<String>,
121
122    /// Timestamp of when this object was created.
123    #[serde(
124        default,
125        skip_serializing_if = "Option::is_none",
126        with = "humantime_serde"
127    )]
128    pub time_created: Option<SystemTime>,
129
130    /// User-provided metadata, including our built-in metadata.
131    #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
132    pub metadata: BTreeMap<GcsMetaKey, String>,
133}
134
135impl GcsObject {
136    /// Converts our Metadata type to GCS JSON object metadata.
137    pub fn from_metadata(metadata: &Metadata) -> Self {
138        let mut gcs_object = GcsObject {
139            content_type: metadata.content_type.clone(),
140            size: metadata.size.map(|size| size.to_string()),
141            content_encoding: None,
142            custom_time: None,
143            time_created: metadata.time_created,
144            metadata: BTreeMap::new(),
145        };
146
147        // For time-based expiration, set the `customTime` field. The bucket must have a
148        // `daysSinceCustomTime` lifecycle rule configured to delete objects with this field set.
149        // This rule automatically skips objects without `customTime` set.
150        if let Some(expires_in) = metadata.expiration_policy.expires_in() {
151            gcs_object.custom_time = Some(SystemTime::now() + expires_in);
152        }
153
154        if let Some(compression) = metadata.compression {
155            gcs_object.content_encoding = Some(compression.to_string());
156        }
157
158        if metadata.expiration_policy != ExpirationPolicy::default() {
159            gcs_object.metadata.insert(
160                GcsMetaKey::Expiration,
161                metadata.expiration_policy.to_string(),
162            );
163        }
164
165        if let Some(origin) = &metadata.origin {
166            gcs_object
167                .metadata
168                .insert(GcsMetaKey::Origin, origin.clone());
169        }
170
171        for (key, value) in &metadata.custom {
172            gcs_object
173                .metadata
174                .insert(GcsMetaKey::Custom(key.clone()), value.clone());
175        }
176
177        gcs_object
178    }
179
180    /// Converts GCS JSON object metadata to our Metadata type.
181    pub fn into_metadata(mut self) -> Result<Metadata> {
182        // Remove ignored metadata keys that are set by the GCS emulator.
183        self.metadata.remove(&GcsMetaKey::EmulatorIgnored);
184
185        let expiration_policy = self
186            .metadata
187            .remove(&GcsMetaKey::Expiration)
188            .map(|s| s.parse())
189            .transpose()?
190            .unwrap_or_default();
191
192        let origin = self.metadata.remove(&GcsMetaKey::Origin);
193
194        let content_type = self.content_type;
195        let compression = self.content_encoding.map(|s| s.parse()).transpose()?;
196        let size = self
197            .size
198            .map(|size| size.parse())
199            .transpose()
200            .map_err(|e| Error::Generic {
201                context: "GCS: failed to parse size from object metadata".to_string(),
202                cause: Some(Box::new(e)),
203            })?;
204        let time_created = self.time_created;
205
206        // At this point, all built-in metadata should have been removed from self.metadata.
207        let mut custom = BTreeMap::new();
208        for (key, value) in self.metadata {
209            if let GcsMetaKey::Custom(custom_key) = key {
210                custom.insert(custom_key, value);
211            } else {
212                return Err(Error::Generic {
213                    context: format!(
214                        "GCS: unexpected built-in metadata key in object metadata: {key}"
215                    ),
216                    cause: None,
217                });
218            }
219        }
220
221        Ok(Metadata {
222            content_type,
223            expiration_policy,
224            compression,
225            origin,
226            size,
227            custom,
228            time_created,
229            time_expires: self.custom_time,
230        })
231    }
232}
233
234/// Key for [`GcsObject::metadata`].
235#[derive(Clone, Debug, PartialEq, Eq, Ord, PartialOrd)]
236enum GcsMetaKey {
237    /// Built-in metadata key for [`Metadata::expiration_policy`].
238    Expiration,
239    /// Built-in metadata key for [`Metadata::origin`].
240    Origin,
241    /// Ignored metadata set by the GCS emulator.
242    EmulatorIgnored,
243    /// User-defined custom metadata key.
244    Custom(String),
245}
246
247impl std::str::FromStr for GcsMetaKey {
248    type Err = anyhow::Error;
249
250    fn from_str(s: &str) -> Result<Self, Self::Err> {
251        if matches!(s, "x_emulator_upload" | "x_testbench_upload") {
252            return Ok(GcsMetaKey::EmulatorIgnored);
253        }
254
255        Ok(match s.strip_prefix(BUILTIN_META_PREFIX) {
256            Some("expiration") => GcsMetaKey::Expiration,
257            Some("origin") => GcsMetaKey::Origin,
258            Some(unknown) => anyhow::bail!("unknown builtin metadata key: {unknown}"),
259            None => match s.strip_prefix(CUSTOM_META_PREFIX) {
260                Some(key) => GcsMetaKey::Custom(key.to_string()),
261                None => anyhow::bail!("invalid GCS metadata key format: {s}"),
262            },
263        })
264    }
265}
266
267impl fmt::Display for GcsMetaKey {
268    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
269        match self {
270            Self::Expiration => write!(f, "{BUILTIN_META_PREFIX}expiration"),
271            Self::Origin => write!(f, "{BUILTIN_META_PREFIX}origin"),
272            Self::EmulatorIgnored => unreachable!("do not serialize emulator metadata"),
273            Self::Custom(key) => write!(f, "{CUSTOM_META_PREFIX}{key}"),
274        }
275    }
276}
277
278impl<'de> Deserialize<'de> for GcsMetaKey {
279    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
280    where
281        D: serde::Deserializer<'de>,
282    {
283        let s = Cow::<'de, str>::deserialize(deserializer)?;
284        s.parse().map_err(serde::de::Error::custom)
285    }
286}
287
288impl Serialize for GcsMetaKey {
289    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
290    where
291        S: serde::Serializer,
292    {
293        serializer.collect_str(self)
294    }
295}
296
297/// Builds HTTP headers that encode `metadata` for a GCS XML API request.
298fn metadata_to_gcs_headers(metadata: &Metadata) -> Result<header::HeaderMap> {
299    let mut headers = header::HeaderMap::new();
300
301    if let Some(expires_in) = metadata.expiration_policy.expires_in() {
302        let custom_time = SystemTime::now() + expires_in;
303        let formatted = humantime::format_rfc3339_seconds(custom_time);
304        headers.insert(
305            HeaderName::from_static("x-goog-custom-time"),
306            formatted.to_string().parse().map_err(|e| Error::Generic {
307                context: "GCS: invalid custom-time header value".into(),
308                cause: Some(Box::new(e)),
309            })?,
310        );
311    }
312
313    if let Some(compression) = metadata.compression {
314        headers.insert(
315            header::CONTENT_ENCODING,
316            compression
317                .to_string()
318                .parse()
319                .map_err(|e| Error::Generic {
320                    context: "GCS: invalid content-encoding header value".into(),
321                    cause: Some(Box::new(e)),
322                })?,
323        );
324    }
325
326    if metadata.expiration_policy != ExpirationPolicy::default() {
327        insert_gcs_meta_header(
328            &mut headers,
329            &GcsMetaKey::Expiration,
330            &metadata.expiration_policy.to_string(),
331        )?;
332    }
333
334    if let Some(origin) = &metadata.origin {
335        insert_gcs_meta_header(&mut headers, &GcsMetaKey::Origin, origin)?;
336    }
337
338    for (key, value) in &metadata.custom {
339        insert_gcs_meta_header(&mut headers, &GcsMetaKey::Custom(key.clone()), value)?;
340    }
341
342    Ok(headers)
343}
344
345fn insert_gcs_meta_header(
346    headers: &mut header::HeaderMap,
347    key: &GcsMetaKey,
348    value: &str,
349) -> Result<()> {
350    let header_name = format!("x-goog-meta-{key}");
351    headers.insert(
352        HeaderName::try_from(&header_name).map_err(|e| Error::Generic {
353            context: format!("GCS: invalid header name: {header_name}"),
354            cause: Some(Box::new(e)),
355        })?,
356        value.parse().map_err(|e| Error::Generic {
357            context: format!("GCS: invalid header value for {header_name}"),
358            cause: Some(Box::new(e)),
359        })?,
360    );
361    Ok(())
362}
363
364/// Returns `true` if the error is a transient reqwest failure worth retrying.
365fn is_retryable(error: &Error) -> bool {
366    let Error::Reqwest { cause, .. } = error else {
367        return false;
368    };
369    if cause.is_timeout() || cause.is_connect() || cause.is_request() {
370        return true;
371    }
372    let Some(status) = cause.status() else {
373        return false;
374    };
375    // https://docs.cloud.google.com/storage/docs/json_api/v1/status-codes
376    matches!(
377        status,
378        StatusCode::REQUEST_TIMEOUT
379            | StatusCode::TOO_MANY_REQUESTS
380            | StatusCode::INTERNAL_SERVER_ERROR
381            | StatusCode::BAD_GATEWAY
382            | StatusCode::SERVICE_UNAVAILABLE
383            | StatusCode::GATEWAY_TIMEOUT
384    )
385}
386
387/// GCS JSON API backend for long-term storage of large objects.
388pub struct GcsBackend {
389    client: reqwest::Client,
390    endpoint: Url,
391    bucket: String,
392    token_provider: Option<PrefetchingTokenProvider>,
393}
394
395impl GcsBackend {
396    /// Creates an authenticated GCS JSON API backend bound to the bucket in `config`.
397    pub async fn new(config: GcsConfig) -> anyhow::Result<Self> {
398        let GcsConfig { endpoint, bucket } = config;
399
400        let token_provider = if endpoint.is_none() {
401            Some(PrefetchingTokenProvider::gcp_auth(TOKEN_SCOPES).await?)
402        } else {
403            None
404        };
405
406        let endpoint_str = endpoint.as_deref().unwrap_or(DEFAULT_ENDPOINT);
407
408        Ok(Self {
409            client: common::reqwest_client(),
410            endpoint: endpoint_str.parse().context("invalid GCS endpoint URL")?,
411            bucket,
412            token_provider,
413        })
414    }
415
416    /// Formats the GCS object (metadata) URL for the given key.
417    fn object_url(&self, id: &ObjectId) -> Result<Url> {
418        let mut url = self.endpoint.clone();
419
420        let path = id.as_storage_path().to_string();
421        url.path_segments_mut()
422            .map_err(|()| Error::Generic {
423                context: format!(
424                    "GCS: invalid endpoint URL, {} cannot be a base",
425                    self.endpoint
426                ),
427                cause: None,
428            })?
429            .extend(&["storage", "v1", "b", &self.bucket, "o", &path]);
430
431        Ok(url)
432    }
433
434    /// Formats the GCS upload URL for the given upload type.
435    fn upload_url(&self, id: &ObjectId, upload_type: &str) -> Result<Url> {
436        let mut url = self.endpoint.clone();
437
438        url.path_segments_mut()
439            .map_err(|()| Error::Generic {
440                context: format!(
441                    "GCS: invalid endpoint URL, {} cannot be a base",
442                    self.endpoint
443                ),
444                cause: None,
445            })?
446            .extend(&["upload", "storage", "v1", "b", &self.bucket, "o"]);
447
448        url.query_pairs_mut()
449            .append_pair("uploadType", upload_type)
450            .append_pair("name", &id.as_storage_path().to_string());
451
452        Ok(url)
453    }
454
455    /// Formats a GCS XML API URL for the given object.
456    ///
457    /// Unlike [`object_url`](Self::object_url) (JSON API at
458    /// `/storage/v1/b/{bucket}/o/{name}`), this produces
459    /// `/{bucket}/{path_segments}` for the S3-compatible XML API used by
460    /// multipart uploads.
461    fn xml_object_url(&self, id: &ObjectId) -> Result<Url> {
462        let mut url = self.endpoint.clone();
463        {
464            let mut segments = url.path_segments_mut().map_err(|()| Error::Generic {
465                context: format!(
466                    "GCS: invalid endpoint URL, {} cannot be a base",
467                    self.endpoint
468                ),
469                cause: None,
470            })?;
471            segments.push(&self.bucket);
472            for part in id.as_storage_path().to_string().split('/') {
473                segments.push(part);
474            }
475        }
476        Ok(url)
477    }
478
479    /// Creates a request builder with the appropriate authentication.
480    async fn request(&self, method: Method, url: impl IntoUrl) -> Result<RequestBuilder> {
481        let mut builder = self.client.request(method, url);
482        if let Some(provider) = &self.token_provider {
483            let token = provider.token(TOKEN_SCOPES).await?;
484            builder = builder.bearer_auth(token.as_str());
485        }
486        Ok(builder)
487    }
488
489    /// Retries a GCS request on transient errors.
490    async fn with_retry<T, F>(&self, action: &'static str, f: impl Fn() -> F) -> Result<T>
491    where
492        F: Future<Output = Result<T>> + Send,
493    {
494        let mut retry_count = 0usize;
495        loop {
496            match f().await {
497                Ok(res) => return Ok(res),
498                Err(ref e) if retry_count < REQUEST_RETRY_COUNT && is_retryable(e) => {
499                    retry_count += 1;
500                    objectstore_metrics::count!("gcs.retries", action = action);
501                    objectstore_log::warn!(!!e, retry_count, action, "Retrying request");
502                }
503                Err(e) => {
504                    objectstore_metrics::count!("gcs.failures", action = action);
505                    return Err(e);
506                }
507            }
508        }
509    }
510
511    /// Fetches the GCS object metadata (without the payload), bumps TTI if
512    /// needed, and returns the parsed [`Metadata`].
513    async fn fetch_gcs_metadata(&self, object_url: &Url) -> Result<Option<Metadata>> {
514        let metadata_opt = self
515            .with_retry("get_metadata", || async {
516                let resp = self
517                    .request(Method::GET, object_url.clone())
518                    .await?
519                    .send()
520                    .await
521                    .map_err(|e| Error::reqwest("GCS: get metadata request", e))?;
522
523                if resp.status() == StatusCode::NOT_FOUND {
524                    return Ok(None);
525                }
526
527                let metadata: GcsObject = resp
528                    .error_for_status()
529                    .map_err(|e| Error::reqwest("GCS: get metadata status", e))?
530                    .json()
531                    .await
532                    .map_err(|e| Error::reqwest("GCS: get metadata parse", e))?;
533
534                Ok(Some(metadata))
535            })
536            .await?;
537
538        let Some(gcs_metadata) = metadata_opt else {
539            objectstore_log::debug!("Object not found");
540            return Ok(None);
541        };
542
543        let expire_at = gcs_metadata.custom_time;
544        let metadata = gcs_metadata.into_metadata()?;
545
546        // TODO: Inject the access time from the request.
547        let access_time = SystemTime::now();
548
549        // Filter already expired objects but leave them to garbage collection
550        if metadata.expiration_policy.is_timeout() && expire_at.is_some_and(|ts| ts < access_time) {
551            objectstore_log::debug!("Object found but past expiry");
552            return Ok(None);
553        }
554
555        // TODO: Schedule into background persistently so this doesn't get lost on restarts
556        if let ExpirationPolicy::TimeToIdle(tti) = metadata.expiration_policy {
557            let new_expire_at = access_time + tti;
558            if expire_at.is_some_and(|ts| ts < new_expire_at - TTI_DEBOUNCE) {
559                self.update_custom_time(object_url.clone(), new_expire_at)
560                    .await?;
561            }
562        }
563
564        Ok(Some(metadata))
565    }
566
567    async fn update_custom_time(&self, object_url: Url, custom_time: SystemTime) -> Result<()> {
568        #[derive(Debug, Serialize)]
569        #[serde(rename_all = "camelCase")]
570        struct CustomTimeRequest {
571            #[serde(with = "humantime_serde")]
572            custom_time: SystemTime,
573        }
574
575        self.with_retry("update_custom_time", || async {
576            self.request(Method::PATCH, object_url.clone())
577                .await?
578                .json(&CustomTimeRequest { custom_time })
579                .send()
580                .await
581                .and_then(Response::error_for_status)
582                .map_err(|e| Error::reqwest("GCS: update custom time", e))?;
583            Ok(())
584        })
585        .await
586    }
587}
588
589impl fmt::Debug for GcsBackend {
590    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
591        f.debug_struct("GcsJsonApi")
592            .field("endpoint", &self.endpoint)
593            .field("bucket", &self.bucket)
594            .finish_non_exhaustive()
595    }
596}
597
598#[async_trait::async_trait]
599impl Backend for GcsBackend {
600    fn name(&self) -> &'static str {
601        "gcs"
602    }
603
604    fn as_multipart_upload_backend(&self) -> Result<&dyn MultipartUploadBackend> {
605        Ok(self)
606    }
607
608    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
609    async fn put_object(
610        &self,
611        id: &ObjectId,
612        metadata: &Metadata,
613        stream: ClientStream,
614    ) -> Result<PutResponse> {
615        objectstore_log::debug!("Writing to GCS backend");
616        let gcs_metadata = GcsObject::from_metadata(metadata);
617
618        // NB: Ensure the order of these fields and that a content-type is attached to them. Both
619        // are required by the GCS API.
620        let metadata_json = serde_json::to_string(&gcs_metadata).map_err(|cause| Error::Serde {
621            context: "failed to serialize metadata for GCS upload".to_string(),
622            cause,
623        })?;
624
625        let multipart = multipart::Form::new()
626            .part(
627                "metadata",
628                multipart::Part::text(metadata_json)
629                    .mime_str("application/json")
630                    .expect("application/json is a valid mime type"),
631            )
632            .part(
633                "media",
634                multipart::Part::stream(Body::wrap_stream(stream))
635                    .mime_str(&metadata.content_type)
636                    .map_err(|e| Error::Generic {
637                        context: format!("invalid mime type: {}", &metadata.content_type),
638                        cause: Some(Box::new(e)),
639                    })?,
640            );
641
642        // GCS requires a multipart/related request. Its body looks identical to
643        // multipart/form-data, but the Content-Type header is different. Hence, we have to manually
644        // set the header *after* writing the multipart form into the request.
645        let content_type = format!("multipart/related; boundary={}", multipart.boundary());
646
647        self.request(Method::POST, self.upload_url(id, "multipart")?)
648            .await?
649            .multipart(multipart)
650            .header(header::CONTENT_TYPE, content_type)
651            .send()
652            .await
653            .and_then(Response::error_for_status)
654            .map_err(|e| match stream::unpack_client_error(&e) {
655                Some(ce) => Error::Client(ce),
656                _ => Error::reqwest("GCS: upload object", e),
657            })?;
658
659        Ok(())
660    }
661
662    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
663    async fn get_object(&self, id: &ObjectId, range: Option<ByteRange>) -> Result<GetResponse> {
664        objectstore_log::debug!("Reading from GCS backend");
665        let object_url = self.object_url(id)?;
666
667        let Some(metadata) = self.fetch_gcs_metadata(&object_url).await? else {
668            return Ok(None);
669        };
670
671        let mut download_url = object_url;
672        download_url.query_pairs_mut().append_pair("alt", "media");
673
674        let payload_response = self
675            .with_retry("get_payload", || async {
676                let mut req = self.request(Method::GET, download_url.clone()).await?;
677                if let Some(r) = range {
678                    req = req.header(header::RANGE, r.to_header_value());
679                }
680                let resp = req
681                    .send()
682                    .await
683                    .map_err(|e| Error::reqwest("GCS: get payload", e))?;
684
685                if resp.status() == StatusCode::RANGE_NOT_SATISFIABLE {
686                    let raw = resp
687                        .headers()
688                        .get(header::CONTENT_RANGE)
689                        .and_then(|v| v.to_str().ok());
690                    let total = raw.and_then(ContentRange::parse_unsatisfiable_total);
691                    match total {
692                        Some(total) => return Err(Error::RangeNotSatisfiable { total }),
693                        None => {
694                            return Err(Error::Generic {
695                                context: format!(
696                                    "GCS: 416 response with invalid Content-Range: {raw:?}"
697                                ),
698                                cause: None,
699                            });
700                        }
701                    }
702                }
703
704                resp.error_for_status()
705                    .map_err(|e| Error::reqwest("GCS: get payload", e))
706            })
707            .await?;
708
709        let content_range = if payload_response.status() == StatusCode::PARTIAL_CONTENT {
710            Some(
711                payload_response
712                    .headers()
713                    .get(header::CONTENT_RANGE)
714                    .and_then(|v| v.to_str().ok())
715                    .and_then(|s| s.parse::<ContentRange>().ok())
716                    .ok_or_else(|| Error::Generic {
717                        context: "GCS: 206 response missing valid Content-Range header".to_owned(),
718                        cause: None,
719                    })?,
720            )
721        } else {
722            None
723        };
724
725        let stream = payload_response
726            .bytes_stream()
727            .map_err(io::Error::other)
728            .boxed();
729
730        Ok(Some((metadata, content_range, stream)))
731    }
732
733    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
734    async fn get_metadata(&self, id: &ObjectId) -> Result<MetadataResponse> {
735        objectstore_log::debug!("Reading metadata from GCS backend");
736        let object_url = self.object_url(id)?;
737        self.fetch_gcs_metadata(&object_url).await
738    }
739
740    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
741    async fn delete_object(&self, id: &ObjectId) -> Result<DeleteResponse> {
742        objectstore_log::debug!("Deleting from GCS backend");
743        let object_url = self.object_url(id)?;
744
745        self.with_retry("delete", || async {
746            let resp = self
747                .request(Method::DELETE, object_url.clone())
748                .await?
749                .send()
750                .await
751                .map_err(|e| Error::reqwest("GCS: delete object", e))?;
752
753            // Do not error for objects that do not exist
754            if resp.status() == StatusCode::NOT_FOUND {
755                return Ok(());
756            }
757
758            resp.error_for_status()
759                .map_err(|e| Error::reqwest("GCS: delete object", e))?;
760
761            Ok(())
762        })
763        .await
764    }
765}
766
767#[derive(Debug, Deserialize)]
768#[serde(rename_all = "PascalCase")]
769struct XmlInitiateMultipartUploadResponse {
770    upload_id: String,
771}
772
773impl TryFrom<XmlInitiateMultipartUploadResponse> for InitiateMultipartResponse {
774    type Error = Error;
775
776    fn try_from(r: XmlInitiateMultipartUploadResponse) -> Result<Self> {
777        Ok(UploadId::new(r.upload_id)?)
778    }
779}
780
781#[derive(Debug, Deserialize)]
782#[serde(rename_all = "PascalCase")]
783struct XmlListPartsResponse {
784    #[serde(default)]
785    is_truncated: bool,
786    next_part_number_marker: Option<PartNumber>,
787    #[serde(default, rename = "Part")]
788    parts: Vec<XmlPart>,
789}
790
791impl From<XmlListPartsResponse> for ListPartsResponse {
792    fn from(xml: XmlListPartsResponse) -> Self {
793        Self {
794            parts: xml.parts.into_iter().map(Into::into).collect(),
795            is_truncated: xml.is_truncated,
796            next_part_number_marker: xml.next_part_number_marker,
797        }
798    }
799}
800
801#[derive(Debug, Deserialize)]
802#[serde(rename_all = "PascalCase")]
803struct XmlPart {
804    part_number: PartNumber,
805    #[serde(rename = "ETag")]
806    e_tag: String,
807    #[serde(with = "humantime_serde")]
808    last_modified: SystemTime,
809    size: u64,
810}
811
812impl From<XmlPart> for crate::multipart::Part {
813    fn from(p: XmlPart) -> Self {
814        Self {
815            part_number: p.part_number,
816            etag: p.e_tag,
817            last_modified: p.last_modified,
818            size: p.size,
819        }
820    }
821}
822
823#[derive(Debug, Serialize)]
824#[serde(rename = "CompleteMultipartUpload")]
825struct XmlCompleteMultipartUpload {
826    #[serde(rename = "Part")]
827    parts: Vec<XmlCompletePart>,
828}
829
830impl From<Vec<CompletedPart>> for XmlCompleteMultipartUpload {
831    fn from(parts: Vec<CompletedPart>) -> Self {
832        Self {
833            parts: parts.into_iter().map(Into::into).collect(),
834        }
835    }
836}
837
838#[derive(Debug, Serialize)]
839#[serde(rename_all = "PascalCase")]
840struct XmlCompletePart {
841    part_number: PartNumber,
842    #[serde(rename = "ETag")]
843    e_tag: String,
844}
845
846impl From<CompletedPart> for XmlCompletePart {
847    fn from(p: CompletedPart) -> Self {
848        Self {
849            part_number: p.part_number,
850            e_tag: p.etag,
851        }
852    }
853}
854
855#[derive(Debug, Deserialize)]
856#[serde(rename = "Error", rename_all = "PascalCase")]
857struct XmlError {
858    code: String,
859    message: String,
860}
861
862impl From<XmlError> for crate::multipart::CompleteMultipartError {
863    fn from(e: XmlError) -> Self {
864        Self {
865            code: e.code,
866            message: e.message,
867        }
868    }
869}
870
871/// XXX: Any change that affects this implementation should be manually tested against real GCS.
872/// That's because the fork of [storage-testbench](https://github.com/googleapis/storage-testbench)
873/// that we test against has an incomplete implementation of the XML multipart API that likely doesn't match GCS's behavior in many cases.
874#[async_trait::async_trait]
875impl MultipartUploadBackend for GcsBackend {
876    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
877    async fn initiate_multipart(
878        &self,
879        id: &ObjectId,
880        metadata: &Metadata,
881    ) -> Result<InitiateMultipartResponse> {
882        objectstore_log::debug!("Initiating multipart upload on GCS backend");
883        let mut url = self.xml_object_url(id)?;
884        url.set_query(Some("uploads"));
885
886        let mut builder = self
887            .request(Method::POST, url)
888            .await?
889            .header(header::CONTENT_TYPE, metadata.content_type.as_ref())
890            .header(header::CONTENT_LENGTH, "0");
891
892        let meta_headers = metadata_to_gcs_headers(metadata)?;
893        for (name, value) in &meta_headers {
894            builder = builder.header(name, value);
895        }
896
897        let resp = builder
898            .send()
899            .await
900            .and_then(Response::error_for_status)
901            .map_err(|e| Error::reqwest("GCS: initiate multipart upload", e))?;
902
903        let body = resp
904            .bytes()
905            .await
906            .map_err(|e| Error::reqwest("GCS: read initiate multipart body", e))?;
907
908        let xml: XmlInitiateMultipartUploadResponse = quick_xml::de::from_reader(body.as_ref())
909            .map_err(|e| Error::Generic {
910                context: "GCS: failed to parse initiate multipart response".to_owned(),
911                cause: Some(Box::new(e)),
912            })?;
913
914        Ok(xml.try_into()?)
915    }
916
917    #[tracing::instrument(level = "trace", fields(?id, upload_id, part_number), skip_all)]
918    async fn upload_part(
919        &self,
920        id: &ObjectId,
921        upload_id: &UploadId,
922        part_number: PartNumber,
923        content_length: u64,
924        content_md5: Option<&str>,
925        body: ClientStream,
926    ) -> Result<UploadPartResponse> {
927        objectstore_log::debug!("Uploading part to GCS backend");
928        let mut url = self.xml_object_url(id)?;
929        url.query_pairs_mut()
930            .append_pair("partNumber", &part_number.to_string())
931            .append_pair("uploadId", upload_id);
932
933        let mut builder = self
934            .request(Method::PUT, url)
935            .await?
936            .header(header::CONTENT_LENGTH, content_length)
937            .body(Body::wrap_stream(body));
938
939        if let Some(md5) = content_md5 {
940            builder = builder.header("content-md5", md5);
941        }
942
943        let resp = builder
944            .send()
945            .await
946            .and_then(Response::error_for_status)
947            .map_err(|e| Error::reqwest("GCS: upload part", e))?;
948
949        let etag = resp
950            .headers()
951            .get(header::ETAG)
952            .and_then(|v| v.to_str().ok())
953            .map(|s| s.to_owned())
954            .ok_or_else(|| Error::generic("GCS: upload part response missing ETag header"))?;
955
956        Ok(etag)
957    }
958
959    #[tracing::instrument(level = "trace", fields(?id, upload_id), skip_all)]
960    async fn list_parts(
961        &self,
962        id: &ObjectId,
963        upload_id: &UploadId,
964        max_parts: Option<u32>,
965        part_number_marker: Option<PartNumber>,
966    ) -> Result<ListPartsResponse> {
967        objectstore_log::debug!("Listing parts on GCS backend");
968        let mut url = self.xml_object_url(id)?;
969        {
970            let mut pairs = url.query_pairs_mut();
971            pairs.append_pair("uploadId", upload_id);
972            if let Some(max) = max_parts {
973                pairs.append_pair("max-parts", &max.to_string());
974            }
975            if let Some(marker) = part_number_marker {
976                pairs.append_pair("part-number-marker", &marker.to_string());
977            }
978        }
979
980        let resp = self
981            .request(Method::GET, url)
982            .await?
983            .send()
984            .await
985            .and_then(Response::error_for_status)
986            .map_err(|e| Error::reqwest("GCS: list parts", e))?;
987
988        let body = resp
989            .bytes()
990            .await
991            .map_err(|e| Error::reqwest("GCS: read list parts body", e))?;
992
993        let xml: XmlListPartsResponse =
994            quick_xml::de::from_reader(body.as_ref()).map_err(|e| Error::Generic {
995                context: "GCS: failed to parse list parts response".to_owned(),
996                cause: Some(Box::new(e)),
997            })?;
998
999        Ok(xml.into())
1000    }
1001
1002    #[tracing::instrument(level = "trace", fields(?id, upload_id), skip_all)]
1003    async fn abort_multipart(
1004        &self,
1005        id: &ObjectId,
1006        upload_id: &UploadId,
1007    ) -> Result<AbortMultipartResponse> {
1008        objectstore_log::debug!("Aborting multipart upload on GCS backend");
1009        let mut url = self.xml_object_url(id)?;
1010        url.query_pairs_mut().append_pair("uploadId", upload_id);
1011
1012        self.request(Method::DELETE, url)
1013            .await?
1014            .send()
1015            .await
1016            .and_then(Response::error_for_status)
1017            .map_err(|e| Error::reqwest("GCS: abort multipart upload", e))?;
1018
1019        Ok(())
1020    }
1021
1022    #[tracing::instrument(level = "trace", fields(?id, upload_id), skip_all)]
1023    async fn complete_multipart(
1024        &self,
1025        id: &ObjectId,
1026        upload_id: &UploadId,
1027        parts: Vec<CompletedPart>,
1028    ) -> Result<CompleteMultipartResponse> {
1029        objectstore_log::debug!("Completing multipart upload on GCS backend");
1030        let mut url = self.xml_object_url(id)?;
1031        url.query_pairs_mut().append_pair("uploadId", upload_id);
1032
1033        let body = XmlCompleteMultipartUpload::from(parts);
1034        let xml = quick_xml::se::to_string(&body).map_err(|e| Error::Generic {
1035            context: "GCS: failed to serialize complete multipart request".into(),
1036            cause: Some(Box::new(e)),
1037        })?;
1038
1039        let resp = self
1040            .request(Method::POST, url)
1041            .await?
1042            .header(header::CONTENT_TYPE, "application/xml")
1043            .body(xml)
1044            .send()
1045            .await
1046            .and_then(Response::error_for_status)
1047            .map_err(|e| Error::reqwest("GCS: complete multipart upload", e))?;
1048
1049        let body = resp
1050            .bytes()
1051            .await
1052            .map_err(|e| Error::reqwest("GCS: read complete multipart body", e))?;
1053
1054        let error = quick_xml::de::from_reader::<_, XmlError>(body.as_ref())
1055            .ok()
1056            .map(Into::into);
1057
1058        Ok(error)
1059    }
1060}
1061
1062#[cfg(test)]
1063mod tests {
1064    use std::collections::BTreeMap;
1065    use std::num::NonZeroU32;
1066
1067    use anyhow::Result;
1068    use objectstore_types::scope::{Scope, Scopes};
1069
1070    use super::*;
1071    use crate::id::ObjectContext;
1072    use crate::multipart::CompletedPart;
1073    use crate::stream;
1074
1075    // NB: Not run any of these tests, you need to have a GCS emulator running. This is done
1076    // automatically in CI.
1077    //
1078    // Refer to the readme for how to set up the emulator.
1079
1080    async fn create_test_backend() -> Result<GcsBackend> {
1081        GcsBackend::new(GcsConfig {
1082            endpoint: Some("http://localhost:8087".into()),
1083            bucket: "test-bucket".into(),
1084        })
1085        .await
1086    }
1087
1088    fn make_id() -> ObjectId {
1089        ObjectId::random(ObjectContext {
1090            usecase: "testing".into(),
1091            scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
1092        })
1093    }
1094
1095    #[tokio::test]
1096    async fn test_roundtrip() -> Result<()> {
1097        let backend = create_test_backend().await?;
1098
1099        let id = make_id();
1100        let metadata = Metadata {
1101            content_type: "text/plain".into(),
1102            expiration_policy: ExpirationPolicy::Manual,
1103            compression: None,
1104            origin: Some("203.0.113.42".into()),
1105            custom: BTreeMap::from_iter([("hello".into(), "world".into())]),
1106            time_created: Some(SystemTime::now()),
1107            time_expires: None,
1108            size: None,
1109        };
1110
1111        backend
1112            .put_object(&id, &metadata, stream::single("hello, world"))
1113            .await?;
1114
1115        let (meta, _, stream) = backend.get_object(&id, None).await?.unwrap();
1116
1117        let payload = stream::read_to_vec(stream).await?;
1118        let str_payload = str::from_utf8(&payload).unwrap();
1119        assert_eq!(str_payload, "hello, world");
1120        assert_eq!(meta.content_type, metadata.content_type);
1121        assert_eq!(meta.origin, metadata.origin);
1122        assert_eq!(meta.custom, metadata.custom);
1123        assert!(metadata.time_created.is_some());
1124
1125        Ok(())
1126    }
1127
1128    #[tokio::test]
1129    async fn test_get_nonexistent() -> Result<()> {
1130        let backend = create_test_backend().await?;
1131
1132        let id = make_id();
1133        let result = backend.get_object(&id, None).await?;
1134        assert!(result.is_none());
1135
1136        Ok(())
1137    }
1138
1139    #[tokio::test]
1140    async fn test_delete_nonexistent() -> Result<()> {
1141        let backend = create_test_backend().await?;
1142
1143        let id = make_id();
1144        backend.delete_object(&id).await?;
1145
1146        Ok(())
1147    }
1148
1149    #[tokio::test]
1150    async fn test_overwrite() -> Result<()> {
1151        let backend = create_test_backend().await?;
1152
1153        let id = make_id();
1154        let metadata = Metadata {
1155            custom: BTreeMap::from_iter([("invalid".into(), "invalid".into())]),
1156            ..Default::default()
1157        };
1158
1159        backend
1160            .put_object(&id, &metadata, stream::single("hello"))
1161            .await?;
1162
1163        let metadata = Metadata {
1164            custom: BTreeMap::from_iter([("hello".into(), "world".into())]),
1165            ..Default::default()
1166        };
1167
1168        backend
1169            .put_object(&id, &metadata, stream::single("world"))
1170            .await?;
1171
1172        let (meta, _, stream) = backend.get_object(&id, None).await?.unwrap();
1173
1174        let payload = stream::read_to_vec(stream).await?;
1175        let str_payload = str::from_utf8(&payload).unwrap();
1176        assert_eq!(str_payload, "world");
1177        assert_eq!(meta.custom, metadata.custom);
1178
1179        Ok(())
1180    }
1181
1182    #[tokio::test]
1183    async fn test_read_after_delete() -> Result<()> {
1184        let backend = create_test_backend().await?;
1185
1186        let id = make_id();
1187        let metadata = Metadata::default();
1188
1189        backend
1190            .put_object(&id, &metadata, stream::single("hello, world"))
1191            .await?;
1192
1193        backend.delete_object(&id).await?;
1194
1195        let result = backend.get_object(&id, None).await?;
1196        assert!(result.is_none());
1197
1198        Ok(())
1199    }
1200
1201    #[tokio::test]
1202    async fn test_ttl_immediate() -> Result<()> {
1203        // NB: We create a TTL that immediately expires in this tests. This might be optimized away
1204        // in a future implementation, so we will have to update this test accordingly.
1205
1206        let backend = create_test_backend().await?;
1207
1208        let id = make_id();
1209        let metadata = Metadata {
1210            expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_secs(0)),
1211            ..Default::default()
1212        };
1213
1214        backend
1215            .put_object(&id, &metadata, stream::single("hello, world"))
1216            .await?;
1217
1218        let result = backend.get_object(&id, None).await?;
1219        assert!(result.is_none());
1220
1221        Ok(())
1222    }
1223
1224    #[tokio::test]
1225    async fn test_tti_immediate() -> Result<()> {
1226        // NB: We create a TTI that immediately expires in this tests. This might be optimized away
1227        // in a future implementation, so we will have to update this test accordingly.
1228
1229        let backend = create_test_backend().await?;
1230
1231        let id = make_id();
1232        let metadata = Metadata {
1233            expiration_policy: ExpirationPolicy::TimeToIdle(Duration::from_secs(0)),
1234            ..Default::default()
1235        };
1236
1237        backend
1238            .put_object(&id, &metadata, stream::single("hello, world"))
1239            .await?;
1240
1241        let result = backend.get_object(&id, None).await?;
1242        assert!(result.is_none());
1243
1244        Ok(())
1245    }
1246
1247    #[tokio::test]
1248    async fn test_get_metadata_returns_metadata() -> Result<()> {
1249        let backend = create_test_backend().await?;
1250
1251        let id = make_id();
1252        let metadata = Metadata {
1253            content_type: "text/plain".into(),
1254            origin: Some("203.0.113.42".into()),
1255            custom: BTreeMap::from_iter([("hello".into(), "world".into())]),
1256            ..Default::default()
1257        };
1258
1259        backend
1260            .put_object(&id, &metadata, stream::single("hello, world"))
1261            .await?;
1262
1263        let meta = backend.get_metadata(&id).await?.unwrap();
1264        assert_eq!(meta.content_type, metadata.content_type);
1265        assert_eq!(meta.origin, metadata.origin);
1266        assert_eq!(meta.custom, metadata.custom);
1267
1268        Ok(())
1269    }
1270
1271    #[tokio::test]
1272    async fn test_get_metadata_nonexistent() -> Result<()> {
1273        let backend = create_test_backend().await?;
1274
1275        let id = make_id();
1276        let result = backend.get_metadata(&id).await?;
1277        assert!(result.is_none());
1278
1279        Ok(())
1280    }
1281
1282    #[tokio::test]
1283    async fn test_get_metadata_bumps_tti() -> Result<()> {
1284        let backend = create_test_backend().await?;
1285
1286        let id = make_id();
1287        // TTI must exceed TTI_DEBOUNCE (1 day) for the bump condition to be reachable.
1288        let tti = Duration::from_hours(2 * 24);
1289        let metadata = Metadata {
1290            content_type: "text/plain".into(),
1291            expiration_policy: ExpirationPolicy::TimeToIdle(tti),
1292            ..Default::default()
1293        };
1294
1295        backend
1296            .put_object(&id, &metadata, stream::single("hello, world"))
1297            .await?;
1298
1299        // Manually set custom_time to just inside the bump window.
1300        // The bump condition is: expire_at < now + tti - TTI_DEBOUNCE.
1301        let object_url = backend.object_url(&id)?;
1302        let old_deadline = SystemTime::now() + tti - TTI_DEBOUNCE - Duration::from_mins(1);
1303        backend.update_custom_time(object_url, old_deadline).await?;
1304
1305        // First get_metadata sees the old timestamp and triggers a TTI bump.
1306        let pre_meta = backend.get_metadata(&id).await?.unwrap();
1307        let pre_expiry = pre_meta.time_expires.unwrap();
1308
1309        // Second get_metadata sees the bumped timestamp.
1310        let post_meta = backend.get_metadata(&id).await?.unwrap();
1311        let post_expiry = post_meta.time_expires.unwrap();
1312        assert!(
1313            post_expiry > pre_expiry,
1314            "TTI bump should have extended the expiry: {pre_expiry:?} -> {post_expiry:?}"
1315        );
1316
1317        // Verify the payload is still intact after the bump.
1318        let (_, _, stream) = backend.get_object(&id, None).await?.unwrap();
1319        let payload = stream::read_to_vec(stream).await?;
1320        assert_eq!(&payload, b"hello, world");
1321
1322        Ok(())
1323    }
1324
1325    #[tokio::test]
1326    async fn test_get_metadata_does_not_bump_fresh_tti() -> Result<()> {
1327        let backend = create_test_backend().await?;
1328
1329        let id = make_id();
1330        // TTI must exceed TTI_DEBOUNCE (1 day) for the bump condition to be reachable.
1331        let tti = Duration::from_hours(2 * 24);
1332        let metadata = Metadata {
1333            content_type: "text/plain".into(),
1334            expiration_policy: ExpirationPolicy::TimeToIdle(tti),
1335            ..Default::default()
1336        };
1337
1338        backend
1339            .put_object(&id, &metadata, stream::single("hello, world"))
1340            .await?;
1341
1342        // A freshly written object has time_expires ≈ now + 2d, which is well outside
1343        // the bump window (now + 2d - 1d = now + 1d). No bump should occur.
1344        let first = backend.get_metadata(&id).await?.unwrap();
1345        let first_expiry = first.time_expires.unwrap();
1346
1347        let second = backend.get_metadata(&id).await?.unwrap();
1348        let second_expiry = second.time_expires.unwrap();
1349
1350        assert_eq!(
1351            first_expiry, second_expiry,
1352            "Fresh TTI object should not have its expiry bumped"
1353        );
1354
1355        Ok(())
1356    }
1357
1358    #[tokio::test]
1359    async fn test_compressed_payload_roundtrip() -> Result<()> {
1360        use objectstore_types::metadata::Compression;
1361
1362        let backend = create_test_backend().await?;
1363
1364        let plaintext = b"hello, world (but compressed with zstd)";
1365        let compressed = zstd::encode_all(&plaintext[..], 3)?;
1366
1367        let id = make_id();
1368        let metadata = Metadata {
1369            content_type: "text/plain".into(),
1370            compression: Some(Compression::Zstd),
1371            ..Default::default()
1372        };
1373
1374        backend
1375            .put_object(&id, &metadata, stream::single(compressed.clone()))
1376            .await?;
1377
1378        let (meta, _, stream) = backend.get_object(&id, None).await?.unwrap();
1379        let payload = stream::read_to_vec(stream).await?;
1380
1381        assert_eq!(meta.compression, Some(Compression::Zstd));
1382        assert_eq!(
1383            payload, compressed,
1384            "Payload should be returned still compressed, not auto-decompressed"
1385        );
1386
1387        Ok(())
1388    }
1389
1390    #[tokio::test]
1391    async fn test_multipart_single_part() -> Result<()> {
1392        let backend = create_test_backend().await?;
1393        let id = make_id();
1394        let metadata = Metadata {
1395            content_type: "text/plain".into(),
1396            expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_mins(33)),
1397            origin: Some("203.0.113.42".into()),
1398            custom: BTreeMap::from_iter([("hello".into(), "world".into())]),
1399            ..Default::default()
1400        };
1401
1402        let upload_id = backend.initiate_multipart(&id, &metadata).await?;
1403
1404        let data = b"hello, multipart world!";
1405        let etag = backend
1406            .upload_part(
1407                &id,
1408                &upload_id,
1409                NonZeroU32::new(1).unwrap(),
1410                data.len() as u64,
1411                None,
1412                stream::single(data.to_vec()),
1413            )
1414            .await?;
1415
1416        let result = backend
1417            .complete_multipart(
1418                &id,
1419                &upload_id,
1420                vec![CompletedPart {
1421                    part_number: NonZeroU32::new(1).unwrap(),
1422                    etag,
1423                }],
1424            )
1425            .await?;
1426        assert!(result.is_none(), "expected no error on complete");
1427
1428        let (meta, _, stream) = backend.get_object(&id, None).await?.unwrap();
1429        let payload = stream::read_to_vec(stream).await?;
1430        assert_eq!(payload, data);
1431        assert_eq!(meta.content_type, "text/plain".to_string());
1432        assert_eq!(
1433            meta.expiration_policy,
1434            ExpirationPolicy::TimeToLive(Duration::from_mins(33))
1435        );
1436        assert_eq!(meta.origin, Some("203.0.113.42".into()));
1437        assert_eq!(
1438            meta.custom,
1439            BTreeMap::from_iter([("hello".into(), "world".into())])
1440        );
1441
1442        Ok(())
1443    }
1444
1445    #[tokio::test]
1446    async fn test_multipart_multiple_parts() -> Result<()> {
1447        let backend = create_test_backend().await?;
1448        let id = make_id();
1449        let metadata = Metadata::default();
1450
1451        let upload_id = backend.initiate_multipart(&id, &metadata).await?;
1452
1453        // Non-final parts must be >= 5 MiB.
1454        const MIN_PART: usize = 5 * 1024 * 1024;
1455        let part1 = vec![b'a'; MIN_PART];
1456        let part2 = vec![b'b'; MIN_PART];
1457        let part3 = b"cccc".to_vec();
1458
1459        let etag1 = backend
1460            .upload_part(
1461                &id,
1462                &upload_id,
1463                NonZeroU32::new(1).unwrap(),
1464                part1.len() as u64,
1465                None,
1466                stream::single(part1.clone()),
1467            )
1468            .await?;
1469        let etag2 = backend
1470            .upload_part(
1471                &id,
1472                &upload_id,
1473                NonZeroU32::new(2).unwrap(),
1474                part2.len() as u64,
1475                None,
1476                stream::single(part2.clone()),
1477            )
1478            .await?;
1479        let etag3 = backend
1480            .upload_part(
1481                &id,
1482                &upload_id,
1483                NonZeroU32::new(3).unwrap(),
1484                part3.len() as u64,
1485                None,
1486                stream::single(part3.clone()),
1487            )
1488            .await?;
1489
1490        let result = backend
1491            .complete_multipart(
1492                &id,
1493                &upload_id,
1494                vec![
1495                    CompletedPart {
1496                        part_number: NonZeroU32::new(1).unwrap(),
1497                        etag: etag1,
1498                    },
1499                    CompletedPart {
1500                        part_number: NonZeroU32::new(2).unwrap(),
1501                        etag: etag2,
1502                    },
1503                    CompletedPart {
1504                        part_number: NonZeroU32::new(3).unwrap(),
1505                        etag: etag3,
1506                    },
1507                ],
1508            )
1509            .await?;
1510        assert!(result.is_none(), "expected no error on complete");
1511
1512        // Object exists after complete
1513        let (_meta, _, stream) = backend.get_object(&id, None).await?.unwrap();
1514        let payload = stream::read_to_vec(stream).await?;
1515        let mut expected = Vec::new();
1516        expected.extend_from_slice(&part1);
1517        expected.extend_from_slice(&part2);
1518        expected.extend_from_slice(&part3);
1519        assert_eq!(payload, expected);
1520
1521        Ok(())
1522    }
1523
1524    #[tokio::test]
1525    async fn test_multipart_out_of_order_upload() -> Result<()> {
1526        let backend = create_test_backend().await?;
1527        let id = make_id();
1528        let metadata = Metadata::default();
1529
1530        let upload_id = backend.initiate_multipart(&id, &metadata).await?;
1531
1532        // Non-final parts must be >= 5 MiB.
1533        const MIN_PART: usize = 5 * 1024 * 1024;
1534        let part1 = vec![b'a'; MIN_PART];
1535        let part2 = vec![b'b'; MIN_PART];
1536        let part3 = b"cccc".to_vec();
1537
1538        // Upload parts out of order: 2, 3, 1.
1539        let etag2 = backend
1540            .upload_part(
1541                &id,
1542                &upload_id,
1543                NonZeroU32::new(2).unwrap(),
1544                part2.len() as u64,
1545                None,
1546                stream::single(part2.clone()),
1547            )
1548            .await?;
1549        let etag3 = backend
1550            .upload_part(
1551                &id,
1552                &upload_id,
1553                NonZeroU32::new(3).unwrap(),
1554                part3.len() as u64,
1555                None,
1556                stream::single(part3.clone()),
1557            )
1558            .await?;
1559        let etag1 = backend
1560            .upload_part(
1561                &id,
1562                &upload_id,
1563                NonZeroU32::new(1).unwrap(),
1564                part1.len() as u64,
1565                None,
1566                stream::single(part1.clone()),
1567            )
1568            .await?;
1569
1570        // Complete with parts listed in order.
1571        let result = backend
1572            .complete_multipart(
1573                &id,
1574                &upload_id,
1575                vec![
1576                    CompletedPart {
1577                        part_number: NonZeroU32::new(1).unwrap(),
1578                        etag: etag1,
1579                    },
1580                    CompletedPart {
1581                        part_number: NonZeroU32::new(2).unwrap(),
1582                        etag: etag2,
1583                    },
1584                    CompletedPart {
1585                        part_number: NonZeroU32::new(3).unwrap(),
1586                        etag: etag3,
1587                    },
1588                ],
1589            )
1590            .await?;
1591        assert!(result.is_none(), "expected no error on complete");
1592
1593        // Verify reassembly order matches part numbers, not upload order.
1594        let (_meta, _, stream) = backend.get_object(&id, None).await?.unwrap();
1595        let payload = stream::read_to_vec(stream).await?;
1596        let mut expected = Vec::new();
1597        expected.extend_from_slice(&part1);
1598        expected.extend_from_slice(&part2);
1599        expected.extend_from_slice(&part3);
1600        assert_eq!(payload, expected);
1601
1602        Ok(())
1603    }
1604
1605    #[tokio::test]
1606    async fn test_multipart_list_parts() -> Result<()> {
1607        let backend = create_test_backend().await?;
1608        let id = make_id();
1609        let metadata = Metadata::default();
1610
1611        let upload_id = backend.initiate_multipart(&id, &metadata).await?;
1612
1613        let etag1 = backend
1614            .upload_part(
1615                &id,
1616                &upload_id,
1617                NonZeroU32::new(1).unwrap(),
1618                3,
1619                None,
1620                stream::single(b"aaa".to_vec()),
1621            )
1622            .await?;
1623        let etag2 = backend
1624            .upload_part(
1625                &id,
1626                &upload_id,
1627                NonZeroU32::new(2).unwrap(),
1628                3,
1629                None,
1630                stream::single(b"bbb".to_vec()),
1631            )
1632            .await?;
1633
1634        // List all parts.
1635        let list = backend.list_parts(&id, &upload_id, None, None).await?;
1636        assert_eq!(list.parts.len(), 2);
1637        assert_eq!(list.parts[0].part_number.get(), 1);
1638        assert_eq!(list.parts[0].etag, etag1);
1639        assert_eq!(list.parts[0].size, 3);
1640        assert_eq!(list.parts[1].part_number.get(), 2);
1641        assert_eq!(list.parts[1].etag, etag2);
1642        assert_eq!(list.parts[1].size, 3);
1643
1644        // List with max_parts=1 to test pagination.
1645        let page1 = backend.list_parts(&id, &upload_id, Some(1), None).await?;
1646        assert_eq!(page1.parts.len(), 1);
1647        assert_eq!(page1.parts[0].part_number.get(), 1);
1648        assert!(page1.is_truncated);
1649        assert!(page1.next_part_number_marker.is_some());
1650
1651        let page2 = backend
1652            .list_parts(&id, &upload_id, Some(1), page1.next_part_number_marker)
1653            .await?;
1654        assert_eq!(page2.parts.len(), 1);
1655        assert_eq!(page2.parts[0].part_number.get(), 2);
1656
1657        // Clean up.
1658        backend.abort_multipart(&id, &upload_id).await?;
1659
1660        Ok(())
1661    }
1662
1663    #[tokio::test]
1664    async fn test_multipart_abort() -> Result<()> {
1665        let backend = create_test_backend().await?;
1666        let id = make_id();
1667        let metadata = Metadata::default();
1668
1669        let upload_id = backend.initiate_multipart(&id, &metadata).await?;
1670
1671        backend
1672            .upload_part(
1673                &id,
1674                &upload_id,
1675                NonZeroU32::new(1).unwrap(),
1676                5,
1677                None,
1678                stream::single(b"hello".to_vec()),
1679            )
1680            .await?;
1681
1682        backend.abort_multipart(&id, &upload_id).await?;
1683
1684        // Object should not exist after abort.
1685        let result = backend.get_object(&id, None).await?;
1686        assert!(result.is_none(), "object should not exist after abort");
1687
1688        Ok(())
1689    }
1690
1691    async fn multipart_put(
1692        backend: &GcsBackend,
1693        id: &ObjectId,
1694        metadata: &Metadata,
1695        payload: impl Into<bytes::Bytes>,
1696    ) -> Result<()> {
1697        let payload: bytes::Bytes = payload.into();
1698        let upload_id = backend.initiate_multipart(id, metadata).await?;
1699        let etag = backend
1700            .upload_part(
1701                id,
1702                &upload_id,
1703                NonZeroU32::new(1).unwrap(),
1704                payload.len() as u64,
1705                None,
1706                stream::single(payload),
1707            )
1708            .await?;
1709        let error = backend
1710            .complete_multipart(
1711                id,
1712                &upload_id,
1713                vec![CompletedPart {
1714                    part_number: NonZeroU32::new(1).unwrap(),
1715                    etag,
1716                }],
1717            )
1718            .await?;
1719        assert!(
1720            error.is_none(),
1721            "complete_multipart returned error: {error:?}"
1722        );
1723        Ok(())
1724    }
1725
1726    #[tokio::test]
1727    async fn test_multipart_ttl_immediate() -> Result<()> {
1728        let backend = create_test_backend().await?;
1729        let id = make_id();
1730        let metadata = Metadata {
1731            expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_secs(0)),
1732            ..Default::default()
1733        };
1734
1735        multipart_put(&backend, &id, &metadata, "hello, world").await?;
1736
1737        let result = backend.get_object(&id, None).await?;
1738        assert!(result.is_none());
1739
1740        Ok(())
1741    }
1742
1743    #[tokio::test]
1744    async fn test_multipart_tti_immediate() -> Result<()> {
1745        let backend = create_test_backend().await?;
1746        let id = make_id();
1747        let metadata = Metadata {
1748            expiration_policy: ExpirationPolicy::TimeToIdle(Duration::from_secs(0)),
1749            ..Default::default()
1750        };
1751
1752        multipart_put(&backend, &id, &metadata, "hello, world").await?;
1753
1754        let result = backend.get_object(&id, None).await?;
1755        assert!(result.is_none());
1756
1757        Ok(())
1758    }
1759
1760    #[tokio::test]
1761    async fn test_multipart_compressed_payload_roundtrip() -> Result<()> {
1762        use objectstore_types::metadata::Compression;
1763
1764        let backend = create_test_backend().await?;
1765
1766        let plaintext = b"hello, world (but compressed with zstd)";
1767        let compressed = zstd::encode_all(&plaintext[..], 3)?;
1768
1769        let id = make_id();
1770        let metadata = Metadata {
1771            content_type: "text/plain".into(),
1772            compression: Some(Compression::Zstd),
1773            ..Default::default()
1774        };
1775
1776        multipart_put(&backend, &id, &metadata, compressed.clone()).await?;
1777
1778        let (meta, _, stream) = backend.get_object(&id, None).await?.unwrap();
1779        let payload = stream::read_to_vec(stream).await?;
1780
1781        assert_eq!(meta.compression, Some(Compression::Zstd));
1782        assert_eq!(
1783            payload, compressed,
1784            "Payload should be returned still compressed, not auto-decompressed"
1785        );
1786
1787        Ok(())
1788    }
1789}