1use std::borrow::Cow;
4use std::collections::BTreeMap;
5use std::future::Future;
6use std::time::{Duration, SystemTime};
7use std::{fmt, io};
8
9use anyhow::Context;
10use futures_util::{StreamExt, TryStreamExt};
11use gcp_auth::TokenProvider;
12use objectstore_types::metadata::{ExpirationPolicy, Metadata};
13use objectstore_types::range::{ByteRange, ContentRange};
14use reqwest::header::HeaderName;
15use reqwest::{
16 Body, IntoUrl, Method, RequestBuilder, Response, StatusCode, Url, header, multipart,
17};
18use serde::{Deserialize, Serialize};
19
20use crate::backend::common::{
21 self, Backend, DeleteResponse, GetResponse, MetadataResponse, MultipartUploadBackend,
22 PutResponse,
23};
24use crate::error::{Error, Result};
25use crate::gcp_auth::PrefetchingTokenProvider;
26use crate::id::ObjectId;
27use crate::multipart::{
28 AbortMultipartResponse, CompleteMultipartResponse, CompletedPart, InitiateMultipartResponse,
29 ListPartsResponse, PartNumber, UploadId, UploadPartResponse,
30};
31use crate::stream::{self, ClientStream};
32
33#[derive(Debug, Clone, Deserialize, Serialize)]
53pub struct GcsConfig {
54 pub endpoint: Option<String>,
67
68 pub bucket: String,
76}
77
78const DEFAULT_ENDPOINT: &str = "https://storage.googleapis.com";
80const TOKEN_SCOPES: &[&str] = &["https://www.googleapis.com/auth/devstorage.read_write"];
82const TTI_DEBOUNCE: Duration = Duration::from_hours(24);
84const REQUEST_RETRY_COUNT: usize = 2;
86
87const BUILTIN_META_PREFIX: &str = "x-sn-";
89const CUSTOM_META_PREFIX: &str = "x-snme-";
91
92#[derive(Debug, Serialize, Deserialize)]
98#[serde(rename_all = "camelCase")]
99struct GcsObject {
100 pub content_type: Cow<'static, str>,
103
104 #[serde(default, skip_serializing_if = "Option::is_none")]
106 pub content_encoding: Option<String>,
107
108 #[serde(
110 default,
111 skip_serializing_if = "Option::is_none",
112 with = "humantime_serde"
113 )]
114 pub custom_time: Option<SystemTime>,
115
116 pub size: Option<String>,
121
122 #[serde(
124 default,
125 skip_serializing_if = "Option::is_none",
126 with = "humantime_serde"
127 )]
128 pub time_created: Option<SystemTime>,
129
130 #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
132 pub metadata: BTreeMap<GcsMetaKey, String>,
133}
134
135impl GcsObject {
136 pub fn from_metadata(metadata: &Metadata) -> Self {
138 let mut gcs_object = GcsObject {
139 content_type: metadata.content_type.clone(),
140 size: metadata.size.map(|size| size.to_string()),
141 content_encoding: None,
142 custom_time: None,
143 time_created: metadata.time_created,
144 metadata: BTreeMap::new(),
145 };
146
147 if let Some(expires_in) = metadata.expiration_policy.expires_in() {
151 gcs_object.custom_time = Some(SystemTime::now() + expires_in);
152 }
153
154 if let Some(compression) = metadata.compression {
155 gcs_object.content_encoding = Some(compression.to_string());
156 }
157
158 if metadata.expiration_policy != ExpirationPolicy::default() {
159 gcs_object.metadata.insert(
160 GcsMetaKey::Expiration,
161 metadata.expiration_policy.to_string(),
162 );
163 }
164
165 if let Some(origin) = &metadata.origin {
166 gcs_object
167 .metadata
168 .insert(GcsMetaKey::Origin, origin.clone());
169 }
170
171 for (key, value) in &metadata.custom {
172 gcs_object
173 .metadata
174 .insert(GcsMetaKey::Custom(key.clone()), value.clone());
175 }
176
177 gcs_object
178 }
179
180 pub fn into_metadata(mut self) -> Result<Metadata> {
182 self.metadata.remove(&GcsMetaKey::EmulatorIgnored);
184
185 let expiration_policy = self
186 .metadata
187 .remove(&GcsMetaKey::Expiration)
188 .map(|s| s.parse())
189 .transpose()?
190 .unwrap_or_default();
191
192 let origin = self.metadata.remove(&GcsMetaKey::Origin);
193
194 let content_type = self.content_type;
195 let compression = self.content_encoding.map(|s| s.parse()).transpose()?;
196 let size = self
197 .size
198 .map(|size| size.parse())
199 .transpose()
200 .map_err(|e| Error::Generic {
201 context: "GCS: failed to parse size from object metadata".to_string(),
202 cause: Some(Box::new(e)),
203 })?;
204 let time_created = self.time_created;
205
206 let mut custom = BTreeMap::new();
208 for (key, value) in self.metadata {
209 if let GcsMetaKey::Custom(custom_key) = key {
210 custom.insert(custom_key, value);
211 } else {
212 return Err(Error::Generic {
213 context: format!(
214 "GCS: unexpected built-in metadata key in object metadata: {key}"
215 ),
216 cause: None,
217 });
218 }
219 }
220
221 Ok(Metadata {
222 content_type,
223 expiration_policy,
224 compression,
225 origin,
226 size,
227 custom,
228 time_created,
229 time_expires: self.custom_time,
230 })
231 }
232}
233
234#[derive(Clone, Debug, PartialEq, Eq, Ord, PartialOrd)]
236enum GcsMetaKey {
237 Expiration,
239 Origin,
241 EmulatorIgnored,
243 Custom(String),
245}
246
247impl std::str::FromStr for GcsMetaKey {
248 type Err = anyhow::Error;
249
250 fn from_str(s: &str) -> Result<Self, Self::Err> {
251 if matches!(s, "x_emulator_upload" | "x_testbench_upload") {
252 return Ok(GcsMetaKey::EmulatorIgnored);
253 }
254
255 Ok(match s.strip_prefix(BUILTIN_META_PREFIX) {
256 Some("expiration") => GcsMetaKey::Expiration,
257 Some("origin") => GcsMetaKey::Origin,
258 Some(unknown) => anyhow::bail!("unknown builtin metadata key: {unknown}"),
259 None => match s.strip_prefix(CUSTOM_META_PREFIX) {
260 Some(key) => GcsMetaKey::Custom(key.to_string()),
261 None => anyhow::bail!("invalid GCS metadata key format: {s}"),
262 },
263 })
264 }
265}
266
267impl fmt::Display for GcsMetaKey {
268 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
269 match self {
270 Self::Expiration => write!(f, "{BUILTIN_META_PREFIX}expiration"),
271 Self::Origin => write!(f, "{BUILTIN_META_PREFIX}origin"),
272 Self::EmulatorIgnored => unreachable!("do not serialize emulator metadata"),
273 Self::Custom(key) => write!(f, "{CUSTOM_META_PREFIX}{key}"),
274 }
275 }
276}
277
278impl<'de> Deserialize<'de> for GcsMetaKey {
279 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
280 where
281 D: serde::Deserializer<'de>,
282 {
283 let s = Cow::<'de, str>::deserialize(deserializer)?;
284 s.parse().map_err(serde::de::Error::custom)
285 }
286}
287
288impl Serialize for GcsMetaKey {
289 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
290 where
291 S: serde::Serializer,
292 {
293 serializer.collect_str(self)
294 }
295}
296
297fn metadata_to_gcs_headers(metadata: &Metadata) -> Result<header::HeaderMap> {
299 let mut headers = header::HeaderMap::new();
300
301 if let Some(expires_in) = metadata.expiration_policy.expires_in() {
302 let custom_time = SystemTime::now() + expires_in;
303 let formatted = humantime::format_rfc3339_seconds(custom_time);
304 headers.insert(
305 HeaderName::from_static("x-goog-custom-time"),
306 formatted.to_string().parse().map_err(|e| Error::Generic {
307 context: "GCS: invalid custom-time header value".into(),
308 cause: Some(Box::new(e)),
309 })?,
310 );
311 }
312
313 if let Some(compression) = metadata.compression {
314 headers.insert(
315 header::CONTENT_ENCODING,
316 compression
317 .to_string()
318 .parse()
319 .map_err(|e| Error::Generic {
320 context: "GCS: invalid content-encoding header value".into(),
321 cause: Some(Box::new(e)),
322 })?,
323 );
324 }
325
326 if metadata.expiration_policy != ExpirationPolicy::default() {
327 insert_gcs_meta_header(
328 &mut headers,
329 &GcsMetaKey::Expiration,
330 &metadata.expiration_policy.to_string(),
331 )?;
332 }
333
334 if let Some(origin) = &metadata.origin {
335 insert_gcs_meta_header(&mut headers, &GcsMetaKey::Origin, origin)?;
336 }
337
338 for (key, value) in &metadata.custom {
339 insert_gcs_meta_header(&mut headers, &GcsMetaKey::Custom(key.clone()), value)?;
340 }
341
342 Ok(headers)
343}
344
345fn insert_gcs_meta_header(
346 headers: &mut header::HeaderMap,
347 key: &GcsMetaKey,
348 value: &str,
349) -> Result<()> {
350 let header_name = format!("x-goog-meta-{key}");
351 headers.insert(
352 HeaderName::try_from(&header_name).map_err(|e| Error::Generic {
353 context: format!("GCS: invalid header name: {header_name}"),
354 cause: Some(Box::new(e)),
355 })?,
356 value.parse().map_err(|e| Error::Generic {
357 context: format!("GCS: invalid header value for {header_name}"),
358 cause: Some(Box::new(e)),
359 })?,
360 );
361 Ok(())
362}
363
364fn is_retryable(error: &Error) -> bool {
366 let Error::Reqwest { cause, .. } = error else {
367 return false;
368 };
369 if cause.is_timeout() || cause.is_connect() || cause.is_request() {
370 return true;
371 }
372 let Some(status) = cause.status() else {
373 return false;
374 };
375 matches!(
377 status,
378 StatusCode::REQUEST_TIMEOUT
379 | StatusCode::TOO_MANY_REQUESTS
380 | StatusCode::INTERNAL_SERVER_ERROR
381 | StatusCode::BAD_GATEWAY
382 | StatusCode::SERVICE_UNAVAILABLE
383 | StatusCode::GATEWAY_TIMEOUT
384 )
385}
386
387pub struct GcsBackend {
389 client: reqwest::Client,
390 endpoint: Url,
391 bucket: String,
392 token_provider: Option<PrefetchingTokenProvider>,
393}
394
395impl GcsBackend {
396 pub async fn new(config: GcsConfig) -> anyhow::Result<Self> {
398 let GcsConfig { endpoint, bucket } = config;
399
400 let token_provider = if endpoint.is_none() {
401 Some(PrefetchingTokenProvider::gcp_auth(TOKEN_SCOPES).await?)
402 } else {
403 None
404 };
405
406 let endpoint_str = endpoint.as_deref().unwrap_or(DEFAULT_ENDPOINT);
407
408 Ok(Self {
409 client: common::reqwest_client(),
410 endpoint: endpoint_str.parse().context("invalid GCS endpoint URL")?,
411 bucket,
412 token_provider,
413 })
414 }
415
416 fn object_url(&self, id: &ObjectId) -> Result<Url> {
418 let mut url = self.endpoint.clone();
419
420 let path = id.as_storage_path().to_string();
421 url.path_segments_mut()
422 .map_err(|()| Error::Generic {
423 context: format!(
424 "GCS: invalid endpoint URL, {} cannot be a base",
425 self.endpoint
426 ),
427 cause: None,
428 })?
429 .extend(&["storage", "v1", "b", &self.bucket, "o", &path]);
430
431 Ok(url)
432 }
433
434 fn upload_url(&self, id: &ObjectId, upload_type: &str) -> Result<Url> {
436 let mut url = self.endpoint.clone();
437
438 url.path_segments_mut()
439 .map_err(|()| Error::Generic {
440 context: format!(
441 "GCS: invalid endpoint URL, {} cannot be a base",
442 self.endpoint
443 ),
444 cause: None,
445 })?
446 .extend(&["upload", "storage", "v1", "b", &self.bucket, "o"]);
447
448 url.query_pairs_mut()
449 .append_pair("uploadType", upload_type)
450 .append_pair("name", &id.as_storage_path().to_string());
451
452 Ok(url)
453 }
454
455 fn xml_object_url(&self, id: &ObjectId) -> Result<Url> {
462 let mut url = self.endpoint.clone();
463 {
464 let mut segments = url.path_segments_mut().map_err(|()| Error::Generic {
465 context: format!(
466 "GCS: invalid endpoint URL, {} cannot be a base",
467 self.endpoint
468 ),
469 cause: None,
470 })?;
471 segments.push(&self.bucket);
472 for part in id.as_storage_path().to_string().split('/') {
473 segments.push(part);
474 }
475 }
476 Ok(url)
477 }
478
479 async fn request(&self, method: Method, url: impl IntoUrl) -> Result<RequestBuilder> {
481 let mut builder = self.client.request(method, url);
482 if let Some(provider) = &self.token_provider {
483 let token = provider.token(TOKEN_SCOPES).await?;
484 builder = builder.bearer_auth(token.as_str());
485 }
486 Ok(builder)
487 }
488
489 async fn with_retry<T, F>(&self, action: &'static str, f: impl Fn() -> F) -> Result<T>
491 where
492 F: Future<Output = Result<T>> + Send,
493 {
494 let mut retry_count = 0usize;
495 loop {
496 match f().await {
497 Ok(res) => return Ok(res),
498 Err(ref e) if retry_count < REQUEST_RETRY_COUNT && is_retryable(e) => {
499 retry_count += 1;
500 objectstore_metrics::count!("gcs.retries", action = action);
501 objectstore_log::warn!(!!e, retry_count, action, "Retrying request");
502 }
503 Err(e) => {
504 objectstore_metrics::count!("gcs.failures", action = action);
505 return Err(e);
506 }
507 }
508 }
509 }
510
511 async fn fetch_gcs_metadata(&self, object_url: &Url) -> Result<Option<Metadata>> {
514 let metadata_opt = self
515 .with_retry("get_metadata", || async {
516 let resp = self
517 .request(Method::GET, object_url.clone())
518 .await?
519 .send()
520 .await
521 .map_err(|e| Error::reqwest("GCS: get metadata request", e))?;
522
523 if resp.status() == StatusCode::NOT_FOUND {
524 return Ok(None);
525 }
526
527 let metadata: GcsObject = resp
528 .error_for_status()
529 .map_err(|e| Error::reqwest("GCS: get metadata status", e))?
530 .json()
531 .await
532 .map_err(|e| Error::reqwest("GCS: get metadata parse", e))?;
533
534 Ok(Some(metadata))
535 })
536 .await?;
537
538 let Some(gcs_metadata) = metadata_opt else {
539 objectstore_log::debug!("Object not found");
540 return Ok(None);
541 };
542
543 let expire_at = gcs_metadata.custom_time;
544 let metadata = gcs_metadata.into_metadata()?;
545
546 let access_time = SystemTime::now();
548
549 if metadata.expiration_policy.is_timeout() && expire_at.is_some_and(|ts| ts < access_time) {
551 objectstore_log::debug!("Object found but past expiry");
552 return Ok(None);
553 }
554
555 if let ExpirationPolicy::TimeToIdle(tti) = metadata.expiration_policy {
557 let new_expire_at = access_time + tti;
558 if expire_at.is_some_and(|ts| ts < new_expire_at - TTI_DEBOUNCE) {
559 self.update_custom_time(object_url.clone(), new_expire_at)
560 .await?;
561 }
562 }
563
564 Ok(Some(metadata))
565 }
566
567 async fn update_custom_time(&self, object_url: Url, custom_time: SystemTime) -> Result<()> {
568 #[derive(Debug, Serialize)]
569 #[serde(rename_all = "camelCase")]
570 struct CustomTimeRequest {
571 #[serde(with = "humantime_serde")]
572 custom_time: SystemTime,
573 }
574
575 self.with_retry("update_custom_time", || async {
576 self.request(Method::PATCH, object_url.clone())
577 .await?
578 .json(&CustomTimeRequest { custom_time })
579 .send()
580 .await
581 .and_then(Response::error_for_status)
582 .map_err(|e| Error::reqwest("GCS: update custom time", e))?;
583 Ok(())
584 })
585 .await
586 }
587}
588
589impl fmt::Debug for GcsBackend {
590 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
591 f.debug_struct("GcsJsonApi")
592 .field("endpoint", &self.endpoint)
593 .field("bucket", &self.bucket)
594 .finish_non_exhaustive()
595 }
596}
597
598#[async_trait::async_trait]
599impl Backend for GcsBackend {
600 fn name(&self) -> &'static str {
601 "gcs"
602 }
603
604 fn as_multipart_upload_backend(&self) -> Result<&dyn MultipartUploadBackend> {
605 Ok(self)
606 }
607
608 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
609 async fn put_object(
610 &self,
611 id: &ObjectId,
612 metadata: &Metadata,
613 stream: ClientStream,
614 ) -> Result<PutResponse> {
615 objectstore_log::debug!("Writing to GCS backend");
616 let gcs_metadata = GcsObject::from_metadata(metadata);
617
618 let metadata_json = serde_json::to_string(&gcs_metadata).map_err(|cause| Error::Serde {
621 context: "failed to serialize metadata for GCS upload".to_string(),
622 cause,
623 })?;
624
625 let multipart = multipart::Form::new()
626 .part(
627 "metadata",
628 multipart::Part::text(metadata_json)
629 .mime_str("application/json")
630 .expect("application/json is a valid mime type"),
631 )
632 .part(
633 "media",
634 multipart::Part::stream(Body::wrap_stream(stream))
635 .mime_str(&metadata.content_type)
636 .map_err(|e| Error::Generic {
637 context: format!("invalid mime type: {}", &metadata.content_type),
638 cause: Some(Box::new(e)),
639 })?,
640 );
641
642 let content_type = format!("multipart/related; boundary={}", multipart.boundary());
646
647 self.request(Method::POST, self.upload_url(id, "multipart")?)
648 .await?
649 .multipart(multipart)
650 .header(header::CONTENT_TYPE, content_type)
651 .send()
652 .await
653 .and_then(Response::error_for_status)
654 .map_err(|e| match stream::unpack_client_error(&e) {
655 Some(ce) => Error::Client(ce),
656 _ => Error::reqwest("GCS: upload object", e),
657 })?;
658
659 Ok(())
660 }
661
662 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
663 async fn get_object(&self, id: &ObjectId, range: Option<ByteRange>) -> Result<GetResponse> {
664 objectstore_log::debug!("Reading from GCS backend");
665 let object_url = self.object_url(id)?;
666
667 let Some(metadata) = self.fetch_gcs_metadata(&object_url).await? else {
668 return Ok(None);
669 };
670
671 let mut download_url = object_url;
672 download_url.query_pairs_mut().append_pair("alt", "media");
673
674 let payload_response = self
675 .with_retry("get_payload", || async {
676 let mut req = self.request(Method::GET, download_url.clone()).await?;
677 if let Some(r) = range {
678 req = req.header(header::RANGE, r.to_header_value());
679 }
680 let resp = req
681 .send()
682 .await
683 .map_err(|e| Error::reqwest("GCS: get payload", e))?;
684
685 if resp.status() == StatusCode::RANGE_NOT_SATISFIABLE {
686 let raw = resp
687 .headers()
688 .get(header::CONTENT_RANGE)
689 .and_then(|v| v.to_str().ok());
690 let total = raw.and_then(ContentRange::parse_unsatisfiable_total);
691 match total {
692 Some(total) => return Err(Error::RangeNotSatisfiable { total }),
693 None => {
694 return Err(Error::Generic {
695 context: format!(
696 "GCS: 416 response with invalid Content-Range: {raw:?}"
697 ),
698 cause: None,
699 });
700 }
701 }
702 }
703
704 resp.error_for_status()
705 .map_err(|e| Error::reqwest("GCS: get payload", e))
706 })
707 .await?;
708
709 let content_range = if payload_response.status() == StatusCode::PARTIAL_CONTENT {
710 Some(
711 payload_response
712 .headers()
713 .get(header::CONTENT_RANGE)
714 .and_then(|v| v.to_str().ok())
715 .and_then(|s| s.parse::<ContentRange>().ok())
716 .ok_or_else(|| Error::Generic {
717 context: "GCS: 206 response missing valid Content-Range header".to_owned(),
718 cause: None,
719 })?,
720 )
721 } else {
722 None
723 };
724
725 let stream = payload_response
726 .bytes_stream()
727 .map_err(io::Error::other)
728 .boxed();
729
730 Ok(Some((metadata, content_range, stream)))
731 }
732
733 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
734 async fn get_metadata(&self, id: &ObjectId) -> Result<MetadataResponse> {
735 objectstore_log::debug!("Reading metadata from GCS backend");
736 let object_url = self.object_url(id)?;
737 self.fetch_gcs_metadata(&object_url).await
738 }
739
740 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
741 async fn delete_object(&self, id: &ObjectId) -> Result<DeleteResponse> {
742 objectstore_log::debug!("Deleting from GCS backend");
743 let object_url = self.object_url(id)?;
744
745 self.with_retry("delete", || async {
746 let resp = self
747 .request(Method::DELETE, object_url.clone())
748 .await?
749 .send()
750 .await
751 .map_err(|e| Error::reqwest("GCS: delete object", e))?;
752
753 if resp.status() == StatusCode::NOT_FOUND {
755 return Ok(());
756 }
757
758 resp.error_for_status()
759 .map_err(|e| Error::reqwest("GCS: delete object", e))?;
760
761 Ok(())
762 })
763 .await
764 }
765}
766
767#[derive(Debug, Deserialize)]
768#[serde(rename_all = "PascalCase")]
769struct XmlInitiateMultipartUploadResponse {
770 upload_id: String,
771}
772
773impl TryFrom<XmlInitiateMultipartUploadResponse> for InitiateMultipartResponse {
774 type Error = Error;
775
776 fn try_from(r: XmlInitiateMultipartUploadResponse) -> Result<Self> {
777 Ok(UploadId::new(r.upload_id)?)
778 }
779}
780
781#[derive(Debug, Deserialize)]
782#[serde(rename_all = "PascalCase")]
783struct XmlListPartsResponse {
784 #[serde(default)]
785 is_truncated: bool,
786 next_part_number_marker: Option<PartNumber>,
787 #[serde(default, rename = "Part")]
788 parts: Vec<XmlPart>,
789}
790
791impl From<XmlListPartsResponse> for ListPartsResponse {
792 fn from(xml: XmlListPartsResponse) -> Self {
793 Self {
794 parts: xml.parts.into_iter().map(Into::into).collect(),
795 is_truncated: xml.is_truncated,
796 next_part_number_marker: xml.next_part_number_marker,
797 }
798 }
799}
800
801#[derive(Debug, Deserialize)]
802#[serde(rename_all = "PascalCase")]
803struct XmlPart {
804 part_number: PartNumber,
805 #[serde(rename = "ETag")]
806 e_tag: String,
807 #[serde(with = "humantime_serde")]
808 last_modified: SystemTime,
809 size: u64,
810}
811
812impl From<XmlPart> for crate::multipart::Part {
813 fn from(p: XmlPart) -> Self {
814 Self {
815 part_number: p.part_number,
816 etag: p.e_tag,
817 last_modified: p.last_modified,
818 size: p.size,
819 }
820 }
821}
822
823#[derive(Debug, Serialize)]
824#[serde(rename = "CompleteMultipartUpload")]
825struct XmlCompleteMultipartUpload {
826 #[serde(rename = "Part")]
827 parts: Vec<XmlCompletePart>,
828}
829
830impl From<Vec<CompletedPart>> for XmlCompleteMultipartUpload {
831 fn from(parts: Vec<CompletedPart>) -> Self {
832 Self {
833 parts: parts.into_iter().map(Into::into).collect(),
834 }
835 }
836}
837
838#[derive(Debug, Serialize)]
839#[serde(rename_all = "PascalCase")]
840struct XmlCompletePart {
841 part_number: PartNumber,
842 #[serde(rename = "ETag")]
843 e_tag: String,
844}
845
846impl From<CompletedPart> for XmlCompletePart {
847 fn from(p: CompletedPart) -> Self {
848 Self {
849 part_number: p.part_number,
850 e_tag: p.etag,
851 }
852 }
853}
854
855#[derive(Debug, Deserialize)]
856#[serde(rename = "Error", rename_all = "PascalCase")]
857struct XmlError {
858 code: String,
859 message: String,
860}
861
862impl From<XmlError> for crate::multipart::CompleteMultipartError {
863 fn from(e: XmlError) -> Self {
864 Self {
865 code: e.code,
866 message: e.message,
867 }
868 }
869}
870
871#[async_trait::async_trait]
875impl MultipartUploadBackend for GcsBackend {
876 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
877 async fn initiate_multipart(
878 &self,
879 id: &ObjectId,
880 metadata: &Metadata,
881 ) -> Result<InitiateMultipartResponse> {
882 objectstore_log::debug!("Initiating multipart upload on GCS backend");
883 let mut url = self.xml_object_url(id)?;
884 url.set_query(Some("uploads"));
885
886 let mut builder = self
887 .request(Method::POST, url)
888 .await?
889 .header(header::CONTENT_TYPE, metadata.content_type.as_ref())
890 .header(header::CONTENT_LENGTH, "0");
891
892 let meta_headers = metadata_to_gcs_headers(metadata)?;
893 for (name, value) in &meta_headers {
894 builder = builder.header(name, value);
895 }
896
897 let resp = builder
898 .send()
899 .await
900 .and_then(Response::error_for_status)
901 .map_err(|e| Error::reqwest("GCS: initiate multipart upload", e))?;
902
903 let body = resp
904 .bytes()
905 .await
906 .map_err(|e| Error::reqwest("GCS: read initiate multipart body", e))?;
907
908 let xml: XmlInitiateMultipartUploadResponse = quick_xml::de::from_reader(body.as_ref())
909 .map_err(|e| Error::Generic {
910 context: "GCS: failed to parse initiate multipart response".to_owned(),
911 cause: Some(Box::new(e)),
912 })?;
913
914 Ok(xml.try_into()?)
915 }
916
917 #[tracing::instrument(level = "trace", fields(?id, upload_id, part_number), skip_all)]
918 async fn upload_part(
919 &self,
920 id: &ObjectId,
921 upload_id: &UploadId,
922 part_number: PartNumber,
923 content_length: u64,
924 content_md5: Option<&str>,
925 body: ClientStream,
926 ) -> Result<UploadPartResponse> {
927 objectstore_log::debug!("Uploading part to GCS backend");
928 let mut url = self.xml_object_url(id)?;
929 url.query_pairs_mut()
930 .append_pair("partNumber", &part_number.to_string())
931 .append_pair("uploadId", upload_id);
932
933 let mut builder = self
934 .request(Method::PUT, url)
935 .await?
936 .header(header::CONTENT_LENGTH, content_length)
937 .body(Body::wrap_stream(body));
938
939 if let Some(md5) = content_md5 {
940 builder = builder.header("content-md5", md5);
941 }
942
943 let resp = builder
944 .send()
945 .await
946 .and_then(Response::error_for_status)
947 .map_err(|e| Error::reqwest("GCS: upload part", e))?;
948
949 let etag = resp
950 .headers()
951 .get(header::ETAG)
952 .and_then(|v| v.to_str().ok())
953 .map(|s| s.to_owned())
954 .ok_or_else(|| Error::generic("GCS: upload part response missing ETag header"))?;
955
956 Ok(etag)
957 }
958
959 #[tracing::instrument(level = "trace", fields(?id, upload_id), skip_all)]
960 async fn list_parts(
961 &self,
962 id: &ObjectId,
963 upload_id: &UploadId,
964 max_parts: Option<u32>,
965 part_number_marker: Option<PartNumber>,
966 ) -> Result<ListPartsResponse> {
967 objectstore_log::debug!("Listing parts on GCS backend");
968 let mut url = self.xml_object_url(id)?;
969 {
970 let mut pairs = url.query_pairs_mut();
971 pairs.append_pair("uploadId", upload_id);
972 if let Some(max) = max_parts {
973 pairs.append_pair("max-parts", &max.to_string());
974 }
975 if let Some(marker) = part_number_marker {
976 pairs.append_pair("part-number-marker", &marker.to_string());
977 }
978 }
979
980 let resp = self
981 .request(Method::GET, url)
982 .await?
983 .send()
984 .await
985 .and_then(Response::error_for_status)
986 .map_err(|e| Error::reqwest("GCS: list parts", e))?;
987
988 let body = resp
989 .bytes()
990 .await
991 .map_err(|e| Error::reqwest("GCS: read list parts body", e))?;
992
993 let xml: XmlListPartsResponse =
994 quick_xml::de::from_reader(body.as_ref()).map_err(|e| Error::Generic {
995 context: "GCS: failed to parse list parts response".to_owned(),
996 cause: Some(Box::new(e)),
997 })?;
998
999 Ok(xml.into())
1000 }
1001
1002 #[tracing::instrument(level = "trace", fields(?id, upload_id), skip_all)]
1003 async fn abort_multipart(
1004 &self,
1005 id: &ObjectId,
1006 upload_id: &UploadId,
1007 ) -> Result<AbortMultipartResponse> {
1008 objectstore_log::debug!("Aborting multipart upload on GCS backend");
1009 let mut url = self.xml_object_url(id)?;
1010 url.query_pairs_mut().append_pair("uploadId", upload_id);
1011
1012 self.request(Method::DELETE, url)
1013 .await?
1014 .send()
1015 .await
1016 .and_then(Response::error_for_status)
1017 .map_err(|e| Error::reqwest("GCS: abort multipart upload", e))?;
1018
1019 Ok(())
1020 }
1021
1022 #[tracing::instrument(level = "trace", fields(?id, upload_id), skip_all)]
1023 async fn complete_multipart(
1024 &self,
1025 id: &ObjectId,
1026 upload_id: &UploadId,
1027 parts: Vec<CompletedPart>,
1028 ) -> Result<CompleteMultipartResponse> {
1029 objectstore_log::debug!("Completing multipart upload on GCS backend");
1030 let mut url = self.xml_object_url(id)?;
1031 url.query_pairs_mut().append_pair("uploadId", upload_id);
1032
1033 let body = XmlCompleteMultipartUpload::from(parts);
1034 let xml = quick_xml::se::to_string(&body).map_err(|e| Error::Generic {
1035 context: "GCS: failed to serialize complete multipart request".into(),
1036 cause: Some(Box::new(e)),
1037 })?;
1038
1039 let resp = self
1040 .request(Method::POST, url)
1041 .await?
1042 .header(header::CONTENT_TYPE, "application/xml")
1043 .body(xml)
1044 .send()
1045 .await
1046 .and_then(Response::error_for_status)
1047 .map_err(|e| Error::reqwest("GCS: complete multipart upload", e))?;
1048
1049 let body = resp
1050 .bytes()
1051 .await
1052 .map_err(|e| Error::reqwest("GCS: read complete multipart body", e))?;
1053
1054 let error = quick_xml::de::from_reader::<_, XmlError>(body.as_ref())
1055 .ok()
1056 .map(Into::into);
1057
1058 Ok(error)
1059 }
1060}
1061
1062#[cfg(test)]
1063mod tests {
1064 use std::collections::BTreeMap;
1065 use std::num::NonZeroU32;
1066
1067 use anyhow::Result;
1068 use objectstore_types::scope::{Scope, Scopes};
1069
1070 use super::*;
1071 use crate::id::ObjectContext;
1072 use crate::multipart::CompletedPart;
1073 use crate::stream;
1074
1075 async fn create_test_backend() -> Result<GcsBackend> {
1081 GcsBackend::new(GcsConfig {
1082 endpoint: Some("http://localhost:8087".into()),
1083 bucket: "test-bucket".into(),
1084 })
1085 .await
1086 }
1087
1088 fn make_id() -> ObjectId {
1089 ObjectId::random(ObjectContext {
1090 usecase: "testing".into(),
1091 scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
1092 })
1093 }
1094
1095 #[tokio::test]
1096 async fn test_roundtrip() -> Result<()> {
1097 let backend = create_test_backend().await?;
1098
1099 let id = make_id();
1100 let metadata = Metadata {
1101 content_type: "text/plain".into(),
1102 expiration_policy: ExpirationPolicy::Manual,
1103 compression: None,
1104 origin: Some("203.0.113.42".into()),
1105 custom: BTreeMap::from_iter([("hello".into(), "world".into())]),
1106 time_created: Some(SystemTime::now()),
1107 time_expires: None,
1108 size: None,
1109 };
1110
1111 backend
1112 .put_object(&id, &metadata, stream::single("hello, world"))
1113 .await?;
1114
1115 let (meta, _, stream) = backend.get_object(&id, None).await?.unwrap();
1116
1117 let payload = stream::read_to_vec(stream).await?;
1118 let str_payload = str::from_utf8(&payload).unwrap();
1119 assert_eq!(str_payload, "hello, world");
1120 assert_eq!(meta.content_type, metadata.content_type);
1121 assert_eq!(meta.origin, metadata.origin);
1122 assert_eq!(meta.custom, metadata.custom);
1123 assert!(metadata.time_created.is_some());
1124
1125 Ok(())
1126 }
1127
1128 #[tokio::test]
1129 async fn test_get_nonexistent() -> Result<()> {
1130 let backend = create_test_backend().await?;
1131
1132 let id = make_id();
1133 let result = backend.get_object(&id, None).await?;
1134 assert!(result.is_none());
1135
1136 Ok(())
1137 }
1138
1139 #[tokio::test]
1140 async fn test_delete_nonexistent() -> Result<()> {
1141 let backend = create_test_backend().await?;
1142
1143 let id = make_id();
1144 backend.delete_object(&id).await?;
1145
1146 Ok(())
1147 }
1148
1149 #[tokio::test]
1150 async fn test_overwrite() -> Result<()> {
1151 let backend = create_test_backend().await?;
1152
1153 let id = make_id();
1154 let metadata = Metadata {
1155 custom: BTreeMap::from_iter([("invalid".into(), "invalid".into())]),
1156 ..Default::default()
1157 };
1158
1159 backend
1160 .put_object(&id, &metadata, stream::single("hello"))
1161 .await?;
1162
1163 let metadata = Metadata {
1164 custom: BTreeMap::from_iter([("hello".into(), "world".into())]),
1165 ..Default::default()
1166 };
1167
1168 backend
1169 .put_object(&id, &metadata, stream::single("world"))
1170 .await?;
1171
1172 let (meta, _, stream) = backend.get_object(&id, None).await?.unwrap();
1173
1174 let payload = stream::read_to_vec(stream).await?;
1175 let str_payload = str::from_utf8(&payload).unwrap();
1176 assert_eq!(str_payload, "world");
1177 assert_eq!(meta.custom, metadata.custom);
1178
1179 Ok(())
1180 }
1181
1182 #[tokio::test]
1183 async fn test_read_after_delete() -> Result<()> {
1184 let backend = create_test_backend().await?;
1185
1186 let id = make_id();
1187 let metadata = Metadata::default();
1188
1189 backend
1190 .put_object(&id, &metadata, stream::single("hello, world"))
1191 .await?;
1192
1193 backend.delete_object(&id).await?;
1194
1195 let result = backend.get_object(&id, None).await?;
1196 assert!(result.is_none());
1197
1198 Ok(())
1199 }
1200
1201 #[tokio::test]
1202 async fn test_ttl_immediate() -> Result<()> {
1203 let backend = create_test_backend().await?;
1207
1208 let id = make_id();
1209 let metadata = Metadata {
1210 expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_secs(0)),
1211 ..Default::default()
1212 };
1213
1214 backend
1215 .put_object(&id, &metadata, stream::single("hello, world"))
1216 .await?;
1217
1218 let result = backend.get_object(&id, None).await?;
1219 assert!(result.is_none());
1220
1221 Ok(())
1222 }
1223
1224 #[tokio::test]
1225 async fn test_tti_immediate() -> Result<()> {
1226 let backend = create_test_backend().await?;
1230
1231 let id = make_id();
1232 let metadata = Metadata {
1233 expiration_policy: ExpirationPolicy::TimeToIdle(Duration::from_secs(0)),
1234 ..Default::default()
1235 };
1236
1237 backend
1238 .put_object(&id, &metadata, stream::single("hello, world"))
1239 .await?;
1240
1241 let result = backend.get_object(&id, None).await?;
1242 assert!(result.is_none());
1243
1244 Ok(())
1245 }
1246
1247 #[tokio::test]
1248 async fn test_get_metadata_returns_metadata() -> Result<()> {
1249 let backend = create_test_backend().await?;
1250
1251 let id = make_id();
1252 let metadata = Metadata {
1253 content_type: "text/plain".into(),
1254 origin: Some("203.0.113.42".into()),
1255 custom: BTreeMap::from_iter([("hello".into(), "world".into())]),
1256 ..Default::default()
1257 };
1258
1259 backend
1260 .put_object(&id, &metadata, stream::single("hello, world"))
1261 .await?;
1262
1263 let meta = backend.get_metadata(&id).await?.unwrap();
1264 assert_eq!(meta.content_type, metadata.content_type);
1265 assert_eq!(meta.origin, metadata.origin);
1266 assert_eq!(meta.custom, metadata.custom);
1267
1268 Ok(())
1269 }
1270
1271 #[tokio::test]
1272 async fn test_get_metadata_nonexistent() -> Result<()> {
1273 let backend = create_test_backend().await?;
1274
1275 let id = make_id();
1276 let result = backend.get_metadata(&id).await?;
1277 assert!(result.is_none());
1278
1279 Ok(())
1280 }
1281
1282 #[tokio::test]
1283 async fn test_get_metadata_bumps_tti() -> Result<()> {
1284 let backend = create_test_backend().await?;
1285
1286 let id = make_id();
1287 let tti = Duration::from_hours(2 * 24);
1289 let metadata = Metadata {
1290 content_type: "text/plain".into(),
1291 expiration_policy: ExpirationPolicy::TimeToIdle(tti),
1292 ..Default::default()
1293 };
1294
1295 backend
1296 .put_object(&id, &metadata, stream::single("hello, world"))
1297 .await?;
1298
1299 let object_url = backend.object_url(&id)?;
1302 let old_deadline = SystemTime::now() + tti - TTI_DEBOUNCE - Duration::from_mins(1);
1303 backend.update_custom_time(object_url, old_deadline).await?;
1304
1305 let pre_meta = backend.get_metadata(&id).await?.unwrap();
1307 let pre_expiry = pre_meta.time_expires.unwrap();
1308
1309 let post_meta = backend.get_metadata(&id).await?.unwrap();
1311 let post_expiry = post_meta.time_expires.unwrap();
1312 assert!(
1313 post_expiry > pre_expiry,
1314 "TTI bump should have extended the expiry: {pre_expiry:?} -> {post_expiry:?}"
1315 );
1316
1317 let (_, _, stream) = backend.get_object(&id, None).await?.unwrap();
1319 let payload = stream::read_to_vec(stream).await?;
1320 assert_eq!(&payload, b"hello, world");
1321
1322 Ok(())
1323 }
1324
1325 #[tokio::test]
1326 async fn test_get_metadata_does_not_bump_fresh_tti() -> Result<()> {
1327 let backend = create_test_backend().await?;
1328
1329 let id = make_id();
1330 let tti = Duration::from_hours(2 * 24);
1332 let metadata = Metadata {
1333 content_type: "text/plain".into(),
1334 expiration_policy: ExpirationPolicy::TimeToIdle(tti),
1335 ..Default::default()
1336 };
1337
1338 backend
1339 .put_object(&id, &metadata, stream::single("hello, world"))
1340 .await?;
1341
1342 let first = backend.get_metadata(&id).await?.unwrap();
1345 let first_expiry = first.time_expires.unwrap();
1346
1347 let second = backend.get_metadata(&id).await?.unwrap();
1348 let second_expiry = second.time_expires.unwrap();
1349
1350 assert_eq!(
1351 first_expiry, second_expiry,
1352 "Fresh TTI object should not have its expiry bumped"
1353 );
1354
1355 Ok(())
1356 }
1357
1358 #[tokio::test]
1359 async fn test_compressed_payload_roundtrip() -> Result<()> {
1360 use objectstore_types::metadata::Compression;
1361
1362 let backend = create_test_backend().await?;
1363
1364 let plaintext = b"hello, world (but compressed with zstd)";
1365 let compressed = zstd::encode_all(&plaintext[..], 3)?;
1366
1367 let id = make_id();
1368 let metadata = Metadata {
1369 content_type: "text/plain".into(),
1370 compression: Some(Compression::Zstd),
1371 ..Default::default()
1372 };
1373
1374 backend
1375 .put_object(&id, &metadata, stream::single(compressed.clone()))
1376 .await?;
1377
1378 let (meta, _, stream) = backend.get_object(&id, None).await?.unwrap();
1379 let payload = stream::read_to_vec(stream).await?;
1380
1381 assert_eq!(meta.compression, Some(Compression::Zstd));
1382 assert_eq!(
1383 payload, compressed,
1384 "Payload should be returned still compressed, not auto-decompressed"
1385 );
1386
1387 Ok(())
1388 }
1389
1390 #[tokio::test]
1391 async fn test_multipart_single_part() -> Result<()> {
1392 let backend = create_test_backend().await?;
1393 let id = make_id();
1394 let metadata = Metadata {
1395 content_type: "text/plain".into(),
1396 expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_mins(33)),
1397 origin: Some("203.0.113.42".into()),
1398 custom: BTreeMap::from_iter([("hello".into(), "world".into())]),
1399 ..Default::default()
1400 };
1401
1402 let upload_id = backend.initiate_multipart(&id, &metadata).await?;
1403
1404 let data = b"hello, multipart world!";
1405 let etag = backend
1406 .upload_part(
1407 &id,
1408 &upload_id,
1409 NonZeroU32::new(1).unwrap(),
1410 data.len() as u64,
1411 None,
1412 stream::single(data.to_vec()),
1413 )
1414 .await?;
1415
1416 let result = backend
1417 .complete_multipart(
1418 &id,
1419 &upload_id,
1420 vec![CompletedPart {
1421 part_number: NonZeroU32::new(1).unwrap(),
1422 etag,
1423 }],
1424 )
1425 .await?;
1426 assert!(result.is_none(), "expected no error on complete");
1427
1428 let (meta, _, stream) = backend.get_object(&id, None).await?.unwrap();
1429 let payload = stream::read_to_vec(stream).await?;
1430 assert_eq!(payload, data);
1431 assert_eq!(meta.content_type, "text/plain".to_string());
1432 assert_eq!(
1433 meta.expiration_policy,
1434 ExpirationPolicy::TimeToLive(Duration::from_mins(33))
1435 );
1436 assert_eq!(meta.origin, Some("203.0.113.42".into()));
1437 assert_eq!(
1438 meta.custom,
1439 BTreeMap::from_iter([("hello".into(), "world".into())])
1440 );
1441
1442 Ok(())
1443 }
1444
1445 #[tokio::test]
1446 async fn test_multipart_multiple_parts() -> Result<()> {
1447 let backend = create_test_backend().await?;
1448 let id = make_id();
1449 let metadata = Metadata::default();
1450
1451 let upload_id = backend.initiate_multipart(&id, &metadata).await?;
1452
1453 const MIN_PART: usize = 5 * 1024 * 1024;
1455 let part1 = vec![b'a'; MIN_PART];
1456 let part2 = vec![b'b'; MIN_PART];
1457 let part3 = b"cccc".to_vec();
1458
1459 let etag1 = backend
1460 .upload_part(
1461 &id,
1462 &upload_id,
1463 NonZeroU32::new(1).unwrap(),
1464 part1.len() as u64,
1465 None,
1466 stream::single(part1.clone()),
1467 )
1468 .await?;
1469 let etag2 = backend
1470 .upload_part(
1471 &id,
1472 &upload_id,
1473 NonZeroU32::new(2).unwrap(),
1474 part2.len() as u64,
1475 None,
1476 stream::single(part2.clone()),
1477 )
1478 .await?;
1479 let etag3 = backend
1480 .upload_part(
1481 &id,
1482 &upload_id,
1483 NonZeroU32::new(3).unwrap(),
1484 part3.len() as u64,
1485 None,
1486 stream::single(part3.clone()),
1487 )
1488 .await?;
1489
1490 let result = backend
1491 .complete_multipart(
1492 &id,
1493 &upload_id,
1494 vec![
1495 CompletedPart {
1496 part_number: NonZeroU32::new(1).unwrap(),
1497 etag: etag1,
1498 },
1499 CompletedPart {
1500 part_number: NonZeroU32::new(2).unwrap(),
1501 etag: etag2,
1502 },
1503 CompletedPart {
1504 part_number: NonZeroU32::new(3).unwrap(),
1505 etag: etag3,
1506 },
1507 ],
1508 )
1509 .await?;
1510 assert!(result.is_none(), "expected no error on complete");
1511
1512 let (_meta, _, stream) = backend.get_object(&id, None).await?.unwrap();
1514 let payload = stream::read_to_vec(stream).await?;
1515 let mut expected = Vec::new();
1516 expected.extend_from_slice(&part1);
1517 expected.extend_from_slice(&part2);
1518 expected.extend_from_slice(&part3);
1519 assert_eq!(payload, expected);
1520
1521 Ok(())
1522 }
1523
1524 #[tokio::test]
1525 async fn test_multipart_out_of_order_upload() -> Result<()> {
1526 let backend = create_test_backend().await?;
1527 let id = make_id();
1528 let metadata = Metadata::default();
1529
1530 let upload_id = backend.initiate_multipart(&id, &metadata).await?;
1531
1532 const MIN_PART: usize = 5 * 1024 * 1024;
1534 let part1 = vec![b'a'; MIN_PART];
1535 let part2 = vec![b'b'; MIN_PART];
1536 let part3 = b"cccc".to_vec();
1537
1538 let etag2 = backend
1540 .upload_part(
1541 &id,
1542 &upload_id,
1543 NonZeroU32::new(2).unwrap(),
1544 part2.len() as u64,
1545 None,
1546 stream::single(part2.clone()),
1547 )
1548 .await?;
1549 let etag3 = backend
1550 .upload_part(
1551 &id,
1552 &upload_id,
1553 NonZeroU32::new(3).unwrap(),
1554 part3.len() as u64,
1555 None,
1556 stream::single(part3.clone()),
1557 )
1558 .await?;
1559 let etag1 = backend
1560 .upload_part(
1561 &id,
1562 &upload_id,
1563 NonZeroU32::new(1).unwrap(),
1564 part1.len() as u64,
1565 None,
1566 stream::single(part1.clone()),
1567 )
1568 .await?;
1569
1570 let result = backend
1572 .complete_multipart(
1573 &id,
1574 &upload_id,
1575 vec![
1576 CompletedPart {
1577 part_number: NonZeroU32::new(1).unwrap(),
1578 etag: etag1,
1579 },
1580 CompletedPart {
1581 part_number: NonZeroU32::new(2).unwrap(),
1582 etag: etag2,
1583 },
1584 CompletedPart {
1585 part_number: NonZeroU32::new(3).unwrap(),
1586 etag: etag3,
1587 },
1588 ],
1589 )
1590 .await?;
1591 assert!(result.is_none(), "expected no error on complete");
1592
1593 let (_meta, _, stream) = backend.get_object(&id, None).await?.unwrap();
1595 let payload = stream::read_to_vec(stream).await?;
1596 let mut expected = Vec::new();
1597 expected.extend_from_slice(&part1);
1598 expected.extend_from_slice(&part2);
1599 expected.extend_from_slice(&part3);
1600 assert_eq!(payload, expected);
1601
1602 Ok(())
1603 }
1604
1605 #[tokio::test]
1606 async fn test_multipart_list_parts() -> Result<()> {
1607 let backend = create_test_backend().await?;
1608 let id = make_id();
1609 let metadata = Metadata::default();
1610
1611 let upload_id = backend.initiate_multipart(&id, &metadata).await?;
1612
1613 let etag1 = backend
1614 .upload_part(
1615 &id,
1616 &upload_id,
1617 NonZeroU32::new(1).unwrap(),
1618 3,
1619 None,
1620 stream::single(b"aaa".to_vec()),
1621 )
1622 .await?;
1623 let etag2 = backend
1624 .upload_part(
1625 &id,
1626 &upload_id,
1627 NonZeroU32::new(2).unwrap(),
1628 3,
1629 None,
1630 stream::single(b"bbb".to_vec()),
1631 )
1632 .await?;
1633
1634 let list = backend.list_parts(&id, &upload_id, None, None).await?;
1636 assert_eq!(list.parts.len(), 2);
1637 assert_eq!(list.parts[0].part_number.get(), 1);
1638 assert_eq!(list.parts[0].etag, etag1);
1639 assert_eq!(list.parts[0].size, 3);
1640 assert_eq!(list.parts[1].part_number.get(), 2);
1641 assert_eq!(list.parts[1].etag, etag2);
1642 assert_eq!(list.parts[1].size, 3);
1643
1644 let page1 = backend.list_parts(&id, &upload_id, Some(1), None).await?;
1646 assert_eq!(page1.parts.len(), 1);
1647 assert_eq!(page1.parts[0].part_number.get(), 1);
1648 assert!(page1.is_truncated);
1649 assert!(page1.next_part_number_marker.is_some());
1650
1651 let page2 = backend
1652 .list_parts(&id, &upload_id, Some(1), page1.next_part_number_marker)
1653 .await?;
1654 assert_eq!(page2.parts.len(), 1);
1655 assert_eq!(page2.parts[0].part_number.get(), 2);
1656
1657 backend.abort_multipart(&id, &upload_id).await?;
1659
1660 Ok(())
1661 }
1662
1663 #[tokio::test]
1664 async fn test_multipart_abort() -> Result<()> {
1665 let backend = create_test_backend().await?;
1666 let id = make_id();
1667 let metadata = Metadata::default();
1668
1669 let upload_id = backend.initiate_multipart(&id, &metadata).await?;
1670
1671 backend
1672 .upload_part(
1673 &id,
1674 &upload_id,
1675 NonZeroU32::new(1).unwrap(),
1676 5,
1677 None,
1678 stream::single(b"hello".to_vec()),
1679 )
1680 .await?;
1681
1682 backend.abort_multipart(&id, &upload_id).await?;
1683
1684 let result = backend.get_object(&id, None).await?;
1686 assert!(result.is_none(), "object should not exist after abort");
1687
1688 Ok(())
1689 }
1690
1691 async fn multipart_put(
1692 backend: &GcsBackend,
1693 id: &ObjectId,
1694 metadata: &Metadata,
1695 payload: impl Into<bytes::Bytes>,
1696 ) -> Result<()> {
1697 let payload: bytes::Bytes = payload.into();
1698 let upload_id = backend.initiate_multipart(id, metadata).await?;
1699 let etag = backend
1700 .upload_part(
1701 id,
1702 &upload_id,
1703 NonZeroU32::new(1).unwrap(),
1704 payload.len() as u64,
1705 None,
1706 stream::single(payload),
1707 )
1708 .await?;
1709 let error = backend
1710 .complete_multipart(
1711 id,
1712 &upload_id,
1713 vec![CompletedPart {
1714 part_number: NonZeroU32::new(1).unwrap(),
1715 etag,
1716 }],
1717 )
1718 .await?;
1719 assert!(
1720 error.is_none(),
1721 "complete_multipart returned error: {error:?}"
1722 );
1723 Ok(())
1724 }
1725
1726 #[tokio::test]
1727 async fn test_multipart_ttl_immediate() -> Result<()> {
1728 let backend = create_test_backend().await?;
1729 let id = make_id();
1730 let metadata = Metadata {
1731 expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_secs(0)),
1732 ..Default::default()
1733 };
1734
1735 multipart_put(&backend, &id, &metadata, "hello, world").await?;
1736
1737 let result = backend.get_object(&id, None).await?;
1738 assert!(result.is_none());
1739
1740 Ok(())
1741 }
1742
1743 #[tokio::test]
1744 async fn test_multipart_tti_immediate() -> Result<()> {
1745 let backend = create_test_backend().await?;
1746 let id = make_id();
1747 let metadata = Metadata {
1748 expiration_policy: ExpirationPolicy::TimeToIdle(Duration::from_secs(0)),
1749 ..Default::default()
1750 };
1751
1752 multipart_put(&backend, &id, &metadata, "hello, world").await?;
1753
1754 let result = backend.get_object(&id, None).await?;
1755 assert!(result.is_none());
1756
1757 Ok(())
1758 }
1759
1760 #[tokio::test]
1761 async fn test_multipart_compressed_payload_roundtrip() -> Result<()> {
1762 use objectstore_types::metadata::Compression;
1763
1764 let backend = create_test_backend().await?;
1765
1766 let plaintext = b"hello, world (but compressed with zstd)";
1767 let compressed = zstd::encode_all(&plaintext[..], 3)?;
1768
1769 let id = make_id();
1770 let metadata = Metadata {
1771 content_type: "text/plain".into(),
1772 compression: Some(Compression::Zstd),
1773 ..Default::default()
1774 };
1775
1776 multipart_put(&backend, &id, &metadata, compressed.clone()).await?;
1777
1778 let (meta, _, stream) = backend.get_object(&id, None).await?.unwrap();
1779 let payload = stream::read_to_vec(stream).await?;
1780
1781 assert_eq!(meta.compression, Some(Compression::Zstd));
1782 assert_eq!(
1783 payload, compressed,
1784 "Payload should be returned still compressed, not auto-decompressed"
1785 );
1786
1787 Ok(())
1788 }
1789}