1use std::borrow::Cow;
4use std::collections::BTreeMap;
5use std::future::Future;
6use std::time::{Duration, SystemTime};
7use std::{fmt, io};
8
9use anyhow::Context;
10use futures_util::{StreamExt, TryStreamExt};
11use gcp_auth::TokenProvider;
12use objectstore_types::metadata::{ExpirationPolicy, Metadata};
13use reqwest::{Body, IntoUrl, Method, RequestBuilder, StatusCode, Url, header, multipart};
14use serde::{Deserialize, Serialize};
15
16use crate::backend::common::{
17 self, Backend, DeleteResponse, GetResponse, MetadataResponse, PutResponse,
18};
19use crate::error::{Error, Result};
20use crate::gcp_auth::PrefetchingTokenProvider;
21use crate::id::ObjectId;
22use crate::stream::{self, ClientStream};
23
24#[derive(Debug, Clone, Deserialize, Serialize)]
44pub struct GcsConfig {
45 pub endpoint: Option<String>,
58
59 pub bucket: String,
67}
68
69const DEFAULT_ENDPOINT: &str = "https://storage.googleapis.com";
71const TOKEN_SCOPES: &[&str] = &["https://www.googleapis.com/auth/devstorage.read_write"];
73const TTI_DEBOUNCE: Duration = Duration::from_secs(24 * 3600); const REQUEST_RETRY_COUNT: usize = 2;
77
78const BUILTIN_META_PREFIX: &str = "x-sn-";
80const CUSTOM_META_PREFIX: &str = "x-snme-";
82
83#[derive(Debug, Serialize, Deserialize)]
89#[serde(rename_all = "camelCase")]
90struct GcsObject {
91 pub content_type: Cow<'static, str>,
94
95 #[serde(default, skip_serializing_if = "Option::is_none")]
97 pub content_encoding: Option<String>,
98
99 #[serde(
101 default,
102 skip_serializing_if = "Option::is_none",
103 with = "humantime_serde"
104 )]
105 pub custom_time: Option<SystemTime>,
106
107 pub size: Option<String>,
112
113 #[serde(
115 default,
116 skip_serializing_if = "Option::is_none",
117 with = "humantime_serde"
118 )]
119 pub time_created: Option<SystemTime>,
120
121 #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
123 pub metadata: BTreeMap<GcsMetaKey, String>,
124}
125
126impl GcsObject {
127 pub fn from_metadata(metadata: &Metadata) -> Self {
129 let mut gcs_object = GcsObject {
130 content_type: metadata.content_type.clone(),
131 size: metadata.size.map(|size| size.to_string()),
132 content_encoding: None,
133 custom_time: None,
134 time_created: metadata.time_created,
135 metadata: BTreeMap::new(),
136 };
137
138 if let Some(expires_in) = metadata.expiration_policy.expires_in() {
142 gcs_object.custom_time = Some(SystemTime::now() + expires_in);
143 }
144
145 if let Some(compression) = metadata.compression {
146 gcs_object.content_encoding = Some(compression.to_string());
147 }
148
149 if metadata.expiration_policy != ExpirationPolicy::default() {
150 gcs_object.metadata.insert(
151 GcsMetaKey::Expiration,
152 metadata.expiration_policy.to_string(),
153 );
154 }
155
156 if let Some(origin) = &metadata.origin {
157 gcs_object
158 .metadata
159 .insert(GcsMetaKey::Origin, origin.clone());
160 }
161
162 for (key, value) in &metadata.custom {
163 gcs_object
164 .metadata
165 .insert(GcsMetaKey::Custom(key.clone()), value.clone());
166 }
167
168 gcs_object
169 }
170
171 pub fn into_metadata(mut self) -> Result<Metadata> {
173 self.metadata.remove(&GcsMetaKey::EmulatorIgnored);
175
176 let expiration_policy = self
177 .metadata
178 .remove(&GcsMetaKey::Expiration)
179 .map(|s| s.parse())
180 .transpose()?
181 .unwrap_or_default();
182
183 let origin = self.metadata.remove(&GcsMetaKey::Origin);
184
185 let content_type = self.content_type;
186 let compression = self.content_encoding.map(|s| s.parse()).transpose()?;
187 let size = self
188 .size
189 .map(|size| size.parse())
190 .transpose()
191 .map_err(|e| Error::Generic {
192 context: "GCS: failed to parse size from object metadata".to_string(),
193 cause: Some(Box::new(e)),
194 })?;
195 let time_created = self.time_created;
196
197 let mut custom = BTreeMap::new();
199 for (key, value) in self.metadata {
200 if let GcsMetaKey::Custom(custom_key) = key {
201 custom.insert(custom_key, value);
202 } else {
203 return Err(Error::Generic {
204 context: format!(
205 "GCS: unexpected built-in metadata key in object metadata: {}",
206 key
207 ),
208 cause: None,
209 });
210 }
211 }
212
213 Ok(Metadata {
214 content_type,
215 expiration_policy,
216 compression,
217 origin,
218 size,
219 custom,
220 time_created,
221 time_expires: self.custom_time,
222 })
223 }
224}
225
226#[derive(Clone, Debug, PartialEq, Eq, Ord, PartialOrd)]
228enum GcsMetaKey {
229 Expiration,
231 Origin,
233 EmulatorIgnored,
235 Custom(String),
237}
238
239impl std::str::FromStr for GcsMetaKey {
240 type Err = anyhow::Error;
241
242 fn from_str(s: &str) -> Result<Self, Self::Err> {
243 if matches!(s, "x_emulator_upload" | "x_testbench_upload") {
244 return Ok(GcsMetaKey::EmulatorIgnored);
245 }
246
247 Ok(match s.strip_prefix(BUILTIN_META_PREFIX) {
248 Some("expiration") => GcsMetaKey::Expiration,
249 Some("origin") => GcsMetaKey::Origin,
250 Some(unknown) => anyhow::bail!("unknown builtin metadata key: {unknown}"),
251 None => match s.strip_prefix(CUSTOM_META_PREFIX) {
252 Some(key) => GcsMetaKey::Custom(key.to_string()),
253 None => anyhow::bail!("invalid GCS metadata key format: {s}"),
254 },
255 })
256 }
257}
258
259impl fmt::Display for GcsMetaKey {
260 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
261 match self {
262 Self::Expiration => write!(f, "{BUILTIN_META_PREFIX}expiration"),
263 Self::Origin => write!(f, "{BUILTIN_META_PREFIX}origin"),
264 Self::EmulatorIgnored => unreachable!("do not serialize emulator metadata"),
265 Self::Custom(key) => write!(f, "{CUSTOM_META_PREFIX}{key}"),
266 }
267 }
268}
269
270impl<'de> serde::Deserialize<'de> for GcsMetaKey {
271 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
272 where
273 D: serde::Deserializer<'de>,
274 {
275 let s = Cow::<'de, str>::deserialize(deserializer)?;
276 s.parse().map_err(serde::de::Error::custom)
277 }
278}
279
280impl serde::Serialize for GcsMetaKey {
281 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
282 where
283 S: serde::Serializer,
284 {
285 serializer.collect_str(self)
286 }
287}
288
289fn is_retryable(error: &Error) -> bool {
291 let Error::Reqwest { cause, .. } = error else {
292 return false;
293 };
294 if cause.is_timeout() || cause.is_connect() || cause.is_request() {
295 return true;
296 }
297 let Some(status) = cause.status() else {
298 return false;
299 };
300 matches!(
302 status,
303 StatusCode::REQUEST_TIMEOUT
304 | StatusCode::TOO_MANY_REQUESTS
305 | StatusCode::INTERNAL_SERVER_ERROR
306 | StatusCode::BAD_GATEWAY
307 | StatusCode::SERVICE_UNAVAILABLE
308 | StatusCode::GATEWAY_TIMEOUT
309 )
310}
311
312pub struct GcsBackend {
314 client: reqwest::Client,
315 endpoint: Url,
316 bucket: String,
317 token_provider: Option<PrefetchingTokenProvider>,
318}
319
320impl GcsBackend {
321 pub async fn new(config: GcsConfig) -> anyhow::Result<Self> {
323 let GcsConfig { endpoint, bucket } = config;
324
325 let token_provider = if endpoint.is_none() {
326 Some(PrefetchingTokenProvider::gcp_auth(TOKEN_SCOPES).await?)
327 } else {
328 None
329 };
330
331 let endpoint_str = endpoint.as_deref().unwrap_or(DEFAULT_ENDPOINT);
332
333 Ok(Self {
334 client: common::reqwest_client(),
335 endpoint: endpoint_str.parse().context("invalid GCS endpoint URL")?,
336 bucket,
337 token_provider,
338 })
339 }
340
341 fn object_url(&self, id: &ObjectId) -> Result<Url> {
343 let mut url = self.endpoint.clone();
344
345 let path = id.as_storage_path().to_string();
346 url.path_segments_mut()
347 .map_err(|()| Error::Generic {
348 context: format!(
349 "GCS: invalid endpoint URL, {} cannot be a base",
350 self.endpoint
351 ),
352 cause: None,
353 })?
354 .extend(&["storage", "v1", "b", &self.bucket, "o", &path]);
355
356 Ok(url)
357 }
358
359 fn upload_url(&self, id: &ObjectId, upload_type: &str) -> Result<Url> {
361 let mut url = self.endpoint.clone();
362
363 url.path_segments_mut()
364 .map_err(|()| Error::Generic {
365 context: format!(
366 "GCS: invalid endpoint URL, {} cannot be a base",
367 self.endpoint
368 ),
369 cause: None,
370 })?
371 .extend(&["upload", "storage", "v1", "b", &self.bucket, "o"]);
372
373 url.query_pairs_mut()
374 .append_pair("uploadType", upload_type)
375 .append_pair("name", &id.as_storage_path().to_string());
376
377 Ok(url)
378 }
379
380 async fn request(&self, method: Method, url: impl IntoUrl) -> Result<RequestBuilder> {
382 let mut builder = self.client.request(method, url);
383 if let Some(provider) = &self.token_provider {
384 let token = provider.token(TOKEN_SCOPES).await?;
385 builder = builder.bearer_auth(token.as_str());
386 }
387 Ok(builder)
388 }
389
390 async fn with_retry<T, F>(&self, action: &'static str, f: impl Fn() -> F) -> Result<T>
392 where
393 F: Future<Output = Result<T>> + Send,
394 {
395 let mut retry_count = 0usize;
396 loop {
397 match f().await {
398 Ok(res) => return Ok(res),
399 Err(ref e) if retry_count < REQUEST_RETRY_COUNT && is_retryable(e) => {
400 retry_count += 1;
401 objectstore_metrics::count!("gcs.retries", action = action);
402 objectstore_log::warn!(!!e, retry_count, action, "Retrying request");
403 }
404 Err(e) => {
405 objectstore_metrics::count!("gcs.failures", action = action);
406 return Err(e);
407 }
408 }
409 }
410 }
411
412 async fn fetch_gcs_metadata(&self, object_url: &Url) -> Result<Option<Metadata>> {
415 let metadata_opt = self
416 .with_retry("get_metadata", || async {
417 let resp = self
418 .request(Method::GET, object_url.clone())
419 .await?
420 .send()
421 .await
422 .map_err(|e| Error::reqwest("GCS: get metadata request", e))?;
423
424 if resp.status() == StatusCode::NOT_FOUND {
425 return Ok(None);
426 }
427
428 let metadata: GcsObject = resp
429 .error_for_status()
430 .map_err(|e| Error::reqwest("GCS: get metadata status", e))?
431 .json()
432 .await
433 .map_err(|e| Error::reqwest("GCS: get metadata parse", e))?;
434
435 Ok(Some(metadata))
436 })
437 .await?;
438
439 let Some(gcs_metadata) = metadata_opt else {
440 objectstore_log::debug!("Object not found");
441 return Ok(None);
442 };
443
444 let expire_at = gcs_metadata.custom_time;
445 let metadata = gcs_metadata.into_metadata()?;
446
447 let access_time = SystemTime::now();
449
450 if metadata.expiration_policy.is_timeout() && expire_at.is_some_and(|ts| ts < access_time) {
452 objectstore_log::debug!("Object found but past expiry");
453 return Ok(None);
454 }
455
456 if let ExpirationPolicy::TimeToIdle(tti) = metadata.expiration_policy {
458 let new_expire_at = access_time + tti;
459 if expire_at.is_some_and(|ts| ts < new_expire_at - TTI_DEBOUNCE) {
460 self.update_custom_time(object_url.clone(), new_expire_at)
461 .await?;
462 }
463 }
464
465 Ok(Some(metadata))
466 }
467
468 async fn update_custom_time(&self, object_url: Url, custom_time: SystemTime) -> Result<()> {
469 #[derive(Debug, Serialize)]
470 #[serde(rename_all = "camelCase")]
471 struct CustomTimeRequest {
472 #[serde(with = "humantime_serde")]
473 custom_time: SystemTime,
474 }
475
476 self.with_retry("update_custom_time", || async {
477 self.request(Method::PATCH, object_url.clone())
478 .await?
479 .json(&CustomTimeRequest { custom_time })
480 .send()
481 .await
482 .and_then(|r| r.error_for_status())
483 .map_err(|e| Error::reqwest("GCS: update custom time", e))?;
484 Ok(())
485 })
486 .await
487 }
488}
489
490impl fmt::Debug for GcsBackend {
491 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
492 f.debug_struct("GcsJsonApi")
493 .field("endpoint", &self.endpoint)
494 .field("bucket", &self.bucket)
495 .finish_non_exhaustive()
496 }
497}
498
499#[async_trait::async_trait]
500impl Backend for GcsBackend {
501 fn name(&self) -> &'static str {
502 "gcs"
503 }
504
505 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
506 async fn put_object(
507 &self,
508 id: &ObjectId,
509 metadata: &Metadata,
510 stream: ClientStream,
511 ) -> Result<PutResponse> {
512 objectstore_log::debug!("Writing to GCS backend");
513 let gcs_metadata = GcsObject::from_metadata(metadata);
514
515 let metadata_json = serde_json::to_string(&gcs_metadata).map_err(|cause| Error::Serde {
518 context: "failed to serialize metadata for GCS upload".to_string(),
519 cause,
520 })?;
521
522 let multipart = multipart::Form::new()
523 .part(
524 "metadata",
525 multipart::Part::text(metadata_json)
526 .mime_str("application/json")
527 .expect("application/json is a valid mime type"),
528 )
529 .part(
530 "media",
531 multipart::Part::stream(Body::wrap_stream(stream))
532 .mime_str(&metadata.content_type)
533 .map_err(|e| Error::Generic {
534 context: format!("invalid mime type: {}", &metadata.content_type),
535 cause: Some(Box::new(e)),
536 })?,
537 );
538
539 let content_type = format!("multipart/related; boundary={}", multipart.boundary());
543
544 self.request(Method::POST, self.upload_url(id, "multipart")?)
545 .await?
546 .multipart(multipart)
547 .header(header::CONTENT_TYPE, content_type)
548 .send()
549 .await
550 .and_then(|r| r.error_for_status())
551 .map_err(|e| match stream::unpack_client_error(&e) {
552 Some(ce) => Error::Client(ce),
553 _ => Error::reqwest("GCS: upload object", e),
554 })?;
555
556 Ok(())
557 }
558
559 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
560 async fn get_object(&self, id: &ObjectId) -> Result<GetResponse> {
561 objectstore_log::debug!("Reading from GCS backend");
562 let object_url = self.object_url(id)?;
563
564 let Some(metadata) = self.fetch_gcs_metadata(&object_url).await? else {
565 return Ok(None);
566 };
567
568 let mut download_url = object_url;
569 download_url.query_pairs_mut().append_pair("alt", "media");
570
571 let payload_response = self
572 .with_retry("get_payload", || async {
573 self.request(Method::GET, download_url.clone())
574 .await?
575 .send()
576 .await
577 .and_then(|r| r.error_for_status())
578 .map_err(|e| Error::reqwest("GCS: get payload", e))
579 })
580 .await?;
581
582 let stream = payload_response
583 .bytes_stream()
584 .map_err(io::Error::other)
585 .boxed();
586
587 Ok(Some((metadata, stream)))
588 }
589
590 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
591 async fn get_metadata(&self, id: &ObjectId) -> Result<MetadataResponse> {
592 objectstore_log::debug!("Reading metadata from GCS backend");
593 let object_url = self.object_url(id)?;
594 self.fetch_gcs_metadata(&object_url).await
595 }
596
597 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
598 async fn delete_object(&self, id: &ObjectId) -> Result<DeleteResponse> {
599 objectstore_log::debug!("Deleting from GCS backend");
600 let object_url = self.object_url(id)?;
601
602 self.with_retry("delete", || async {
603 let resp = self
604 .request(Method::DELETE, object_url.clone())
605 .await?
606 .send()
607 .await
608 .map_err(|e| Error::reqwest("GCS: delete object", e))?;
609
610 if resp.status() == StatusCode::NOT_FOUND {
612 return Ok(());
613 }
614
615 resp.error_for_status()
616 .map_err(|e| Error::reqwest("GCS: delete object", e))?;
617
618 Ok(())
619 })
620 .await
621 }
622}
623
624#[cfg(test)]
625mod tests {
626 use std::collections::BTreeMap;
627
628 use anyhow::Result;
629 use objectstore_types::scope::{Scope, Scopes};
630
631 use super::*;
632 use crate::id::ObjectContext;
633 use crate::stream;
634
635 async fn create_test_backend() -> Result<GcsBackend> {
641 GcsBackend::new(GcsConfig {
642 endpoint: Some("http://localhost:8087".into()),
643 bucket: "test-bucket".into(),
644 })
645 .await
646 }
647
648 fn make_id() -> ObjectId {
649 ObjectId::random(ObjectContext {
650 usecase: "testing".into(),
651 scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
652 })
653 }
654
655 #[tokio::test]
656 async fn test_roundtrip() -> Result<()> {
657 let backend = create_test_backend().await?;
658
659 let id = make_id();
660 let metadata = Metadata {
661 content_type: "text/plain".into(),
662 expiration_policy: ExpirationPolicy::Manual,
663 compression: None,
664 origin: Some("203.0.113.42".into()),
665 custom: BTreeMap::from_iter([("hello".into(), "world".into())]),
666 time_created: Some(SystemTime::now()),
667 time_expires: None,
668 size: None,
669 };
670
671 backend
672 .put_object(&id, &metadata, stream::single("hello, world"))
673 .await?;
674
675 let (meta, stream) = backend.get_object(&id).await?.unwrap();
676
677 let payload = stream::read_to_vec(stream).await?;
678 let str_payload = str::from_utf8(&payload).unwrap();
679 assert_eq!(str_payload, "hello, world");
680 assert_eq!(meta.content_type, metadata.content_type);
681 assert_eq!(meta.origin, metadata.origin);
682 assert_eq!(meta.custom, metadata.custom);
683 assert!(metadata.time_created.is_some());
684
685 Ok(())
686 }
687
688 #[tokio::test]
689 async fn test_get_nonexistent() -> Result<()> {
690 let backend = create_test_backend().await?;
691
692 let id = make_id();
693 let result = backend.get_object(&id).await?;
694 assert!(result.is_none());
695
696 Ok(())
697 }
698
699 #[tokio::test]
700 async fn test_delete_nonexistent() -> Result<()> {
701 let backend = create_test_backend().await?;
702
703 let id = make_id();
704 backend.delete_object(&id).await?;
705
706 Ok(())
707 }
708
709 #[tokio::test]
710 async fn test_overwrite() -> Result<()> {
711 let backend = create_test_backend().await?;
712
713 let id = make_id();
714 let metadata = Metadata {
715 custom: BTreeMap::from_iter([("invalid".into(), "invalid".into())]),
716 ..Default::default()
717 };
718
719 backend
720 .put_object(&id, &metadata, stream::single("hello"))
721 .await?;
722
723 let metadata = Metadata {
724 custom: BTreeMap::from_iter([("hello".into(), "world".into())]),
725 ..Default::default()
726 };
727
728 backend
729 .put_object(&id, &metadata, stream::single("world"))
730 .await?;
731
732 let (meta, stream) = backend.get_object(&id).await?.unwrap();
733
734 let payload = stream::read_to_vec(stream).await?;
735 let str_payload = str::from_utf8(&payload).unwrap();
736 assert_eq!(str_payload, "world");
737 assert_eq!(meta.custom, metadata.custom);
738
739 Ok(())
740 }
741
742 #[tokio::test]
743 async fn test_read_after_delete() -> Result<()> {
744 let backend = create_test_backend().await?;
745
746 let id = make_id();
747 let metadata = Metadata::default();
748
749 backend
750 .put_object(&id, &metadata, stream::single("hello, world"))
751 .await?;
752
753 backend.delete_object(&id).await?;
754
755 let result = backend.get_object(&id).await?;
756 assert!(result.is_none());
757
758 Ok(())
759 }
760
761 #[tokio::test]
762 async fn test_ttl_immediate() -> Result<()> {
763 let backend = create_test_backend().await?;
767
768 let id = make_id();
769 let metadata = Metadata {
770 expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_secs(0)),
771 ..Default::default()
772 };
773
774 backend
775 .put_object(&id, &metadata, stream::single("hello, world"))
776 .await?;
777
778 let result = backend.get_object(&id).await?;
779 assert!(result.is_none());
780
781 Ok(())
782 }
783
784 #[tokio::test]
785 async fn test_tti_immediate() -> Result<()> {
786 let backend = create_test_backend().await?;
790
791 let id = make_id();
792 let metadata = Metadata {
793 expiration_policy: ExpirationPolicy::TimeToIdle(Duration::from_secs(0)),
794 ..Default::default()
795 };
796
797 backend
798 .put_object(&id, &metadata, stream::single("hello, world"))
799 .await?;
800
801 let result = backend.get_object(&id).await?;
802 assert!(result.is_none());
803
804 Ok(())
805 }
806
807 #[tokio::test]
808 async fn test_get_metadata_returns_metadata() -> Result<()> {
809 let backend = create_test_backend().await?;
810
811 let id = make_id();
812 let metadata = Metadata {
813 content_type: "text/plain".into(),
814 origin: Some("203.0.113.42".into()),
815 custom: BTreeMap::from_iter([("hello".into(), "world".into())]),
816 ..Default::default()
817 };
818
819 backend
820 .put_object(&id, &metadata, stream::single("hello, world"))
821 .await?;
822
823 let meta = backend.get_metadata(&id).await?.unwrap();
824 assert_eq!(meta.content_type, metadata.content_type);
825 assert_eq!(meta.origin, metadata.origin);
826 assert_eq!(meta.custom, metadata.custom);
827
828 Ok(())
829 }
830
831 #[tokio::test]
832 async fn test_get_metadata_nonexistent() -> Result<()> {
833 let backend = create_test_backend().await?;
834
835 let id = make_id();
836 let result = backend.get_metadata(&id).await?;
837 assert!(result.is_none());
838
839 Ok(())
840 }
841
842 #[tokio::test]
843 async fn test_get_metadata_bumps_tti() -> Result<()> {
844 let backend = create_test_backend().await?;
845
846 let id = make_id();
847 let tti = Duration::from_secs(2 * 24 * 3600); let metadata = Metadata {
850 content_type: "text/plain".into(),
851 expiration_policy: ExpirationPolicy::TimeToIdle(tti),
852 ..Default::default()
853 };
854
855 backend
856 .put_object(&id, &metadata, stream::single("hello, world"))
857 .await?;
858
859 let object_url = backend.object_url(&id)?;
862 let old_deadline = SystemTime::now() + tti - TTI_DEBOUNCE - Duration::from_secs(60);
863 backend.update_custom_time(object_url, old_deadline).await?;
864
865 let pre_meta = backend.get_metadata(&id).await?.unwrap();
867 let pre_expiry = pre_meta.time_expires.unwrap();
868
869 let post_meta = backend.get_metadata(&id).await?.unwrap();
871 let post_expiry = post_meta.time_expires.unwrap();
872 assert!(
873 post_expiry > pre_expiry,
874 "TTI bump should have extended the expiry: {pre_expiry:?} -> {post_expiry:?}"
875 );
876
877 let (_, stream) = backend.get_object(&id).await?.unwrap();
879 let payload = stream::read_to_vec(stream).await?;
880 assert_eq!(&payload, b"hello, world");
881
882 Ok(())
883 }
884
885 #[tokio::test]
886 async fn test_get_metadata_does_not_bump_fresh_tti() -> Result<()> {
887 let backend = create_test_backend().await?;
888
889 let id = make_id();
890 let tti = Duration::from_secs(2 * 24 * 3600); let metadata = Metadata {
893 content_type: "text/plain".into(),
894 expiration_policy: ExpirationPolicy::TimeToIdle(tti),
895 ..Default::default()
896 };
897
898 backend
899 .put_object(&id, &metadata, stream::single("hello, world"))
900 .await?;
901
902 let first = backend.get_metadata(&id).await?.unwrap();
905 let first_expiry = first.time_expires.unwrap();
906
907 let second = backend.get_metadata(&id).await?.unwrap();
908 let second_expiry = second.time_expires.unwrap();
909
910 assert_eq!(
911 first_expiry, second_expiry,
912 "Fresh TTI object should not have its expiry bumped"
913 );
914
915 Ok(())
916 }
917
918 #[tokio::test]
919 async fn test_compressed_payload_roundtrip() -> Result<()> {
920 use objectstore_types::metadata::Compression;
921
922 let backend = create_test_backend().await?;
923
924 let plaintext = b"hello, world (but compressed with zstd)";
925 let compressed = zstd::encode_all(&plaintext[..], 3)?;
926
927 let id = make_id();
928 let metadata = Metadata {
929 content_type: "text/plain".into(),
930 compression: Some(Compression::Zstd),
931 ..Default::default()
932 };
933
934 backend
935 .put_object(&id, &metadata, stream::single(compressed.clone()))
936 .await?;
937
938 let (meta, stream) = backend.get_object(&id).await?.unwrap();
939 let payload = stream::read_to_vec(stream).await?;
940
941 assert_eq!(meta.compression, Some(Compression::Zstd));
942 assert_eq!(
943 payload, compressed,
944 "Payload should be returned still compressed, not auto-decompressed"
945 );
946
947 Ok(())
948 }
949}