1use std::borrow::Cow;
4use std::collections::BTreeMap;
5use std::future::Future;
6use std::sync::Arc;
7use std::time::{Duration, SystemTime};
8use std::{fmt, io};
9
10use anyhow::Context;
11use futures_util::{StreamExt, TryStreamExt};
12use gcp_auth::TokenProvider;
13use objectstore_types::metadata::{ExpirationPolicy, Metadata};
14use reqwest::header::HeaderName;
15use reqwest::{Body, IntoUrl, Method, RequestBuilder, StatusCode, Url, header, multipart};
16use serde::{Deserialize, Serialize};
17
18use crate::backend::common::{
19 self, Backend, DeleteResponse, GetResponse, MetadataResponse, MultipartUploadBackend,
20 PutResponse,
21};
22use crate::error::{Error, Result};
23use crate::gcp_auth::PrefetchingTokenProvider;
24use crate::id::ObjectId;
25use crate::multipart::{
26 AbortMultipartResponse, CompleteMultipartResponse, CompletedPart, InitiateMultipartResponse,
27 ListPartsResponse, PartNumber, UploadId, UploadPartResponse,
28};
29use crate::stream::{self, ClientStream};
30
31#[derive(Debug, Clone, Deserialize, Serialize)]
51pub struct GcsConfig {
52 pub endpoint: Option<String>,
65
66 pub bucket: String,
74}
75
76const DEFAULT_ENDPOINT: &str = "https://storage.googleapis.com";
78const TOKEN_SCOPES: &[&str] = &["https://www.googleapis.com/auth/devstorage.read_write"];
80const TTI_DEBOUNCE: Duration = Duration::from_secs(24 * 3600); const REQUEST_RETRY_COUNT: usize = 2;
84
85const BUILTIN_META_PREFIX: &str = "x-sn-";
87const CUSTOM_META_PREFIX: &str = "x-snme-";
89
90#[derive(Debug, Serialize, Deserialize)]
96#[serde(rename_all = "camelCase")]
97struct GcsObject {
98 pub content_type: Cow<'static, str>,
101
102 #[serde(default, skip_serializing_if = "Option::is_none")]
104 pub content_encoding: Option<String>,
105
106 #[serde(
108 default,
109 skip_serializing_if = "Option::is_none",
110 with = "humantime_serde"
111 )]
112 pub custom_time: Option<SystemTime>,
113
114 pub size: Option<String>,
119
120 #[serde(
122 default,
123 skip_serializing_if = "Option::is_none",
124 with = "humantime_serde"
125 )]
126 pub time_created: Option<SystemTime>,
127
128 #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
130 pub metadata: BTreeMap<GcsMetaKey, String>,
131}
132
133impl GcsObject {
134 pub fn from_metadata(metadata: &Metadata) -> Self {
136 let mut gcs_object = GcsObject {
137 content_type: metadata.content_type.clone(),
138 size: metadata.size.map(|size| size.to_string()),
139 content_encoding: None,
140 custom_time: None,
141 time_created: metadata.time_created,
142 metadata: BTreeMap::new(),
143 };
144
145 if let Some(expires_in) = metadata.expiration_policy.expires_in() {
149 gcs_object.custom_time = Some(SystemTime::now() + expires_in);
150 }
151
152 if let Some(compression) = metadata.compression {
153 gcs_object.content_encoding = Some(compression.to_string());
154 }
155
156 if metadata.expiration_policy != ExpirationPolicy::default() {
157 gcs_object.metadata.insert(
158 GcsMetaKey::Expiration,
159 metadata.expiration_policy.to_string(),
160 );
161 }
162
163 if let Some(origin) = &metadata.origin {
164 gcs_object
165 .metadata
166 .insert(GcsMetaKey::Origin, origin.clone());
167 }
168
169 for (key, value) in &metadata.custom {
170 gcs_object
171 .metadata
172 .insert(GcsMetaKey::Custom(key.clone()), value.clone());
173 }
174
175 gcs_object
176 }
177
178 pub fn into_metadata(mut self) -> Result<Metadata> {
180 self.metadata.remove(&GcsMetaKey::EmulatorIgnored);
182
183 let expiration_policy = self
184 .metadata
185 .remove(&GcsMetaKey::Expiration)
186 .map(|s| s.parse())
187 .transpose()?
188 .unwrap_or_default();
189
190 let origin = self.metadata.remove(&GcsMetaKey::Origin);
191
192 let content_type = self.content_type;
193 let compression = self.content_encoding.map(|s| s.parse()).transpose()?;
194 let size = self
195 .size
196 .map(|size| size.parse())
197 .transpose()
198 .map_err(|e| Error::Generic {
199 context: "GCS: failed to parse size from object metadata".to_string(),
200 cause: Some(Box::new(e)),
201 })?;
202 let time_created = self.time_created;
203
204 let mut custom = BTreeMap::new();
206 for (key, value) in self.metadata {
207 if let GcsMetaKey::Custom(custom_key) = key {
208 custom.insert(custom_key, value);
209 } else {
210 return Err(Error::Generic {
211 context: format!(
212 "GCS: unexpected built-in metadata key in object metadata: {}",
213 key
214 ),
215 cause: None,
216 });
217 }
218 }
219
220 Ok(Metadata {
221 content_type,
222 expiration_policy,
223 compression,
224 origin,
225 size,
226 custom,
227 time_created,
228 time_expires: self.custom_time,
229 })
230 }
231}
232
233#[derive(Clone, Debug, PartialEq, Eq, Ord, PartialOrd)]
235enum GcsMetaKey {
236 Expiration,
238 Origin,
240 EmulatorIgnored,
242 Custom(String),
244}
245
246impl std::str::FromStr for GcsMetaKey {
247 type Err = anyhow::Error;
248
249 fn from_str(s: &str) -> Result<Self, Self::Err> {
250 if matches!(s, "x_emulator_upload" | "x_testbench_upload") {
251 return Ok(GcsMetaKey::EmulatorIgnored);
252 }
253
254 Ok(match s.strip_prefix(BUILTIN_META_PREFIX) {
255 Some("expiration") => GcsMetaKey::Expiration,
256 Some("origin") => GcsMetaKey::Origin,
257 Some(unknown) => anyhow::bail!("unknown builtin metadata key: {unknown}"),
258 None => match s.strip_prefix(CUSTOM_META_PREFIX) {
259 Some(key) => GcsMetaKey::Custom(key.to_string()),
260 None => anyhow::bail!("invalid GCS metadata key format: {s}"),
261 },
262 })
263 }
264}
265
266impl fmt::Display for GcsMetaKey {
267 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
268 match self {
269 Self::Expiration => write!(f, "{BUILTIN_META_PREFIX}expiration"),
270 Self::Origin => write!(f, "{BUILTIN_META_PREFIX}origin"),
271 Self::EmulatorIgnored => unreachable!("do not serialize emulator metadata"),
272 Self::Custom(key) => write!(f, "{CUSTOM_META_PREFIX}{key}"),
273 }
274 }
275}
276
277impl<'de> serde::Deserialize<'de> for GcsMetaKey {
278 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
279 where
280 D: serde::Deserializer<'de>,
281 {
282 let s = Cow::<'de, str>::deserialize(deserializer)?;
283 s.parse().map_err(serde::de::Error::custom)
284 }
285}
286
287impl serde::Serialize for GcsMetaKey {
288 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
289 where
290 S: serde::Serializer,
291 {
292 serializer.collect_str(self)
293 }
294}
295
296fn metadata_to_gcs_headers(metadata: &Metadata) -> Result<header::HeaderMap> {
298 let mut headers = header::HeaderMap::new();
299
300 if let Some(expires_in) = metadata.expiration_policy.expires_in() {
301 let custom_time = SystemTime::now() + expires_in;
302 let formatted = humantime::format_rfc3339_seconds(custom_time);
303 headers.insert(
304 HeaderName::from_static("x-goog-custom-time"),
305 formatted.to_string().parse().map_err(|e| Error::Generic {
306 context: "GCS: invalid custom-time header value".into(),
307 cause: Some(Box::new(e)),
308 })?,
309 );
310 }
311
312 if let Some(compression) = metadata.compression {
313 headers.insert(
314 header::CONTENT_ENCODING,
315 compression
316 .to_string()
317 .parse()
318 .map_err(|e| Error::Generic {
319 context: "GCS: invalid content-encoding header value".into(),
320 cause: Some(Box::new(e)),
321 })?,
322 );
323 }
324
325 if metadata.expiration_policy != ExpirationPolicy::default() {
326 insert_gcs_meta_header(
327 &mut headers,
328 &GcsMetaKey::Expiration,
329 &metadata.expiration_policy.to_string(),
330 )?;
331 }
332
333 if let Some(origin) = &metadata.origin {
334 insert_gcs_meta_header(&mut headers, &GcsMetaKey::Origin, origin)?;
335 }
336
337 for (key, value) in &metadata.custom {
338 insert_gcs_meta_header(&mut headers, &GcsMetaKey::Custom(key.clone()), value)?;
339 }
340
341 Ok(headers)
342}
343
344fn insert_gcs_meta_header(
345 headers: &mut header::HeaderMap,
346 key: &GcsMetaKey,
347 value: &str,
348) -> Result<()> {
349 let header_name = format!("x-goog-meta-{key}");
350 headers.insert(
351 HeaderName::try_from(&header_name).map_err(|e| Error::Generic {
352 context: format!("GCS: invalid header name: {header_name}"),
353 cause: Some(Box::new(e)),
354 })?,
355 value.parse().map_err(|e| Error::Generic {
356 context: format!("GCS: invalid header value for {header_name}"),
357 cause: Some(Box::new(e)),
358 })?,
359 );
360 Ok(())
361}
362
363fn is_retryable(error: &Error) -> bool {
365 let Error::Reqwest { cause, .. } = error else {
366 return false;
367 };
368 if cause.is_timeout() || cause.is_connect() || cause.is_request() {
369 return true;
370 }
371 let Some(status) = cause.status() else {
372 return false;
373 };
374 matches!(
376 status,
377 StatusCode::REQUEST_TIMEOUT
378 | StatusCode::TOO_MANY_REQUESTS
379 | StatusCode::INTERNAL_SERVER_ERROR
380 | StatusCode::BAD_GATEWAY
381 | StatusCode::SERVICE_UNAVAILABLE
382 | StatusCode::GATEWAY_TIMEOUT
383 )
384}
385
386pub struct GcsBackend {
388 client: reqwest::Client,
389 endpoint: Url,
390 bucket: String,
391 token_provider: Option<PrefetchingTokenProvider>,
392}
393
394impl GcsBackend {
395 pub async fn new(config: GcsConfig) -> anyhow::Result<Self> {
397 let GcsConfig { endpoint, bucket } = config;
398
399 let token_provider = if endpoint.is_none() {
400 Some(PrefetchingTokenProvider::gcp_auth(TOKEN_SCOPES).await?)
401 } else {
402 None
403 };
404
405 let endpoint_str = endpoint.as_deref().unwrap_or(DEFAULT_ENDPOINT);
406
407 Ok(Self {
408 client: common::reqwest_client(),
409 endpoint: endpoint_str.parse().context("invalid GCS endpoint URL")?,
410 bucket,
411 token_provider,
412 })
413 }
414
415 fn object_url(&self, id: &ObjectId) -> Result<Url> {
417 let mut url = self.endpoint.clone();
418
419 let path = id.as_storage_path().to_string();
420 url.path_segments_mut()
421 .map_err(|()| Error::Generic {
422 context: format!(
423 "GCS: invalid endpoint URL, {} cannot be a base",
424 self.endpoint
425 ),
426 cause: None,
427 })?
428 .extend(&["storage", "v1", "b", &self.bucket, "o", &path]);
429
430 Ok(url)
431 }
432
433 fn upload_url(&self, id: &ObjectId, upload_type: &str) -> Result<Url> {
435 let mut url = self.endpoint.clone();
436
437 url.path_segments_mut()
438 .map_err(|()| Error::Generic {
439 context: format!(
440 "GCS: invalid endpoint URL, {} cannot be a base",
441 self.endpoint
442 ),
443 cause: None,
444 })?
445 .extend(&["upload", "storage", "v1", "b", &self.bucket, "o"]);
446
447 url.query_pairs_mut()
448 .append_pair("uploadType", upload_type)
449 .append_pair("name", &id.as_storage_path().to_string());
450
451 Ok(url)
452 }
453
454 fn xml_object_url(&self, id: &ObjectId) -> Result<Url> {
461 let mut url = self.endpoint.clone();
462 {
463 let mut segments = url.path_segments_mut().map_err(|()| Error::Generic {
464 context: format!(
465 "GCS: invalid endpoint URL, {} cannot be a base",
466 self.endpoint
467 ),
468 cause: None,
469 })?;
470 segments.push(&self.bucket);
471 for part in id.as_storage_path().to_string().split('/') {
472 segments.push(part);
473 }
474 }
475 Ok(url)
476 }
477
478 async fn request(&self, method: Method, url: impl IntoUrl) -> Result<RequestBuilder> {
480 let mut builder = self.client.request(method, url);
481 if let Some(provider) = &self.token_provider {
482 let token = provider.token(TOKEN_SCOPES).await?;
483 builder = builder.bearer_auth(token.as_str());
484 }
485 Ok(builder)
486 }
487
488 async fn with_retry<T, F>(&self, action: &'static str, f: impl Fn() -> F) -> Result<T>
490 where
491 F: Future<Output = Result<T>> + Send,
492 {
493 let mut retry_count = 0usize;
494 loop {
495 match f().await {
496 Ok(res) => return Ok(res),
497 Err(ref e) if retry_count < REQUEST_RETRY_COUNT && is_retryable(e) => {
498 retry_count += 1;
499 objectstore_metrics::count!("gcs.retries", action = action);
500 objectstore_log::warn!(!!e, retry_count, action, "Retrying request");
501 }
502 Err(e) => {
503 objectstore_metrics::count!("gcs.failures", action = action);
504 return Err(e);
505 }
506 }
507 }
508 }
509
510 async fn fetch_gcs_metadata(&self, object_url: &Url) -> Result<Option<Metadata>> {
513 let metadata_opt = self
514 .with_retry("get_metadata", || async {
515 let resp = self
516 .request(Method::GET, object_url.clone())
517 .await?
518 .send()
519 .await
520 .map_err(|e| Error::reqwest("GCS: get metadata request", e))?;
521
522 if resp.status() == StatusCode::NOT_FOUND {
523 return Ok(None);
524 }
525
526 let metadata: GcsObject = resp
527 .error_for_status()
528 .map_err(|e| Error::reqwest("GCS: get metadata status", e))?
529 .json()
530 .await
531 .map_err(|e| Error::reqwest("GCS: get metadata parse", e))?;
532
533 Ok(Some(metadata))
534 })
535 .await?;
536
537 let Some(gcs_metadata) = metadata_opt else {
538 objectstore_log::debug!("Object not found");
539 return Ok(None);
540 };
541
542 let expire_at = gcs_metadata.custom_time;
543 let metadata = gcs_metadata.into_metadata()?;
544
545 let access_time = SystemTime::now();
547
548 if metadata.expiration_policy.is_timeout() && expire_at.is_some_and(|ts| ts < access_time) {
550 objectstore_log::debug!("Object found but past expiry");
551 return Ok(None);
552 }
553
554 if let ExpirationPolicy::TimeToIdle(tti) = metadata.expiration_policy {
556 let new_expire_at = access_time + tti;
557 if expire_at.is_some_and(|ts| ts < new_expire_at - TTI_DEBOUNCE) {
558 self.update_custom_time(object_url.clone(), new_expire_at)
559 .await?;
560 }
561 }
562
563 Ok(Some(metadata))
564 }
565
566 async fn update_custom_time(&self, object_url: Url, custom_time: SystemTime) -> Result<()> {
567 #[derive(Debug, Serialize)]
568 #[serde(rename_all = "camelCase")]
569 struct CustomTimeRequest {
570 #[serde(with = "humantime_serde")]
571 custom_time: SystemTime,
572 }
573
574 self.with_retry("update_custom_time", || async {
575 self.request(Method::PATCH, object_url.clone())
576 .await?
577 .json(&CustomTimeRequest { custom_time })
578 .send()
579 .await
580 .and_then(|r| r.error_for_status())
581 .map_err(|e| Error::reqwest("GCS: update custom time", e))?;
582 Ok(())
583 })
584 .await
585 }
586}
587
588impl fmt::Debug for GcsBackend {
589 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
590 f.debug_struct("GcsJsonApi")
591 .field("endpoint", &self.endpoint)
592 .field("bucket", &self.bucket)
593 .finish_non_exhaustive()
594 }
595}
596
597#[async_trait::async_trait]
598impl Backend for GcsBackend {
599 fn name(&self) -> &'static str {
600 "gcs"
601 }
602
603 fn as_multipart_upload_backend(self: Arc<Self>) -> Result<Arc<dyn MultipartUploadBackend>> {
604 Ok(self)
605 }
606
607 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
608 async fn put_object(
609 &self,
610 id: &ObjectId,
611 metadata: &Metadata,
612 stream: ClientStream,
613 ) -> Result<PutResponse> {
614 objectstore_log::debug!("Writing to GCS backend");
615 let gcs_metadata = GcsObject::from_metadata(metadata);
616
617 let metadata_json = serde_json::to_string(&gcs_metadata).map_err(|cause| Error::Serde {
620 context: "failed to serialize metadata for GCS upload".to_string(),
621 cause,
622 })?;
623
624 let multipart = multipart::Form::new()
625 .part(
626 "metadata",
627 multipart::Part::text(metadata_json)
628 .mime_str("application/json")
629 .expect("application/json is a valid mime type"),
630 )
631 .part(
632 "media",
633 multipart::Part::stream(Body::wrap_stream(stream))
634 .mime_str(&metadata.content_type)
635 .map_err(|e| Error::Generic {
636 context: format!("invalid mime type: {}", &metadata.content_type),
637 cause: Some(Box::new(e)),
638 })?,
639 );
640
641 let content_type = format!("multipart/related; boundary={}", multipart.boundary());
645
646 self.request(Method::POST, self.upload_url(id, "multipart")?)
647 .await?
648 .multipart(multipart)
649 .header(header::CONTENT_TYPE, content_type)
650 .send()
651 .await
652 .and_then(|r| r.error_for_status())
653 .map_err(|e| match stream::unpack_client_error(&e) {
654 Some(ce) => Error::Client(ce),
655 _ => Error::reqwest("GCS: upload object", e),
656 })?;
657
658 Ok(())
659 }
660
661 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
662 async fn get_object(&self, id: &ObjectId) -> Result<GetResponse> {
663 objectstore_log::debug!("Reading from GCS backend");
664 let object_url = self.object_url(id)?;
665
666 let Some(metadata) = self.fetch_gcs_metadata(&object_url).await? else {
667 return Ok(None);
668 };
669
670 let mut download_url = object_url;
671 download_url.query_pairs_mut().append_pair("alt", "media");
672
673 let payload_response = self
674 .with_retry("get_payload", || async {
675 self.request(Method::GET, download_url.clone())
676 .await?
677 .send()
678 .await
679 .and_then(|r| r.error_for_status())
680 .map_err(|e| Error::reqwest("GCS: get payload", e))
681 })
682 .await?;
683
684 let stream = payload_response
685 .bytes_stream()
686 .map_err(io::Error::other)
687 .boxed();
688
689 Ok(Some((metadata, stream)))
690 }
691
692 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
693 async fn get_metadata(&self, id: &ObjectId) -> Result<MetadataResponse> {
694 objectstore_log::debug!("Reading metadata from GCS backend");
695 let object_url = self.object_url(id)?;
696 self.fetch_gcs_metadata(&object_url).await
697 }
698
699 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
700 async fn delete_object(&self, id: &ObjectId) -> Result<DeleteResponse> {
701 objectstore_log::debug!("Deleting from GCS backend");
702 let object_url = self.object_url(id)?;
703
704 self.with_retry("delete", || async {
705 let resp = self
706 .request(Method::DELETE, object_url.clone())
707 .await?
708 .send()
709 .await
710 .map_err(|e| Error::reqwest("GCS: delete object", e))?;
711
712 if resp.status() == StatusCode::NOT_FOUND {
714 return Ok(());
715 }
716
717 resp.error_for_status()
718 .map_err(|e| Error::reqwest("GCS: delete object", e))?;
719
720 Ok(())
721 })
722 .await
723 }
724}
725
726#[derive(Debug, Deserialize)]
727#[serde(rename_all = "PascalCase")]
728struct XmlInitiateMultipartUploadResponse {
729 upload_id: String,
730}
731
732impl TryFrom<XmlInitiateMultipartUploadResponse> for InitiateMultipartResponse {
733 type Error = crate::error::Error;
734
735 fn try_from(r: XmlInitiateMultipartUploadResponse) -> crate::error::Result<Self> {
736 Ok(UploadId::new(r.upload_id)?)
737 }
738}
739
740#[derive(Debug, Deserialize)]
741#[serde(rename_all = "PascalCase")]
742struct XmlListPartsResponse {
743 #[serde(default)]
744 is_truncated: bool,
745 next_part_number_marker: Option<PartNumber>,
746 #[serde(default, rename = "Part")]
747 parts: Vec<XmlPart>,
748}
749
750impl From<XmlListPartsResponse> for ListPartsResponse {
751 fn from(xml: XmlListPartsResponse) -> Self {
752 Self {
753 parts: xml.parts.into_iter().map(Into::into).collect(),
754 is_truncated: xml.is_truncated,
755 next_part_number_marker: xml.next_part_number_marker,
756 }
757 }
758}
759
760#[derive(Debug, Deserialize)]
761#[serde(rename_all = "PascalCase")]
762struct XmlPart {
763 part_number: PartNumber,
764 #[serde(rename = "ETag")]
765 e_tag: String,
766 #[serde(with = "humantime_serde")]
767 last_modified: SystemTime,
768 size: u64,
769}
770
771impl From<XmlPart> for crate::multipart::Part {
772 fn from(p: XmlPart) -> Self {
773 Self {
774 part_number: p.part_number,
775 etag: p.e_tag,
776 last_modified: p.last_modified,
777 size: p.size,
778 }
779 }
780}
781
782#[derive(Debug, Serialize)]
783#[serde(rename = "CompleteMultipartUpload")]
784struct XmlCompleteMultipartUpload {
785 #[serde(rename = "Part")]
786 parts: Vec<XmlCompletePart>,
787}
788
789impl From<Vec<CompletedPart>> for XmlCompleteMultipartUpload {
790 fn from(parts: Vec<CompletedPart>) -> Self {
791 Self {
792 parts: parts.into_iter().map(Into::into).collect(),
793 }
794 }
795}
796
797#[derive(Debug, Serialize)]
798#[serde(rename_all = "PascalCase")]
799struct XmlCompletePart {
800 part_number: PartNumber,
801 #[serde(rename = "ETag")]
802 e_tag: String,
803}
804
805impl From<CompletedPart> for XmlCompletePart {
806 fn from(p: CompletedPart) -> Self {
807 Self {
808 part_number: p.part_number,
809 e_tag: p.etag,
810 }
811 }
812}
813
814#[derive(Debug, Deserialize)]
815#[serde(rename = "Error", rename_all = "PascalCase")]
816struct XmlError {
817 code: String,
818 message: String,
819}
820
821impl From<XmlError> for crate::multipart::CompleteMultipartError {
822 fn from(e: XmlError) -> Self {
823 Self {
824 code: e.code,
825 message: e.message,
826 }
827 }
828}
829
830#[async_trait::async_trait]
834impl MultipartUploadBackend for GcsBackend {
835 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
836 async fn initiate_multipart(
837 &self,
838 id: &ObjectId,
839 metadata: &Metadata,
840 ) -> Result<InitiateMultipartResponse> {
841 objectstore_log::debug!("Initiating multipart upload on GCS backend");
842 let mut url = self.xml_object_url(id)?;
843 url.set_query(Some("uploads"));
844
845 let mut builder = self
846 .request(Method::POST, url)
847 .await?
848 .header(header::CONTENT_TYPE, metadata.content_type.as_ref())
849 .header(header::CONTENT_LENGTH, "0");
850
851 let meta_headers = metadata_to_gcs_headers(metadata)?;
852 for (name, value) in &meta_headers {
853 builder = builder.header(name, value);
854 }
855
856 let resp = builder
857 .send()
858 .await
859 .and_then(|r| r.error_for_status())
860 .map_err(|e| Error::reqwest("GCS: initiate multipart upload", e))?;
861
862 let body = resp
863 .bytes()
864 .await
865 .map_err(|e| Error::reqwest("GCS: read initiate multipart body", e))?;
866
867 let xml: XmlInitiateMultipartUploadResponse = quick_xml::de::from_reader(body.as_ref())
868 .map_err(|e| Error::Generic {
869 context: "GCS: failed to parse initiate multipart response".to_owned(),
870 cause: Some(Box::new(e)),
871 })?;
872
873 Ok(xml.try_into()?)
874 }
875
876 #[tracing::instrument(level = "trace", fields(?id, upload_id, part_number), skip_all)]
877 async fn upload_part(
878 &self,
879 id: &ObjectId,
880 upload_id: &UploadId,
881 part_number: PartNumber,
882 content_length: u64,
883 content_md5: Option<&str>,
884 body: ClientStream,
885 ) -> Result<UploadPartResponse> {
886 objectstore_log::debug!("Uploading part to GCS backend");
887 let mut url = self.xml_object_url(id)?;
888 url.query_pairs_mut()
889 .append_pair("partNumber", &part_number.to_string())
890 .append_pair("uploadId", upload_id);
891
892 let mut builder = self
893 .request(Method::PUT, url)
894 .await?
895 .header(header::CONTENT_LENGTH, content_length)
896 .body(Body::wrap_stream(body));
897
898 if let Some(md5) = content_md5 {
899 builder = builder.header("content-md5", md5);
900 }
901
902 let resp = builder
903 .send()
904 .await
905 .and_then(|r| r.error_for_status())
906 .map_err(|e| Error::reqwest("GCS: upload part", e))?;
907
908 let etag = resp
909 .headers()
910 .get(header::ETAG)
911 .and_then(|v| v.to_str().ok())
912 .map(|s| s.to_owned())
913 .ok_or_else(|| Error::generic("GCS: upload part response missing ETag header"))?;
914
915 Ok(etag)
916 }
917
918 #[tracing::instrument(level = "trace", fields(?id, upload_id), skip_all)]
919 async fn list_parts(
920 &self,
921 id: &ObjectId,
922 upload_id: &UploadId,
923 max_parts: Option<u32>,
924 part_number_marker: Option<PartNumber>,
925 ) -> Result<ListPartsResponse> {
926 objectstore_log::debug!("Listing parts on GCS backend");
927 let mut url = self.xml_object_url(id)?;
928 {
929 let mut pairs = url.query_pairs_mut();
930 pairs.append_pair("uploadId", upload_id);
931 if let Some(max) = max_parts {
932 pairs.append_pair("max-parts", &max.to_string());
933 }
934 if let Some(marker) = part_number_marker {
935 pairs.append_pair("part-number-marker", &marker.to_string());
936 }
937 }
938
939 let resp = self
940 .request(Method::GET, url)
941 .await?
942 .send()
943 .await
944 .and_then(|r| r.error_for_status())
945 .map_err(|e| Error::reqwest("GCS: list parts", e))?;
946
947 let body = resp
948 .bytes()
949 .await
950 .map_err(|e| Error::reqwest("GCS: read list parts body", e))?;
951
952 let xml: XmlListPartsResponse =
953 quick_xml::de::from_reader(body.as_ref()).map_err(|e| Error::Generic {
954 context: "GCS: failed to parse list parts response".to_owned(),
955 cause: Some(Box::new(e)),
956 })?;
957
958 Ok(xml.into())
959 }
960
961 #[tracing::instrument(level = "trace", fields(?id, upload_id), skip_all)]
962 async fn abort_multipart(
963 &self,
964 id: &ObjectId,
965 upload_id: &UploadId,
966 ) -> Result<AbortMultipartResponse> {
967 objectstore_log::debug!("Aborting multipart upload on GCS backend");
968 let mut url = self.xml_object_url(id)?;
969 url.query_pairs_mut().append_pair("uploadId", upload_id);
970
971 self.request(Method::DELETE, url)
972 .await?
973 .send()
974 .await
975 .and_then(|r| r.error_for_status())
976 .map_err(|e| Error::reqwest("GCS: abort multipart upload", e))?;
977
978 Ok(())
979 }
980
981 #[tracing::instrument(level = "trace", fields(?id, upload_id), skip_all)]
982 async fn complete_multipart(
983 &self,
984 id: &ObjectId,
985 upload_id: &UploadId,
986 parts: Vec<CompletedPart>,
987 ) -> Result<CompleteMultipartResponse> {
988 objectstore_log::debug!("Completing multipart upload on GCS backend");
989 let mut url = self.xml_object_url(id)?;
990 url.query_pairs_mut().append_pair("uploadId", upload_id);
991
992 let body = XmlCompleteMultipartUpload::from(parts);
993 let xml = quick_xml::se::to_string(&body).map_err(|e| Error::Generic {
994 context: "GCS: failed to serialize complete multipart request".into(),
995 cause: Some(Box::new(e)),
996 })?;
997
998 let resp = self
999 .request(Method::POST, url)
1000 .await?
1001 .header(header::CONTENT_TYPE, "application/xml")
1002 .body(xml)
1003 .send()
1004 .await
1005 .and_then(|r| r.error_for_status())
1006 .map_err(|e| Error::reqwest("GCS: complete multipart upload", e))?;
1007
1008 let body = resp
1009 .bytes()
1010 .await
1011 .map_err(|e| Error::reqwest("GCS: read complete multipart body", e))?;
1012
1013 let error = quick_xml::de::from_reader::<_, XmlError>(body.as_ref())
1014 .ok()
1015 .map(Into::into);
1016
1017 Ok(error)
1018 }
1019}
1020
1021#[cfg(test)]
1022mod tests {
1023 use std::collections::BTreeMap;
1024 use std::num::NonZeroU32;
1025
1026 use anyhow::Result;
1027 use objectstore_types::scope::{Scope, Scopes};
1028
1029 use super::*;
1030 use crate::id::ObjectContext;
1031 use crate::multipart::CompletedPart;
1032 use crate::stream;
1033
1034 async fn create_test_backend() -> Result<GcsBackend> {
1040 GcsBackend::new(GcsConfig {
1041 endpoint: Some("http://localhost:8087".into()),
1042 bucket: "test-bucket".into(),
1043 })
1044 .await
1045 }
1046
1047 fn make_id() -> ObjectId {
1048 ObjectId::random(ObjectContext {
1049 usecase: "testing".into(),
1050 scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
1051 })
1052 }
1053
1054 #[tokio::test]
1055 async fn test_roundtrip() -> Result<()> {
1056 let backend = create_test_backend().await?;
1057
1058 let id = make_id();
1059 let metadata = Metadata {
1060 content_type: "text/plain".into(),
1061 expiration_policy: ExpirationPolicy::Manual,
1062 compression: None,
1063 origin: Some("203.0.113.42".into()),
1064 custom: BTreeMap::from_iter([("hello".into(), "world".into())]),
1065 time_created: Some(SystemTime::now()),
1066 time_expires: None,
1067 size: None,
1068 };
1069
1070 backend
1071 .put_object(&id, &metadata, stream::single("hello, world"))
1072 .await?;
1073
1074 let (meta, stream) = backend.get_object(&id).await?.unwrap();
1075
1076 let payload = stream::read_to_vec(stream).await?;
1077 let str_payload = str::from_utf8(&payload).unwrap();
1078 assert_eq!(str_payload, "hello, world");
1079 assert_eq!(meta.content_type, metadata.content_type);
1080 assert_eq!(meta.origin, metadata.origin);
1081 assert_eq!(meta.custom, metadata.custom);
1082 assert!(metadata.time_created.is_some());
1083
1084 Ok(())
1085 }
1086
1087 #[tokio::test]
1088 async fn test_get_nonexistent() -> Result<()> {
1089 let backend = create_test_backend().await?;
1090
1091 let id = make_id();
1092 let result = backend.get_object(&id).await?;
1093 assert!(result.is_none());
1094
1095 Ok(())
1096 }
1097
1098 #[tokio::test]
1099 async fn test_delete_nonexistent() -> Result<()> {
1100 let backend = create_test_backend().await?;
1101
1102 let id = make_id();
1103 backend.delete_object(&id).await?;
1104
1105 Ok(())
1106 }
1107
1108 #[tokio::test]
1109 async fn test_overwrite() -> Result<()> {
1110 let backend = create_test_backend().await?;
1111
1112 let id = make_id();
1113 let metadata = Metadata {
1114 custom: BTreeMap::from_iter([("invalid".into(), "invalid".into())]),
1115 ..Default::default()
1116 };
1117
1118 backend
1119 .put_object(&id, &metadata, stream::single("hello"))
1120 .await?;
1121
1122 let metadata = Metadata {
1123 custom: BTreeMap::from_iter([("hello".into(), "world".into())]),
1124 ..Default::default()
1125 };
1126
1127 backend
1128 .put_object(&id, &metadata, stream::single("world"))
1129 .await?;
1130
1131 let (meta, stream) = backend.get_object(&id).await?.unwrap();
1132
1133 let payload = stream::read_to_vec(stream).await?;
1134 let str_payload = str::from_utf8(&payload).unwrap();
1135 assert_eq!(str_payload, "world");
1136 assert_eq!(meta.custom, metadata.custom);
1137
1138 Ok(())
1139 }
1140
1141 #[tokio::test]
1142 async fn test_read_after_delete() -> Result<()> {
1143 let backend = create_test_backend().await?;
1144
1145 let id = make_id();
1146 let metadata = Metadata::default();
1147
1148 backend
1149 .put_object(&id, &metadata, stream::single("hello, world"))
1150 .await?;
1151
1152 backend.delete_object(&id).await?;
1153
1154 let result = backend.get_object(&id).await?;
1155 assert!(result.is_none());
1156
1157 Ok(())
1158 }
1159
1160 #[tokio::test]
1161 async fn test_ttl_immediate() -> Result<()> {
1162 let backend = create_test_backend().await?;
1166
1167 let id = make_id();
1168 let metadata = Metadata {
1169 expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_secs(0)),
1170 ..Default::default()
1171 };
1172
1173 backend
1174 .put_object(&id, &metadata, stream::single("hello, world"))
1175 .await?;
1176
1177 let result = backend.get_object(&id).await?;
1178 assert!(result.is_none());
1179
1180 Ok(())
1181 }
1182
1183 #[tokio::test]
1184 async fn test_tti_immediate() -> Result<()> {
1185 let backend = create_test_backend().await?;
1189
1190 let id = make_id();
1191 let metadata = Metadata {
1192 expiration_policy: ExpirationPolicy::TimeToIdle(Duration::from_secs(0)),
1193 ..Default::default()
1194 };
1195
1196 backend
1197 .put_object(&id, &metadata, stream::single("hello, world"))
1198 .await?;
1199
1200 let result = backend.get_object(&id).await?;
1201 assert!(result.is_none());
1202
1203 Ok(())
1204 }
1205
1206 #[tokio::test]
1207 async fn test_get_metadata_returns_metadata() -> Result<()> {
1208 let backend = create_test_backend().await?;
1209
1210 let id = make_id();
1211 let metadata = Metadata {
1212 content_type: "text/plain".into(),
1213 origin: Some("203.0.113.42".into()),
1214 custom: BTreeMap::from_iter([("hello".into(), "world".into())]),
1215 ..Default::default()
1216 };
1217
1218 backend
1219 .put_object(&id, &metadata, stream::single("hello, world"))
1220 .await?;
1221
1222 let meta = backend.get_metadata(&id).await?.unwrap();
1223 assert_eq!(meta.content_type, metadata.content_type);
1224 assert_eq!(meta.origin, metadata.origin);
1225 assert_eq!(meta.custom, metadata.custom);
1226
1227 Ok(())
1228 }
1229
1230 #[tokio::test]
1231 async fn test_get_metadata_nonexistent() -> Result<()> {
1232 let backend = create_test_backend().await?;
1233
1234 let id = make_id();
1235 let result = backend.get_metadata(&id).await?;
1236 assert!(result.is_none());
1237
1238 Ok(())
1239 }
1240
1241 #[tokio::test]
1242 async fn test_get_metadata_bumps_tti() -> Result<()> {
1243 let backend = create_test_backend().await?;
1244
1245 let id = make_id();
1246 let tti = Duration::from_secs(2 * 24 * 3600); let metadata = Metadata {
1249 content_type: "text/plain".into(),
1250 expiration_policy: ExpirationPolicy::TimeToIdle(tti),
1251 ..Default::default()
1252 };
1253
1254 backend
1255 .put_object(&id, &metadata, stream::single("hello, world"))
1256 .await?;
1257
1258 let object_url = backend.object_url(&id)?;
1261 let old_deadline = SystemTime::now() + tti - TTI_DEBOUNCE - Duration::from_secs(60);
1262 backend.update_custom_time(object_url, old_deadline).await?;
1263
1264 let pre_meta = backend.get_metadata(&id).await?.unwrap();
1266 let pre_expiry = pre_meta.time_expires.unwrap();
1267
1268 let post_meta = backend.get_metadata(&id).await?.unwrap();
1270 let post_expiry = post_meta.time_expires.unwrap();
1271 assert!(
1272 post_expiry > pre_expiry,
1273 "TTI bump should have extended the expiry: {pre_expiry:?} -> {post_expiry:?}"
1274 );
1275
1276 let (_, stream) = backend.get_object(&id).await?.unwrap();
1278 let payload = stream::read_to_vec(stream).await?;
1279 assert_eq!(&payload, b"hello, world");
1280
1281 Ok(())
1282 }
1283
1284 #[tokio::test]
1285 async fn test_get_metadata_does_not_bump_fresh_tti() -> Result<()> {
1286 let backend = create_test_backend().await?;
1287
1288 let id = make_id();
1289 let tti = Duration::from_secs(2 * 24 * 3600); let metadata = Metadata {
1292 content_type: "text/plain".into(),
1293 expiration_policy: ExpirationPolicy::TimeToIdle(tti),
1294 ..Default::default()
1295 };
1296
1297 backend
1298 .put_object(&id, &metadata, stream::single("hello, world"))
1299 .await?;
1300
1301 let first = backend.get_metadata(&id).await?.unwrap();
1304 let first_expiry = first.time_expires.unwrap();
1305
1306 let second = backend.get_metadata(&id).await?.unwrap();
1307 let second_expiry = second.time_expires.unwrap();
1308
1309 assert_eq!(
1310 first_expiry, second_expiry,
1311 "Fresh TTI object should not have its expiry bumped"
1312 );
1313
1314 Ok(())
1315 }
1316
1317 #[tokio::test]
1318 async fn test_compressed_payload_roundtrip() -> Result<()> {
1319 use objectstore_types::metadata::Compression;
1320
1321 let backend = create_test_backend().await?;
1322
1323 let plaintext = b"hello, world (but compressed with zstd)";
1324 let compressed = zstd::encode_all(&plaintext[..], 3)?;
1325
1326 let id = make_id();
1327 let metadata = Metadata {
1328 content_type: "text/plain".into(),
1329 compression: Some(Compression::Zstd),
1330 ..Default::default()
1331 };
1332
1333 backend
1334 .put_object(&id, &metadata, stream::single(compressed.clone()))
1335 .await?;
1336
1337 let (meta, stream) = backend.get_object(&id).await?.unwrap();
1338 let payload = stream::read_to_vec(stream).await?;
1339
1340 assert_eq!(meta.compression, Some(Compression::Zstd));
1341 assert_eq!(
1342 payload, compressed,
1343 "Payload should be returned still compressed, not auto-decompressed"
1344 );
1345
1346 Ok(())
1347 }
1348
1349 #[tokio::test]
1350 async fn test_multipart_single_part() -> Result<()> {
1351 let backend = create_test_backend().await?;
1352 let id = make_id();
1353 let metadata = Metadata {
1354 content_type: "text/plain".into(),
1355 expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_mins(33)),
1356 origin: Some("203.0.113.42".into()),
1357 custom: BTreeMap::from_iter([("hello".into(), "world".into())]),
1358 ..Default::default()
1359 };
1360
1361 let upload_id = backend.initiate_multipart(&id, &metadata).await?;
1362
1363 let data = b"hello, multipart world!";
1364 let etag = backend
1365 .upload_part(
1366 &id,
1367 &upload_id,
1368 NonZeroU32::new(1).unwrap(),
1369 data.len() as u64,
1370 None,
1371 stream::single(data.to_vec()),
1372 )
1373 .await?;
1374
1375 let result = backend
1376 .complete_multipart(
1377 &id,
1378 &upload_id,
1379 vec![CompletedPart {
1380 part_number: NonZeroU32::new(1).unwrap(),
1381 etag,
1382 }],
1383 )
1384 .await?;
1385 assert!(result.is_none(), "expected no error on complete");
1386
1387 let (meta, stream) = backend.get_object(&id).await?.unwrap();
1388 let payload = stream::read_to_vec(stream).await?;
1389 assert_eq!(payload, data);
1390 assert_eq!(meta.content_type, "text/plain".to_string());
1391 assert_eq!(
1392 meta.expiration_policy,
1393 ExpirationPolicy::TimeToLive(Duration::from_mins(33))
1394 );
1395 assert_eq!(meta.origin, Some("203.0.113.42".into()));
1396 assert_eq!(
1397 meta.custom,
1398 BTreeMap::from_iter([("hello".into(), "world".into())])
1399 );
1400
1401 Ok(())
1402 }
1403
1404 #[tokio::test]
1405 async fn test_multipart_multiple_parts() -> Result<()> {
1406 let backend = create_test_backend().await?;
1407 let id = make_id();
1408 let metadata = Metadata::default();
1409
1410 let upload_id = backend.initiate_multipart(&id, &metadata).await?;
1411
1412 const MIN_PART: usize = 5 * 1024 * 1024;
1414 let part1 = vec![b'a'; MIN_PART];
1415 let part2 = vec![b'b'; MIN_PART];
1416 let part3 = b"cccc".to_vec();
1417
1418 let etag1 = backend
1419 .upload_part(
1420 &id,
1421 &upload_id,
1422 NonZeroU32::new(1).unwrap(),
1423 part1.len() as u64,
1424 None,
1425 stream::single(part1.clone()),
1426 )
1427 .await?;
1428 let etag2 = backend
1429 .upload_part(
1430 &id,
1431 &upload_id,
1432 NonZeroU32::new(2).unwrap(),
1433 part2.len() as u64,
1434 None,
1435 stream::single(part2.clone()),
1436 )
1437 .await?;
1438 let etag3 = backend
1439 .upload_part(
1440 &id,
1441 &upload_id,
1442 NonZeroU32::new(3).unwrap(),
1443 part3.len() as u64,
1444 None,
1445 stream::single(part3.clone()),
1446 )
1447 .await?;
1448
1449 let result = backend
1450 .complete_multipart(
1451 &id,
1452 &upload_id,
1453 vec![
1454 CompletedPart {
1455 part_number: NonZeroU32::new(1).unwrap(),
1456 etag: etag1,
1457 },
1458 CompletedPart {
1459 part_number: NonZeroU32::new(2).unwrap(),
1460 etag: etag2,
1461 },
1462 CompletedPart {
1463 part_number: NonZeroU32::new(3).unwrap(),
1464 etag: etag3,
1465 },
1466 ],
1467 )
1468 .await?;
1469 assert!(result.is_none(), "expected no error on complete");
1470
1471 let (_meta, stream) = backend.get_object(&id).await?.unwrap();
1473 let payload = stream::read_to_vec(stream).await?;
1474 let mut expected = Vec::new();
1475 expected.extend_from_slice(&part1);
1476 expected.extend_from_slice(&part2);
1477 expected.extend_from_slice(&part3);
1478 assert_eq!(payload, expected);
1479
1480 Ok(())
1481 }
1482
1483 #[tokio::test]
1484 async fn test_multipart_out_of_order_upload() -> Result<()> {
1485 let backend = create_test_backend().await?;
1486 let id = make_id();
1487 let metadata = Metadata::default();
1488
1489 let upload_id = backend.initiate_multipart(&id, &metadata).await?;
1490
1491 const MIN_PART: usize = 5 * 1024 * 1024;
1493 let part1 = vec![b'a'; MIN_PART];
1494 let part2 = vec![b'b'; MIN_PART];
1495 let part3 = b"cccc".to_vec();
1496
1497 let etag2 = backend
1499 .upload_part(
1500 &id,
1501 &upload_id,
1502 NonZeroU32::new(2).unwrap(),
1503 part2.len() as u64,
1504 None,
1505 stream::single(part2.clone()),
1506 )
1507 .await?;
1508 let etag3 = backend
1509 .upload_part(
1510 &id,
1511 &upload_id,
1512 NonZeroU32::new(3).unwrap(),
1513 part3.len() as u64,
1514 None,
1515 stream::single(part3.clone()),
1516 )
1517 .await?;
1518 let etag1 = backend
1519 .upload_part(
1520 &id,
1521 &upload_id,
1522 NonZeroU32::new(1).unwrap(),
1523 part1.len() as u64,
1524 None,
1525 stream::single(part1.clone()),
1526 )
1527 .await?;
1528
1529 let result = backend
1531 .complete_multipart(
1532 &id,
1533 &upload_id,
1534 vec![
1535 CompletedPart {
1536 part_number: NonZeroU32::new(1).unwrap(),
1537 etag: etag1,
1538 },
1539 CompletedPart {
1540 part_number: NonZeroU32::new(2).unwrap(),
1541 etag: etag2,
1542 },
1543 CompletedPart {
1544 part_number: NonZeroU32::new(3).unwrap(),
1545 etag: etag3,
1546 },
1547 ],
1548 )
1549 .await?;
1550 assert!(result.is_none(), "expected no error on complete");
1551
1552 let (_meta, stream) = backend.get_object(&id).await?.unwrap();
1554 let payload = stream::read_to_vec(stream).await?;
1555 let mut expected = Vec::new();
1556 expected.extend_from_slice(&part1);
1557 expected.extend_from_slice(&part2);
1558 expected.extend_from_slice(&part3);
1559 assert_eq!(payload, expected);
1560
1561 Ok(())
1562 }
1563
1564 #[tokio::test]
1565 async fn test_multipart_list_parts() -> Result<()> {
1566 let backend = create_test_backend().await?;
1567 let id = make_id();
1568 let metadata = Metadata::default();
1569
1570 let upload_id = backend.initiate_multipart(&id, &metadata).await?;
1571
1572 let etag1 = backend
1573 .upload_part(
1574 &id,
1575 &upload_id,
1576 NonZeroU32::new(1).unwrap(),
1577 3,
1578 None,
1579 stream::single(b"aaa".to_vec()),
1580 )
1581 .await?;
1582 let etag2 = backend
1583 .upload_part(
1584 &id,
1585 &upload_id,
1586 NonZeroU32::new(2).unwrap(),
1587 3,
1588 None,
1589 stream::single(b"bbb".to_vec()),
1590 )
1591 .await?;
1592
1593 let list = backend.list_parts(&id, &upload_id, None, None).await?;
1595 assert_eq!(list.parts.len(), 2);
1596 assert_eq!(list.parts[0].part_number.get(), 1);
1597 assert_eq!(list.parts[0].etag, etag1);
1598 assert_eq!(list.parts[0].size, 3);
1599 assert_eq!(list.parts[1].part_number.get(), 2);
1600 assert_eq!(list.parts[1].etag, etag2);
1601 assert_eq!(list.parts[1].size, 3);
1602
1603 let page1 = backend.list_parts(&id, &upload_id, Some(1), None).await?;
1605 assert_eq!(page1.parts.len(), 1);
1606 assert_eq!(page1.parts[0].part_number.get(), 1);
1607 assert!(page1.is_truncated);
1608 assert!(page1.next_part_number_marker.is_some());
1609
1610 let page2 = backend
1611 .list_parts(&id, &upload_id, Some(1), page1.next_part_number_marker)
1612 .await?;
1613 assert_eq!(page2.parts.len(), 1);
1614 assert_eq!(page2.parts[0].part_number.get(), 2);
1615
1616 backend.abort_multipart(&id, &upload_id).await?;
1618
1619 Ok(())
1620 }
1621
1622 #[tokio::test]
1623 async fn test_multipart_abort() -> Result<()> {
1624 let backend = create_test_backend().await?;
1625 let id = make_id();
1626 let metadata = Metadata::default();
1627
1628 let upload_id = backend.initiate_multipart(&id, &metadata).await?;
1629
1630 backend
1631 .upload_part(
1632 &id,
1633 &upload_id,
1634 NonZeroU32::new(1).unwrap(),
1635 5,
1636 None,
1637 stream::single(b"hello".to_vec()),
1638 )
1639 .await?;
1640
1641 backend.abort_multipart(&id, &upload_id).await?;
1642
1643 let result = backend.get_object(&id).await?;
1645 assert!(result.is_none(), "object should not exist after abort");
1646
1647 Ok(())
1648 }
1649
1650 async fn multipart_put(
1651 backend: &GcsBackend,
1652 id: &ObjectId,
1653 metadata: &Metadata,
1654 payload: impl Into<bytes::Bytes>,
1655 ) -> Result<()> {
1656 let payload: bytes::Bytes = payload.into();
1657 let upload_id = backend.initiate_multipart(id, metadata).await?;
1658 let etag = backend
1659 .upload_part(
1660 id,
1661 &upload_id,
1662 NonZeroU32::new(1).unwrap(),
1663 payload.len() as u64,
1664 None,
1665 stream::single(payload),
1666 )
1667 .await?;
1668 let error = backend
1669 .complete_multipart(
1670 id,
1671 &upload_id,
1672 vec![CompletedPart {
1673 part_number: NonZeroU32::new(1).unwrap(),
1674 etag,
1675 }],
1676 )
1677 .await?;
1678 assert!(
1679 error.is_none(),
1680 "complete_multipart returned error: {error:?}"
1681 );
1682 Ok(())
1683 }
1684
1685 #[tokio::test]
1686 async fn test_multipart_ttl_immediate() -> Result<()> {
1687 let backend = create_test_backend().await?;
1688 let id = make_id();
1689 let metadata = Metadata {
1690 expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_secs(0)),
1691 ..Default::default()
1692 };
1693
1694 multipart_put(&backend, &id, &metadata, "hello, world").await?;
1695
1696 let result = backend.get_object(&id).await?;
1697 assert!(result.is_none());
1698
1699 Ok(())
1700 }
1701
1702 #[tokio::test]
1703 async fn test_multipart_tti_immediate() -> Result<()> {
1704 let backend = create_test_backend().await?;
1705 let id = make_id();
1706 let metadata = Metadata {
1707 expiration_policy: ExpirationPolicy::TimeToIdle(Duration::from_secs(0)),
1708 ..Default::default()
1709 };
1710
1711 multipart_put(&backend, &id, &metadata, "hello, world").await?;
1712
1713 let result = backend.get_object(&id).await?;
1714 assert!(result.is_none());
1715
1716 Ok(())
1717 }
1718
1719 #[tokio::test]
1720 async fn test_multipart_compressed_payload_roundtrip() -> Result<()> {
1721 use objectstore_types::metadata::Compression;
1722
1723 let backend = create_test_backend().await?;
1724
1725 let plaintext = b"hello, world (but compressed with zstd)";
1726 let compressed = zstd::encode_all(&plaintext[..], 3)?;
1727
1728 let id = make_id();
1729 let metadata = Metadata {
1730 content_type: "text/plain".into(),
1731 compression: Some(Compression::Zstd),
1732 ..Default::default()
1733 };
1734
1735 multipart_put(&backend, &id, &metadata, compressed.clone()).await?;
1736
1737 let (meta, stream) = backend.get_object(&id).await?.unwrap();
1738 let payload = stream::read_to_vec(stream).await?;
1739
1740 assert_eq!(meta.compression, Some(Compression::Zstd));
1741 assert_eq!(
1742 payload, compressed,
1743 "Payload should be returned still compressed, not auto-decompressed"
1744 );
1745
1746 Ok(())
1747 }
1748}