1use std::sync::Arc;
101use std::sync::atomic::{AtomicU64, Ordering};
102use std::time::{Duration, Instant, SystemTime};
103
104use base64::Engine as _;
105use bytes::Bytes;
106use futures_util::{Stream, StreamExt};
107use objectstore_types::metadata::Metadata;
108use serde::{Deserialize, Serialize};
109
110use crate::backend::changelog::{Change, ChangeGuard, ChangeLog, ChangeManager, ChangePhase};
111use crate::backend::common::{
112 Backend, DeleteResponse, GetResponse, HighVolumeBackend, MetadataResponse,
113 MultipartUploadBackend, PutResponse, TieredGet, TieredMetadata, TieredWrite, Tombstone,
114};
115use crate::backend::{HighVolumeStorageConfig, MultipartUploadStorageConfig};
116use crate::error::{Error, Result};
117use crate::id::ObjectId;
118use crate::multipart::{
119 AbortMultipartResponse, CompleteMultipartResponse, CompletedPart, InitiateMultipartResponse,
120 ListPartsResponse, PartNumber, UploadId, UploadPartResponse,
121};
122use crate::stream::{ClientStream, SizedPeek};
123
124const BACKEND_SIZE_THRESHOLD: usize = 1024 * 1024; const MULTIPART_COMPLETE_CLEANUP_DELAY: Duration = Duration::from_hours(24);
132
133fn new_long_term_revision(id: &ObjectId) -> ObjectId {
139 ObjectId {
140 context: id.context.clone(),
141 key: format!("{}/{}", id.key, uuid::Uuid::now_v7()),
142 }
143}
144
145#[derive(Debug, Clone, Deserialize, Serialize)]
166pub struct TieredStorageConfig {
167 pub high_volume: HighVolumeStorageConfig,
172 pub long_term: MultipartUploadStorageConfig,
176}
177
178#[derive(Debug)]
228pub struct TieredStorage {
229 inner: Arc<ChangeManager>,
230}
231
232impl TieredStorage {
233 pub fn new(
235 high_volume: Box<dyn HighVolumeBackend>,
236 long_term: Box<dyn MultipartUploadBackend>,
237 changelog: Box<dyn ChangeLog>,
238 ) -> Self {
239 let inner = ChangeManager::new(high_volume, long_term, changelog);
240 tokio::spawn(inner.clone().recover());
243 Self { inner }
244 }
245
246 async fn record_change(&self, change: Change) -> Result<ChangeGuard> {
248 self.inner.clone().record(change).await
249 }
250
251 async fn record_assembling(&self, change: Change) -> Result<ChangeGuard> {
254 self.inner.clone().record_assembling(change).await
255 }
256
257 fn backend_type(&self, choice: &BackendChoice) -> &'static str {
259 match choice {
260 BackendChoice::HighVolume => self.inner.high_volume.name(),
261 BackendChoice::LongTerm => self.inner.long_term.name(),
262 }
263 }
264
265 async fn put_high_volume(
270 &self,
271 id: &ObjectId,
272 metadata: &Metadata,
273 payload: Bytes,
274 ) -> Result<()> {
275 let tombstone_opt = self
276 .inner
277 .high_volume
278 .put_non_tombstone(id, metadata, payload.clone())
279 .await?;
280
281 let Some(Tombstone { target, .. }) = tombstone_opt else {
282 return Ok(());
284 };
285
286 let mut guard = self
288 .record_change(Change {
289 id: id.clone(),
290 new: None,
291 old: Some(target.clone()),
292 cleanup_after: None,
293 })
294 .await?;
295
296 let write = TieredWrite::Object(metadata.clone(), payload);
297 guard.advance(ChangePhase::Written);
298
299 let written = self
300 .inner
301 .high_volume
302 .compare_and_write(id, Some(&target), write)
303 .await?;
304
305 guard.advance(ChangePhase::compare_and_write(written));
307
308 Ok(())
309 }
310
311 async fn put_long_term(
316 &self,
317 id: &ObjectId,
318 metadata: &Metadata,
319 stream: ClientStream,
320 ) -> Result<()> {
321 let current = match self.inner.high_volume.get_tiered_metadata(id).await? {
323 TieredMetadata::Tombstone(t) => Some(t.target),
324 _ => None,
325 };
326
327 let new = new_long_term_revision(id);
329 let mut guard = self
330 .record_change(Change {
331 id: id.clone(),
332 new: Some(new.clone()),
333 old: current.clone(),
334 cleanup_after: None,
335 })
336 .await?;
337
338 self.inner
339 .long_term
340 .put_object(&new, metadata, stream)
341 .await?;
342 guard.advance(ChangePhase::Written);
343
344 let tombstone = Tombstone {
346 target: new.clone(),
347 expiration_policy: metadata.expiration_policy,
348 };
349 let written = self
350 .inner
351 .high_volume
352 .compare_and_write(id, current.as_ref(), TieredWrite::Tombstone(tombstone))
353 .await?;
354
355 guard.advance(ChangePhase::compare_and_write(written));
357
358 Ok(())
359 }
360}
361
362#[async_trait::async_trait]
363impl Backend for TieredStorage {
364 fn name(&self) -> &'static str {
365 "tiered"
366 }
367
368 fn as_multipart_upload_backend(self: Arc<Self>) -> Result<Arc<dyn MultipartUploadBackend>> {
369 Ok(self)
370 }
371
372 async fn put_object(
373 &self,
374 id: &ObjectId,
375 metadata: &Metadata,
376 stream: ClientStream,
377 ) -> Result<PutResponse> {
378 let start = Instant::now();
379 if metadata.origin.is_none() {
380 objectstore_metrics::count!("put.origin_missing", usecase = id.usecase().to_owned());
381 }
382
383 let peeked = SizedPeek::new(stream, BACKEND_SIZE_THRESHOLD).await?;
384 objectstore_metrics::record!(
385 "put.first_chunk.latency" = start.elapsed(),
386 usecase = id.usecase().to_owned(),
387 complete = if peeked.is_exhausted() { "yes" } else { "no" },
388 );
389
390 let (backend_choice, stored_size) = if peeked.is_exhausted() {
391 let payload = peeked.into_bytes().await?;
392 let payload_len = payload.len() as u64;
393 self.put_high_volume(id, metadata, payload).await?;
394 (BackendChoice::HighVolume, payload_len)
395 } else {
396 let (stored_size, stream) = counting_stream(peeked.into_stream());
397 self.put_long_term(id, metadata, stream.boxed()).await?;
398 (BackendChoice::LongTerm, stored_size.load(Ordering::Acquire))
399 };
400
401 let backend_ty = self.backend_type(&backend_choice);
402 objectstore_metrics::record!(
403 "put.latency" = start.elapsed(),
404 usecase = id.usecase().to_owned(),
405 backend_choice = backend_choice.as_str(),
406 backend_type = backend_ty,
407 );
408 objectstore_metrics::record!(
409 "put.size" = stored_size,
410 usecase = id.usecase().to_owned(),
411 backend_choice = backend_choice.as_str(),
412 backend_type = backend_ty,
413 );
414
415 Ok(())
416 }
417
418 async fn get_object(&self, id: &ObjectId) -> Result<GetResponse> {
419 let start = Instant::now();
420
421 let hv_result = self.inner.high_volume.get_tiered_object(id).await?;
422 let (result, backend_choice) = match hv_result {
423 TieredGet::NotFound => (None, BackendChoice::HighVolume),
424 TieredGet::Object(metadata, stream) => {
425 (Some((metadata, stream)), BackendChoice::HighVolume)
426 }
427 TieredGet::Tombstone(tombstone) => (
428 self.inner.long_term.get_object(&tombstone.target).await?,
429 BackendChoice::LongTerm,
430 ),
431 };
432
433 let backend_type = self.backend_type(&backend_choice);
434 objectstore_metrics::record!(
435 "get.latency.pre-response" = start.elapsed(),
436 usecase = id.usecase().to_owned(),
437 backend_choice = backend_choice.as_str(),
438 backend_type = backend_type,
439 );
440
441 if let Some((ref metadata, _)) = result {
442 if let Some(size) = metadata.size {
443 objectstore_metrics::record!(
444 "get.size" = size,
445 usecase = id.usecase().to_owned(),
446 backend_choice = backend_choice.as_str(),
447 backend_type = backend_type,
448 );
449 } else {
450 objectstore_log::warn!(backend_type, "Missing object size");
451 }
452 }
453
454 Ok(result)
455 }
456
457 async fn get_metadata(&self, id: &ObjectId) -> Result<MetadataResponse> {
458 let start = Instant::now();
459
460 let hv_result = self.inner.high_volume.get_tiered_metadata(id).await?;
461 let (result, backend_choice) = match hv_result {
462 TieredMetadata::NotFound => (None, BackendChoice::HighVolume),
463 TieredMetadata::Object(metadata) => (Some(metadata), BackendChoice::HighVolume),
464 TieredMetadata::Tombstone(tombstone) => (
465 self.inner.long_term.get_metadata(&tombstone.target).await?,
466 BackendChoice::LongTerm,
467 ),
468 };
469
470 objectstore_metrics::record!(
471 "head.latency" = start.elapsed(),
472 usecase = id.usecase().to_owned(),
473 backend_choice = backend_choice.as_str(),
474 backend_type = self.backend_type(&backend_choice),
475 );
476
477 Ok(result)
478 }
479
480 async fn delete_object(&self, id: &ObjectId) -> Result<DeleteResponse> {
481 let start = Instant::now();
482
483 let mut backend_choice = BackendChoice::HighVolume;
484
485 if let Some(tombstone) = self.inner.high_volume.delete_non_tombstone(id).await? {
486 backend_choice = BackendChoice::LongTerm;
487
488 let mut guard = self
489 .record_change(Change {
490 id: id.clone(),
491 new: None,
492 old: Some(tombstone.target.clone()),
493 cleanup_after: None,
494 })
495 .await?;
496 guard.advance(ChangePhase::Written);
497
498 let deleted = self
500 .inner
501 .high_volume
502 .compare_and_write(id, Some(&tombstone.target), TieredWrite::Delete)
503 .await?;
504
505 guard.advance(ChangePhase::compare_and_write(deleted));
507 }
508
509 objectstore_metrics::record!(
510 "delete.latency" = start.elapsed(),
511 usecase = id.usecase().to_owned(),
512 backend_choice = backend_choice.as_str(),
513 backend_type = self.backend_type(&backend_choice),
514 );
515
516 Ok(())
517 }
518
519 async fn join(&self) {
520 self.inner.tracker.close();
521 tokio::join!(
522 self.inner.high_volume.join(),
523 self.inner.long_term.join(),
524 self.inner.tracker.wait()
525 );
526 }
527}
528
529#[derive(Debug)]
530enum BackendChoice {
531 HighVolume,
532 LongTerm,
533}
534
535impl BackendChoice {
536 fn as_str(&self) -> &'static str {
537 match self {
538 BackendChoice::HighVolume => "high-volume",
539 BackendChoice::LongTerm => "long-term",
540 }
541 }
542}
543
544impl std::fmt::Display for BackendChoice {
545 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
546 f.write_str(self.as_str())
547 }
548}
549
550fn counting_stream<S, E>(stream: S) -> (Arc<AtomicU64>, impl Stream<Item = Result<Bytes, E>>)
555where
556 S: Stream<Item = Result<Bytes, E>>,
557{
558 let counter = Arc::new(AtomicU64::new(0));
559
560 (
561 counter.clone(),
562 stream.inspect(move |res| {
563 if let Ok(chunk) = res {
564 counter.fetch_add(chunk.len() as u64, Ordering::Relaxed);
565 }
566 }),
567 )
568}
569
570#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
572struct TieredUploadId {
573 revision: String,
574 upload_id: UploadId,
575}
576
577impl TryInto<UploadId> for TieredUploadId {
578 type Error = Error;
579
580 fn try_into(self) -> Result<UploadId, Self::Error> {
581 let json =
582 serde_json::to_vec(&self).map_err(|e| Error::serde("encoding multipart token", e))?;
583 Ok(UploadId::new(
584 base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(json),
585 )?)
586 }
587}
588
589impl TryFrom<&UploadId> for TieredUploadId {
590 type Error = Error;
591
592 fn try_from(value: &UploadId) -> Result<Self, Self::Error> {
593 let json = base64::engine::general_purpose::URL_SAFE_NO_PAD
594 .decode(value.as_bytes())
595 .map_err(|e| Error::generic(format!("invalid multipart upload ID: {e}")))?;
596 serde_json::from_slice(&json).map_err(|e| Error::serde("decoding multipart token", e))
597 }
598}
599
600#[async_trait::async_trait]
601impl MultipartUploadBackend for TieredStorage {
602 async fn initiate_multipart(
603 &self,
604 id: &ObjectId,
605 metadata: &Metadata,
606 ) -> Result<InitiateMultipartResponse> {
607 let physical = new_long_term_revision(id);
608
609 let id = self
610 .inner
611 .long_term
612 .initiate_multipart(&physical, metadata)
613 .await?;
614
615 let id = TieredUploadId {
616 revision: physical.key,
617 upload_id: id,
618 };
619 id.try_into()
620 }
621
622 async fn upload_part(
623 &self,
624 id: &ObjectId,
625 upload_id: &UploadId,
626 part_number: PartNumber,
627 content_length: u64,
628 content_md5: Option<&str>,
629 body: ClientStream,
630 ) -> Result<UploadPartResponse> {
631 let tiered: TieredUploadId = upload_id.try_into()?;
632
633 let physical = ObjectId {
634 context: id.context.clone(),
635 key: tiered.revision,
636 };
637
638 self.inner
639 .long_term
640 .upload_part(
641 &physical,
642 &tiered.upload_id,
643 part_number,
644 content_length,
645 content_md5,
646 body,
647 )
648 .await
649 }
650
651 async fn list_parts(
652 &self,
653 id: &ObjectId,
654 upload_id: &UploadId,
655 max_parts: Option<u32>,
656 part_number_marker: Option<PartNumber>,
657 ) -> Result<ListPartsResponse> {
658 let tiered: TieredUploadId = upload_id.try_into()?;
659
660 let physical = ObjectId {
661 context: id.context.clone(),
662 key: tiered.revision,
663 };
664
665 self.inner
666 .long_term
667 .list_parts(&physical, &tiered.upload_id, max_parts, part_number_marker)
668 .await
669 }
670
671 async fn abort_multipart(
672 &self,
673 id: &ObjectId,
674 upload_id: &UploadId,
675 ) -> Result<AbortMultipartResponse> {
676 let tiered: TieredUploadId = upload_id.try_into()?;
677
678 let physical = ObjectId {
679 context: id.context.clone(),
680 key: tiered.revision,
681 };
682
683 self.inner
684 .long_term
685 .abort_multipart(&physical, &tiered.upload_id)
686 .await
687 }
688
689 async fn complete_multipart(
690 &self,
691 id: &ObjectId,
692 upload_id: &UploadId,
693 parts: Vec<CompletedPart>,
694 ) -> Result<CompleteMultipartResponse> {
695 let tiered: TieredUploadId = upload_id.try_into()?;
696
697 let physical = ObjectId {
698 context: id.context.clone(),
699 key: tiered.revision,
700 };
701
702 let current = match self.inner.high_volume.get_tiered_metadata(id).await? {
704 TieredMetadata::Tombstone(t) if t.target == physical => return Ok(None),
706 TieredMetadata::Tombstone(t) => Some(t.target),
707 _ => None,
708 };
709
710 let mut guard = self
713 .record_assembling(Change {
714 id: id.clone(),
715 new: Some(physical.clone()),
716 old: current.clone(),
717 cleanup_after: Some(SystemTime::now() + MULTIPART_COMPLETE_CLEANUP_DELAY),
718 })
719 .await?;
720
721 let maybe_complete_multipart_err = match self
723 .inner
724 .long_term
725 .complete_multipart(&physical, &tiered.upload_id, parts)
726 .await
727 {
728 Ok(error) => {
731 if error.is_some() {
732 return Ok(error);
733 }
734 None
735 }
736 Err(err) => Some(err),
742 };
743
744 let metadata = self.inner.long_term.get_metadata(&physical).await;
751
752 let metadata = match (metadata, maybe_complete_multipart_err) {
753 (Ok(Some(metadata)), _) => metadata,
755 (Ok(None), Some(err)) => return Err(err),
757 (Ok(None), None) => {
760 objectstore_log::error!(
761 id = ?id,
762 upload_id = ?upload_id,
763 physical = ?physical,
764 "complete_multipart call succeeded on long_term backend, but subsequent get_metadata found no object"
765 );
766 return Err(Error::generic(
767 "completed multipart object not found in long-term storage",
768 ));
769 }
770 (Err(get_metadata_err), maybe_complete_multipart_err) => {
772 return Err(maybe_complete_multipart_err.unwrap_or(get_metadata_err));
776 }
777 };
778
779 let tombstone = Tombstone {
781 target: physical.clone(),
782 expiration_policy: metadata.expiration_policy,
783 };
784 let written = self
785 .inner
786 .high_volume
787 .compare_and_write(id, current.as_ref(), TieredWrite::Tombstone(tombstone))
788 .await?;
789
790 guard.advance(ChangePhase::compare_and_write(written));
792
793 Ok(None)
794 }
795}
796
797#[cfg(test)]
798mod tests {
799 use std::num::NonZeroU32;
800
801 use futures::lock::Mutex;
802 use objectstore_types::metadata::{ExpirationPolicy, Metadata};
803 use objectstore_types::scope::{Scope, Scopes};
804
805 use super::*;
806 use crate::backend::changelog::{InMemoryChangeLog, NoopChangeLog};
807 use crate::backend::in_memory::InMemoryBackend;
808 use crate::backend::testing::{Hooks, TestBackend};
809 use crate::error::Error;
810 use crate::id::ObjectContext;
811
812 use crate::stream::{self, ClientStream};
813
814 fn make_context() -> ObjectContext {
815 ObjectContext {
816 usecase: "testing".into(),
817 scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
818 }
819 }
820
821 fn make_id(key: &str) -> ObjectId {
822 ObjectId::new(make_context(), key.into())
823 }
824
825 fn make_tiered_storage() -> (
826 TieredStorage,
827 InMemoryBackend,
828 InMemoryBackend,
829 InMemoryChangeLog,
830 ) {
831 let hv = InMemoryBackend::new("in-memory-hv");
832 let lt = InMemoryBackend::new("in-memory-lt");
833 let changelog = InMemoryChangeLog::default();
834 let storage = TieredStorage::new(
835 Box::new(hv.clone()),
836 Box::new(lt.clone()),
837 Box::new(changelog.clone()),
838 );
839 (storage, hv, lt, changelog)
840 }
841
842 #[test]
845 fn revision_id_preserves_context() {
846 let id = make_id("my-key");
847 let revised = new_long_term_revision(&id);
848 assert_eq!(revised.context, id.context);
849 assert!(
850 revised.key.starts_with("my-key/"),
851 "revised key should have /<uuid> suffix, got: {}",
852 revised.key
853 );
854 }
855
856 #[test]
857 fn revision_id_roundtrips_storage_path() {
858 let id = make_id("original");
859 let revised = new_long_term_revision(&id);
860 let path = revised.as_storage_path().to_string();
861 let parsed = ObjectId::from_storage_path(&path)
862 .unwrap_or_else(|| panic!("failed to parse '{path}'"));
863 assert_eq!(parsed, revised);
864 }
865
866 #[test]
867 fn revision_id_is_unique() {
868 let id = make_id("base-key");
869 let a = new_long_term_revision(&id);
870 let b = new_long_term_revision(&id);
871 assert_ne!(a.key, b.key, "two calls should produce different keys");
872 }
873
874 #[tokio::test]
877 async fn get_nonexistent_returns_none() {
878 let (storage, _hv, _lt, _) = make_tiered_storage();
879 let id = make_id("does-not-exist");
880
881 assert!(storage.get_object(&id).await.unwrap().is_none());
882 assert!(storage.get_metadata(&id).await.unwrap().is_none());
883 }
884
885 #[tokio::test]
886 async fn delete_nonexistent_succeeds() {
887 let (storage, _hv, _lt, _) = make_tiered_storage();
888 let id = make_id("does-not-exist");
889
890 storage.delete_object(&id).await.unwrap();
891 }
892
893 #[tokio::test]
896 async fn put_small_object_stores_inline() {
897 let (storage, hv, lt, _) = make_tiered_storage();
898 let id = make_id("small");
899 let payload = b"small payload".to_vec();
900
901 storage
902 .put_object(&id, &Default::default(), stream::single(payload.clone()))
903 .await
904 .unwrap();
905
906 assert!(hv.contains(&id), "expected in high-volume");
907 assert!(!lt.contains(&id), "leaked to long-term");
908
909 let (_, s) = storage.get_object(&id).await.unwrap().unwrap();
910 let body = stream::read_to_vec(s).await.unwrap();
911 assert_eq!(body, payload);
912
913 assert!(
914 storage.get_metadata(&id).await.unwrap().is_some(),
915 "get_metadata should return metadata for inline objects"
916 );
917 }
918
919 #[tokio::test]
920 async fn put_large_object_creates_tombstone() {
921 let (storage, hv, lt, _) = make_tiered_storage();
922 let id = make_id("large");
923 let payload = vec![0xCDu8; 2 * 1024 * 1024]; let metadata_in = Metadata {
925 content_type: "image/png".into(),
926 expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_secs(3600)),
927 origin: Some("10.0.0.1".into()),
928 ..Default::default()
929 };
930
931 storage
932 .put_object(&id, &metadata_in, stream::single(payload.clone()))
933 .await
934 .unwrap();
935
936 let tombstone = hv.get(&id).expect_tombstone();
938 assert_eq!(tombstone.expiration_policy, metadata_in.expiration_policy);
939 let lt_id = tombstone.target;
940 assert!(
941 lt_id.key().starts_with(id.key()),
942 "tombstone target key should be a revision of the HV key, got: {}",
943 lt_id.key()
944 );
945
946 let (lt_meta, _) = lt.get(<_id).expect_object();
948 assert_eq!(lt_meta.content_type, "image/png");
949 assert_eq!(lt_meta.expiration_policy, metadata_in.expiration_policy);
950
951 let (_, s) = storage.get_object(&id).await.unwrap().unwrap();
953 let body = stream::read_to_vec(s).await.unwrap();
954 assert_eq!(body, payload);
955
956 let metadata = storage.get_metadata(&id).await.unwrap().unwrap();
958 assert_eq!(metadata.content_type, "image/png");
959 }
960
961 #[tokio::test]
964 async fn reinsert_small_over_large_swaps_to_inline() {
965 let (storage, hv, lt, _) = make_tiered_storage();
966 let id = make_id("reinsert-key");
967
968 let large_payload = vec![0xABu8; 2 * 1024 * 1024];
970 storage
971 .put_object(&id, &Default::default(), stream::single(large_payload))
972 .await
973 .unwrap();
974
975 let lt_id = hv.get(&id).expect_tombstone().target;
976
977 let small_payload = vec![0xCDu8; 100]; storage
981 .put_object(&id, &Default::default(), stream::single(small_payload))
982 .await
983 .unwrap();
984
985 hv.get(&id).expect_object();
987
988 storage.join().await;
990
991 lt.get(<_id).expect_not_found();
993 }
994
995 #[tokio::test]
996 async fn overwrite_large_with_large_replaces_revision() {
997 let (storage, hv, lt, _) = make_tiered_storage();
998 let id = make_id("overwrite-large");
999
1000 let payload1 = vec![0xAAu8; 2 * 1024 * 1024];
1001 storage
1002 .put_object(&id, &Default::default(), stream::single(payload1))
1003 .await
1004 .unwrap();
1005 let lt_id_1 = hv.get(&id).expect_tombstone().target;
1006
1007 let payload2 = vec![0xBBu8; 2 * 1024 * 1024];
1008 storage
1009 .put_object(&id, &Default::default(), stream::single(payload2.clone()))
1010 .await
1011 .unwrap();
1012 let lt_id_2 = hv.get(&id).expect_tombstone().target;
1013
1014 assert_ne!(
1015 lt_id_1, lt_id_2,
1016 "second write should create a new revision"
1017 );
1018
1019 storage.join().await;
1021
1022 lt.get(<_id_1).expect_not_found();
1023 lt.get(<_id_2).expect_object();
1024
1025 let (_, s) = storage.get_object(&id).await.unwrap().unwrap();
1026 let body = stream::read_to_vec(s).await.unwrap();
1027 assert_eq!(body, payload2);
1028 }
1029
1030 #[tokio::test]
1033 async fn delete_small_object() {
1034 let (storage, hv, _lt, _) = make_tiered_storage();
1035 let id = make_id("delete-small");
1036
1037 storage
1038 .put_object(&id, &Default::default(), stream::single("tiny"))
1039 .await
1040 .unwrap();
1041
1042 storage.delete_object(&id).await.unwrap();
1043
1044 hv.get(&id).expect_not_found();
1045 assert!(storage.get_object(&id).await.unwrap().is_none());
1046 }
1047
1048 #[tokio::test]
1049 async fn delete_large_object_cleans_up_both_backends() {
1050 let (storage, hv, lt, _) = make_tiered_storage();
1051 let id = make_id("delete-both");
1052 let payload = vec![0u8; 2 * 1024 * 1024]; storage
1055 .put_object(&id, &Default::default(), stream::single(payload))
1056 .await
1057 .unwrap();
1058
1059 let lt_id = hv.get(&id).expect_tombstone().target;
1061
1062 storage.delete_object(&id).await.unwrap();
1063
1064 storage.join().await;
1066
1067 assert!(!hv.contains(&id), "tombstone not cleaned up");
1068 assert!(!lt.contains(<_id), "long-term object not cleaned up");
1069 }
1070
1071 #[derive(Debug)]
1072 struct FailDelete;
1073
1074 #[async_trait::async_trait]
1075 impl Hooks for FailDelete {
1076 async fn delete_object(
1077 &self,
1078 _inner: &InMemoryBackend,
1079 _id: &ObjectId,
1080 ) -> Result<DeleteResponse> {
1081 Err(Error::Io(std::io::Error::new(
1082 std::io::ErrorKind::ConnectionRefused,
1083 "simulated long-term delete failure",
1084 )))
1085 }
1086 }
1087
1088 #[tokio::test]
1092 async fn delete_succeeds_when_gcs_cleanup_fails() {
1093 let hv = InMemoryBackend::new("hv");
1094 let lt = TestBackend::new(FailDelete);
1095 let log = NoopChangeLog;
1096 let storage = TieredStorage::new(Box::new(hv.clone()), Box::new(lt), Box::new(log));
1097
1098 let id = make_id("fail-delete");
1099 let payload = vec![0xABu8; 2 * 1024 * 1024]; storage
1101 .put_object(&id, &Default::default(), stream::single(payload))
1102 .await
1103 .unwrap();
1104
1105 let result = storage.delete_object(&id).await;
1107 assert!(
1108 result.is_ok(),
1109 "delete should succeed despite GCS cleanup failure"
1110 );
1111
1112 hv.get(&id).expect_not_found();
1114
1115 assert!(
1117 storage.get_object(&id).await.unwrap().is_none(),
1118 "object should be unreachable after tombstone is deleted"
1119 );
1120 }
1121
1122 #[derive(Debug)]
1125 struct CasConflict;
1126
1127 #[async_trait::async_trait]
1128 impl Hooks for CasConflict {
1129 async fn compare_and_write(
1130 &self,
1131 _inner: &InMemoryBackend,
1132 _id: &ObjectId,
1133 _current: Option<&ObjectId>,
1134 _write: TieredWrite,
1135 ) -> Result<bool> {
1136 Ok(false) }
1138 }
1139
1140 #[tokio::test]
1144 async fn put_large_cas_conflict_cleans_up_new_blob() {
1145 let hv = TestBackend::new(CasConflict);
1146 let lt = InMemoryBackend::new("lt");
1147 let log = NoopChangeLog;
1148 let storage = TieredStorage::new(Box::new(hv), Box::new(lt.clone()), Box::new(log));
1149
1150 let id = make_id("cas-conflict-large");
1151 let payload = vec![0xABu8; 2 * 1024 * 1024]; storage
1154 .put_object(&id, &Default::default(), stream::single(payload))
1155 .await
1156 .unwrap();
1157
1158 storage.join().await;
1160
1161 assert!(
1162 lt.is_empty(),
1163 "LT blob should be cleaned up after CAS conflict"
1164 );
1165 }
1166
1167 #[tokio::test]
1171 async fn put_small_over_tombstone_cas_conflict_succeeds() {
1172 let inner = InMemoryBackend::new("hv");
1173 let id = make_id("cas-conflict-small");
1174
1175 let tombstone = Tombstone {
1178 target: make_id("lt-object"),
1179 expiration_policy: ExpirationPolicy::Manual,
1180 };
1181 inner
1182 .compare_and_write(&id, None, TieredWrite::Tombstone(tombstone))
1183 .await
1184 .unwrap();
1185
1186 let lt = InMemoryBackend::new("lt");
1187 let hv = TestBackend::with_inner(inner, CasConflict);
1188 let log = NoopChangeLog;
1189 let storage = TieredStorage::new(Box::new(hv), Box::new(lt), Box::new(log));
1190
1191 storage
1194 .put_object(&id, &Default::default(), stream::single("tiny"))
1195 .await
1196 .unwrap();
1197 }
1198
1199 #[derive(Debug)]
1203 struct FailCas(bool);
1204
1205 #[async_trait::async_trait]
1206 impl Hooks for FailCas {
1207 async fn compare_and_write(
1208 &self,
1209 inner: &InMemoryBackend,
1210 id: &ObjectId,
1211 current: Option<&ObjectId>,
1212 write: TieredWrite,
1213 ) -> Result<bool> {
1214 if self.0 {
1215 inner.compare_and_write(id, current, write).await?;
1217 }
1218 Err(Error::Io(std::io::Error::new(
1219 std::io::ErrorKind::TimedOut,
1220 "simulated compare_and_write failure",
1221 )))
1222 }
1223 }
1224
1225 #[tokio::test]
1229 async fn no_orphan_when_tombstone_write_fails() {
1230 let lt = InMemoryBackend::new("lt");
1231 let hv = TestBackend::new(FailCas(false));
1232 let log = NoopChangeLog;
1233 let storage = TieredStorage::new(Box::new(hv), Box::new(lt.clone()), Box::new(log));
1234
1235 let id = make_id("orphan-test");
1236 let payload = vec![0xABu8; 2 * 1024 * 1024]; let result = storage
1238 .put_object(&id, &Default::default(), stream::single(payload))
1239 .await;
1240
1241 assert!(result.is_err());
1242
1243 storage.join().await;
1245
1246 assert!(lt.is_empty(), "long-term object not cleaned up");
1247 }
1248
1249 #[tokio::test]
1253 async fn orphan_tombstone_returns_none() {
1254 let (storage, hv, lt, _) = make_tiered_storage();
1255 let id = make_id("orphan-tombstone");
1256 let payload = vec![0xCDu8; 2 * 1024 * 1024]; storage
1259 .put_object(&id, &Default::default(), stream::single(payload))
1260 .await
1261 .unwrap();
1262
1263 let lt_id = hv.get(&id).expect_tombstone().target;
1265
1266 lt.remove(<_id);
1268
1269 assert!(
1270 storage.get_object(&id).await.unwrap().is_none(),
1271 "orphan tombstone should resolve to None on get_object"
1272 );
1273 assert!(
1274 storage.get_metadata(&id).await.unwrap().is_none(),
1275 "orphan tombstone should resolve to None on get_metadata"
1276 );
1277 }
1278
1279 #[tokio::test]
1284 async fn tombstone_target_is_used_for_reads_and_deletes() {
1285 let hv = InMemoryBackend::new("hv");
1286 let lt = InMemoryBackend::new("lt");
1287 let log = NoopChangeLog;
1288 let storage = TieredStorage::new(Box::new(hv.clone()), Box::new(lt.clone()), Box::new(log));
1289
1290 let hv_id = make_id("hv-key");
1291 let lt_id = make_id("lt-key");
1292 let payload = vec![0xABu8; 100];
1293
1294 lt.put_object(<_id, &Default::default(), stream::single(payload.clone()))
1296 .await
1297 .unwrap();
1298 let tombstone = Tombstone {
1299 target: lt_id.clone(),
1300 expiration_policy: ExpirationPolicy::Manual,
1301 };
1302 hv.compare_and_write(&hv_id, None, TieredWrite::Tombstone(tombstone))
1303 .await
1304 .unwrap();
1305
1306 let (_, s) = storage.get_object(&hv_id).await.unwrap().unwrap();
1308 let body = stream::read_to_vec(s).await.unwrap();
1309 assert_eq!(body, payload);
1310
1311 storage.delete_object(&hv_id).await.unwrap();
1313 storage.join().await;
1314 assert!(!hv.contains(&hv_id), "tombstone should be removed");
1315 assert!(!lt.contains(<_id), "lt object should be removed");
1316 }
1317
1318 #[tokio::test]
1321 async fn multi_chunk_large_object_chains_buffered_and_remaining() {
1322 let (storage, hv, lt, _) = make_tiered_storage();
1323 let id = make_id("multi-chunk");
1324
1325 let chunk_size = 512 * 1024; let chunk_count = 4; let stream: ClientStream = futures_util::stream::iter(
1330 (0..chunk_count).map(move |i| Ok(bytes::Bytes::from(vec![i as u8; chunk_size]))),
1331 )
1332 .boxed();
1333
1334 storage
1335 .put_object(&id, &Default::default(), stream)
1336 .await
1337 .unwrap();
1338
1339 let lt_id = hv.get(&id).expect_tombstone().target;
1341 let (_, lt_bytes) = lt.get(<_id).expect_object();
1342 assert_eq!(lt_bytes.len(), chunk_size * chunk_count);
1343
1344 for i in 0..chunk_count {
1346 let offset = i * chunk_size;
1347 assert!(
1348 lt_bytes[offset..offset + chunk_size]
1349 .iter()
1350 .all(|&b| b == i as u8),
1351 "data mismatch in chunk {i}"
1352 );
1353 }
1354 }
1355
1356 #[tokio::test]
1362 async fn written_cleanup_after_lost_cas_response() {
1363 let (storage, hv, lt, log) = make_tiered_storage();
1364 let id = make_id("obj");
1365
1366 let payload = vec![0xAAu8; 2 * 1024 * 1024];
1368 storage
1369 .put_object(&id, &Default::default(), stream::single(payload.clone()))
1370 .await
1371 .unwrap();
1372 let tombstone1 = hv.get(&id).expect_tombstone().target;
1373
1374 let broken_storage = TieredStorage::new(
1376 Box::new(TestBackend::with_inner(hv.clone(), FailCas(true))),
1377 Box::new(lt.clone()),
1378 Box::new(log.clone()),
1379 );
1380 broken_storage
1381 .put_object(&id, &Default::default(), stream::single(payload.clone()))
1382 .await
1383 .unwrap_err(); let tombstone2 = hv.get(&id).expect_tombstone().target;
1385 assert_ne!(tombstone1, tombstone2);
1386
1387 broken_storage.join().await;
1389 lt.get(&tombstone1).expect_not_found();
1390 lt.get(&tombstone2).expect_object();
1391
1392 broken_storage.delete_object(&id).await.unwrap_err();
1394 hv.get(&id).expect_not_found();
1395 broken_storage.join().await;
1396 lt.get(&tombstone2).expect_not_found();
1397
1398 let id = make_id("obj2");
1400 storage
1401 .put_object(&id, &Default::default(), stream::single(payload.clone()))
1402 .await
1403 .unwrap();
1404 let tombstone3 = hv.get(&id).expect_tombstone().target;
1405
1406 broken_storage
1408 .put_object(&id, &Default::default(), stream::single(&b"small"[..]))
1409 .await
1410 .unwrap_err(); hv.get(&id).expect_object();
1412 broken_storage.join().await;
1413 lt.get(&tombstone3).expect_not_found();
1414 }
1415
1416 #[test]
1420 fn guard_dropped_outside_runtime_does_not_panic() {
1421 let manager = ChangeManager::new(
1422 Box::new(InMemoryBackend::new("hv")),
1423 Box::new(InMemoryBackend::new("lt")),
1424 Box::new(NoopChangeLog),
1425 );
1426
1427 let change = Change {
1428 id: make_id("object-key"),
1429 new: Some(make_id("cleanup-target")),
1430 old: None,
1431 cleanup_after: None,
1432 };
1433
1434 let guard = {
1437 let rt = tokio::runtime::Runtime::new().unwrap();
1438 rt.block_on(manager.record(change)).unwrap()
1439 };
1440
1441 drop(guard); }
1443
1444 #[tokio::test(start_paused = true)]
1449 async fn join_waits_for_cleanup_to_complete() {
1450 let (storage, _hv, _lt, _) = make_tiered_storage();
1451 let change = Change {
1452 id: make_id("object-key"),
1453 new: None,
1454 old: None,
1455 cleanup_after: None,
1456 };
1457 let mut guard = storage.record_change(change).await.unwrap();
1458
1459 tokio::spawn(async move {
1460 tokio::time::sleep(Duration::from_secs(10)).await;
1461 guard.advance(ChangePhase::Completed);
1462 drop(guard);
1463 });
1464
1465 let join_future = tokio::spawn(async move { storage.join().await });
1466
1467 tokio::time::sleep(Duration::from_secs(9)).await;
1468 assert!(!join_future.is_finished(), "finished before guard dropped");
1469
1470 tokio::time::sleep(Duration::from_secs(2)).await;
1471 assert!(join_future.is_finished(), "finish after guard drops");
1472 }
1473
1474 #[derive(Clone, Debug)]
1481 struct PauseAfterPut {
1482 paused: Arc<tokio::sync::Notify>,
1483 resume: Arc<tokio::sync::Notify>,
1484 }
1485
1486 #[async_trait::async_trait]
1487 impl Hooks for PauseAfterPut {
1488 async fn put_object(
1489 &self,
1490 inner: &InMemoryBackend,
1491 id: &ObjectId,
1492 metadata: &Metadata,
1493 stream: ClientStream,
1494 ) -> Result<PutResponse> {
1495 inner.put_object(id, metadata, stream).await?;
1496 self.paused.notify_one();
1497 self.resume.notified().await;
1498 Ok(())
1499 }
1500 }
1501
1502 #[tokio::test]
1505 async fn dropped_future_triggers_cleanup_and_log_entry_removed() {
1506 let paused = Arc::new(tokio::sync::Notify::new());
1507 let hooks = PauseAfterPut {
1508 paused: Arc::clone(&paused),
1509 resume: Arc::new(tokio::sync::Notify::new()),
1510 };
1511
1512 let lt_inner = InMemoryBackend::new("lt");
1513 let log = InMemoryChangeLog::default();
1514 let storage = TieredStorage::new(
1515 Box::new(InMemoryBackend::new("hv")),
1516 Box::new(TestBackend::with_inner(lt_inner.clone(), hooks)),
1517 Box::new(log.clone()),
1518 );
1519
1520 let id = make_id("drop-test");
1521 let metadata = Metadata::default();
1522 let payload = vec![0xABu8; 2 * 1024 * 1024]; tokio::select! {
1526 result = storage.put_object(&id, &metadata, stream::single(payload)) => {
1527 panic!("expected put to pause before completing, got: {result:?}");
1528 }
1529 _ = paused.notified() => {
1530 }
1532 }
1533
1534 storage.join().await;
1536
1537 assert!(lt_inner.is_empty(), "orphaned LT blob was not cleaned up");
1539
1540 let entries = log.scan().await.unwrap();
1542 assert!(
1543 entries.is_empty(),
1544 "changelog entry not removed after cleanup"
1545 );
1546 }
1547
1548 #[test]
1551 fn multipart_upload_id_roundtrip() {
1552 let id = TieredUploadId {
1553 revision: "my-key/01924a6f-7e28-7b9a-9c1d-abcdef123456".into(),
1554 upload_id: UploadId::new("upstream-upload-id-abc".into()).unwrap(),
1555 };
1556 let encoded: UploadId = id.clone().try_into().unwrap();
1557 let decoded: TieredUploadId = (&encoded.clone()).try_into().unwrap();
1558 assert_eq!(decoded, id);
1559 }
1560
1561 #[tokio::test]
1562 async fn multipart_single_part_roundtrip() {
1563 let (storage, hv, lt, _) = make_tiered_storage();
1564 let id = make_id("mp-single");
1565 let metadata = Metadata {
1566 content_type: "application/octet-stream".into(),
1567 expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_secs(3600)),
1568 ..Default::default()
1569 };
1570 let payload = vec![0xABu8; 2 * 1024 * 1024]; let upload_id = storage.initiate_multipart(&id, &metadata).await.unwrap();
1573
1574 let etag = storage
1575 .upload_part(
1576 &id,
1577 &upload_id,
1578 NonZeroU32::new(1).unwrap(),
1579 payload.len() as u64,
1580 None,
1581 stream::single(payload.clone()),
1582 )
1583 .await
1584 .unwrap();
1585
1586 let error = storage
1587 .complete_multipart(
1588 &id,
1589 &upload_id,
1590 vec![CompletedPart {
1591 part_number: NonZeroU32::new(1).unwrap(),
1592 etag,
1593 }],
1594 )
1595 .await
1596 .unwrap();
1597 assert!(
1598 error.is_none(),
1599 "complete_multipart returned error: {error:?}"
1600 );
1601
1602 let (got_meta, s) = storage.get_object(&id).await.unwrap().unwrap();
1604 let body = stream::read_to_vec(s).await.unwrap();
1605 assert_eq!(body, payload);
1606 assert_eq!(got_meta.content_type, "application/octet-stream");
1607
1608 let tombstone = hv.get(&id).expect_tombstone();
1610 assert!(
1611 tombstone.target.key().starts_with(id.key()),
1612 "tombstone target should be a revision key"
1613 );
1614 lt.get(&tombstone.target).expect_object();
1615 }
1616
1617 #[tokio::test]
1618 async fn multipart_upload() {
1619 let (storage, _hv, _lt, _) = make_tiered_storage();
1620 let id = make_id("multipart");
1621
1622 let upload_id = storage
1623 .initiate_multipart(&id, &Default::default())
1624 .await
1625 .unwrap();
1626
1627 let part1 = vec![0xAAu8; 512 * 1024];
1628 let part2 = vec![0xBBu8; 512 * 1024];
1629 let part3 = vec![0xCCu8; 512 * 1024];
1630
1631 let etag3 = storage
1632 .upload_part(
1633 &id,
1634 &upload_id,
1635 NonZeroU32::new(3).unwrap(),
1636 part3.len() as u64,
1637 None,
1638 stream::single(part3.clone()),
1639 )
1640 .await
1641 .unwrap();
1642 let etag2 = storage
1643 .upload_part(
1644 &id,
1645 &upload_id,
1646 NonZeroU32::new(2).unwrap(),
1647 part2.len() as u64,
1648 None,
1649 stream::single(part2.clone()),
1650 )
1651 .await
1652 .unwrap();
1653 let etag1 = storage
1654 .upload_part(
1655 &id,
1656 &upload_id,
1657 NonZeroU32::new(1).unwrap(),
1658 part1.len() as u64,
1659 None,
1660 stream::single(part1.clone()),
1661 )
1662 .await
1663 .unwrap();
1664
1665 let error = storage
1666 .complete_multipart(
1667 &id,
1668 &upload_id,
1669 vec![
1670 CompletedPart {
1671 part_number: NonZeroU32::new(1).unwrap(),
1672 etag: etag1,
1673 },
1674 CompletedPart {
1675 part_number: NonZeroU32::new(2).unwrap(),
1676 etag: etag2,
1677 },
1678 CompletedPart {
1679 part_number: NonZeroU32::new(3).unwrap(),
1680 etag: etag3,
1681 },
1682 ],
1683 )
1684 .await
1685 .unwrap();
1686 assert!(error.is_none());
1687
1688 let (_, s) = storage.get_object(&id).await.unwrap().unwrap();
1689 let body = stream::read_to_vec(s).await.unwrap();
1690
1691 let mut expected = Vec::new();
1692 expected.extend_from_slice(&part1);
1693 expected.extend_from_slice(&part2);
1694 expected.extend_from_slice(&part3);
1695 assert_eq!(body, expected);
1696 }
1697
1698 #[tokio::test]
1699 async fn multipart_abort() {
1700 let (storage, hv, _lt, _) = make_tiered_storage();
1701 let id = make_id("mp-abort");
1702
1703 let upload_id = storage
1704 .initiate_multipart(&id, &Default::default())
1705 .await
1706 .unwrap();
1707
1708 let payload = vec![0xABu8; 100];
1710 storage
1711 .upload_part(
1712 &id,
1713 &upload_id,
1714 NonZeroU32::new(1).unwrap(),
1715 payload.len() as u64,
1716 None,
1717 stream::single(payload),
1718 )
1719 .await
1720 .unwrap();
1721
1722 storage.abort_multipart(&id, &upload_id).await.unwrap();
1723
1724 hv.get(&id).expect_not_found();
1726
1727 assert!(storage.get_object(&id).await.unwrap().is_none());
1729 }
1730
1731 #[tokio::test]
1732 async fn multipart_list_parts() {
1733 let (storage, _hv, _lt, _) = make_tiered_storage();
1734 let id = make_id("mp-list");
1735
1736 let upload_id = storage
1737 .initiate_multipart(&id, &Default::default())
1738 .await
1739 .unwrap();
1740
1741 let part1 = vec![0xAAu8; 100];
1742 let part2 = vec![0xBBu8; 200];
1743 storage
1744 .upload_part(
1745 &id,
1746 &upload_id,
1747 NonZeroU32::new(1).unwrap(),
1748 part1.len() as u64,
1749 None,
1750 stream::single(part1),
1751 )
1752 .await
1753 .unwrap();
1754 storage
1755 .upload_part(
1756 &id,
1757 &upload_id,
1758 NonZeroU32::new(2).unwrap(),
1759 part2.len() as u64,
1760 None,
1761 stream::single(part2),
1762 )
1763 .await
1764 .unwrap();
1765
1766 let resp = storage
1767 .list_parts(&id, &upload_id, None, None)
1768 .await
1769 .unwrap();
1770 assert_eq!(resp.parts.len(), 2);
1771 assert_eq!(resp.parts[0].part_number.get(), 1);
1772 assert_eq!(resp.parts[0].size, 100);
1773 assert_eq!(resp.parts[1].part_number.get(), 2);
1774 assert_eq!(resp.parts[1].size, 200);
1775 }
1776
1777 #[tokio::test]
1778 async fn multipart_overwrites_existing_tombstone() {
1779 let (storage, hv, lt, _) = make_tiered_storage();
1780 let id = make_id("mp-overwrite");
1781
1782 let payload1 = vec![0xAAu8; 2 * 1024 * 1024];
1784 storage
1785 .put_object(&id, &Default::default(), stream::single(payload1))
1786 .await
1787 .unwrap();
1788 let old_lt_id = hv.get(&id).expect_tombstone().target;
1789
1790 let upload_id = storage
1792 .initiate_multipart(&id, &Default::default())
1793 .await
1794 .unwrap();
1795
1796 let payload2 = vec![0xBBu8; 2 * 1024 * 1024];
1797 let etag = storage
1798 .upload_part(
1799 &id,
1800 &upload_id,
1801 NonZeroU32::new(1).unwrap(),
1802 payload2.len() as u64,
1803 None,
1804 stream::single(payload2.clone()),
1805 )
1806 .await
1807 .unwrap();
1808
1809 let lt_id = hv.get(&id).expect_tombstone().target;
1812 assert_eq!(old_lt_id, lt_id);
1813
1814 let error = storage
1815 .complete_multipart(
1816 &id,
1817 &upload_id,
1818 vec![CompletedPart {
1819 part_number: NonZeroU32::new(1).unwrap(),
1820 etag,
1821 }],
1822 )
1823 .await
1824 .unwrap();
1825 assert!(error.is_none());
1826
1827 let new_lt_id = hv.get(&id).expect_tombstone().target;
1829 assert_ne!(old_lt_id, new_lt_id);
1830
1831 storage.join().await;
1833
1834 lt.get(&old_lt_id).expect_not_found();
1836 lt.get(&new_lt_id).expect_object();
1837
1838 let (_, s) = storage.get_object(&id).await.unwrap().unwrap();
1840 let body = stream::read_to_vec(s).await.unwrap();
1841 assert_eq!(body, payload2);
1842 }
1843
1844 #[derive(Debug)]
1850 struct CompleteMultipartButReturnError;
1851
1852 #[async_trait::async_trait]
1853 impl Hooks for CompleteMultipartButReturnError {
1854 async fn complete_multipart(
1855 &self,
1856 inner: &InMemoryBackend,
1857 id: &ObjectId,
1858 upload_id: &UploadId,
1859 parts: Vec<CompletedPart>,
1860 ) -> Result<CompleteMultipartResponse> {
1861 inner
1862 .complete_multipart(id, upload_id, parts)
1863 .await
1864 .unwrap();
1865 Err(Error::Io(std::io::Error::new(
1866 std::io::ErrorKind::TimedOut,
1867 "simulated network error on complete_multipart",
1868 )))
1869 }
1870
1871 async fn get_metadata(
1872 &self,
1873 _inner: &InMemoryBackend,
1874 _id: &ObjectId,
1875 ) -> Result<MetadataResponse> {
1876 Err(Error::Io(std::io::Error::new(
1877 std::io::ErrorKind::TimedOut,
1878 "simulated network error on get_metadata",
1879 )))
1880 }
1881 }
1882
1883 #[tokio::test]
1887 async fn cleans_up_orphan_after_failed_multipart_complete() {
1888 let hv = InMemoryBackend::new("hv");
1889 let lt_inner = InMemoryBackend::new("lt");
1890 let log = InMemoryChangeLog::default();
1891 let storage = TieredStorage::new(
1892 Box::new(hv.clone()),
1893 Box::new(TestBackend::with_inner(
1894 lt_inner.clone(),
1895 CompleteMultipartButReturnError {},
1896 )),
1897 Box::new(log.clone()),
1898 );
1899
1900 let id = make_id("mp-orphan");
1901 let upload_id = storage
1902 .initiate_multipart(&id, &Default::default())
1903 .await
1904 .unwrap();
1905
1906 let tiered_id: TieredUploadId = (&upload_id).try_into().unwrap();
1907 let physical = ObjectId {
1908 context: id.context.clone(),
1909 key: tiered_id.revision,
1910 };
1911
1912 let payload = vec![0xABu8; 2 * 1024 * 1024];
1913 let etag = storage
1914 .upload_part(
1915 &id,
1916 &upload_id,
1917 NonZeroU32::new(1).unwrap(),
1918 payload.len() as u64,
1919 None,
1920 stream::single(payload),
1921 )
1922 .await
1923 .unwrap();
1924
1925 let result = storage
1926 .complete_multipart(
1927 &id,
1928 &upload_id,
1929 vec![CompletedPart {
1930 part_number: NonZeroU32::new(1).unwrap(),
1931 etag,
1932 }],
1933 )
1934 .await;
1935 assert!(result.is_err());
1936 storage.join().await;
1937
1938 lt_inner.get(&physical).expect_object();
1941 hv.get(&id).expect_not_found();
1942
1943 log.expire_all();
1945 let manager = ChangeManager::new(
1946 Box::new(hv.clone()),
1947 Box::new(lt_inner.clone()),
1948 Box::new(log.clone()),
1949 );
1950 manager.recover().await.unwrap();
1951
1952 lt_inner.get(&physical).expect_not_found();
1954 let remaining = log.scan().await.unwrap();
1956 assert!(remaining.is_empty());
1957 }
1958
1959 #[derive(Debug)]
1960 struct FailOnFirstCompleteMultipartAttempt {
1961 attempt: Mutex<u32>,
1962 }
1963
1964 impl FailOnFirstCompleteMultipartAttempt {
1965 fn new() -> Self {
1966 Self {
1967 attempt: Mutex::new(0),
1968 }
1969 }
1970 }
1971
1972 #[async_trait::async_trait]
1973 impl Hooks for FailOnFirstCompleteMultipartAttempt {
1974 async fn complete_multipart(
1975 &self,
1976 inner: &InMemoryBackend,
1977 id: &ObjectId,
1978 upload_id: &UploadId,
1979 parts: Vec<CompletedPart>,
1980 ) -> Result<CompleteMultipartResponse> {
1981 let mut attempt = self.attempt.lock().await;
1982 *attempt += 1;
1983 if *attempt == 1 {
1984 return Err(Error::Io(std::io::Error::new(
1985 std::io::ErrorKind::TimedOut,
1986 "simulated network error",
1987 )));
1988 } else {
1989 return Ok(inner
1990 .complete_multipart(id, upload_id, parts)
1991 .await
1992 .unwrap());
1993 }
1994 }
1995 }
1996
1997 #[tokio::test]
2002 async fn multipart_complete_succeeds_on_retry_and_leaves_state_consistent() {
2003 let hv = InMemoryBackend::new("hv");
2004 let lt_inner = InMemoryBackend::new("lt");
2005 let log = InMemoryChangeLog::default();
2006 let storage = TieredStorage::new(
2007 Box::new(hv.clone()),
2008 Box::new(TestBackend::with_inner(
2009 lt_inner.clone(),
2010 FailOnFirstCompleteMultipartAttempt::new(),
2011 )),
2012 Box::new(log.clone()),
2013 );
2014
2015 let id = make_id("mp-retry");
2016 let upload_id = storage
2017 .initiate_multipart(&id, &Default::default())
2018 .await
2019 .unwrap();
2020
2021 let tiered_id: TieredUploadId = (&upload_id).try_into().unwrap();
2022 let physical = ObjectId {
2023 context: id.context.clone(),
2024 key: tiered_id.revision,
2025 };
2026
2027 let payload = vec![0xABu8; 2 * 1024 * 1024];
2028 let etag = storage
2029 .upload_part(
2030 &id,
2031 &upload_id,
2032 NonZeroU32::new(1).unwrap(),
2033 payload.len() as u64,
2034 None,
2035 stream::single(payload.clone()),
2036 )
2037 .await
2038 .unwrap();
2039
2040 let result = storage
2042 .complete_multipart(
2043 &id,
2044 &upload_id,
2045 vec![CompletedPart {
2046 part_number: NonZeroU32::new(1).unwrap(),
2047 etag: etag.clone(),
2048 }],
2049 )
2050 .await;
2051 assert!(result.is_err());
2052 storage.join().await;
2053
2054 let result = storage
2056 .complete_multipart(
2057 &id,
2058 &upload_id,
2059 vec![CompletedPart {
2060 part_number: NonZeroU32::new(1).unwrap(),
2061 etag,
2062 }],
2063 )
2064 .await;
2065 assert!(result.is_ok());
2066 storage.join().await;
2067
2068 let (_, s) = storage.get_object(&id).await.unwrap().unwrap();
2070 let body = stream::read_to_vec(s).await.unwrap();
2071 assert_eq!(body, payload);
2072
2073 log.expire_all();
2075 let manager = ChangeManager::new(
2076 Box::new(hv.clone()),
2077 Box::new(lt_inner.clone()),
2078 Box::new(log.clone()),
2079 );
2080 manager.recover().await.unwrap();
2081
2082 lt_inner.get(&physical).expect_object();
2084 let tombstone = hv.get(&id).expect_tombstone();
2086 assert_eq!(tombstone.target, physical);
2087 let remaining = log.scan().await.unwrap();
2089 assert!(remaining.is_empty());
2090
2091 let (_, s) = storage.get_object(&id).await.unwrap().unwrap();
2093 let body = stream::read_to_vec(s).await.unwrap();
2094 assert_eq!(body, payload);
2095 }
2096
2097 #[derive(Debug)]
2098 struct FailOnFirstGetMetadataAttempt {
2099 attempt: Mutex<u32>,
2100 }
2101
2102 impl FailOnFirstGetMetadataAttempt {
2103 fn new() -> Self {
2104 Self {
2105 attempt: Mutex::new(0),
2106 }
2107 }
2108 }
2109
2110 #[async_trait::async_trait]
2111 impl Hooks for FailOnFirstGetMetadataAttempt {
2112 async fn get_metadata(
2113 &self,
2114 inner: &InMemoryBackend,
2115 id: &ObjectId,
2116 ) -> Result<MetadataResponse> {
2117 let mut attempt = self.attempt.lock().await;
2118 *attempt += 1;
2119 if *attempt == 1 {
2120 return Err(Error::Io(std::io::Error::new(
2121 std::io::ErrorKind::TimedOut,
2122 "simulated network error",
2123 )));
2124 } else {
2125 inner.get_metadata(id).await
2126 }
2127 }
2128 }
2129
2130 #[tokio::test]
2137 async fn multipart_complete_succeeds_on_retry_if_get_metadata_errs_and_leaves_state_consistent()
2138 {
2139 let hv = InMemoryBackend::new("hv");
2140 let lt_inner = InMemoryBackend::new("lt");
2141 let log = InMemoryChangeLog::default();
2142 let storage = TieredStorage::new(
2143 Box::new(hv.clone()),
2144 Box::new(TestBackend::with_inner(
2145 lt_inner.clone(),
2146 FailOnFirstGetMetadataAttempt::new(),
2147 )),
2148 Box::new(log.clone()),
2149 );
2150
2151 let id = make_id("mp-retry-meta");
2152 let upload_id = storage
2153 .initiate_multipart(&id, &Default::default())
2154 .await
2155 .unwrap();
2156
2157 let tiered_id: TieredUploadId = (&upload_id).try_into().unwrap();
2158 let physical = ObjectId {
2159 context: id.context.clone(),
2160 key: tiered_id.revision,
2161 };
2162
2163 let payload = vec![0xABu8; 2 * 1024 * 1024];
2164 let etag = storage
2165 .upload_part(
2166 &id,
2167 &upload_id,
2168 NonZeroU32::new(1).unwrap(),
2169 payload.len() as u64,
2170 None,
2171 stream::single(payload.clone()),
2172 )
2173 .await
2174 .unwrap();
2175
2176 let result = storage
2179 .complete_multipart(
2180 &id,
2181 &upload_id,
2182 vec![CompletedPart {
2183 part_number: NonZeroU32::new(1).unwrap(),
2184 etag: etag.clone(),
2185 }],
2186 )
2187 .await;
2188 assert!(result.is_err());
2189 storage.join().await;
2190
2191 let result = storage
2193 .complete_multipart(
2194 &id,
2195 &upload_id,
2196 vec![CompletedPart {
2197 part_number: NonZeroU32::new(1).unwrap(),
2198 etag,
2199 }],
2200 )
2201 .await;
2202 assert!(result.is_ok());
2203 storage.join().await;
2204
2205 let (_, s) = storage.get_object(&id).await.unwrap().unwrap();
2207 let body = stream::read_to_vec(s).await.unwrap();
2208 assert_eq!(body, payload);
2209
2210 log.expire_all();
2212 let manager = ChangeManager::new(
2213 Box::new(hv.clone()),
2214 Box::new(lt_inner.clone()),
2215 Box::new(log.clone()),
2216 );
2217 manager.recover().await.unwrap();
2218
2219 lt_inner.get(&physical).expect_object();
2221 let tombstone = hv.get(&id).expect_tombstone();
2223 assert_eq!(tombstone.target, physical);
2224 let remaining = log.scan().await.unwrap();
2226 assert!(remaining.is_empty());
2227
2228 let (_, s) = storage.get_object(&id).await.unwrap().unwrap();
2230 let body = stream::read_to_vec(s).await.unwrap();
2231 assert_eq!(body, payload);
2232 }
2233}