1use std::sync::Arc;
101use std::sync::atomic::{AtomicU64, Ordering};
102use std::time::Instant;
103
104use bytes::Bytes;
105use futures_util::{Stream, StreamExt};
106use objectstore_types::metadata::Metadata;
107use serde::{Deserialize, Serialize};
108
109use crate::backend::changelog::{Change, ChangeGuard, ChangeLog, ChangeManager, ChangePhase};
110use crate::backend::common::{
111 Backend, DeleteResponse, GetResponse, HighVolumeBackend, MetadataResponse, PutResponse,
112 TieredGet, TieredMetadata, TieredWrite, Tombstone,
113};
114use crate::backend::{HighVolumeStorageConfig, StorageConfig};
115use crate::error::Result;
116use crate::id::ObjectId;
117use crate::stream::{ClientStream, SizedPeek};
118
119const BACKEND_SIZE_THRESHOLD: usize = 1024 * 1024; fn new_long_term_revision(id: &ObjectId) -> ObjectId {
127 ObjectId {
128 context: id.context.clone(),
129 key: format!("{}/{}", id.key, uuid::Uuid::now_v7()),
130 }
131}
132
133#[derive(Debug, Clone, Deserialize, Serialize)]
154pub struct TieredStorageConfig {
155 pub high_volume: HighVolumeStorageConfig,
160 pub long_term: Box<StorageConfig>,
162}
163
164#[derive(Debug)]
214pub struct TieredStorage {
215 inner: Arc<ChangeManager>,
216}
217
218impl TieredStorage {
219 pub fn new(
221 high_volume: Box<dyn HighVolumeBackend>,
222 long_term: Box<dyn Backend>,
223 changelog: Box<dyn ChangeLog>,
224 ) -> Self {
225 let inner = ChangeManager::new(high_volume, long_term, changelog);
226 tokio::spawn(inner.clone().recover());
229 Self { inner }
230 }
231
232 async fn record_change(&self, change: Change) -> Result<ChangeGuard> {
234 self.inner.clone().record(change).await
235 }
236
237 fn backend_type(&self, choice: &BackendChoice) -> &'static str {
239 match choice {
240 BackendChoice::HighVolume => self.inner.high_volume.name(),
241 BackendChoice::LongTerm => self.inner.long_term.name(),
242 }
243 }
244
245 async fn put_high_volume(
250 &self,
251 id: &ObjectId,
252 metadata: &Metadata,
253 payload: Bytes,
254 ) -> Result<()> {
255 let tombstone_opt = self
256 .inner
257 .high_volume
258 .put_non_tombstone(id, metadata, payload.clone())
259 .await?;
260
261 let Some(Tombstone { target, .. }) = tombstone_opt else {
262 return Ok(());
264 };
265
266 let mut guard = self
268 .record_change(Change {
269 id: id.clone(),
270 new: None,
271 old: Some(target.clone()),
272 })
273 .await?;
274
275 let write = TieredWrite::Object(metadata.clone(), payload);
276 guard.advance(ChangePhase::Written);
277
278 let written = self
279 .inner
280 .high_volume
281 .compare_and_write(id, Some(&target), write)
282 .await?;
283
284 guard.advance(ChangePhase::compare_and_write(written));
286
287 Ok(())
288 }
289
290 async fn put_long_term(
295 &self,
296 id: &ObjectId,
297 metadata: &Metadata,
298 stream: ClientStream,
299 ) -> Result<()> {
300 let current = match self.inner.high_volume.get_tiered_metadata(id).await? {
302 TieredMetadata::Tombstone(t) => Some(t.target),
303 _ => None,
304 };
305
306 let new = new_long_term_revision(id);
308 let mut guard = self
309 .record_change(Change {
310 id: id.clone(),
311 new: Some(new.clone()),
312 old: current.clone(),
313 })
314 .await?;
315
316 self.inner
317 .long_term
318 .put_object(&new, metadata, stream)
319 .await?;
320 guard.advance(ChangePhase::Written);
321
322 let tombstone = Tombstone {
324 target: new.clone(),
325 expiration_policy: metadata.expiration_policy,
326 };
327 let written = self
328 .inner
329 .high_volume
330 .compare_and_write(id, current.as_ref(), TieredWrite::Tombstone(tombstone))
331 .await?;
332
333 guard.advance(ChangePhase::compare_and_write(written));
335
336 Ok(())
337 }
338}
339
340#[async_trait::async_trait]
341impl Backend for TieredStorage {
342 fn name(&self) -> &'static str {
343 "tiered"
344 }
345
346 async fn put_object(
347 &self,
348 id: &ObjectId,
349 metadata: &Metadata,
350 stream: ClientStream,
351 ) -> Result<PutResponse> {
352 let start = Instant::now();
353 if metadata.origin.is_none() {
354 objectstore_metrics::count!("put.origin_missing", usecase = id.usecase().to_owned());
355 }
356
357 let peeked = SizedPeek::new(stream, BACKEND_SIZE_THRESHOLD).await?;
358 objectstore_metrics::record!(
359 "put.first_chunk.latency" = start.elapsed(),
360 usecase = id.usecase().to_owned(),
361 complete = if peeked.is_exhausted() { "yes" } else { "no" },
362 );
363
364 let (backend_choice, stored_size) = if peeked.is_exhausted() {
365 let payload = peeked.into_bytes().await?;
366 let payload_len = payload.len() as u64;
367 self.put_high_volume(id, metadata, payload).await?;
368 (BackendChoice::HighVolume, payload_len)
369 } else {
370 let (stored_size, stream) = counting_stream(peeked.into_stream());
371 self.put_long_term(id, metadata, stream.boxed()).await?;
372 (BackendChoice::LongTerm, stored_size.load(Ordering::Acquire))
373 };
374
375 let backend_ty = self.backend_type(&backend_choice);
376 objectstore_metrics::record!(
377 "put.latency" = start.elapsed(),
378 usecase = id.usecase().to_owned(),
379 backend_choice = backend_choice.as_str(),
380 backend_type = backend_ty,
381 );
382 objectstore_metrics::record!(
383 "put.size" = stored_size,
384 usecase = id.usecase().to_owned(),
385 backend_choice = backend_choice.as_str(),
386 backend_type = backend_ty,
387 );
388
389 Ok(())
390 }
391
392 async fn get_object(&self, id: &ObjectId) -> Result<GetResponse> {
393 let start = Instant::now();
394
395 let hv_result = self.inner.high_volume.get_tiered_object(id).await?;
396 let (result, backend_choice) = match hv_result {
397 TieredGet::NotFound => (None, BackendChoice::HighVolume),
398 TieredGet::Object(metadata, stream) => {
399 (Some((metadata, stream)), BackendChoice::HighVolume)
400 }
401 TieredGet::Tombstone(tombstone) => (
402 self.inner.long_term.get_object(&tombstone.target).await?,
403 BackendChoice::LongTerm,
404 ),
405 };
406
407 let backend_type = self.backend_type(&backend_choice);
408 objectstore_metrics::record!(
409 "get.latency.pre-response" = start.elapsed(),
410 usecase = id.usecase().to_owned(),
411 backend_choice = backend_choice.as_str(),
412 backend_type = backend_type,
413 );
414
415 if let Some((ref metadata, _)) = result {
416 if let Some(size) = metadata.size {
417 objectstore_metrics::record!(
418 "get.size" = size,
419 usecase = id.usecase().to_owned(),
420 backend_choice = backend_choice.as_str(),
421 backend_type = backend_type,
422 );
423 } else {
424 objectstore_log::warn!(backend_type, "Missing object size");
425 }
426 }
427
428 Ok(result)
429 }
430
431 async fn get_metadata(&self, id: &ObjectId) -> Result<MetadataResponse> {
432 let start = Instant::now();
433
434 let hv_result = self.inner.high_volume.get_tiered_metadata(id).await?;
435 let (result, backend_choice) = match hv_result {
436 TieredMetadata::NotFound => (None, BackendChoice::HighVolume),
437 TieredMetadata::Object(metadata) => (Some(metadata), BackendChoice::HighVolume),
438 TieredMetadata::Tombstone(tombstone) => (
439 self.inner.long_term.get_metadata(&tombstone.target).await?,
440 BackendChoice::LongTerm,
441 ),
442 };
443
444 objectstore_metrics::record!(
445 "head.latency" = start.elapsed(),
446 usecase = id.usecase().to_owned(),
447 backend_choice = backend_choice.as_str(),
448 backend_type = self.backend_type(&backend_choice),
449 );
450
451 Ok(result)
452 }
453
454 async fn delete_object(&self, id: &ObjectId) -> Result<DeleteResponse> {
455 let start = Instant::now();
456
457 let mut backend_choice = BackendChoice::HighVolume;
458
459 if let Some(tombstone) = self.inner.high_volume.delete_non_tombstone(id).await? {
460 backend_choice = BackendChoice::LongTerm;
461
462 let mut guard = self
463 .record_change(Change {
464 id: id.clone(),
465 new: None,
466 old: Some(tombstone.target.clone()),
467 })
468 .await?;
469 guard.advance(ChangePhase::Written);
470
471 let deleted = self
473 .inner
474 .high_volume
475 .compare_and_write(id, Some(&tombstone.target), TieredWrite::Delete)
476 .await?;
477
478 guard.advance(ChangePhase::compare_and_write(deleted));
480 }
481
482 objectstore_metrics::record!(
483 "delete.latency" = start.elapsed(),
484 usecase = id.usecase().to_owned(),
485 backend_choice = backend_choice.as_str(),
486 backend_type = self.backend_type(&backend_choice),
487 );
488
489 Ok(())
490 }
491
492 async fn join(&self) {
493 self.inner.tracker.close();
494 tokio::join!(
495 self.inner.high_volume.join(),
496 self.inner.long_term.join(),
497 self.inner.tracker.wait()
498 );
499 }
500}
501
502#[derive(Debug)]
503enum BackendChoice {
504 HighVolume,
505 LongTerm,
506}
507
508impl BackendChoice {
509 fn as_str(&self) -> &'static str {
510 match self {
511 BackendChoice::HighVolume => "high-volume",
512 BackendChoice::LongTerm => "long-term",
513 }
514 }
515}
516
517impl std::fmt::Display for BackendChoice {
518 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
519 f.write_str(self.as_str())
520 }
521}
522
523fn counting_stream<S, E>(stream: S) -> (Arc<AtomicU64>, impl Stream<Item = Result<Bytes, E>>)
528where
529 S: Stream<Item = Result<Bytes, E>>,
530{
531 let counter = Arc::new(AtomicU64::new(0));
532
533 (
534 counter.clone(),
535 stream.inspect(move |res| {
536 if let Ok(chunk) = res {
537 counter.fetch_add(chunk.len() as u64, Ordering::Relaxed);
538 }
539 }),
540 )
541}
542
543#[cfg(test)]
544mod tests {
545 use std::time::Duration;
546
547 use objectstore_types::metadata::{ExpirationPolicy, Metadata};
548 use objectstore_types::scope::{Scope, Scopes};
549
550 use super::*;
551 use crate::backend::changelog::{InMemoryChangeLog, NoopChangeLog};
552 use crate::backend::in_memory::InMemoryBackend;
553 use crate::backend::testing::{Hooks, TestBackend};
554 use crate::error::Error;
555 use crate::id::ObjectContext;
556 use crate::stream::{self, ClientStream};
557
558 fn make_context() -> ObjectContext {
559 ObjectContext {
560 usecase: "testing".into(),
561 scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
562 }
563 }
564
565 fn make_id(key: &str) -> ObjectId {
566 ObjectId::new(make_context(), key.into())
567 }
568
569 fn make_tiered_storage() -> (
570 TieredStorage,
571 InMemoryBackend,
572 InMemoryBackend,
573 InMemoryChangeLog,
574 ) {
575 let hv = InMemoryBackend::new("in-memory-hv");
576 let lt = InMemoryBackend::new("in-memory-lt");
577 let changelog = InMemoryChangeLog::default();
578 let storage = TieredStorage::new(
579 Box::new(hv.clone()),
580 Box::new(lt.clone()),
581 Box::new(changelog.clone()),
582 );
583 (storage, hv, lt, changelog)
584 }
585
586 #[test]
589 fn revision_id_preserves_context() {
590 let id = make_id("my-key");
591 let revised = new_long_term_revision(&id);
592 assert_eq!(revised.context, id.context);
593 assert!(
594 revised.key.starts_with("my-key/"),
595 "revised key should have /<uuid> suffix, got: {}",
596 revised.key
597 );
598 }
599
600 #[test]
601 fn revision_id_roundtrips_storage_path() {
602 let id = make_id("original");
603 let revised = new_long_term_revision(&id);
604 let path = revised.as_storage_path().to_string();
605 let parsed = ObjectId::from_storage_path(&path)
606 .unwrap_or_else(|| panic!("failed to parse '{path}'"));
607 assert_eq!(parsed, revised);
608 }
609
610 #[test]
611 fn revision_id_is_unique() {
612 let id = make_id("base-key");
613 let a = new_long_term_revision(&id);
614 let b = new_long_term_revision(&id);
615 assert_ne!(a.key, b.key, "two calls should produce different keys");
616 }
617
618 #[tokio::test]
621 async fn get_nonexistent_returns_none() {
622 let (storage, _hv, _lt, _) = make_tiered_storage();
623 let id = make_id("does-not-exist");
624
625 assert!(storage.get_object(&id).await.unwrap().is_none());
626 assert!(storage.get_metadata(&id).await.unwrap().is_none());
627 }
628
629 #[tokio::test]
630 async fn delete_nonexistent_succeeds() {
631 let (storage, _hv, _lt, _) = make_tiered_storage();
632 let id = make_id("does-not-exist");
633
634 storage.delete_object(&id).await.unwrap();
635 }
636
637 #[tokio::test]
640 async fn put_small_object_stores_inline() {
641 let (storage, hv, lt, _) = make_tiered_storage();
642 let id = make_id("small");
643 let payload = b"small payload".to_vec();
644
645 storage
646 .put_object(&id, &Default::default(), stream::single(payload.clone()))
647 .await
648 .unwrap();
649
650 assert!(hv.contains(&id), "expected in high-volume");
651 assert!(!lt.contains(&id), "leaked to long-term");
652
653 let (_, s) = storage.get_object(&id).await.unwrap().unwrap();
654 let body = stream::read_to_vec(s).await.unwrap();
655 assert_eq!(body, payload);
656
657 assert!(
658 storage.get_metadata(&id).await.unwrap().is_some(),
659 "get_metadata should return metadata for inline objects"
660 );
661 }
662
663 #[tokio::test]
664 async fn put_large_object_creates_tombstone() {
665 let (storage, hv, lt, _) = make_tiered_storage();
666 let id = make_id("large");
667 let payload = vec![0xCDu8; 2 * 1024 * 1024]; let metadata_in = Metadata {
669 content_type: "image/png".into(),
670 expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_secs(3600)),
671 origin: Some("10.0.0.1".into()),
672 ..Default::default()
673 };
674
675 storage
676 .put_object(&id, &metadata_in, stream::single(payload.clone()))
677 .await
678 .unwrap();
679
680 let tombstone = hv.get(&id).expect_tombstone();
682 assert_eq!(tombstone.expiration_policy, metadata_in.expiration_policy);
683 let lt_id = tombstone.target;
684 assert!(
685 lt_id.key().starts_with(id.key()),
686 "tombstone target key should be a revision of the HV key, got: {}",
687 lt_id.key()
688 );
689
690 let (lt_meta, _) = lt.get(<_id).expect_object();
692 assert_eq!(lt_meta.content_type, "image/png");
693 assert_eq!(lt_meta.expiration_policy, metadata_in.expiration_policy);
694
695 let (_, s) = storage.get_object(&id).await.unwrap().unwrap();
697 let body = stream::read_to_vec(s).await.unwrap();
698 assert_eq!(body, payload);
699
700 let metadata = storage.get_metadata(&id).await.unwrap().unwrap();
702 assert_eq!(metadata.content_type, "image/png");
703 }
704
705 #[tokio::test]
708 async fn reinsert_small_over_large_swaps_to_inline() {
709 let (storage, hv, lt, _) = make_tiered_storage();
710 let id = make_id("reinsert-key");
711
712 let large_payload = vec![0xABu8; 2 * 1024 * 1024];
714 storage
715 .put_object(&id, &Default::default(), stream::single(large_payload))
716 .await
717 .unwrap();
718
719 let lt_id = hv.get(&id).expect_tombstone().target;
720
721 let small_payload = vec![0xCDu8; 100]; storage
725 .put_object(&id, &Default::default(), stream::single(small_payload))
726 .await
727 .unwrap();
728
729 hv.get(&id).expect_object();
731
732 storage.join().await;
734
735 lt.get(<_id).expect_not_found();
737 }
738
739 #[tokio::test]
740 async fn overwrite_large_with_large_replaces_revision() {
741 let (storage, hv, lt, _) = make_tiered_storage();
742 let id = make_id("overwrite-large");
743
744 let payload1 = vec![0xAAu8; 2 * 1024 * 1024];
745 storage
746 .put_object(&id, &Default::default(), stream::single(payload1))
747 .await
748 .unwrap();
749 let lt_id_1 = hv.get(&id).expect_tombstone().target;
750
751 let payload2 = vec![0xBBu8; 2 * 1024 * 1024];
752 storage
753 .put_object(&id, &Default::default(), stream::single(payload2.clone()))
754 .await
755 .unwrap();
756 let lt_id_2 = hv.get(&id).expect_tombstone().target;
757
758 assert_ne!(
759 lt_id_1, lt_id_2,
760 "second write should create a new revision"
761 );
762
763 storage.join().await;
765
766 lt.get(<_id_1).expect_not_found();
767 lt.get(<_id_2).expect_object();
768
769 let (_, s) = storage.get_object(&id).await.unwrap().unwrap();
770 let body = stream::read_to_vec(s).await.unwrap();
771 assert_eq!(body, payload2);
772 }
773
774 #[tokio::test]
777 async fn delete_small_object() {
778 let (storage, hv, _lt, _) = make_tiered_storage();
779 let id = make_id("delete-small");
780
781 storage
782 .put_object(&id, &Default::default(), stream::single("tiny"))
783 .await
784 .unwrap();
785
786 storage.delete_object(&id).await.unwrap();
787
788 hv.get(&id).expect_not_found();
789 assert!(storage.get_object(&id).await.unwrap().is_none());
790 }
791
792 #[tokio::test]
793 async fn delete_large_object_cleans_up_both_backends() {
794 let (storage, hv, lt, _) = make_tiered_storage();
795 let id = make_id("delete-both");
796 let payload = vec![0u8; 2 * 1024 * 1024]; storage
799 .put_object(&id, &Default::default(), stream::single(payload))
800 .await
801 .unwrap();
802
803 let lt_id = hv.get(&id).expect_tombstone().target;
805
806 storage.delete_object(&id).await.unwrap();
807
808 storage.join().await;
810
811 assert!(!hv.contains(&id), "tombstone not cleaned up");
812 assert!(!lt.contains(<_id), "long-term object not cleaned up");
813 }
814
815 #[derive(Debug)]
816 struct FailDelete;
817
818 #[async_trait::async_trait]
819 impl Hooks for FailDelete {
820 async fn delete_object(
821 &self,
822 _inner: &InMemoryBackend,
823 _id: &ObjectId,
824 ) -> Result<DeleteResponse> {
825 Err(Error::Io(std::io::Error::new(
826 std::io::ErrorKind::ConnectionRefused,
827 "simulated long-term delete failure",
828 )))
829 }
830 }
831
832 #[tokio::test]
836 async fn delete_succeeds_when_gcs_cleanup_fails() {
837 let hv = InMemoryBackend::new("hv");
838 let lt = TestBackend::new(FailDelete);
839 let log = NoopChangeLog;
840 let storage = TieredStorage::new(Box::new(hv.clone()), Box::new(lt), Box::new(log));
841
842 let id = make_id("fail-delete");
843 let payload = vec![0xABu8; 2 * 1024 * 1024]; storage
845 .put_object(&id, &Default::default(), stream::single(payload))
846 .await
847 .unwrap();
848
849 let result = storage.delete_object(&id).await;
851 assert!(
852 result.is_ok(),
853 "delete should succeed despite GCS cleanup failure"
854 );
855
856 hv.get(&id).expect_not_found();
858
859 assert!(
861 storage.get_object(&id).await.unwrap().is_none(),
862 "object should be unreachable after tombstone is deleted"
863 );
864 }
865
866 #[derive(Debug)]
869 struct CasConflict;
870
871 #[async_trait::async_trait]
872 impl Hooks for CasConflict {
873 async fn compare_and_write(
874 &self,
875 _inner: &InMemoryBackend,
876 _id: &ObjectId,
877 _current: Option<&ObjectId>,
878 _write: TieredWrite,
879 ) -> Result<bool> {
880 Ok(false) }
882 }
883
884 #[tokio::test]
888 async fn put_large_cas_conflict_cleans_up_new_blob() {
889 let hv = TestBackend::new(CasConflict);
890 let lt = InMemoryBackend::new("lt");
891 let log = NoopChangeLog;
892 let storage = TieredStorage::new(Box::new(hv), Box::new(lt.clone()), Box::new(log));
893
894 let id = make_id("cas-conflict-large");
895 let payload = vec![0xABu8; 2 * 1024 * 1024]; storage
898 .put_object(&id, &Default::default(), stream::single(payload))
899 .await
900 .unwrap();
901
902 storage.join().await;
904
905 assert!(
906 lt.is_empty(),
907 "LT blob should be cleaned up after CAS conflict"
908 );
909 }
910
911 #[tokio::test]
915 async fn put_small_over_tombstone_cas_conflict_succeeds() {
916 let inner = InMemoryBackend::new("hv");
917 let id = make_id("cas-conflict-small");
918
919 let tombstone = Tombstone {
922 target: make_id("lt-object"),
923 expiration_policy: ExpirationPolicy::Manual,
924 };
925 inner
926 .compare_and_write(&id, None, TieredWrite::Tombstone(tombstone))
927 .await
928 .unwrap();
929
930 let lt = InMemoryBackend::new("lt");
931 let hv = TestBackend::with_inner(inner, CasConflict);
932 let log = NoopChangeLog;
933 let storage = TieredStorage::new(Box::new(hv), Box::new(lt), Box::new(log));
934
935 storage
938 .put_object(&id, &Default::default(), stream::single("tiny"))
939 .await
940 .unwrap();
941 }
942
943 #[derive(Debug)]
947 struct FailCas(bool);
948
949 #[async_trait::async_trait]
950 impl Hooks for FailCas {
951 async fn compare_and_write(
952 &self,
953 inner: &InMemoryBackend,
954 id: &ObjectId,
955 current: Option<&ObjectId>,
956 write: TieredWrite,
957 ) -> Result<bool> {
958 if self.0 {
959 inner.compare_and_write(id, current, write).await?;
961 }
962 Err(Error::Io(std::io::Error::new(
963 std::io::ErrorKind::TimedOut,
964 "simulated compare_and_write failure",
965 )))
966 }
967 }
968
969 #[tokio::test]
973 async fn no_orphan_when_tombstone_write_fails() {
974 let lt = InMemoryBackend::new("lt");
975 let hv = TestBackend::new(FailCas(false));
976 let log = NoopChangeLog;
977 let storage = TieredStorage::new(Box::new(hv), Box::new(lt.clone()), Box::new(log));
978
979 let id = make_id("orphan-test");
980 let payload = vec![0xABu8; 2 * 1024 * 1024]; let result = storage
982 .put_object(&id, &Default::default(), stream::single(payload))
983 .await;
984
985 assert!(result.is_err());
986
987 storage.join().await;
989
990 assert!(lt.is_empty(), "long-term object not cleaned up");
991 }
992
993 #[tokio::test]
997 async fn orphan_tombstone_returns_none() {
998 let (storage, hv, lt, _) = make_tiered_storage();
999 let id = make_id("orphan-tombstone");
1000 let payload = vec![0xCDu8; 2 * 1024 * 1024]; storage
1003 .put_object(&id, &Default::default(), stream::single(payload))
1004 .await
1005 .unwrap();
1006
1007 let lt_id = hv.get(&id).expect_tombstone().target;
1009
1010 lt.remove(<_id);
1012
1013 assert!(
1014 storage.get_object(&id).await.unwrap().is_none(),
1015 "orphan tombstone should resolve to None on get_object"
1016 );
1017 assert!(
1018 storage.get_metadata(&id).await.unwrap().is_none(),
1019 "orphan tombstone should resolve to None on get_metadata"
1020 );
1021 }
1022
1023 #[tokio::test]
1028 async fn tombstone_target_is_used_for_reads_and_deletes() {
1029 let hv = InMemoryBackend::new("hv");
1030 let lt = InMemoryBackend::new("lt");
1031 let log = NoopChangeLog;
1032 let storage = TieredStorage::new(Box::new(hv.clone()), Box::new(lt.clone()), Box::new(log));
1033
1034 let hv_id = make_id("hv-key");
1035 let lt_id = make_id("lt-key");
1036 let payload = vec![0xABu8; 100];
1037
1038 lt.put_object(<_id, &Default::default(), stream::single(payload.clone()))
1040 .await
1041 .unwrap();
1042 let tombstone = Tombstone {
1043 target: lt_id.clone(),
1044 expiration_policy: ExpirationPolicy::Manual,
1045 };
1046 hv.compare_and_write(&hv_id, None, TieredWrite::Tombstone(tombstone))
1047 .await
1048 .unwrap();
1049
1050 let (_, s) = storage.get_object(&hv_id).await.unwrap().unwrap();
1052 let body = stream::read_to_vec(s).await.unwrap();
1053 assert_eq!(body, payload);
1054
1055 storage.delete_object(&hv_id).await.unwrap();
1057 storage.join().await;
1058 assert!(!hv.contains(&hv_id), "tombstone should be removed");
1059 assert!(!lt.contains(<_id), "lt object should be removed");
1060 }
1061
1062 #[tokio::test]
1065 async fn multi_chunk_large_object_chains_buffered_and_remaining() {
1066 let (storage, hv, lt, _) = make_tiered_storage();
1067 let id = make_id("multi-chunk");
1068
1069 let chunk_size = 512 * 1024; let chunk_count = 4; let stream: ClientStream = futures_util::stream::iter(
1074 (0..chunk_count).map(move |i| Ok(bytes::Bytes::from(vec![i as u8; chunk_size]))),
1075 )
1076 .boxed();
1077
1078 storage
1079 .put_object(&id, &Default::default(), stream)
1080 .await
1081 .unwrap();
1082
1083 let lt_id = hv.get(&id).expect_tombstone().target;
1085 let (_, lt_bytes) = lt.get(<_id).expect_object();
1086 assert_eq!(lt_bytes.len(), chunk_size * chunk_count);
1087
1088 for i in 0..chunk_count {
1090 let offset = i * chunk_size;
1091 assert!(
1092 lt_bytes[offset..offset + chunk_size]
1093 .iter()
1094 .all(|&b| b == i as u8),
1095 "data mismatch in chunk {i}"
1096 );
1097 }
1098 }
1099
1100 #[tokio::test]
1106 async fn written_cleanup_after_lost_cas_response() {
1107 let (storage, hv, lt, log) = make_tiered_storage();
1108 let id = make_id("obj");
1109
1110 let payload = vec![0xAAu8; 2 * 1024 * 1024];
1112 storage
1113 .put_object(&id, &Default::default(), stream::single(payload.clone()))
1114 .await
1115 .unwrap();
1116 let tombstone1 = hv.get(&id).expect_tombstone().target;
1117
1118 let broken_storage = TieredStorage::new(
1120 Box::new(TestBackend::with_inner(hv.clone(), FailCas(true))),
1121 Box::new(lt.clone()),
1122 Box::new(log.clone()),
1123 );
1124 broken_storage
1125 .put_object(&id, &Default::default(), stream::single(payload.clone()))
1126 .await
1127 .unwrap_err(); let tombstone2 = hv.get(&id).expect_tombstone().target;
1129 assert_ne!(tombstone1, tombstone2);
1130
1131 broken_storage.join().await;
1133 lt.get(&tombstone1).expect_not_found();
1134 lt.get(&tombstone2).expect_object();
1135
1136 broken_storage.delete_object(&id).await.unwrap_err();
1138 hv.get(&id).expect_not_found();
1139 broken_storage.join().await;
1140 lt.get(&tombstone2).expect_not_found();
1141
1142 let id = make_id("obj2");
1144 storage
1145 .put_object(&id, &Default::default(), stream::single(payload.clone()))
1146 .await
1147 .unwrap();
1148 let tombstone3 = hv.get(&id).expect_tombstone().target;
1149
1150 broken_storage
1152 .put_object(&id, &Default::default(), stream::single(&b"small"[..]))
1153 .await
1154 .unwrap_err(); hv.get(&id).expect_object();
1156 broken_storage.join().await;
1157 lt.get(&tombstone3).expect_not_found();
1158 }
1159
1160 #[test]
1164 fn guard_dropped_outside_runtime_does_not_panic() {
1165 let manager = ChangeManager::new(
1166 Box::new(InMemoryBackend::new("hv")),
1167 Box::new(InMemoryBackend::new("lt")),
1168 Box::new(NoopChangeLog),
1169 );
1170
1171 let change = Change {
1172 id: make_id("object-key"),
1173 new: Some(make_id("cleanup-target")),
1174 old: None,
1175 };
1176
1177 let guard = {
1180 let rt = tokio::runtime::Runtime::new().unwrap();
1181 rt.block_on(manager.record(change)).unwrap()
1182 };
1183
1184 drop(guard); }
1186
1187 #[tokio::test(start_paused = true)]
1192 async fn join_waits_for_cleanup_to_complete() {
1193 let (storage, _hv, _lt, _) = make_tiered_storage();
1194 let change = Change {
1195 id: make_id("object-key"),
1196 new: None,
1197 old: None,
1198 };
1199 let mut guard = storage.record_change(change).await.unwrap();
1200
1201 tokio::spawn(async move {
1202 tokio::time::sleep(Duration::from_secs(10)).await;
1203 guard.advance(ChangePhase::Completed);
1204 drop(guard);
1205 });
1206
1207 let join_future = tokio::spawn(async move { storage.join().await });
1208
1209 tokio::time::sleep(Duration::from_secs(9)).await;
1210 assert!(!join_future.is_finished(), "finished before guard dropped");
1211
1212 tokio::time::sleep(Duration::from_secs(2)).await;
1213 assert!(join_future.is_finished(), "finish after guard drops");
1214 }
1215
1216 #[derive(Clone, Debug)]
1223 struct PauseAfterPut {
1224 paused: Arc<tokio::sync::Notify>,
1225 resume: Arc<tokio::sync::Notify>,
1226 }
1227
1228 #[async_trait::async_trait]
1229 impl Hooks for PauseAfterPut {
1230 async fn put_object(
1231 &self,
1232 inner: &InMemoryBackend,
1233 id: &ObjectId,
1234 metadata: &Metadata,
1235 stream: ClientStream,
1236 ) -> Result<PutResponse> {
1237 inner.put_object(id, metadata, stream).await?;
1238 self.paused.notify_one();
1239 self.resume.notified().await;
1240 Ok(())
1241 }
1242 }
1243
1244 #[tokio::test]
1247 async fn dropped_future_triggers_cleanup_and_log_entry_removed() {
1248 let paused = Arc::new(tokio::sync::Notify::new());
1249 let hooks = PauseAfterPut {
1250 paused: Arc::clone(&paused),
1251 resume: Arc::new(tokio::sync::Notify::new()),
1252 };
1253
1254 let lt_inner = InMemoryBackend::new("lt");
1255 let log = InMemoryChangeLog::default();
1256 let storage = TieredStorage::new(
1257 Box::new(InMemoryBackend::new("hv")),
1258 Box::new(TestBackend::with_inner(lt_inner.clone(), hooks)),
1259 Box::new(log.clone()),
1260 );
1261
1262 let id = make_id("drop-test");
1263 let metadata = Metadata::default();
1264 let payload = vec![0xABu8; 2 * 1024 * 1024]; tokio::select! {
1268 result = storage.put_object(&id, &metadata, stream::single(payload)) => {
1269 panic!("expected put to pause before completing, got: {result:?}");
1270 }
1271 _ = paused.notified() => {
1272 }
1274 }
1275
1276 storage.join().await;
1278
1279 assert!(lt_inner.is_empty(), "orphaned LT blob was not cleaned up");
1281
1282 let entries = log.scan().await.unwrap();
1284 assert!(
1285 entries.is_empty(),
1286 "changelog entry not removed after cleanup"
1287 );
1288 }
1289}