1use std::borrow::Cow;
4use std::collections::BTreeMap;
5use std::future::Future;
6use std::time::{Duration, SystemTime};
7use std::{fmt, io};
8
9use anyhow::Context;
10use futures_util::{StreamExt, TryStreamExt};
11use gcp_auth::TokenProvider;
12use objectstore_types::metadata::{ExpirationPolicy, Metadata};
13use reqwest::header::HeaderName;
14use reqwest::{Body, IntoUrl, Method, RequestBuilder, StatusCode, Url, header, multipart};
15use serde::{Deserialize, Serialize};
16
17use crate::backend::common::{
18 self, Backend, DeleteResponse, GetResponse, MetadataResponse, MultipartUploadBackend,
19 PutResponse,
20};
21use crate::error::{Error, Result};
22use crate::gcp_auth::PrefetchingTokenProvider;
23use crate::id::ObjectId;
24use crate::multipart::{
25 AbortMultipartResponse, CompleteMultipartResponse, CompletedPart, InitiateMultipartResponse,
26 ListPartsResponse, PartNumber, UploadId, UploadPartResponse,
27};
28use crate::stream::{self, ClientStream};
29
30#[derive(Debug, Clone, Deserialize, Serialize)]
50pub struct GcsConfig {
51 pub endpoint: Option<String>,
64
65 pub bucket: String,
73}
74
75const DEFAULT_ENDPOINT: &str = "https://storage.googleapis.com";
77const TOKEN_SCOPES: &[&str] = &["https://www.googleapis.com/auth/devstorage.read_write"];
79const TTI_DEBOUNCE: Duration = Duration::from_secs(24 * 3600); const REQUEST_RETRY_COUNT: usize = 2;
83
84const BUILTIN_META_PREFIX: &str = "x-sn-";
86const CUSTOM_META_PREFIX: &str = "x-snme-";
88
89#[derive(Debug, Serialize, Deserialize)]
95#[serde(rename_all = "camelCase")]
96struct GcsObject {
97 pub content_type: Cow<'static, str>,
100
101 #[serde(default, skip_serializing_if = "Option::is_none")]
103 pub content_encoding: Option<String>,
104
105 #[serde(
107 default,
108 skip_serializing_if = "Option::is_none",
109 with = "humantime_serde"
110 )]
111 pub custom_time: Option<SystemTime>,
112
113 pub size: Option<String>,
118
119 #[serde(
121 default,
122 skip_serializing_if = "Option::is_none",
123 with = "humantime_serde"
124 )]
125 pub time_created: Option<SystemTime>,
126
127 #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
129 pub metadata: BTreeMap<GcsMetaKey, String>,
130}
131
132impl GcsObject {
133 pub fn from_metadata(metadata: &Metadata) -> Self {
135 let mut gcs_object = GcsObject {
136 content_type: metadata.content_type.clone(),
137 size: metadata.size.map(|size| size.to_string()),
138 content_encoding: None,
139 custom_time: None,
140 time_created: metadata.time_created,
141 metadata: BTreeMap::new(),
142 };
143
144 if let Some(expires_in) = metadata.expiration_policy.expires_in() {
148 gcs_object.custom_time = Some(SystemTime::now() + expires_in);
149 }
150
151 if let Some(compression) = metadata.compression {
152 gcs_object.content_encoding = Some(compression.to_string());
153 }
154
155 if metadata.expiration_policy != ExpirationPolicy::default() {
156 gcs_object.metadata.insert(
157 GcsMetaKey::Expiration,
158 metadata.expiration_policy.to_string(),
159 );
160 }
161
162 if let Some(origin) = &metadata.origin {
163 gcs_object
164 .metadata
165 .insert(GcsMetaKey::Origin, origin.clone());
166 }
167
168 for (key, value) in &metadata.custom {
169 gcs_object
170 .metadata
171 .insert(GcsMetaKey::Custom(key.clone()), value.clone());
172 }
173
174 gcs_object
175 }
176
177 pub fn into_metadata(mut self) -> Result<Metadata> {
179 self.metadata.remove(&GcsMetaKey::EmulatorIgnored);
181
182 let expiration_policy = self
183 .metadata
184 .remove(&GcsMetaKey::Expiration)
185 .map(|s| s.parse())
186 .transpose()?
187 .unwrap_or_default();
188
189 let origin = self.metadata.remove(&GcsMetaKey::Origin);
190
191 let content_type = self.content_type;
192 let compression = self.content_encoding.map(|s| s.parse()).transpose()?;
193 let size = self
194 .size
195 .map(|size| size.parse())
196 .transpose()
197 .map_err(|e| Error::Generic {
198 context: "GCS: failed to parse size from object metadata".to_string(),
199 cause: Some(Box::new(e)),
200 })?;
201 let time_created = self.time_created;
202
203 let mut custom = BTreeMap::new();
205 for (key, value) in self.metadata {
206 if let GcsMetaKey::Custom(custom_key) = key {
207 custom.insert(custom_key, value);
208 } else {
209 return Err(Error::Generic {
210 context: format!(
211 "GCS: unexpected built-in metadata key in object metadata: {}",
212 key
213 ),
214 cause: None,
215 });
216 }
217 }
218
219 Ok(Metadata {
220 content_type,
221 expiration_policy,
222 compression,
223 origin,
224 size,
225 custom,
226 time_created,
227 time_expires: self.custom_time,
228 })
229 }
230}
231
232#[derive(Clone, Debug, PartialEq, Eq, Ord, PartialOrd)]
234enum GcsMetaKey {
235 Expiration,
237 Origin,
239 EmulatorIgnored,
241 Custom(String),
243}
244
245impl std::str::FromStr for GcsMetaKey {
246 type Err = anyhow::Error;
247
248 fn from_str(s: &str) -> Result<Self, Self::Err> {
249 if matches!(s, "x_emulator_upload" | "x_testbench_upload") {
250 return Ok(GcsMetaKey::EmulatorIgnored);
251 }
252
253 Ok(match s.strip_prefix(BUILTIN_META_PREFIX) {
254 Some("expiration") => GcsMetaKey::Expiration,
255 Some("origin") => GcsMetaKey::Origin,
256 Some(unknown) => anyhow::bail!("unknown builtin metadata key: {unknown}"),
257 None => match s.strip_prefix(CUSTOM_META_PREFIX) {
258 Some(key) => GcsMetaKey::Custom(key.to_string()),
259 None => anyhow::bail!("invalid GCS metadata key format: {s}"),
260 },
261 })
262 }
263}
264
265impl fmt::Display for GcsMetaKey {
266 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
267 match self {
268 Self::Expiration => write!(f, "{BUILTIN_META_PREFIX}expiration"),
269 Self::Origin => write!(f, "{BUILTIN_META_PREFIX}origin"),
270 Self::EmulatorIgnored => unreachable!("do not serialize emulator metadata"),
271 Self::Custom(key) => write!(f, "{CUSTOM_META_PREFIX}{key}"),
272 }
273 }
274}
275
276impl<'de> serde::Deserialize<'de> for GcsMetaKey {
277 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
278 where
279 D: serde::Deserializer<'de>,
280 {
281 let s = Cow::<'de, str>::deserialize(deserializer)?;
282 s.parse().map_err(serde::de::Error::custom)
283 }
284}
285
286impl serde::Serialize for GcsMetaKey {
287 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
288 where
289 S: serde::Serializer,
290 {
291 serializer.collect_str(self)
292 }
293}
294
295fn metadata_to_gcs_headers(metadata: &Metadata) -> Result<header::HeaderMap> {
297 let mut headers = header::HeaderMap::new();
298
299 if let Some(expires_in) = metadata.expiration_policy.expires_in() {
300 let custom_time = SystemTime::now() + expires_in;
301 let formatted = humantime::format_rfc3339_seconds(custom_time);
302 headers.insert(
303 HeaderName::from_static("x-goog-custom-time"),
304 formatted.to_string().parse().map_err(|e| Error::Generic {
305 context: "GCS: invalid custom-time header value".into(),
306 cause: Some(Box::new(e)),
307 })?,
308 );
309 }
310
311 if let Some(compression) = metadata.compression {
312 headers.insert(
313 header::CONTENT_ENCODING,
314 compression
315 .to_string()
316 .parse()
317 .map_err(|e| Error::Generic {
318 context: "GCS: invalid content-encoding header value".into(),
319 cause: Some(Box::new(e)),
320 })?,
321 );
322 }
323
324 if metadata.expiration_policy != ExpirationPolicy::default() {
325 insert_gcs_meta_header(
326 &mut headers,
327 &GcsMetaKey::Expiration,
328 &metadata.expiration_policy.to_string(),
329 )?;
330 }
331
332 if let Some(origin) = &metadata.origin {
333 insert_gcs_meta_header(&mut headers, &GcsMetaKey::Origin, origin)?;
334 }
335
336 for (key, value) in &metadata.custom {
337 insert_gcs_meta_header(&mut headers, &GcsMetaKey::Custom(key.clone()), value)?;
338 }
339
340 Ok(headers)
341}
342
343fn insert_gcs_meta_header(
344 headers: &mut header::HeaderMap,
345 key: &GcsMetaKey,
346 value: &str,
347) -> Result<()> {
348 let header_name = format!("x-goog-meta-{key}");
349 headers.insert(
350 HeaderName::try_from(&header_name).map_err(|e| Error::Generic {
351 context: format!("GCS: invalid header name: {header_name}"),
352 cause: Some(Box::new(e)),
353 })?,
354 value.parse().map_err(|e| Error::Generic {
355 context: format!("GCS: invalid header value for {header_name}"),
356 cause: Some(Box::new(e)),
357 })?,
358 );
359 Ok(())
360}
361
362fn is_retryable(error: &Error) -> bool {
364 let Error::Reqwest { cause, .. } = error else {
365 return false;
366 };
367 if cause.is_timeout() || cause.is_connect() || cause.is_request() {
368 return true;
369 }
370 let Some(status) = cause.status() else {
371 return false;
372 };
373 matches!(
375 status,
376 StatusCode::REQUEST_TIMEOUT
377 | StatusCode::TOO_MANY_REQUESTS
378 | StatusCode::INTERNAL_SERVER_ERROR
379 | StatusCode::BAD_GATEWAY
380 | StatusCode::SERVICE_UNAVAILABLE
381 | StatusCode::GATEWAY_TIMEOUT
382 )
383}
384
385pub struct GcsBackend {
387 client: reqwest::Client,
388 endpoint: Url,
389 bucket: String,
390 token_provider: Option<PrefetchingTokenProvider>,
391}
392
393impl GcsBackend {
394 pub async fn new(config: GcsConfig) -> anyhow::Result<Self> {
396 let GcsConfig { endpoint, bucket } = config;
397
398 let token_provider = if endpoint.is_none() {
399 Some(PrefetchingTokenProvider::gcp_auth(TOKEN_SCOPES).await?)
400 } else {
401 None
402 };
403
404 let endpoint_str = endpoint.as_deref().unwrap_or(DEFAULT_ENDPOINT);
405
406 Ok(Self {
407 client: common::reqwest_client(),
408 endpoint: endpoint_str.parse().context("invalid GCS endpoint URL")?,
409 bucket,
410 token_provider,
411 })
412 }
413
414 fn object_url(&self, id: &ObjectId) -> Result<Url> {
416 let mut url = self.endpoint.clone();
417
418 let path = id.as_storage_path().to_string();
419 url.path_segments_mut()
420 .map_err(|()| Error::Generic {
421 context: format!(
422 "GCS: invalid endpoint URL, {} cannot be a base",
423 self.endpoint
424 ),
425 cause: None,
426 })?
427 .extend(&["storage", "v1", "b", &self.bucket, "o", &path]);
428
429 Ok(url)
430 }
431
432 fn upload_url(&self, id: &ObjectId, upload_type: &str) -> Result<Url> {
434 let mut url = self.endpoint.clone();
435
436 url.path_segments_mut()
437 .map_err(|()| Error::Generic {
438 context: format!(
439 "GCS: invalid endpoint URL, {} cannot be a base",
440 self.endpoint
441 ),
442 cause: None,
443 })?
444 .extend(&["upload", "storage", "v1", "b", &self.bucket, "o"]);
445
446 url.query_pairs_mut()
447 .append_pair("uploadType", upload_type)
448 .append_pair("name", &id.as_storage_path().to_string());
449
450 Ok(url)
451 }
452
453 fn xml_object_url(&self, id: &ObjectId) -> Result<Url> {
460 let mut url = self.endpoint.clone();
461 {
462 let mut segments = url.path_segments_mut().map_err(|()| Error::Generic {
463 context: format!(
464 "GCS: invalid endpoint URL, {} cannot be a base",
465 self.endpoint
466 ),
467 cause: None,
468 })?;
469 segments.push(&self.bucket);
470 for part in id.as_storage_path().to_string().split('/') {
471 segments.push(part);
472 }
473 }
474 Ok(url)
475 }
476
477 async fn request(&self, method: Method, url: impl IntoUrl) -> Result<RequestBuilder> {
479 let mut builder = self.client.request(method, url);
480 if let Some(provider) = &self.token_provider {
481 let token = provider.token(TOKEN_SCOPES).await?;
482 builder = builder.bearer_auth(token.as_str());
483 }
484 Ok(builder)
485 }
486
487 async fn with_retry<T, F>(&self, action: &'static str, f: impl Fn() -> F) -> Result<T>
489 where
490 F: Future<Output = Result<T>> + Send,
491 {
492 let mut retry_count = 0usize;
493 loop {
494 match f().await {
495 Ok(res) => return Ok(res),
496 Err(ref e) if retry_count < REQUEST_RETRY_COUNT && is_retryable(e) => {
497 retry_count += 1;
498 objectstore_metrics::count!("gcs.retries", action = action);
499 objectstore_log::warn!(!!e, retry_count, action, "Retrying request");
500 }
501 Err(e) => {
502 objectstore_metrics::count!("gcs.failures", action = action);
503 return Err(e);
504 }
505 }
506 }
507 }
508
509 async fn fetch_gcs_metadata(&self, object_url: &Url) -> Result<Option<Metadata>> {
512 let metadata_opt = self
513 .with_retry("get_metadata", || async {
514 let resp = self
515 .request(Method::GET, object_url.clone())
516 .await?
517 .send()
518 .await
519 .map_err(|e| Error::reqwest("GCS: get metadata request", e))?;
520
521 if resp.status() == StatusCode::NOT_FOUND {
522 return Ok(None);
523 }
524
525 let metadata: GcsObject = resp
526 .error_for_status()
527 .map_err(|e| Error::reqwest("GCS: get metadata status", e))?
528 .json()
529 .await
530 .map_err(|e| Error::reqwest("GCS: get metadata parse", e))?;
531
532 Ok(Some(metadata))
533 })
534 .await?;
535
536 let Some(gcs_metadata) = metadata_opt else {
537 objectstore_log::debug!("Object not found");
538 return Ok(None);
539 };
540
541 let expire_at = gcs_metadata.custom_time;
542 let metadata = gcs_metadata.into_metadata()?;
543
544 let access_time = SystemTime::now();
546
547 if metadata.expiration_policy.is_timeout() && expire_at.is_some_and(|ts| ts < access_time) {
549 objectstore_log::debug!("Object found but past expiry");
550 return Ok(None);
551 }
552
553 if let ExpirationPolicy::TimeToIdle(tti) = metadata.expiration_policy {
555 let new_expire_at = access_time + tti;
556 if expire_at.is_some_and(|ts| ts < new_expire_at - TTI_DEBOUNCE) {
557 self.update_custom_time(object_url.clone(), new_expire_at)
558 .await?;
559 }
560 }
561
562 Ok(Some(metadata))
563 }
564
565 async fn update_custom_time(&self, object_url: Url, custom_time: SystemTime) -> Result<()> {
566 #[derive(Debug, Serialize)]
567 #[serde(rename_all = "camelCase")]
568 struct CustomTimeRequest {
569 #[serde(with = "humantime_serde")]
570 custom_time: SystemTime,
571 }
572
573 self.with_retry("update_custom_time", || async {
574 self.request(Method::PATCH, object_url.clone())
575 .await?
576 .json(&CustomTimeRequest { custom_time })
577 .send()
578 .await
579 .and_then(|r| r.error_for_status())
580 .map_err(|e| Error::reqwest("GCS: update custom time", e))?;
581 Ok(())
582 })
583 .await
584 }
585}
586
587impl fmt::Debug for GcsBackend {
588 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
589 f.debug_struct("GcsJsonApi")
590 .field("endpoint", &self.endpoint)
591 .field("bucket", &self.bucket)
592 .finish_non_exhaustive()
593 }
594}
595
596#[async_trait::async_trait]
597impl Backend for GcsBackend {
598 fn name(&self) -> &'static str {
599 "gcs"
600 }
601
602 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
603 async fn put_object(
604 &self,
605 id: &ObjectId,
606 metadata: &Metadata,
607 stream: ClientStream,
608 ) -> Result<PutResponse> {
609 objectstore_log::debug!("Writing to GCS backend");
610 let gcs_metadata = GcsObject::from_metadata(metadata);
611
612 let metadata_json = serde_json::to_string(&gcs_metadata).map_err(|cause| Error::Serde {
615 context: "failed to serialize metadata for GCS upload".to_string(),
616 cause,
617 })?;
618
619 let multipart = multipart::Form::new()
620 .part(
621 "metadata",
622 multipart::Part::text(metadata_json)
623 .mime_str("application/json")
624 .expect("application/json is a valid mime type"),
625 )
626 .part(
627 "media",
628 multipart::Part::stream(Body::wrap_stream(stream))
629 .mime_str(&metadata.content_type)
630 .map_err(|e| Error::Generic {
631 context: format!("invalid mime type: {}", &metadata.content_type),
632 cause: Some(Box::new(e)),
633 })?,
634 );
635
636 let content_type = format!("multipart/related; boundary={}", multipart.boundary());
640
641 self.request(Method::POST, self.upload_url(id, "multipart")?)
642 .await?
643 .multipart(multipart)
644 .header(header::CONTENT_TYPE, content_type)
645 .send()
646 .await
647 .and_then(|r| r.error_for_status())
648 .map_err(|e| match stream::unpack_client_error(&e) {
649 Some(ce) => Error::Client(ce),
650 _ => Error::reqwest("GCS: upload object", e),
651 })?;
652
653 Ok(())
654 }
655
656 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
657 async fn get_object(&self, id: &ObjectId) -> Result<GetResponse> {
658 objectstore_log::debug!("Reading from GCS backend");
659 let object_url = self.object_url(id)?;
660
661 let Some(metadata) = self.fetch_gcs_metadata(&object_url).await? else {
662 return Ok(None);
663 };
664
665 let mut download_url = object_url;
666 download_url.query_pairs_mut().append_pair("alt", "media");
667
668 let payload_response = self
669 .with_retry("get_payload", || async {
670 self.request(Method::GET, download_url.clone())
671 .await?
672 .send()
673 .await
674 .and_then(|r| r.error_for_status())
675 .map_err(|e| Error::reqwest("GCS: get payload", e))
676 })
677 .await?;
678
679 let stream = payload_response
680 .bytes_stream()
681 .map_err(io::Error::other)
682 .boxed();
683
684 Ok(Some((metadata, stream)))
685 }
686
687 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
688 async fn get_metadata(&self, id: &ObjectId) -> Result<MetadataResponse> {
689 objectstore_log::debug!("Reading metadata from GCS backend");
690 let object_url = self.object_url(id)?;
691 self.fetch_gcs_metadata(&object_url).await
692 }
693
694 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
695 async fn delete_object(&self, id: &ObjectId) -> Result<DeleteResponse> {
696 objectstore_log::debug!("Deleting from GCS backend");
697 let object_url = self.object_url(id)?;
698
699 self.with_retry("delete", || async {
700 let resp = self
701 .request(Method::DELETE, object_url.clone())
702 .await?
703 .send()
704 .await
705 .map_err(|e| Error::reqwest("GCS: delete object", e))?;
706
707 if resp.status() == StatusCode::NOT_FOUND {
709 return Ok(());
710 }
711
712 resp.error_for_status()
713 .map_err(|e| Error::reqwest("GCS: delete object", e))?;
714
715 Ok(())
716 })
717 .await
718 }
719}
720
721#[derive(Debug, Deserialize)]
722#[serde(rename_all = "PascalCase")]
723struct XmlInitiateMultipartUploadResponse {
724 upload_id: String,
725}
726
727impl From<XmlInitiateMultipartUploadResponse> for InitiateMultipartResponse {
728 fn from(r: XmlInitiateMultipartUploadResponse) -> Self {
729 r.upload_id
730 }
731}
732
733#[derive(Debug, Deserialize)]
734#[serde(rename_all = "PascalCase")]
735struct XmlListPartsResponse {
736 #[serde(default)]
737 is_truncated: bool,
738 next_part_number_marker: Option<u32>,
739 #[serde(default, rename = "Part")]
740 parts: Vec<XmlPart>,
741}
742
743impl From<XmlListPartsResponse> for ListPartsResponse {
744 fn from(xml: XmlListPartsResponse) -> Self {
745 Self {
746 parts: xml.parts.into_iter().map(Into::into).collect(),
747 is_truncated: xml.is_truncated,
748 next_part_number_marker: xml.next_part_number_marker,
749 }
750 }
751}
752
753#[derive(Debug, Deserialize)]
754#[serde(rename_all = "PascalCase")]
755struct XmlPart {
756 part_number: u32,
757 #[serde(rename = "ETag")]
758 e_tag: String,
759 #[serde(with = "humantime_serde")]
760 last_modified: SystemTime,
761 size: u64,
762}
763
764impl From<XmlPart> for crate::multipart::Part {
765 fn from(p: XmlPart) -> Self {
766 Self {
767 part_number: p.part_number,
768 etag: p.e_tag,
769 last_modified: p.last_modified,
770 size: p.size,
771 }
772 }
773}
774
775#[derive(Debug, Serialize)]
776#[serde(rename = "CompleteMultipartUpload")]
777struct XmlCompleteMultipartUpload {
778 #[serde(rename = "Part")]
779 parts: Vec<XmlCompletePart>,
780}
781
782impl From<Vec<CompletedPart>> for XmlCompleteMultipartUpload {
783 fn from(parts: Vec<CompletedPart>) -> Self {
784 Self {
785 parts: parts.into_iter().map(Into::into).collect(),
786 }
787 }
788}
789
790#[derive(Debug, Serialize)]
791#[serde(rename_all = "PascalCase")]
792struct XmlCompletePart {
793 part_number: PartNumber,
794 #[serde(rename = "ETag")]
795 e_tag: String,
796}
797
798impl From<CompletedPart> for XmlCompletePart {
799 fn from(p: CompletedPart) -> Self {
800 Self {
801 part_number: p.part_number,
802 e_tag: p.etag,
803 }
804 }
805}
806
807#[derive(Debug, Deserialize)]
808#[serde(rename = "Error", rename_all = "PascalCase")]
809struct XmlError {
810 code: String,
811 message: String,
812}
813
814impl From<XmlError> for crate::multipart::CompleteMultipartError {
815 fn from(e: XmlError) -> Self {
816 Self {
817 code: e.code,
818 message: e.message,
819 }
820 }
821}
822
823#[async_trait::async_trait]
827impl MultipartUploadBackend for GcsBackend {
828 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
829 async fn initiate_multipart(
830 &self,
831 id: &ObjectId,
832 metadata: &Metadata,
833 ) -> Result<InitiateMultipartResponse> {
834 objectstore_log::debug!("Initiating multipart upload on GCS backend");
835 let mut url = self.xml_object_url(id)?;
836 url.set_query(Some("uploads"));
837
838 let mut builder = self
839 .request(Method::POST, url)
840 .await?
841 .header(header::CONTENT_TYPE, metadata.content_type.as_ref())
842 .header(header::CONTENT_LENGTH, "0");
843
844 let meta_headers = metadata_to_gcs_headers(metadata)?;
845 for (name, value) in &meta_headers {
846 builder = builder.header(name, value);
847 }
848
849 let resp = builder
850 .send()
851 .await
852 .and_then(|r| r.error_for_status())
853 .map_err(|e| Error::reqwest("GCS: initiate multipart upload", e))?;
854
855 let body = resp
856 .bytes()
857 .await
858 .map_err(|e| Error::reqwest("GCS: read initiate multipart body", e))?;
859
860 let xml: XmlInitiateMultipartUploadResponse = quick_xml::de::from_reader(body.as_ref())
861 .map_err(|e| Error::Generic {
862 context: "GCS: failed to parse initiate multipart response".to_owned(),
863 cause: Some(Box::new(e)),
864 })?;
865
866 Ok(xml.into())
867 }
868
869 #[tracing::instrument(level = "trace", fields(?id, upload_id, part_number), skip_all)]
870 async fn upload_part(
871 &self,
872 id: &ObjectId,
873 upload_id: &UploadId,
874 part_number: PartNumber,
875 content_length: u64,
876 content_md5: Option<&str>,
877 body: ClientStream,
878 ) -> Result<UploadPartResponse> {
879 objectstore_log::debug!("Uploading part to GCS backend");
880 let mut url = self.xml_object_url(id)?;
881 url.query_pairs_mut()
882 .append_pair("partNumber", &part_number.to_string())
883 .append_pair("uploadId", upload_id);
884
885 let mut builder = self
886 .request(Method::PUT, url)
887 .await?
888 .header(header::CONTENT_LENGTH, content_length)
889 .body(Body::wrap_stream(body));
890
891 if let Some(md5) = content_md5 {
892 builder = builder.header("content-md5", md5);
893 }
894
895 let resp = builder
896 .send()
897 .await
898 .and_then(|r| r.error_for_status())
899 .map_err(|e| Error::reqwest("GCS: upload part", e))?;
900
901 let etag = resp
902 .headers()
903 .get(header::ETAG)
904 .and_then(|v| v.to_str().ok())
905 .map(|s| s.to_owned())
906 .ok_or_else(|| Error::generic("GCS: upload part response missing ETag header"))?;
907
908 Ok(etag)
909 }
910
911 #[tracing::instrument(level = "trace", fields(?id, upload_id), skip_all)]
912 async fn list_parts(
913 &self,
914 id: &ObjectId,
915 upload_id: &UploadId,
916 max_parts: Option<u32>,
917 part_number_marker: Option<PartNumber>,
918 ) -> Result<ListPartsResponse> {
919 objectstore_log::debug!("Listing parts on GCS backend");
920 let mut url = self.xml_object_url(id)?;
921 {
922 let mut pairs = url.query_pairs_mut();
923 pairs.append_pair("uploadId", upload_id);
924 if let Some(max) = max_parts {
925 pairs.append_pair("max-parts", &max.to_string());
926 }
927 if let Some(marker) = part_number_marker {
928 pairs.append_pair("part-number-marker", &marker.to_string());
929 }
930 }
931
932 let resp = self
933 .request(Method::GET, url)
934 .await?
935 .send()
936 .await
937 .and_then(|r| r.error_for_status())
938 .map_err(|e| Error::reqwest("GCS: list parts", e))?;
939
940 let body = resp
941 .bytes()
942 .await
943 .map_err(|e| Error::reqwest("GCS: read list parts body", e))?;
944
945 let xml: XmlListPartsResponse =
946 quick_xml::de::from_reader(body.as_ref()).map_err(|e| Error::Generic {
947 context: "GCS: failed to parse list parts response".to_owned(),
948 cause: Some(Box::new(e)),
949 })?;
950
951 Ok(xml.into())
952 }
953
954 #[tracing::instrument(level = "trace", fields(?id, upload_id), skip_all)]
955 async fn abort_multipart(
956 &self,
957 id: &ObjectId,
958 upload_id: &UploadId,
959 ) -> Result<AbortMultipartResponse> {
960 objectstore_log::debug!("Aborting multipart upload on GCS backend");
961 let mut url = self.xml_object_url(id)?;
962 url.query_pairs_mut().append_pair("uploadId", upload_id);
963
964 self.request(Method::DELETE, url)
965 .await?
966 .send()
967 .await
968 .and_then(|r| r.error_for_status())
969 .map_err(|e| Error::reqwest("GCS: abort multipart upload", e))?;
970
971 Ok(())
972 }
973
974 #[tracing::instrument(level = "trace", fields(?id, upload_id), skip_all)]
975 async fn complete_multipart(
976 &self,
977 id: &ObjectId,
978 upload_id: &UploadId,
979 parts: Vec<CompletedPart>,
980 ) -> Result<CompleteMultipartResponse> {
981 objectstore_log::debug!("Completing multipart upload on GCS backend");
982 let mut url = self.xml_object_url(id)?;
983 url.query_pairs_mut().append_pair("uploadId", upload_id);
984
985 let body = XmlCompleteMultipartUpload::from(parts);
986 let xml = quick_xml::se::to_string(&body).map_err(|e| Error::Generic {
987 context: "GCS: failed to serialize complete multipart request".into(),
988 cause: Some(Box::new(e)),
989 })?;
990
991 let resp = self
992 .request(Method::POST, url)
993 .await?
994 .header(header::CONTENT_TYPE, "application/xml")
995 .body(xml)
996 .send()
997 .await
998 .and_then(|r| r.error_for_status())
999 .map_err(|e| Error::reqwest("GCS: complete multipart upload", e))?;
1000
1001 let body = resp
1002 .bytes()
1003 .await
1004 .map_err(|e| Error::reqwest("GCS: read complete multipart body", e))?;
1005
1006 let error = quick_xml::de::from_reader::<_, XmlError>(body.as_ref())
1007 .ok()
1008 .map(Into::into);
1009
1010 Ok(error)
1011 }
1012}
1013
1014#[cfg(test)]
1015mod tests {
1016 use std::collections::BTreeMap;
1017
1018 use anyhow::Result;
1019 use objectstore_types::scope::{Scope, Scopes};
1020
1021 use super::*;
1022 use crate::id::ObjectContext;
1023 use crate::multipart::CompletedPart;
1024 use crate::stream;
1025
1026 async fn create_test_backend() -> Result<GcsBackend> {
1032 GcsBackend::new(GcsConfig {
1033 endpoint: Some("http://localhost:8087".into()),
1034 bucket: "test-bucket".into(),
1035 })
1036 .await
1037 }
1038
1039 fn make_id() -> ObjectId {
1040 ObjectId::random(ObjectContext {
1041 usecase: "testing".into(),
1042 scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
1043 })
1044 }
1045
1046 #[tokio::test]
1047 async fn test_roundtrip() -> Result<()> {
1048 let backend = create_test_backend().await?;
1049
1050 let id = make_id();
1051 let metadata = Metadata {
1052 content_type: "text/plain".into(),
1053 expiration_policy: ExpirationPolicy::Manual,
1054 compression: None,
1055 origin: Some("203.0.113.42".into()),
1056 custom: BTreeMap::from_iter([("hello".into(), "world".into())]),
1057 time_created: Some(SystemTime::now()),
1058 time_expires: None,
1059 size: None,
1060 };
1061
1062 backend
1063 .put_object(&id, &metadata, stream::single("hello, world"))
1064 .await?;
1065
1066 let (meta, stream) = backend.get_object(&id).await?.unwrap();
1067
1068 let payload = stream::read_to_vec(stream).await?;
1069 let str_payload = str::from_utf8(&payload).unwrap();
1070 assert_eq!(str_payload, "hello, world");
1071 assert_eq!(meta.content_type, metadata.content_type);
1072 assert_eq!(meta.origin, metadata.origin);
1073 assert_eq!(meta.custom, metadata.custom);
1074 assert!(metadata.time_created.is_some());
1075
1076 Ok(())
1077 }
1078
1079 #[tokio::test]
1080 async fn test_get_nonexistent() -> Result<()> {
1081 let backend = create_test_backend().await?;
1082
1083 let id = make_id();
1084 let result = backend.get_object(&id).await?;
1085 assert!(result.is_none());
1086
1087 Ok(())
1088 }
1089
1090 #[tokio::test]
1091 async fn test_delete_nonexistent() -> Result<()> {
1092 let backend = create_test_backend().await?;
1093
1094 let id = make_id();
1095 backend.delete_object(&id).await?;
1096
1097 Ok(())
1098 }
1099
1100 #[tokio::test]
1101 async fn test_overwrite() -> Result<()> {
1102 let backend = create_test_backend().await?;
1103
1104 let id = make_id();
1105 let metadata = Metadata {
1106 custom: BTreeMap::from_iter([("invalid".into(), "invalid".into())]),
1107 ..Default::default()
1108 };
1109
1110 backend
1111 .put_object(&id, &metadata, stream::single("hello"))
1112 .await?;
1113
1114 let metadata = Metadata {
1115 custom: BTreeMap::from_iter([("hello".into(), "world".into())]),
1116 ..Default::default()
1117 };
1118
1119 backend
1120 .put_object(&id, &metadata, stream::single("world"))
1121 .await?;
1122
1123 let (meta, stream) = backend.get_object(&id).await?.unwrap();
1124
1125 let payload = stream::read_to_vec(stream).await?;
1126 let str_payload = str::from_utf8(&payload).unwrap();
1127 assert_eq!(str_payload, "world");
1128 assert_eq!(meta.custom, metadata.custom);
1129
1130 Ok(())
1131 }
1132
1133 #[tokio::test]
1134 async fn test_read_after_delete() -> Result<()> {
1135 let backend = create_test_backend().await?;
1136
1137 let id = make_id();
1138 let metadata = Metadata::default();
1139
1140 backend
1141 .put_object(&id, &metadata, stream::single("hello, world"))
1142 .await?;
1143
1144 backend.delete_object(&id).await?;
1145
1146 let result = backend.get_object(&id).await?;
1147 assert!(result.is_none());
1148
1149 Ok(())
1150 }
1151
1152 #[tokio::test]
1153 async fn test_ttl_immediate() -> Result<()> {
1154 let backend = create_test_backend().await?;
1158
1159 let id = make_id();
1160 let metadata = Metadata {
1161 expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_secs(0)),
1162 ..Default::default()
1163 };
1164
1165 backend
1166 .put_object(&id, &metadata, stream::single("hello, world"))
1167 .await?;
1168
1169 let result = backend.get_object(&id).await?;
1170 assert!(result.is_none());
1171
1172 Ok(())
1173 }
1174
1175 #[tokio::test]
1176 async fn test_tti_immediate() -> Result<()> {
1177 let backend = create_test_backend().await?;
1181
1182 let id = make_id();
1183 let metadata = Metadata {
1184 expiration_policy: ExpirationPolicy::TimeToIdle(Duration::from_secs(0)),
1185 ..Default::default()
1186 };
1187
1188 backend
1189 .put_object(&id, &metadata, stream::single("hello, world"))
1190 .await?;
1191
1192 let result = backend.get_object(&id).await?;
1193 assert!(result.is_none());
1194
1195 Ok(())
1196 }
1197
1198 #[tokio::test]
1199 async fn test_get_metadata_returns_metadata() -> Result<()> {
1200 let backend = create_test_backend().await?;
1201
1202 let id = make_id();
1203 let metadata = Metadata {
1204 content_type: "text/plain".into(),
1205 origin: Some("203.0.113.42".into()),
1206 custom: BTreeMap::from_iter([("hello".into(), "world".into())]),
1207 ..Default::default()
1208 };
1209
1210 backend
1211 .put_object(&id, &metadata, stream::single("hello, world"))
1212 .await?;
1213
1214 let meta = backend.get_metadata(&id).await?.unwrap();
1215 assert_eq!(meta.content_type, metadata.content_type);
1216 assert_eq!(meta.origin, metadata.origin);
1217 assert_eq!(meta.custom, metadata.custom);
1218
1219 Ok(())
1220 }
1221
1222 #[tokio::test]
1223 async fn test_get_metadata_nonexistent() -> Result<()> {
1224 let backend = create_test_backend().await?;
1225
1226 let id = make_id();
1227 let result = backend.get_metadata(&id).await?;
1228 assert!(result.is_none());
1229
1230 Ok(())
1231 }
1232
1233 #[tokio::test]
1234 async fn test_get_metadata_bumps_tti() -> Result<()> {
1235 let backend = create_test_backend().await?;
1236
1237 let id = make_id();
1238 let tti = Duration::from_secs(2 * 24 * 3600); let metadata = Metadata {
1241 content_type: "text/plain".into(),
1242 expiration_policy: ExpirationPolicy::TimeToIdle(tti),
1243 ..Default::default()
1244 };
1245
1246 backend
1247 .put_object(&id, &metadata, stream::single("hello, world"))
1248 .await?;
1249
1250 let object_url = backend.object_url(&id)?;
1253 let old_deadline = SystemTime::now() + tti - TTI_DEBOUNCE - Duration::from_secs(60);
1254 backend.update_custom_time(object_url, old_deadline).await?;
1255
1256 let pre_meta = backend.get_metadata(&id).await?.unwrap();
1258 let pre_expiry = pre_meta.time_expires.unwrap();
1259
1260 let post_meta = backend.get_metadata(&id).await?.unwrap();
1262 let post_expiry = post_meta.time_expires.unwrap();
1263 assert!(
1264 post_expiry > pre_expiry,
1265 "TTI bump should have extended the expiry: {pre_expiry:?} -> {post_expiry:?}"
1266 );
1267
1268 let (_, stream) = backend.get_object(&id).await?.unwrap();
1270 let payload = stream::read_to_vec(stream).await?;
1271 assert_eq!(&payload, b"hello, world");
1272
1273 Ok(())
1274 }
1275
1276 #[tokio::test]
1277 async fn test_get_metadata_does_not_bump_fresh_tti() -> Result<()> {
1278 let backend = create_test_backend().await?;
1279
1280 let id = make_id();
1281 let tti = Duration::from_secs(2 * 24 * 3600); let metadata = Metadata {
1284 content_type: "text/plain".into(),
1285 expiration_policy: ExpirationPolicy::TimeToIdle(tti),
1286 ..Default::default()
1287 };
1288
1289 backend
1290 .put_object(&id, &metadata, stream::single("hello, world"))
1291 .await?;
1292
1293 let first = backend.get_metadata(&id).await?.unwrap();
1296 let first_expiry = first.time_expires.unwrap();
1297
1298 let second = backend.get_metadata(&id).await?.unwrap();
1299 let second_expiry = second.time_expires.unwrap();
1300
1301 assert_eq!(
1302 first_expiry, second_expiry,
1303 "Fresh TTI object should not have its expiry bumped"
1304 );
1305
1306 Ok(())
1307 }
1308
1309 #[tokio::test]
1310 async fn test_compressed_payload_roundtrip() -> Result<()> {
1311 use objectstore_types::metadata::Compression;
1312
1313 let backend = create_test_backend().await?;
1314
1315 let plaintext = b"hello, world (but compressed with zstd)";
1316 let compressed = zstd::encode_all(&plaintext[..], 3)?;
1317
1318 let id = make_id();
1319 let metadata = Metadata {
1320 content_type: "text/plain".into(),
1321 compression: Some(Compression::Zstd),
1322 ..Default::default()
1323 };
1324
1325 backend
1326 .put_object(&id, &metadata, stream::single(compressed.clone()))
1327 .await?;
1328
1329 let (meta, stream) = backend.get_object(&id).await?.unwrap();
1330 let payload = stream::read_to_vec(stream).await?;
1331
1332 assert_eq!(meta.compression, Some(Compression::Zstd));
1333 assert_eq!(
1334 payload, compressed,
1335 "Payload should be returned still compressed, not auto-decompressed"
1336 );
1337
1338 Ok(())
1339 }
1340
1341 #[tokio::test]
1342 async fn test_multipart_single_part() -> Result<()> {
1343 let backend = create_test_backend().await?;
1344 let id = make_id();
1345 let metadata = Metadata {
1346 content_type: "text/plain".into(),
1347 expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_mins(33)),
1348 origin: Some("203.0.113.42".into()),
1349 custom: BTreeMap::from_iter([("hello".into(), "world".into())]),
1350 ..Default::default()
1351 };
1352
1353 let upload_id = backend.initiate_multipart(&id, &metadata).await?;
1354
1355 let data = b"hello, multipart world!";
1356 let etag = backend
1357 .upload_part(
1358 &id,
1359 &upload_id,
1360 1,
1361 data.len() as u64,
1362 None,
1363 stream::single(data.to_vec()),
1364 )
1365 .await?;
1366
1367 let result = backend
1368 .complete_multipart(
1369 &id,
1370 &upload_id,
1371 vec![CompletedPart {
1372 part_number: 1,
1373 etag,
1374 }],
1375 )
1376 .await?;
1377 assert!(result.is_none(), "expected no error on complete");
1378
1379 let (meta, stream) = backend.get_object(&id).await?.unwrap();
1380 let payload = stream::read_to_vec(stream).await?;
1381 assert_eq!(payload, data);
1382 assert_eq!(meta.content_type, "text/plain".to_string());
1383 assert_eq!(
1384 meta.expiration_policy,
1385 ExpirationPolicy::TimeToLive(Duration::from_mins(33))
1386 );
1387 assert_eq!(meta.origin, Some("203.0.113.42".into()));
1388 assert_eq!(
1389 meta.custom,
1390 BTreeMap::from_iter([("hello".into(), "world".into())])
1391 );
1392
1393 Ok(())
1394 }
1395
1396 #[tokio::test]
1397 async fn test_multipart_multiple_parts() -> Result<()> {
1398 let backend = create_test_backend().await?;
1399 let id = make_id();
1400 let metadata = Metadata::default();
1401
1402 let upload_id = backend.initiate_multipart(&id, &metadata).await?;
1403
1404 const MIN_PART: usize = 5 * 1024 * 1024;
1406 let part1 = vec![b'a'; MIN_PART];
1407 let part2 = vec![b'b'; MIN_PART];
1408 let part3 = b"cccc".to_vec();
1409
1410 let etag1 = backend
1411 .upload_part(
1412 &id,
1413 &upload_id,
1414 1,
1415 part1.len() as u64,
1416 None,
1417 stream::single(part1.clone()),
1418 )
1419 .await?;
1420 let etag2 = backend
1421 .upload_part(
1422 &id,
1423 &upload_id,
1424 2,
1425 part2.len() as u64,
1426 None,
1427 stream::single(part2.clone()),
1428 )
1429 .await?;
1430 let etag3 = backend
1431 .upload_part(
1432 &id,
1433 &upload_id,
1434 3,
1435 part3.len() as u64,
1436 None,
1437 stream::single(part3.clone()),
1438 )
1439 .await?;
1440
1441 let result = backend
1442 .complete_multipart(
1443 &id,
1444 &upload_id,
1445 vec![
1446 CompletedPart {
1447 part_number: 1,
1448 etag: etag1,
1449 },
1450 CompletedPart {
1451 part_number: 2,
1452 etag: etag2,
1453 },
1454 CompletedPart {
1455 part_number: 3,
1456 etag: etag3,
1457 },
1458 ],
1459 )
1460 .await?;
1461 assert!(result.is_none(), "expected no error on complete");
1462
1463 let (_meta, stream) = backend.get_object(&id).await?.unwrap();
1465 let payload = stream::read_to_vec(stream).await?;
1466 let mut expected = Vec::new();
1467 expected.extend_from_slice(&part1);
1468 expected.extend_from_slice(&part2);
1469 expected.extend_from_slice(&part3);
1470 assert_eq!(payload, expected);
1471
1472 Ok(())
1473 }
1474
1475 #[tokio::test]
1476 async fn test_multipart_out_of_order_upload() -> Result<()> {
1477 let backend = create_test_backend().await?;
1478 let id = make_id();
1479 let metadata = Metadata::default();
1480
1481 let upload_id = backend.initiate_multipart(&id, &metadata).await?;
1482
1483 const MIN_PART: usize = 5 * 1024 * 1024;
1485 let part1 = vec![b'a'; MIN_PART];
1486 let part2 = vec![b'b'; MIN_PART];
1487 let part3 = b"cccc".to_vec();
1488
1489 let etag2 = backend
1491 .upload_part(
1492 &id,
1493 &upload_id,
1494 2,
1495 part2.len() as u64,
1496 None,
1497 stream::single(part2.clone()),
1498 )
1499 .await?;
1500 let etag3 = backend
1501 .upload_part(
1502 &id,
1503 &upload_id,
1504 3,
1505 part3.len() as u64,
1506 None,
1507 stream::single(part3.clone()),
1508 )
1509 .await?;
1510 let etag1 = backend
1511 .upload_part(
1512 &id,
1513 &upload_id,
1514 1,
1515 part1.len() as u64,
1516 None,
1517 stream::single(part1.clone()),
1518 )
1519 .await?;
1520
1521 let result = backend
1523 .complete_multipart(
1524 &id,
1525 &upload_id,
1526 vec![
1527 CompletedPart {
1528 part_number: 1,
1529 etag: etag1,
1530 },
1531 CompletedPart {
1532 part_number: 2,
1533 etag: etag2,
1534 },
1535 CompletedPart {
1536 part_number: 3,
1537 etag: etag3,
1538 },
1539 ],
1540 )
1541 .await?;
1542 assert!(result.is_none(), "expected no error on complete");
1543
1544 let (_meta, stream) = backend.get_object(&id).await?.unwrap();
1546 let payload = stream::read_to_vec(stream).await?;
1547 let mut expected = Vec::new();
1548 expected.extend_from_slice(&part1);
1549 expected.extend_from_slice(&part2);
1550 expected.extend_from_slice(&part3);
1551 assert_eq!(payload, expected);
1552
1553 Ok(())
1554 }
1555
1556 #[tokio::test]
1557 async fn test_multipart_list_parts() -> Result<()> {
1558 let backend = create_test_backend().await?;
1559 let id = make_id();
1560 let metadata = Metadata::default();
1561
1562 let upload_id = backend.initiate_multipart(&id, &metadata).await?;
1563
1564 let etag1 = backend
1565 .upload_part(&id, &upload_id, 1, 3, None, stream::single(b"aaa".to_vec()))
1566 .await?;
1567 let etag2 = backend
1568 .upload_part(&id, &upload_id, 2, 3, None, stream::single(b"bbb".to_vec()))
1569 .await?;
1570
1571 let list = backend.list_parts(&id, &upload_id, None, None).await?;
1573 assert_eq!(list.parts.len(), 2);
1574 assert_eq!(list.parts[0].part_number, 1);
1575 assert_eq!(list.parts[0].etag, etag1);
1576 assert_eq!(list.parts[0].size, 3);
1577 assert_eq!(list.parts[1].part_number, 2);
1578 assert_eq!(list.parts[1].etag, etag2);
1579 assert_eq!(list.parts[1].size, 3);
1580
1581 let page1 = backend.list_parts(&id, &upload_id, Some(1), None).await?;
1583 assert_eq!(page1.parts.len(), 1);
1584 assert_eq!(page1.parts[0].part_number, 1);
1585 assert!(page1.is_truncated);
1586 assert!(page1.next_part_number_marker.is_some());
1587
1588 let page2 = backend
1589 .list_parts(&id, &upload_id, Some(1), page1.next_part_number_marker)
1590 .await?;
1591 assert_eq!(page2.parts.len(), 1);
1592 assert_eq!(page2.parts[0].part_number, 2);
1593
1594 backend.abort_multipart(&id, &upload_id).await?;
1596
1597 Ok(())
1598 }
1599
1600 #[tokio::test]
1601 async fn test_multipart_abort() -> Result<()> {
1602 let backend = create_test_backend().await?;
1603 let id = make_id();
1604 let metadata = Metadata::default();
1605
1606 let upload_id = backend.initiate_multipart(&id, &metadata).await?;
1607
1608 backend
1609 .upload_part(
1610 &id,
1611 &upload_id,
1612 1,
1613 5,
1614 None,
1615 stream::single(b"hello".to_vec()),
1616 )
1617 .await?;
1618
1619 backend.abort_multipart(&id, &upload_id).await?;
1620
1621 let result = backend.get_object(&id).await?;
1623 assert!(result.is_none(), "object should not exist after abort");
1624
1625 Ok(())
1626 }
1627
1628 async fn multipart_put(
1629 backend: &GcsBackend,
1630 id: &ObjectId,
1631 metadata: &Metadata,
1632 payload: impl Into<bytes::Bytes>,
1633 ) -> Result<()> {
1634 let payload: bytes::Bytes = payload.into();
1635 let upload_id = backend.initiate_multipart(id, metadata).await?;
1636 let etag = backend
1637 .upload_part(
1638 id,
1639 &upload_id,
1640 1,
1641 payload.len() as u64,
1642 None,
1643 stream::single(payload),
1644 )
1645 .await?;
1646 let error = backend
1647 .complete_multipart(
1648 id,
1649 &upload_id,
1650 vec![CompletedPart {
1651 part_number: 1,
1652 etag,
1653 }],
1654 )
1655 .await?;
1656 assert!(
1657 error.is_none(),
1658 "complete_multipart returned error: {error:?}"
1659 );
1660 Ok(())
1661 }
1662
1663 #[tokio::test]
1664 async fn test_multipart_ttl_immediate() -> Result<()> {
1665 let backend = create_test_backend().await?;
1666 let id = make_id();
1667 let metadata = Metadata {
1668 expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_secs(0)),
1669 ..Default::default()
1670 };
1671
1672 multipart_put(&backend, &id, &metadata, "hello, world").await?;
1673
1674 let result = backend.get_object(&id).await?;
1675 assert!(result.is_none());
1676
1677 Ok(())
1678 }
1679
1680 #[tokio::test]
1681 async fn test_multipart_tti_immediate() -> Result<()> {
1682 let backend = create_test_backend().await?;
1683 let id = make_id();
1684 let metadata = Metadata {
1685 expiration_policy: ExpirationPolicy::TimeToIdle(Duration::from_secs(0)),
1686 ..Default::default()
1687 };
1688
1689 multipart_put(&backend, &id, &metadata, "hello, world").await?;
1690
1691 let result = backend.get_object(&id).await?;
1692 assert!(result.is_none());
1693
1694 Ok(())
1695 }
1696
1697 #[tokio::test]
1698 async fn test_multipart_compressed_payload_roundtrip() -> Result<()> {
1699 use objectstore_types::metadata::Compression;
1700
1701 let backend = create_test_backend().await?;
1702
1703 let plaintext = b"hello, world (but compressed with zstd)";
1704 let compressed = zstd::encode_all(&plaintext[..], 3)?;
1705
1706 let id = make_id();
1707 let metadata = Metadata {
1708 content_type: "text/plain".into(),
1709 compression: Some(Compression::Zstd),
1710 ..Default::default()
1711 };
1712
1713 multipart_put(&backend, &id, &metadata, compressed.clone()).await?;
1714
1715 let (meta, stream) = backend.get_object(&id).await?.unwrap();
1716 let payload = stream::read_to_vec(stream).await?;
1717
1718 assert_eq!(meta.compression, Some(Compression::Zstd));
1719 assert_eq!(
1720 payload, compressed,
1721 "Payload should be returned still compressed, not auto-decompressed"
1722 );
1723
1724 Ok(())
1725 }
1726}