1use std::sync::Arc;
101use std::sync::atomic::{AtomicU64, Ordering};
102use std::time::{Duration, SystemTime};
103
104use base64::Engine as _;
105use bytes::Bytes;
106use futures_util::{Stream, StreamExt};
107use objectstore_types::metadata::Metadata;
108use objectstore_types::range::ByteRange;
109use serde::{Deserialize, Serialize};
110
111use crate::backend::changelog::{Change, ChangeGuard, ChangeLog, ChangeManager, ChangePhase};
112use crate::backend::common::{
113 Backend, DeleteResponse, GetResponse, HighVolumeBackend, MetadataResponse,
114 MultipartUploadBackend, PutResponse, TieredGet, TieredMetadata, TieredWrite, Tombstone,
115};
116use crate::backend::{HighVolumeStorageConfig, MultipartUploadStorageConfig};
117use crate::error::{Error, Result};
118use crate::id::ObjectId;
119use crate::multipart::{
120 AbortMultipartResponse, CompleteMultipartResponse, CompletedPart, InitiateMultipartResponse,
121 ListPartsResponse, PartNumber, UploadId, UploadPartResponse,
122};
123use crate::stream::{ClientStream, SizedPeek};
124
125const BACKEND_SIZE_THRESHOLD: usize = 1024 * 1024; const MULTIPART_COMPLETE_CLEANUP_DELAY: Duration = Duration::from_hours(24);
133
134fn new_long_term_revision(id: &ObjectId) -> ObjectId {
140 ObjectId {
141 context: id.context.clone(),
142 key: format!("{}/{}", id.key, uuid::Uuid::now_v7()),
143 }
144}
145
146#[derive(Debug, Clone, Deserialize, Serialize)]
167pub struct TieredStorageConfig {
168 pub high_volume: HighVolumeStorageConfig,
173 pub long_term: MultipartUploadStorageConfig,
177}
178
179#[derive(Debug)]
229pub struct TieredStorage {
230 inner: Arc<ChangeManager>,
231}
232
233impl TieredStorage {
234 pub fn new(
236 high_volume: Box<dyn HighVolumeBackend>,
237 long_term: Box<dyn MultipartUploadBackend>,
238 changelog: Box<dyn ChangeLog>,
239 ) -> Self {
240 let inner = ChangeManager::new(high_volume, long_term, changelog);
241 tokio::spawn(inner.clone().recover());
244 Self { inner }
245 }
246
247 async fn record_change(&self, change: Change) -> Result<ChangeGuard> {
249 self.inner.clone().record(change).await
250 }
251
252 async fn record_assembling(&self, change: Change) -> Result<ChangeGuard> {
255 self.inner.clone().record_assembling(change).await
256 }
257
258 fn backend_type(&self, choice: &BackendChoice) -> &'static str {
260 match choice {
261 BackendChoice::HighVolume => self.inner.high_volume.name(),
262 BackendChoice::LongTerm => self.inner.long_term.name(),
263 }
264 }
265
266 async fn put_high_volume(
271 &self,
272 id: &ObjectId,
273 metadata: &Metadata,
274 payload: Bytes,
275 ) -> Result<()> {
276 let tombstone_opt = self
277 .inner
278 .high_volume
279 .put_non_tombstone(id, metadata, payload.clone())
280 .await?;
281
282 let Some(Tombstone { target, .. }) = tombstone_opt else {
283 return Ok(());
285 };
286
287 let mut guard = self
289 .record_change(Change {
290 id: id.clone(),
291 new: None,
292 old: Some(target.clone()),
293 cleanup_after: None,
294 })
295 .await?;
296
297 let write = TieredWrite::Object(metadata.clone(), payload);
298 guard.advance(ChangePhase::Written);
299
300 let written = self
301 .inner
302 .high_volume
303 .compare_and_write(id, Some(&target), write)
304 .await?;
305
306 guard.advance(ChangePhase::compare_and_write(written));
308
309 Ok(())
310 }
311
312 async fn put_long_term(
317 &self,
318 id: &ObjectId,
319 metadata: &Metadata,
320 stream: ClientStream,
321 ) -> Result<()> {
322 let current = match self.inner.high_volume.get_tiered_metadata(id).await? {
324 TieredMetadata::Tombstone(t) => Some(t.target),
325 _ => None,
326 };
327
328 let new = new_long_term_revision(id);
330 let mut guard = self
331 .record_change(Change {
332 id: id.clone(),
333 new: Some(new.clone()),
334 old: current.clone(),
335 cleanup_after: None,
336 })
337 .await?;
338
339 self.inner
340 .long_term
341 .put_object(&new, metadata, stream)
342 .await?;
343 guard.advance(ChangePhase::Written);
344
345 let tombstone = Tombstone {
347 target: new.clone(),
348 expiration_policy: metadata.expiration_policy,
349 };
350 let written = self
351 .inner
352 .high_volume
353 .compare_and_write(id, current.as_ref(), TieredWrite::Tombstone(tombstone))
354 .await?;
355
356 guard.advance(ChangePhase::compare_and_write(written));
358
359 Ok(())
360 }
361}
362
363#[async_trait::async_trait]
364impl Backend for TieredStorage {
365 fn name(&self) -> &'static str {
366 "tiered"
367 }
368
369 fn as_multipart_upload_backend(&self) -> Result<&dyn MultipartUploadBackend> {
370 Ok(self)
371 }
372
373 async fn put_object(
374 &self,
375 id: &ObjectId,
376 metadata: &Metadata,
377 stream: ClientStream,
378 ) -> Result<PutResponse> {
379 let timer = objectstore_metrics::timer!("put.latency", usecase = id.usecase().to_owned());
380 if metadata.origin.is_none() {
381 objectstore_metrics::count!("put.origin_missing", usecase = id.usecase().to_owned());
382 }
383
384 let peeked = SizedPeek::new(stream, BACKEND_SIZE_THRESHOLD).await?;
385 objectstore_metrics::record!(
386 "put.first_chunk.latency" = timer.elapsed(),
387 usecase = id.usecase().to_owned(),
388 complete = if peeked.is_exhausted() { "yes" } else { "no" },
389 );
390
391 let (backend_choice, stored_size) = if peeked.is_exhausted() {
392 let payload = peeked.into_bytes().await?;
393 let payload_len = payload.len() as u64;
394 self.put_high_volume(id, metadata, payload).await?;
395 (BackendChoice::HighVolume, payload_len)
396 } else {
397 let (stored_size, stream) = counting_stream(peeked.into_stream());
398 self.put_long_term(id, metadata, stream.boxed()).await?;
399 (BackendChoice::LongTerm, stored_size.load(Ordering::Acquire))
400 };
401
402 let backend_ty = self.backend_type(&backend_choice);
403 timer
404 .tag("backend_choice", backend_choice.as_str())
405 .tag("backend_type", backend_ty)
406 .record();
407 objectstore_metrics::record!(
408 "put.size" = stored_size,
409 usecase = id.usecase().to_owned(),
410 backend_choice = backend_choice.as_str(),
411 backend_type = backend_ty,
412 upload_type = "direct",
413 );
414
415 Ok(())
416 }
417
418 async fn get_object(&self, id: &ObjectId, range: Option<ByteRange>) -> Result<GetResponse> {
419 let timer = objectstore_metrics::timer!(
420 "get.latency.pre-response",
421 usecase = id.usecase().to_owned(),
422 );
423
424 let hv_result = self.inner.high_volume.get_tiered_object(id, range).await?;
425 let (result, backend_choice) = match hv_result {
426 TieredGet::NotFound => (None, BackendChoice::HighVolume),
427 TieredGet::Object(metadata, content_range, stream) => (
428 Some((metadata, content_range, stream)),
429 BackendChoice::HighVolume,
430 ),
431 TieredGet::Tombstone(tombstone) => (
432 self.inner
433 .long_term
434 .get_object(&tombstone.target, range)
435 .await?,
436 BackendChoice::LongTerm,
437 ),
438 };
439
440 let backend_type = self.backend_type(&backend_choice);
441 timer
442 .tag("backend_choice", backend_choice.as_str())
443 .tag("backend_type", backend_type)
444 .record();
445
446 if let Some((ref metadata, ref content_range, _)) = result {
447 let size = content_range.map(|cr| cr.len() as usize).or(metadata.size);
448 if let Some(size) = size {
449 objectstore_metrics::record!(
450 "get.size" = size,
451 usecase = id.usecase().to_owned(),
452 backend_choice = backend_choice.as_str(),
453 backend_type = backend_type,
454 );
455 }
456 }
457
458 Ok(result)
459 }
460
461 async fn get_metadata(&self, id: &ObjectId) -> Result<MetadataResponse> {
462 let timer = objectstore_metrics::timer!("head.latency", usecase = id.usecase().to_owned());
463
464 let hv_result = self.inner.high_volume.get_tiered_metadata(id).await?;
465 let (result, backend_choice) = match hv_result {
466 TieredMetadata::NotFound => (None, BackendChoice::HighVolume),
467 TieredMetadata::Object(metadata) => (Some(metadata), BackendChoice::HighVolume),
468 TieredMetadata::Tombstone(tombstone) => (
469 self.inner.long_term.get_metadata(&tombstone.target).await?,
470 BackendChoice::LongTerm,
471 ),
472 };
473
474 timer
475 .tag("backend_choice", backend_choice.as_str())
476 .tag("backend_type", self.backend_type(&backend_choice))
477 .record();
478
479 Ok(result)
480 }
481
482 async fn delete_object(&self, id: &ObjectId) -> Result<DeleteResponse> {
483 let timer =
484 objectstore_metrics::timer!("delete.latency", usecase = id.usecase().to_owned());
485
486 let mut backend_choice = BackendChoice::HighVolume;
487
488 if let Some(tombstone) = self.inner.high_volume.delete_non_tombstone(id).await? {
489 backend_choice = BackendChoice::LongTerm;
490
491 let mut guard = self
492 .record_change(Change {
493 id: id.clone(),
494 new: None,
495 old: Some(tombstone.target.clone()),
496 cleanup_after: None,
497 })
498 .await?;
499 guard.advance(ChangePhase::Written);
500
501 let deleted = self
503 .inner
504 .high_volume
505 .compare_and_write(id, Some(&tombstone.target), TieredWrite::Delete)
506 .await?;
507
508 guard.advance(ChangePhase::compare_and_write(deleted));
510 }
511
512 timer
513 .tag("backend_choice", backend_choice.as_str())
514 .tag("backend_type", self.backend_type(&backend_choice))
515 .record();
516
517 Ok(())
518 }
519
520 async fn join(&self) {
521 self.inner.tracker.close();
522 tokio::join!(
523 self.inner.high_volume.join(),
524 self.inner.long_term.join(),
525 self.inner.tracker.wait()
526 );
527 }
528}
529
530#[derive(Debug)]
531enum BackendChoice {
532 HighVolume,
533 LongTerm,
534}
535
536impl BackendChoice {
537 fn as_str(&self) -> &'static str {
538 match self {
539 BackendChoice::HighVolume => "high-volume",
540 BackendChoice::LongTerm => "long-term",
541 }
542 }
543}
544
545impl std::fmt::Display for BackendChoice {
546 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
547 f.write_str(self.as_str())
548 }
549}
550
551fn counting_stream<S, E>(stream: S) -> (Arc<AtomicU64>, impl Stream<Item = Result<Bytes, E>>)
556where
557 S: Stream<Item = Result<Bytes, E>>,
558{
559 let counter = Arc::new(AtomicU64::new(0));
560
561 (
562 counter.clone(),
563 stream.inspect(move |res| {
564 if let Ok(chunk) = res {
565 counter.fetch_add(chunk.len() as u64, Ordering::Relaxed);
566 }
567 }),
568 )
569}
570
571#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
573struct TieredUploadId {
574 revision: String,
575 upload_id: UploadId,
576}
577
578impl TryInto<UploadId> for TieredUploadId {
579 type Error = Error;
580
581 fn try_into(self) -> Result<UploadId, Self::Error> {
582 let json =
583 serde_json::to_vec(&self).map_err(|e| Error::serde("encoding multipart token", e))?;
584 Ok(UploadId::new(
585 base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(json),
586 )?)
587 }
588}
589
590impl TryFrom<&UploadId> for TieredUploadId {
591 type Error = Error;
592
593 fn try_from(value: &UploadId) -> Result<Self, Self::Error> {
594 let json = base64::engine::general_purpose::URL_SAFE_NO_PAD
595 .decode(value.as_bytes())
596 .map_err(|e| Error::generic(format!("invalid multipart upload ID: {e}")))?;
597 serde_json::from_slice(&json).map_err(|e| Error::serde("decoding multipart token", e))
598 }
599}
600
601#[async_trait::async_trait]
602impl MultipartUploadBackend for TieredStorage {
603 async fn initiate_multipart(
604 &self,
605 id: &ObjectId,
606 metadata: &Metadata,
607 ) -> Result<InitiateMultipartResponse> {
608 let timer = objectstore_metrics::timer!(
609 "multipart.initiate.latency",
610 usecase = id.usecase().to_owned(),
611 );
612 let physical = new_long_term_revision(id);
613
614 let upload_id = self
615 .inner
616 .long_term
617 .initiate_multipart(&physical, metadata)
618 .await?;
619
620 let id = TieredUploadId {
621 revision: physical.key,
622 upload_id,
623 };
624 let id = id.try_into()?;
625
626 timer.record();
627 Ok(id)
628 }
629
630 async fn upload_part(
631 &self,
632 id: &ObjectId,
633 upload_id: &UploadId,
634 part_number: PartNumber,
635 content_length: u64,
636 content_md5: Option<&str>,
637 body: ClientStream,
638 ) -> Result<UploadPartResponse> {
639 let timer = objectstore_metrics::timer!(
640 "multipart.upload_part.latency",
641 usecase = id.usecase().to_owned(),
642 );
643 let tiered: TieredUploadId = upload_id.try_into()?;
644
645 let physical = ObjectId {
646 context: id.context.clone(),
647 key: tiered.revision,
648 };
649
650 let etag = self
651 .inner
652 .long_term
653 .upload_part(
654 &physical,
655 &tiered.upload_id,
656 part_number,
657 content_length,
658 content_md5,
659 body,
660 )
661 .await?;
662
663 timer.record();
664 objectstore_metrics::record!(
665 "multipart.upload_part.size" = content_length,
666 usecase = id.usecase().to_owned(),
667 );
668
669 Ok(etag)
670 }
671
672 async fn list_parts(
673 &self,
674 id: &ObjectId,
675 upload_id: &UploadId,
676 max_parts: Option<u32>,
677 part_number_marker: Option<PartNumber>,
678 ) -> Result<ListPartsResponse> {
679 let timer = objectstore_metrics::timer!(
680 "multipart.list_parts.latency",
681 usecase = id.usecase().to_owned(),
682 );
683 let tiered: TieredUploadId = upload_id.try_into()?;
684
685 let physical = ObjectId {
686 context: id.context.clone(),
687 key: tiered.revision,
688 };
689
690 let response = self
691 .inner
692 .long_term
693 .list_parts(&physical, &tiered.upload_id, max_parts, part_number_marker)
694 .await?;
695
696 timer.record();
697 Ok(response)
698 }
699
700 async fn abort_multipart(
701 &self,
702 id: &ObjectId,
703 upload_id: &UploadId,
704 ) -> Result<AbortMultipartResponse> {
705 let timer = objectstore_metrics::timer!(
706 "multipart.abort.latency",
707 usecase = id.usecase().to_owned(),
708 );
709 let tiered: TieredUploadId = upload_id.try_into()?;
710
711 let physical = ObjectId {
712 context: id.context.clone(),
713 key: tiered.revision,
714 };
715
716 let () = self
717 .inner
718 .long_term
719 .abort_multipart(&physical, &tiered.upload_id)
720 .await?;
721
722 timer.record();
723 Ok(())
724 }
725
726 async fn complete_multipart(
727 &self,
728 id: &ObjectId,
729 upload_id: &UploadId,
730 parts: Vec<CompletedPart>,
731 ) -> Result<CompleteMultipartResponse> {
732 let timer = objectstore_metrics::timer!(
733 "multipart.complete.latency",
734 usecase = id.usecase().to_owned(),
735 );
736 let part_count = parts.len();
737 let tiered: TieredUploadId = upload_id.try_into()?;
738
739 let physical = ObjectId {
740 context: id.context.clone(),
741 key: tiered.revision,
742 };
743
744 let current = match self.inner.high_volume.get_tiered_metadata(id).await? {
746 TieredMetadata::Tombstone(t) if t.target == physical => {
748 timer.record();
749 return Ok(None);
750 }
751 TieredMetadata::Tombstone(t) => Some(t.target),
752 _ => None,
753 };
754
755 let mut guard = self
758 .record_assembling(Change {
759 id: id.clone(),
760 new: Some(physical.clone()),
761 old: current.clone(),
762 cleanup_after: Some(SystemTime::now() + MULTIPART_COMPLETE_CLEANUP_DELAY),
763 })
764 .await?;
765
766 let maybe_complete_multipart_err = match self
768 .inner
769 .long_term
770 .complete_multipart(&physical, &tiered.upload_id, parts)
771 .await
772 {
773 Ok(error) => {
776 if error.is_some() {
777 return Ok(error);
778 }
779 None
780 }
781 Err(err) => Some(err),
787 };
788
789 let metadata = self.inner.long_term.get_metadata(&physical).await;
796
797 let metadata = match (metadata, maybe_complete_multipart_err) {
798 (Ok(Some(metadata)), _) => metadata,
800 (Ok(None), Some(err)) => return Err(err),
802 (Ok(None), None) => {
805 objectstore_log::error!(
806 id = ?id,
807 upload_id = ?upload_id,
808 physical = ?physical,
809 "complete_multipart call succeeded on long_term backend, but subsequent get_metadata found no object"
810 );
811 return Err(Error::generic(
812 "completed multipart object not found in long-term storage",
813 ));
814 }
815 (Err(get_metadata_err), maybe_complete_multipart_err) => {
817 return Err(maybe_complete_multipart_err.unwrap_or(get_metadata_err));
821 }
822 };
823
824 let tombstone = Tombstone {
826 target: physical.clone(),
827 expiration_policy: metadata.expiration_policy,
828 };
829 let written = self
830 .inner
831 .high_volume
832 .compare_and_write(id, current.as_ref(), TieredWrite::Tombstone(tombstone))
833 .await?;
834
835 guard.advance(ChangePhase::compare_and_write(written));
837
838 timer.record();
839 objectstore_metrics::record!(
840 "multipart.complete.part_count" = part_count as u64,
841 usecase = id.usecase().to_owned(),
842 );
843 if let Some(size) = metadata.size {
844 objectstore_metrics::record!(
845 "put.size" = size as u64,
846 usecase = id.usecase().to_owned(),
847 backend_choice = BackendChoice::LongTerm.as_str(),
848 backend_type = self.backend_type(&BackendChoice::LongTerm),
849 upload_type = "multipart",
850 );
851 }
852
853 Ok(None)
854 }
855}
856
857#[cfg(test)]
858mod tests {
859 use std::num::NonZeroU32;
860
861 use futures::lock::Mutex;
862 use objectstore_types::metadata::{ExpirationPolicy, Metadata};
863 use objectstore_types::scope::{Scope, Scopes};
864
865 use super::*;
866 use crate::backend::changelog::{InMemoryChangeLog, NoopChangeLog};
867 use crate::backend::in_memory::InMemoryBackend;
868 use crate::backend::testing::{Hooks, TestBackend};
869 use crate::error::Error;
870 use crate::id::ObjectContext;
871
872 use crate::stream::{self, ClientStream};
873
874 fn make_context() -> ObjectContext {
875 ObjectContext {
876 usecase: "testing".into(),
877 scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
878 }
879 }
880
881 fn make_id(key: &str) -> ObjectId {
882 ObjectId::new(make_context(), key.into())
883 }
884
885 fn make_tiered_storage() -> (
886 TieredStorage,
887 InMemoryBackend,
888 InMemoryBackend,
889 InMemoryChangeLog,
890 ) {
891 let hv = InMemoryBackend::new("in-memory-hv");
892 let lt = InMemoryBackend::new("in-memory-lt");
893 let changelog = InMemoryChangeLog::default();
894 let storage = TieredStorage::new(
895 Box::new(hv.clone()),
896 Box::new(lt.clone()),
897 Box::new(changelog.clone()),
898 );
899 (storage, hv, lt, changelog)
900 }
901
902 #[test]
905 fn revision_id_preserves_context() {
906 let id = make_id("my-key");
907 let revised = new_long_term_revision(&id);
908 assert_eq!(revised.context, id.context);
909 assert!(
910 revised.key.starts_with("my-key/"),
911 "revised key should have /<uuid> suffix, got: {}",
912 revised.key
913 );
914 }
915
916 #[test]
917 fn revision_id_roundtrips_storage_path() {
918 let id = make_id("original");
919 let revised = new_long_term_revision(&id);
920 let path = revised.as_storage_path().to_string();
921 let parsed = ObjectId::from_storage_path(&path)
922 .unwrap_or_else(|| panic!("failed to parse '{path}'"));
923 assert_eq!(parsed, revised);
924 }
925
926 #[test]
927 fn revision_id_is_unique() {
928 let id = make_id("base-key");
929 let a = new_long_term_revision(&id);
930 let b = new_long_term_revision(&id);
931 assert_ne!(a.key, b.key, "two calls should produce different keys");
932 }
933
934 #[tokio::test]
937 async fn get_nonexistent_returns_none() {
938 let (storage, _hv, _lt, _) = make_tiered_storage();
939 let id = make_id("does-not-exist");
940
941 assert!(storage.get_object(&id, None).await.unwrap().is_none());
942 assert!(storage.get_metadata(&id).await.unwrap().is_none());
943 }
944
945 #[tokio::test]
946 async fn delete_nonexistent_succeeds() {
947 let (storage, _hv, _lt, _) = make_tiered_storage();
948 let id = make_id("does-not-exist");
949
950 storage.delete_object(&id).await.unwrap();
951 }
952
953 #[tokio::test]
956 async fn put_small_object_stores_inline() {
957 let (storage, hv, lt, _) = make_tiered_storage();
958 let id = make_id("small");
959 let payload = b"small payload".to_vec();
960
961 storage
962 .put_object(&id, &Metadata::default(), stream::single(payload.clone()))
963 .await
964 .unwrap();
965
966 assert!(hv.contains(&id), "expected in high-volume");
967 assert!(!lt.contains(&id), "leaked to long-term");
968
969 let (_, _, s) = storage.get_object(&id, None).await.unwrap().unwrap();
970 let body = stream::read_to_vec(s).await.unwrap();
971 assert_eq!(body, payload);
972
973 assert!(
974 storage.get_metadata(&id).await.unwrap().is_some(),
975 "get_metadata should return metadata for inline objects"
976 );
977 }
978
979 #[tokio::test]
980 async fn put_large_object_creates_tombstone() {
981 let (storage, hv, lt, _) = make_tiered_storage();
982 let id = make_id("large");
983 let payload = vec![0xCDu8; 2 * 1024 * 1024]; let metadata_in = Metadata {
985 content_type: "image/png".into(),
986 expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_hours(1)),
987 origin: Some("10.0.0.1".into()),
988 ..Metadata::default()
989 };
990
991 storage
992 .put_object(&id, &metadata_in, stream::single(payload.clone()))
993 .await
994 .unwrap();
995
996 let tombstone = hv.get(&id).expect_tombstone();
998 assert_eq!(tombstone.expiration_policy, metadata_in.expiration_policy);
999 let lt_id = tombstone.target;
1000 assert!(
1001 lt_id.key().starts_with(id.key()),
1002 "tombstone target key should be a revision of the HV key, got: {}",
1003 lt_id.key()
1004 );
1005
1006 let (lt_meta, _) = lt.get(<_id).expect_object();
1008 assert_eq!(lt_meta.content_type, "image/png");
1009 assert_eq!(lt_meta.expiration_policy, metadata_in.expiration_policy);
1010
1011 let (_, _, s) = storage.get_object(&id, None).await.unwrap().unwrap();
1013 let body = stream::read_to_vec(s).await.unwrap();
1014 assert_eq!(body, payload);
1015
1016 let metadata = storage.get_metadata(&id).await.unwrap().unwrap();
1018 assert_eq!(metadata.content_type, "image/png");
1019 }
1020
1021 #[tokio::test]
1024 async fn reinsert_small_over_large_swaps_to_inline() {
1025 let (storage, hv, lt, _) = make_tiered_storage();
1026 let id = make_id("reinsert-key");
1027
1028 let large_payload = vec![0xABu8; 2 * 1024 * 1024];
1030 storage
1031 .put_object(&id, &Metadata::default(), stream::single(large_payload))
1032 .await
1033 .unwrap();
1034
1035 let lt_id = hv.get(&id).expect_tombstone().target;
1036
1037 let small_payload = vec![0xCDu8; 100]; storage
1041 .put_object(&id, &Metadata::default(), stream::single(small_payload))
1042 .await
1043 .unwrap();
1044
1045 hv.get(&id).expect_object();
1047
1048 storage.join().await;
1050
1051 lt.get(<_id).expect_not_found();
1053 }
1054
1055 #[tokio::test]
1056 async fn overwrite_large_with_large_replaces_revision() {
1057 let (storage, hv, lt, _) = make_tiered_storage();
1058 let id = make_id("overwrite-large");
1059
1060 let payload1 = vec![0xAAu8; 2 * 1024 * 1024];
1061 storage
1062 .put_object(&id, &Metadata::default(), stream::single(payload1))
1063 .await
1064 .unwrap();
1065 let lt_id_1 = hv.get(&id).expect_tombstone().target;
1066
1067 let payload2 = vec![0xBBu8; 2 * 1024 * 1024];
1068 storage
1069 .put_object(&id, &Metadata::default(), stream::single(payload2.clone()))
1070 .await
1071 .unwrap();
1072 let lt_id_2 = hv.get(&id).expect_tombstone().target;
1073
1074 assert_ne!(
1075 lt_id_1, lt_id_2,
1076 "second write should create a new revision"
1077 );
1078
1079 storage.join().await;
1081
1082 lt.get(<_id_1).expect_not_found();
1083 lt.get(<_id_2).expect_object();
1084
1085 let (_, _, s) = storage.get_object(&id, None).await.unwrap().unwrap();
1086 let body = stream::read_to_vec(s).await.unwrap();
1087 assert_eq!(body, payload2);
1088 }
1089
1090 #[tokio::test]
1093 async fn delete_small_object() {
1094 let (storage, hv, _lt, _) = make_tiered_storage();
1095 let id = make_id("delete-small");
1096
1097 storage
1098 .put_object(&id, &Metadata::default(), stream::single("tiny"))
1099 .await
1100 .unwrap();
1101
1102 storage.delete_object(&id).await.unwrap();
1103
1104 hv.get(&id).expect_not_found();
1105 assert!(storage.get_object(&id, None).await.unwrap().is_none());
1106 }
1107
1108 #[tokio::test]
1109 async fn delete_large_object_cleans_up_both_backends() {
1110 let (storage, hv, lt, _) = make_tiered_storage();
1111 let id = make_id("delete-both");
1112 let payload = vec![0u8; 2 * 1024 * 1024]; storage
1115 .put_object(&id, &Metadata::default(), stream::single(payload))
1116 .await
1117 .unwrap();
1118
1119 let lt_id = hv.get(&id).expect_tombstone().target;
1121
1122 storage.delete_object(&id).await.unwrap();
1123
1124 storage.join().await;
1126
1127 assert!(!hv.contains(&id), "tombstone not cleaned up");
1128 assert!(!lt.contains(<_id), "long-term object not cleaned up");
1129 }
1130
1131 #[derive(Debug)]
1132 struct FailDelete;
1133
1134 #[async_trait::async_trait]
1135 impl Hooks for FailDelete {
1136 async fn delete_object(
1137 &self,
1138 _inner: &InMemoryBackend,
1139 _id: &ObjectId,
1140 ) -> Result<DeleteResponse> {
1141 Err(Error::Io(std::io::Error::new(
1142 std::io::ErrorKind::ConnectionRefused,
1143 "simulated long-term delete failure",
1144 )))
1145 }
1146 }
1147
1148 #[tokio::test]
1152 async fn delete_succeeds_when_gcs_cleanup_fails() {
1153 let hv = InMemoryBackend::new("hv");
1154 let lt = TestBackend::new(FailDelete);
1155 let log = NoopChangeLog;
1156 let storage = TieredStorage::new(Box::new(hv.clone()), Box::new(lt), Box::new(log));
1157
1158 let id = make_id("fail-delete");
1159 let payload = vec![0xABu8; 2 * 1024 * 1024]; storage
1161 .put_object(&id, &Metadata::default(), stream::single(payload))
1162 .await
1163 .unwrap();
1164
1165 let result = storage.delete_object(&id).await;
1167 assert!(
1168 result.is_ok(),
1169 "delete should succeed despite GCS cleanup failure"
1170 );
1171
1172 hv.get(&id).expect_not_found();
1174
1175 assert!(
1177 storage.get_object(&id, None).await.unwrap().is_none(),
1178 "object should be unreachable after tombstone is deleted"
1179 );
1180 }
1181
1182 #[derive(Debug)]
1185 struct CasConflict;
1186
1187 #[async_trait::async_trait]
1188 impl Hooks for CasConflict {
1189 async fn compare_and_write(
1190 &self,
1191 _inner: &InMemoryBackend,
1192 _id: &ObjectId,
1193 _current: Option<&ObjectId>,
1194 _write: TieredWrite,
1195 ) -> Result<bool> {
1196 Ok(false) }
1198 }
1199
1200 #[tokio::test]
1204 async fn put_large_cas_conflict_cleans_up_new_blob() {
1205 let hv = TestBackend::new(CasConflict);
1206 let lt = InMemoryBackend::new("lt");
1207 let log = NoopChangeLog;
1208 let storage = TieredStorage::new(Box::new(hv), Box::new(lt.clone()), Box::new(log));
1209
1210 let id = make_id("cas-conflict-large");
1211 let payload = vec![0xABu8; 2 * 1024 * 1024]; storage
1214 .put_object(&id, &Metadata::default(), stream::single(payload))
1215 .await
1216 .unwrap();
1217
1218 storage.join().await;
1220
1221 assert!(
1222 lt.is_empty(),
1223 "LT blob should be cleaned up after CAS conflict"
1224 );
1225 }
1226
1227 #[tokio::test]
1231 async fn put_small_over_tombstone_cas_conflict_succeeds() {
1232 let inner = InMemoryBackend::new("hv");
1233 let id = make_id("cas-conflict-small");
1234
1235 let tombstone = Tombstone {
1238 target: make_id("lt-object"),
1239 expiration_policy: ExpirationPolicy::Manual,
1240 };
1241 inner
1242 .compare_and_write(&id, None, TieredWrite::Tombstone(tombstone))
1243 .await
1244 .unwrap();
1245
1246 let lt = InMemoryBackend::new("lt");
1247 let hv = TestBackend::with_inner(inner, CasConflict);
1248 let log = NoopChangeLog;
1249 let storage = TieredStorage::new(Box::new(hv), Box::new(lt), Box::new(log));
1250
1251 storage
1254 .put_object(&id, &Metadata::default(), stream::single("tiny"))
1255 .await
1256 .unwrap();
1257 }
1258
1259 #[derive(Debug)]
1263 struct FailCas(bool);
1264
1265 #[async_trait::async_trait]
1266 impl Hooks for FailCas {
1267 async fn compare_and_write(
1268 &self,
1269 inner: &InMemoryBackend,
1270 id: &ObjectId,
1271 current: Option<&ObjectId>,
1272 write: TieredWrite,
1273 ) -> Result<bool> {
1274 if self.0 {
1275 inner.compare_and_write(id, current, write).await?;
1277 }
1278 Err(Error::Io(std::io::Error::new(
1279 std::io::ErrorKind::TimedOut,
1280 "simulated compare_and_write failure",
1281 )))
1282 }
1283 }
1284
1285 #[tokio::test]
1289 async fn no_orphan_when_tombstone_write_fails() {
1290 let lt = InMemoryBackend::new("lt");
1291 let hv = TestBackend::new(FailCas(false));
1292 let log = NoopChangeLog;
1293 let storage = TieredStorage::new(Box::new(hv), Box::new(lt.clone()), Box::new(log));
1294
1295 let id = make_id("orphan-test");
1296 let payload = vec![0xABu8; 2 * 1024 * 1024]; let result = storage
1298 .put_object(&id, &Metadata::default(), stream::single(payload))
1299 .await;
1300
1301 assert!(result.is_err());
1302
1303 storage.join().await;
1305
1306 assert!(lt.is_empty(), "long-term object not cleaned up");
1307 }
1308
1309 #[tokio::test]
1313 async fn orphan_tombstone_returns_none() {
1314 let (storage, hv, lt, _) = make_tiered_storage();
1315 let id = make_id("orphan-tombstone");
1316 let payload = vec![0xCDu8; 2 * 1024 * 1024]; storage
1319 .put_object(&id, &Metadata::default(), stream::single(payload))
1320 .await
1321 .unwrap();
1322
1323 let lt_id = hv.get(&id).expect_tombstone().target;
1325
1326 lt.remove(<_id);
1328
1329 assert!(
1330 storage.get_object(&id, None).await.unwrap().is_none(),
1331 "orphan tombstone should resolve to None on get_object"
1332 );
1333 assert!(
1334 storage.get_metadata(&id).await.unwrap().is_none(),
1335 "orphan tombstone should resolve to None on get_metadata"
1336 );
1337 }
1338
1339 #[tokio::test]
1344 async fn tombstone_target_is_used_for_reads_and_deletes() {
1345 let hv = InMemoryBackend::new("hv");
1346 let lt = InMemoryBackend::new("lt");
1347 let log = NoopChangeLog;
1348 let storage = TieredStorage::new(Box::new(hv.clone()), Box::new(lt.clone()), Box::new(log));
1349
1350 let hv_id = make_id("hv-key");
1351 let lt_id = make_id("lt-key");
1352 let payload = vec![0xABu8; 100];
1353
1354 lt.put_object(
1356 <_id,
1357 &Metadata::default(),
1358 stream::single(payload.clone()),
1359 )
1360 .await
1361 .unwrap();
1362 let tombstone = Tombstone {
1363 target: lt_id.clone(),
1364 expiration_policy: ExpirationPolicy::Manual,
1365 };
1366 hv.compare_and_write(&hv_id, None, TieredWrite::Tombstone(tombstone))
1367 .await
1368 .unwrap();
1369
1370 let (_, _, s) = storage.get_object(&hv_id, None).await.unwrap().unwrap();
1372 let body = stream::read_to_vec(s).await.unwrap();
1373 assert_eq!(body, payload);
1374
1375 storage.delete_object(&hv_id).await.unwrap();
1377 storage.join().await;
1378 assert!(!hv.contains(&hv_id), "tombstone should be removed");
1379 assert!(!lt.contains(<_id), "lt object should be removed");
1380 }
1381
1382 #[tokio::test]
1385 async fn multi_chunk_large_object_chains_buffered_and_remaining() {
1386 let (storage, hv, lt, _) = make_tiered_storage();
1387 let id = make_id("multi-chunk");
1388
1389 let chunk_size = 512 * 1024; let chunk_count = 4; let stream: ClientStream = futures_util::stream::iter(
1394 (0..chunk_count).map(move |i| Ok(Bytes::from(vec![i as u8; chunk_size]))),
1395 )
1396 .boxed();
1397
1398 storage
1399 .put_object(&id, &Metadata::default(), stream)
1400 .await
1401 .unwrap();
1402
1403 let lt_id = hv.get(&id).expect_tombstone().target;
1405 let (_, lt_bytes) = lt.get(<_id).expect_object();
1406 assert_eq!(lt_bytes.len(), chunk_size * chunk_count);
1407
1408 for i in 0..chunk_count {
1410 let offset = i * chunk_size;
1411 assert!(
1412 lt_bytes[offset..offset + chunk_size]
1413 .iter()
1414 .all(|&b| b == i as u8),
1415 "data mismatch in chunk {i}"
1416 );
1417 }
1418 }
1419
1420 #[tokio::test]
1426 async fn written_cleanup_after_lost_cas_response() {
1427 let (storage, hv, lt, log) = make_tiered_storage();
1428 let id = make_id("obj");
1429
1430 let payload = vec![0xAAu8; 2 * 1024 * 1024];
1432 storage
1433 .put_object(&id, &Metadata::default(), stream::single(payload.clone()))
1434 .await
1435 .unwrap();
1436 let tombstone1 = hv.get(&id).expect_tombstone().target;
1437
1438 let broken_storage = TieredStorage::new(
1440 Box::new(TestBackend::with_inner(hv.clone(), FailCas(true))),
1441 Box::new(lt.clone()),
1442 Box::new(log.clone()),
1443 );
1444 broken_storage
1445 .put_object(&id, &Metadata::default(), stream::single(payload.clone()))
1446 .await
1447 .unwrap_err(); let tombstone2 = hv.get(&id).expect_tombstone().target;
1449 assert_ne!(tombstone1, tombstone2);
1450
1451 broken_storage.join().await;
1453 lt.get(&tombstone1).expect_not_found();
1454 lt.get(&tombstone2).expect_object();
1455
1456 broken_storage.delete_object(&id).await.unwrap_err();
1458 hv.get(&id).expect_not_found();
1459 broken_storage.join().await;
1460 lt.get(&tombstone2).expect_not_found();
1461
1462 let id = make_id("obj2");
1464 storage
1465 .put_object(&id, &Metadata::default(), stream::single(payload.clone()))
1466 .await
1467 .unwrap();
1468 let tombstone3 = hv.get(&id).expect_tombstone().target;
1469
1470 broken_storage
1472 .put_object(&id, &Metadata::default(), stream::single(&b"small"[..]))
1473 .await
1474 .unwrap_err(); hv.get(&id).expect_object();
1476 broken_storage.join().await;
1477 lt.get(&tombstone3).expect_not_found();
1478 }
1479
1480 #[test]
1484 fn guard_dropped_outside_runtime_does_not_panic() {
1485 let manager = ChangeManager::new(
1486 Box::new(InMemoryBackend::new("hv")),
1487 Box::new(InMemoryBackend::new("lt")),
1488 Box::new(NoopChangeLog),
1489 );
1490
1491 let change = Change {
1492 id: make_id("object-key"),
1493 new: Some(make_id("cleanup-target")),
1494 old: None,
1495 cleanup_after: None,
1496 };
1497
1498 let guard = {
1501 let rt = tokio::runtime::Runtime::new().unwrap();
1502 rt.block_on(manager.record(change)).unwrap()
1503 };
1504
1505 drop(guard); }
1507
1508 #[tokio::test(start_paused = true)]
1513 async fn join_waits_for_cleanup_to_complete() {
1514 let (storage, _hv, _lt, _) = make_tiered_storage();
1515 let change = Change {
1516 id: make_id("object-key"),
1517 new: None,
1518 old: None,
1519 cleanup_after: None,
1520 };
1521 let mut guard = storage.record_change(change).await.unwrap();
1522
1523 tokio::spawn(async move {
1524 tokio::time::sleep(Duration::from_secs(10)).await;
1525 guard.advance(ChangePhase::Completed);
1526 drop(guard);
1527 });
1528
1529 let join_future = tokio::spawn(async move { storage.join().await });
1530
1531 tokio::time::sleep(Duration::from_secs(9)).await;
1532 assert!(!join_future.is_finished(), "finished before guard dropped");
1533
1534 tokio::time::sleep(Duration::from_secs(2)).await;
1535 assert!(join_future.is_finished(), "finish after guard drops");
1536 }
1537
1538 #[derive(Clone, Debug)]
1545 struct PauseAfterPut {
1546 paused: Arc<tokio::sync::Notify>,
1547 resume: Arc<tokio::sync::Notify>,
1548 }
1549
1550 #[async_trait::async_trait]
1551 impl Hooks for PauseAfterPut {
1552 async fn put_object(
1553 &self,
1554 inner: &InMemoryBackend,
1555 id: &ObjectId,
1556 metadata: &Metadata,
1557 stream: ClientStream,
1558 ) -> Result<PutResponse> {
1559 inner.put_object(id, metadata, stream).await?;
1560 self.paused.notify_one();
1561 self.resume.notified().await;
1562 Ok(())
1563 }
1564 }
1565
1566 #[tokio::test]
1569 async fn dropped_future_triggers_cleanup_and_log_entry_removed() {
1570 let paused = Arc::new(tokio::sync::Notify::new());
1571 let hooks = PauseAfterPut {
1572 paused: Arc::clone(&paused),
1573 resume: Arc::new(tokio::sync::Notify::new()),
1574 };
1575
1576 let lt_inner = InMemoryBackend::new("lt");
1577 let log = InMemoryChangeLog::default();
1578 let storage = TieredStorage::new(
1579 Box::new(InMemoryBackend::new("hv")),
1580 Box::new(TestBackend::with_inner(lt_inner.clone(), hooks)),
1581 Box::new(log.clone()),
1582 );
1583
1584 let id = make_id("drop-test");
1585 let metadata = Metadata::default();
1586 let payload = vec![0xABu8; 2 * 1024 * 1024]; tokio::select! {
1590 result = storage.put_object(&id, &metadata, stream::single(payload)) => {
1591 panic!("expected put to pause before completing, got: {result:?}");
1592 }
1593 _ = paused.notified() => {
1594 }
1596 }
1597
1598 storage.join().await;
1600
1601 assert!(lt_inner.is_empty(), "orphaned LT blob was not cleaned up");
1603
1604 let entries = log.scan().await.unwrap();
1606 assert!(
1607 entries.is_empty(),
1608 "changelog entry not removed after cleanup"
1609 );
1610 }
1611
1612 #[test]
1615 fn multipart_upload_id_roundtrip() {
1616 let id = TieredUploadId {
1617 revision: "my-key/01924a6f-7e28-7b9a-9c1d-abcdef123456".into(),
1618 upload_id: UploadId::new("upstream-upload-id-abc".into()).unwrap(),
1619 };
1620 let encoded: UploadId = id.clone().try_into().unwrap();
1621 let decoded: TieredUploadId = (&encoded.clone()).try_into().unwrap();
1622 assert_eq!(decoded, id);
1623 }
1624
1625 #[tokio::test]
1626 async fn multipart_single_part_roundtrip() {
1627 let (storage, hv, lt, _) = make_tiered_storage();
1628 let id = make_id("mp-single");
1629 let metadata = Metadata {
1630 content_type: "application/octet-stream".into(),
1631 expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_hours(1)),
1632 ..Metadata::default()
1633 };
1634 let payload = vec![0xABu8; 2 * 1024 * 1024]; let upload_id = storage.initiate_multipart(&id, &metadata).await.unwrap();
1637
1638 let etag = storage
1639 .upload_part(
1640 &id,
1641 &upload_id,
1642 NonZeroU32::new(1).unwrap(),
1643 payload.len() as u64,
1644 None,
1645 stream::single(payload.clone()),
1646 )
1647 .await
1648 .unwrap();
1649
1650 let error = storage
1651 .complete_multipart(
1652 &id,
1653 &upload_id,
1654 vec![CompletedPart {
1655 part_number: NonZeroU32::new(1).unwrap(),
1656 etag,
1657 }],
1658 )
1659 .await
1660 .unwrap();
1661 assert!(
1662 error.is_none(),
1663 "complete_multipart returned error: {error:?}"
1664 );
1665
1666 let (got_meta, _, s) = storage.get_object(&id, None).await.unwrap().unwrap();
1668 let body = stream::read_to_vec(s).await.unwrap();
1669 assert_eq!(body, payload);
1670 assert_eq!(got_meta.content_type, "application/octet-stream");
1671
1672 let tombstone = hv.get(&id).expect_tombstone();
1674 assert!(
1675 tombstone.target.key().starts_with(id.key()),
1676 "tombstone target should be a revision key"
1677 );
1678 lt.get(&tombstone.target).expect_object();
1679 }
1680
1681 #[tokio::test]
1682 async fn multipart_upload() {
1683 let (storage, _hv, _lt, _) = make_tiered_storage();
1684 let id = make_id("multipart");
1685
1686 let upload_id = storage
1687 .initiate_multipart(&id, &Metadata::default())
1688 .await
1689 .unwrap();
1690
1691 let part1 = vec![0xAAu8; 512 * 1024];
1692 let part2 = vec![0xBBu8; 512 * 1024];
1693 let part3 = vec![0xCCu8; 512 * 1024];
1694
1695 let etag3 = storage
1696 .upload_part(
1697 &id,
1698 &upload_id,
1699 NonZeroU32::new(3).unwrap(),
1700 part3.len() as u64,
1701 None,
1702 stream::single(part3.clone()),
1703 )
1704 .await
1705 .unwrap();
1706 let etag2 = storage
1707 .upload_part(
1708 &id,
1709 &upload_id,
1710 NonZeroU32::new(2).unwrap(),
1711 part2.len() as u64,
1712 None,
1713 stream::single(part2.clone()),
1714 )
1715 .await
1716 .unwrap();
1717 let etag1 = storage
1718 .upload_part(
1719 &id,
1720 &upload_id,
1721 NonZeroU32::new(1).unwrap(),
1722 part1.len() as u64,
1723 None,
1724 stream::single(part1.clone()),
1725 )
1726 .await
1727 .unwrap();
1728
1729 let error = storage
1730 .complete_multipart(
1731 &id,
1732 &upload_id,
1733 vec![
1734 CompletedPart {
1735 part_number: NonZeroU32::new(1).unwrap(),
1736 etag: etag1,
1737 },
1738 CompletedPart {
1739 part_number: NonZeroU32::new(2).unwrap(),
1740 etag: etag2,
1741 },
1742 CompletedPart {
1743 part_number: NonZeroU32::new(3).unwrap(),
1744 etag: etag3,
1745 },
1746 ],
1747 )
1748 .await
1749 .unwrap();
1750 assert!(error.is_none());
1751
1752 let (_, _, s) = storage.get_object(&id, None).await.unwrap().unwrap();
1753 let body = stream::read_to_vec(s).await.unwrap();
1754
1755 let mut expected = Vec::new();
1756 expected.extend_from_slice(&part1);
1757 expected.extend_from_slice(&part2);
1758 expected.extend_from_slice(&part3);
1759 assert_eq!(body, expected);
1760 }
1761
1762 #[tokio::test]
1763 async fn multipart_abort() {
1764 let (storage, hv, _lt, _) = make_tiered_storage();
1765 let id = make_id("mp-abort");
1766
1767 let upload_id = storage
1768 .initiate_multipart(&id, &Metadata::default())
1769 .await
1770 .unwrap();
1771
1772 let payload = vec![0xABu8; 100];
1774 storage
1775 .upload_part(
1776 &id,
1777 &upload_id,
1778 NonZeroU32::new(1).unwrap(),
1779 payload.len() as u64,
1780 None,
1781 stream::single(payload),
1782 )
1783 .await
1784 .unwrap();
1785
1786 storage.abort_multipart(&id, &upload_id).await.unwrap();
1787
1788 hv.get(&id).expect_not_found();
1790
1791 assert!(storage.get_object(&id, None).await.unwrap().is_none());
1793 }
1794
1795 #[tokio::test]
1796 async fn multipart_list_parts() {
1797 let (storage, _hv, _lt, _) = make_tiered_storage();
1798 let id = make_id("mp-list");
1799
1800 let upload_id = storage
1801 .initiate_multipart(&id, &Metadata::default())
1802 .await
1803 .unwrap();
1804
1805 let part1 = vec![0xAAu8; 100];
1806 let part2 = vec![0xBBu8; 200];
1807 storage
1808 .upload_part(
1809 &id,
1810 &upload_id,
1811 NonZeroU32::new(1).unwrap(),
1812 part1.len() as u64,
1813 None,
1814 stream::single(part1),
1815 )
1816 .await
1817 .unwrap();
1818 storage
1819 .upload_part(
1820 &id,
1821 &upload_id,
1822 NonZeroU32::new(2).unwrap(),
1823 part2.len() as u64,
1824 None,
1825 stream::single(part2),
1826 )
1827 .await
1828 .unwrap();
1829
1830 let resp = storage
1831 .list_parts(&id, &upload_id, None, None)
1832 .await
1833 .unwrap();
1834 assert_eq!(resp.parts.len(), 2);
1835 assert_eq!(resp.parts[0].part_number.get(), 1);
1836 assert_eq!(resp.parts[0].size, 100);
1837 assert_eq!(resp.parts[1].part_number.get(), 2);
1838 assert_eq!(resp.parts[1].size, 200);
1839 }
1840
1841 #[tokio::test]
1842 async fn multipart_overwrites_existing_tombstone() {
1843 let (storage, hv, lt, _) = make_tiered_storage();
1844 let id = make_id("mp-overwrite");
1845
1846 let payload1 = vec![0xAAu8; 2 * 1024 * 1024];
1848 storage
1849 .put_object(&id, &Metadata::default(), stream::single(payload1))
1850 .await
1851 .unwrap();
1852 let old_lt_id = hv.get(&id).expect_tombstone().target;
1853
1854 let upload_id = storage
1856 .initiate_multipart(&id, &Metadata::default())
1857 .await
1858 .unwrap();
1859
1860 let payload2 = vec![0xBBu8; 2 * 1024 * 1024];
1861 let etag = storage
1862 .upload_part(
1863 &id,
1864 &upload_id,
1865 NonZeroU32::new(1).unwrap(),
1866 payload2.len() as u64,
1867 None,
1868 stream::single(payload2.clone()),
1869 )
1870 .await
1871 .unwrap();
1872
1873 let lt_id = hv.get(&id).expect_tombstone().target;
1876 assert_eq!(old_lt_id, lt_id);
1877
1878 let error = storage
1879 .complete_multipart(
1880 &id,
1881 &upload_id,
1882 vec![CompletedPart {
1883 part_number: NonZeroU32::new(1).unwrap(),
1884 etag,
1885 }],
1886 )
1887 .await
1888 .unwrap();
1889 assert!(error.is_none());
1890
1891 let new_lt_id = hv.get(&id).expect_tombstone().target;
1893 assert_ne!(old_lt_id, new_lt_id);
1894
1895 storage.join().await;
1897
1898 lt.get(&old_lt_id).expect_not_found();
1900 lt.get(&new_lt_id).expect_object();
1901
1902 let (_, _, s) = storage.get_object(&id, None).await.unwrap().unwrap();
1904 let body = stream::read_to_vec(s).await.unwrap();
1905 assert_eq!(body, payload2);
1906 }
1907
1908 #[derive(Debug)]
1914 struct CompleteMultipartButReturnError;
1915
1916 #[async_trait::async_trait]
1917 impl Hooks for CompleteMultipartButReturnError {
1918 async fn complete_multipart(
1919 &self,
1920 inner: &InMemoryBackend,
1921 id: &ObjectId,
1922 upload_id: &UploadId,
1923 parts: Vec<CompletedPart>,
1924 ) -> Result<CompleteMultipartResponse> {
1925 inner
1926 .complete_multipart(id, upload_id, parts)
1927 .await
1928 .unwrap();
1929 Err(Error::Io(std::io::Error::new(
1930 std::io::ErrorKind::TimedOut,
1931 "simulated network error on complete_multipart",
1932 )))
1933 }
1934
1935 async fn get_metadata(
1936 &self,
1937 _inner: &InMemoryBackend,
1938 _id: &ObjectId,
1939 ) -> Result<MetadataResponse> {
1940 Err(Error::Io(std::io::Error::new(
1941 std::io::ErrorKind::TimedOut,
1942 "simulated network error on get_metadata",
1943 )))
1944 }
1945 }
1946
1947 #[tokio::test]
1951 async fn cleans_up_orphan_after_failed_multipart_complete() {
1952 let hv = InMemoryBackend::new("hv");
1953 let lt_inner = InMemoryBackend::new("lt");
1954 let log = InMemoryChangeLog::default();
1955 let storage = TieredStorage::new(
1956 Box::new(hv.clone()),
1957 Box::new(TestBackend::with_inner(
1958 lt_inner.clone(),
1959 CompleteMultipartButReturnError {},
1960 )),
1961 Box::new(log.clone()),
1962 );
1963
1964 let id = make_id("mp-orphan");
1965 let upload_id = storage
1966 .initiate_multipart(&id, &Metadata::default())
1967 .await
1968 .unwrap();
1969
1970 let tiered_id: TieredUploadId = (&upload_id).try_into().unwrap();
1971 let physical = ObjectId {
1972 context: id.context.clone(),
1973 key: tiered_id.revision,
1974 };
1975
1976 let payload = vec![0xABu8; 2 * 1024 * 1024];
1977 let etag = storage
1978 .upload_part(
1979 &id,
1980 &upload_id,
1981 NonZeroU32::new(1).unwrap(),
1982 payload.len() as u64,
1983 None,
1984 stream::single(payload),
1985 )
1986 .await
1987 .unwrap();
1988
1989 let result = storage
1990 .complete_multipart(
1991 &id,
1992 &upload_id,
1993 vec![CompletedPart {
1994 part_number: NonZeroU32::new(1).unwrap(),
1995 etag,
1996 }],
1997 )
1998 .await;
1999 assert!(result.is_err());
2000 storage.join().await;
2001
2002 lt_inner.get(&physical).expect_object();
2005 hv.get(&id).expect_not_found();
2006
2007 log.expire_all();
2009 let manager = ChangeManager::new(
2010 Box::new(hv.clone()),
2011 Box::new(lt_inner.clone()),
2012 Box::new(log.clone()),
2013 );
2014 manager.recover().await.unwrap();
2015
2016 lt_inner.get(&physical).expect_not_found();
2018 let remaining = log.scan().await.unwrap();
2020 assert!(remaining.is_empty());
2021 }
2022
2023 #[derive(Debug)]
2024 struct FailOnFirstCompleteMultipartAttempt {
2025 attempt: Mutex<u32>,
2026 }
2027
2028 impl FailOnFirstCompleteMultipartAttempt {
2029 fn new() -> Self {
2030 Self {
2031 attempt: Mutex::new(0),
2032 }
2033 }
2034 }
2035
2036 #[async_trait::async_trait]
2037 impl Hooks for FailOnFirstCompleteMultipartAttempt {
2038 async fn complete_multipart(
2039 &self,
2040 inner: &InMemoryBackend,
2041 id: &ObjectId,
2042 upload_id: &UploadId,
2043 parts: Vec<CompletedPart>,
2044 ) -> Result<CompleteMultipartResponse> {
2045 let mut attempt = self.attempt.lock().await;
2046 *attempt += 1;
2047 if *attempt == 1 {
2048 Err(Error::Io(std::io::Error::new(
2049 std::io::ErrorKind::TimedOut,
2050 "simulated network error",
2051 )))
2052 } else {
2053 Ok(inner
2054 .complete_multipart(id, upload_id, parts)
2055 .await
2056 .unwrap())
2057 }
2058 }
2059 }
2060
2061 #[tokio::test]
2066 async fn multipart_complete_succeeds_on_retry_and_leaves_state_consistent() {
2067 let hv = InMemoryBackend::new("hv");
2068 let lt_inner = InMemoryBackend::new("lt");
2069 let log = InMemoryChangeLog::default();
2070 let storage = TieredStorage::new(
2071 Box::new(hv.clone()),
2072 Box::new(TestBackend::with_inner(
2073 lt_inner.clone(),
2074 FailOnFirstCompleteMultipartAttempt::new(),
2075 )),
2076 Box::new(log.clone()),
2077 );
2078
2079 let id = make_id("mp-retry");
2080 let upload_id = storage
2081 .initiate_multipart(&id, &Metadata::default())
2082 .await
2083 .unwrap();
2084
2085 let tiered_id: TieredUploadId = (&upload_id).try_into().unwrap();
2086 let physical = ObjectId {
2087 context: id.context.clone(),
2088 key: tiered_id.revision,
2089 };
2090
2091 let payload = vec![0xABu8; 2 * 1024 * 1024];
2092 let etag = storage
2093 .upload_part(
2094 &id,
2095 &upload_id,
2096 NonZeroU32::new(1).unwrap(),
2097 payload.len() as u64,
2098 None,
2099 stream::single(payload.clone()),
2100 )
2101 .await
2102 .unwrap();
2103
2104 let result = storage
2106 .complete_multipart(
2107 &id,
2108 &upload_id,
2109 vec![CompletedPart {
2110 part_number: NonZeroU32::new(1).unwrap(),
2111 etag: etag.clone(),
2112 }],
2113 )
2114 .await;
2115 assert!(result.is_err());
2116 storage.join().await;
2117
2118 let result = storage
2120 .complete_multipart(
2121 &id,
2122 &upload_id,
2123 vec![CompletedPart {
2124 part_number: NonZeroU32::new(1).unwrap(),
2125 etag,
2126 }],
2127 )
2128 .await;
2129 assert!(result.is_ok());
2130 storage.join().await;
2131
2132 let (_, _, s) = storage.get_object(&id, None).await.unwrap().unwrap();
2134 let body = stream::read_to_vec(s).await.unwrap();
2135 assert_eq!(body, payload);
2136
2137 log.expire_all();
2139 let manager = ChangeManager::new(
2140 Box::new(hv.clone()),
2141 Box::new(lt_inner.clone()),
2142 Box::new(log.clone()),
2143 );
2144 manager.recover().await.unwrap();
2145
2146 lt_inner.get(&physical).expect_object();
2148 let tombstone = hv.get(&id).expect_tombstone();
2150 assert_eq!(tombstone.target, physical);
2151 let remaining = log.scan().await.unwrap();
2153 assert!(remaining.is_empty());
2154
2155 let (_, _, s) = storage.get_object(&id, None).await.unwrap().unwrap();
2157 let body = stream::read_to_vec(s).await.unwrap();
2158 assert_eq!(body, payload);
2159 }
2160
2161 #[derive(Debug)]
2162 struct FailOnFirstGetMetadataAttempt {
2163 attempt: Mutex<u32>,
2164 }
2165
2166 impl FailOnFirstGetMetadataAttempt {
2167 fn new() -> Self {
2168 Self {
2169 attempt: Mutex::new(0),
2170 }
2171 }
2172 }
2173
2174 #[async_trait::async_trait]
2175 impl Hooks for FailOnFirstGetMetadataAttempt {
2176 async fn get_metadata(
2177 &self,
2178 inner: &InMemoryBackend,
2179 id: &ObjectId,
2180 ) -> Result<MetadataResponse> {
2181 let mut attempt = self.attempt.lock().await;
2182 *attempt += 1;
2183 if *attempt == 1 {
2184 Err(Error::Io(std::io::Error::new(
2185 std::io::ErrorKind::TimedOut,
2186 "simulated network error",
2187 )))
2188 } else {
2189 inner.get_metadata(id).await
2190 }
2191 }
2192 }
2193
2194 #[tokio::test]
2201 async fn multipart_complete_succeeds_on_retry_if_get_metadata_errs_and_leaves_state_consistent()
2202 {
2203 let hv = InMemoryBackend::new("hv");
2204 let lt_inner = InMemoryBackend::new("lt");
2205 let log = InMemoryChangeLog::default();
2206 let storage = TieredStorage::new(
2207 Box::new(hv.clone()),
2208 Box::new(TestBackend::with_inner(
2209 lt_inner.clone(),
2210 FailOnFirstGetMetadataAttempt::new(),
2211 )),
2212 Box::new(log.clone()),
2213 );
2214
2215 let id = make_id("mp-retry-meta");
2216 let upload_id = storage
2217 .initiate_multipart(&id, &Metadata::default())
2218 .await
2219 .unwrap();
2220
2221 let tiered_id: TieredUploadId = (&upload_id).try_into().unwrap();
2222 let physical = ObjectId {
2223 context: id.context.clone(),
2224 key: tiered_id.revision,
2225 };
2226
2227 let payload = vec![0xABu8; 2 * 1024 * 1024];
2228 let etag = storage
2229 .upload_part(
2230 &id,
2231 &upload_id,
2232 NonZeroU32::new(1).unwrap(),
2233 payload.len() as u64,
2234 None,
2235 stream::single(payload.clone()),
2236 )
2237 .await
2238 .unwrap();
2239
2240 let result = storage
2243 .complete_multipart(
2244 &id,
2245 &upload_id,
2246 vec![CompletedPart {
2247 part_number: NonZeroU32::new(1).unwrap(),
2248 etag: etag.clone(),
2249 }],
2250 )
2251 .await;
2252 assert!(result.is_err());
2253 storage.join().await;
2254
2255 let result = storage
2257 .complete_multipart(
2258 &id,
2259 &upload_id,
2260 vec![CompletedPart {
2261 part_number: NonZeroU32::new(1).unwrap(),
2262 etag,
2263 }],
2264 )
2265 .await;
2266 assert!(result.is_ok());
2267 storage.join().await;
2268
2269 let (_, _, s) = storage.get_object(&id, None).await.unwrap().unwrap();
2271 let body = stream::read_to_vec(s).await.unwrap();
2272 assert_eq!(body, payload);
2273
2274 log.expire_all();
2276 let manager = ChangeManager::new(
2277 Box::new(hv.clone()),
2278 Box::new(lt_inner.clone()),
2279 Box::new(log.clone()),
2280 );
2281 manager.recover().await.unwrap();
2282
2283 lt_inner.get(&physical).expect_object();
2285 let tombstone = hv.get(&id).expect_tombstone();
2287 assert_eq!(tombstone.target, physical);
2288 let remaining = log.scan().await.unwrap();
2290 assert!(remaining.is_empty());
2291
2292 let (_, _, s) = storage.get_object(&id, None).await.unwrap().unwrap();
2294 let body = stream::read_to_vec(s).await.unwrap();
2295 assert_eq!(body, payload);
2296 }
2297}