objectstore_service/backend/
tiered.rs

1//! Two-tier storage backend with size-based routing and redirect tombstones.
2//!
3//! [`TieredStorage`] routes objects to a high-volume or long-term backend based
4//! on size and maintains redirect tombstones so that reads never need to probe
5//! both backends. See the [crate-level documentation](crate) for the high-level
6//! motivation, and the [`TieredStorage`] struct docs for routing and tombstone
7//! semantics.
8//!
9//! # Cross-Tier Consistency
10//!
11//! A single logical object may span both backends: a tombstone in HV pointing
12//! to a payload in LT. Mutations keep the two in sync through compare-and-swap
13//! on the high-volume backend (see [`HighVolumeBackend::compare_and_write`]).
14//! Each operation reads the current HV revision, performs its work, then
15//! atomically swaps the HV entry only if the revision is still current —
16//! rolling back on conflict.
17//!
18//! ## Revision Keys
19//!
20//! Every large-object write stores its payload at a **revision key** in the
21//! long-term backend: `{original_key}/{uuid}`. The UUID suffix is random (no
22//! monotonicity is guaranteed), so each write targets a distinct LT path
23//! regardless of whether another write to the same logical key is in progress.
24//! The tombstone in HV then points to this specific revision. Because each
25//! writer owns its own LT blob, the compare-and-swap on the tombstone becomes
26//! an atomic pointer swap: the winner's revision is committed and the loser
27//! can safely delete its own blob without affecting the winner.
28//!
29//! See `new_long_term_revision` for the key construction.
30//!
31//! ## Compare-and-Swap
32//!
33//! All mutating operations follow a common pattern of reading the current
34//! revision, performing the upload, atomically swapping the revision (commit
35//! point), and cleaning up the now-unreferenced LT blob in the background:
36//!
37//! ### Large-Object Write (> 1 MiB)
38//!
39//! 1. **Read HV** to capture the current revision (existing tombstone target,
40//!    or absent).
41//! 2. **Write payload to LT** at a unique revision key.
42//! 3. **Compare-and-swap in HV**: write a tombstone pointing to the new
43//!    revision, only if the current revision still matches step 1.
44//!    - **OK** — schedule background deletion of the old LT blob, if any.
45//!    - **Conflict** — another writer won the race; schedule background deletion
46//!      of our new LT blob.
47//!    - **Error** — reload the tombstone and delete the unreferenced blob or
48//!      blobs.
49//!
50//! ### Small-Object Write (≤ 1 MiB)
51//!
52//! 1. **Write inline to HV**, skipping the write if a tombstone is present.
53//!    - **OK** — done; the object is stored entirely in HV.
54//!    - **Tombstone present** — a large object already occupies this key;
55//!      continue:
56//! 2. **Compare-and-swap in HV**: replace the tombstone with inline data, only
57//!    if the tombstone's revision still matches.
58//!    - **OK** — schedule background deletion of the old LT blob.
59//!    - **Conflict** — another writer won the race; they will clean up the
60//!      LT blob and we have no new LT blob to clean up.
61//!    - **Error** — reload the tombstone and delete the unreferenced blob if
62//!      the write went through.
63//!
64//! ### Delete
65//!
66//! 1. **Delete from HV** if the entry is not a tombstone.
67//!    - **OK** — done; there is no LT data to clean up.
68//!    - **Tombstone present** — a large object is stored here; continue:
69//! 2. **Compare-and-swap in HV**: remove the tombstone, only if its revision
70//!    still matches.
71//!    - **OK** — schedule background deletion of the LT blob.
72//!    - **Conflict** — another writer won the race; they will clean up.
73//!    - **Error** — reload the tombstone and delete the unreferenced blob if
74//!      the write went through.
75//!
76//! Tombstone removal is the commit point for deletes. If the subsequent LT
77//! cleanup fails, an orphan blob remains but the object is already unreachable
78//! through the normal read path.
79//!
80//! ## Last-Writer-Wins
81//!
82//! Concurrent mutations on the same key are inherently a race. Even a write
83//! that returns `Ok` may be immediately overwritten by another caller — there
84//! is no ordering guarantee and objectstore cannot provide a read-your-writes
85//! promise.
86//!
87//! CAS conflicts are therefore **not errors**: the losing writer's data is
88//! cleaned up and `Ok` is returned, because the result is indistinguishable
89//! from having succeeded a moment earlier and then been overwritten.
90//!
91//! ### Idempotency
92//!
93//! `compare_and_write` is idempotent: if the row is already in the target state, it
94//! returns `true` without re-applying the mutation. This is critical for retry
95//! safety. If the server commits a write but the response is lost, a retry sees the
96//! already-mutated state and still returns `true` — so callers do not mistakenly
97//! treat a successful commit as a lost race and clean up data that was actually
98//! persisted.
99
100use std::sync::Arc;
101use std::sync::atomic::{AtomicU64, Ordering};
102use std::time::Instant;
103
104use bytes::Bytes;
105use futures_util::{Stream, StreamExt};
106use objectstore_types::metadata::Metadata;
107use serde::{Deserialize, Serialize};
108
109use crate::backend::changelog::{Change, ChangeGuard, ChangeLog, ChangeManager, ChangePhase};
110use crate::backend::common::{
111    Backend, DeleteResponse, GetResponse, HighVolumeBackend, MetadataResponse, PutResponse,
112    TieredGet, TieredMetadata, TieredWrite, Tombstone,
113};
114use crate::backend::{HighVolumeStorageConfig, StorageConfig};
115use crate::error::Result;
116use crate::id::ObjectId;
117use crate::stream::{ClientStream, SizedPeek};
118
119/// The threshold up until which we will go to the "high volume" backend.
120const BACKEND_SIZE_THRESHOLD: usize = 1024 * 1024; // 1 MiB
121/// Creates a new [`ObjectId`] with the same context but a unique revision key.
122///
123/// The new key has the format `{original_key}/{uuid_v7}`, producing a distinct
124/// storage path for each large-object write. [`ObjectId::from_storage_path`] parses
125/// the result back correctly because the key portion may contain `/`.
126fn new_long_term_revision(id: &ObjectId) -> ObjectId {
127    ObjectId {
128        context: id.context.clone(),
129        key: format!("{}/{}", id.key, uuid::Uuid::now_v7()),
130    }
131}
132
133/// Configuration for [`TieredStorage`].
134///
135/// Composes two backends into a tiered routing setup: `high_volume` for small
136/// objects and `long_term` for large objects. Nesting [`StorageConfig::Tiered`]
137/// inside another tiered config is not supported.
138///
139/// # Example
140///
141/// ```yaml
142/// storage:
143///   type: tiered
144///   high_volume:
145///     type: bigtable
146///     project_id: my-project
147///     instance_name: objectstore
148///     table_name: objectstore
149///   long_term:
150///     type: gcs
151///     bucket: my-objectstore-bucket
152/// ```
153#[derive(Debug, Clone, Deserialize, Serialize)]
154pub struct TieredStorageConfig {
155    /// Backend for high-volume, small objects.
156    ///
157    /// Must be a backend that implements [`HighVolumeBackend`] (currently
158    /// only BigTable).
159    pub high_volume: HighVolumeStorageConfig,
160    /// Backend for large, long-term objects.
161    pub long_term: Box<StorageConfig>,
162}
163
164/// Two-tier storage backend that routes objects by size.
165///
166/// `TieredStorage` implements [`Backend`] and is intended to be used inside a
167/// [`StorageService`](crate::StorageService), which wraps it with task spawning and panic
168/// isolation.
169///
170/// # Size-Based Routing
171///
172/// Objects are routed at write time based on their size relative to a **1 MiB threshold**:
173///
174/// - Objects **≤ 1 MiB** go to the `high_volume` backend — optimized for low-latency reads
175///   and writes of small objects (e.g. BigTable).
176/// - Objects **> 1 MiB** go to the `long_term` backend — optimized for cost-efficient
177///   storage of large objects (e.g. GCS).
178///
179/// # Redirect Tombstones
180///
181/// Because the [`ObjectId`] is backend-independent, reads must be able to find an object
182/// without knowing which backend stores it. A naive approach would check the long-term
183/// backend on every read miss in the high-volume backend — but that is slow and expensive.
184///
185/// Instead, when an object is stored in the long-term backend, a **redirect tombstone** is
186/// written in the high-volume backend. It acts as a signpost: "the real data lives in the
187/// other backend at this target." On reads, a single high-volume lookup either returns the
188/// object directly or follows the tombstone to long-term storage, without probing both
189/// backends.
190///
191/// How tombstones are physically stored is determined by the [`HighVolumeBackend`]
192/// implementation — refer to the backend's own documentation for storage format details.
193///
194/// # Consistency
195///
196/// Consistency across the two backends is maintained through compare-and-swap
197/// operations on the high-volume backend (see
198/// [`HighVolumeBackend::compare_and_write`]), not distributed locks. Each
199/// mutating operation reads the current high-volume revision, performs its
200/// work, and then atomically swaps the high-volume entry only if the revision
201/// is still current — rolling back on conflict. Cleanup of unreferenced LT
202/// blobs runs in background tasks so the caller returns as soon as the commit
203/// point is reached. Call [`Backend::join`] during shutdown to wait for
204/// outstanding cleanup.
205///
206/// See the [module-level documentation](self) for per-operation diagrams.
207///
208/// # Usage
209///
210/// `TieredStorage` handles only the routing and consistency logic. Wrap it in a
211/// [`StorageService`](crate::service::StorageService) to add task spawning, panic isolation,
212/// and concurrency limiting.
213#[derive(Debug)]
214pub struct TieredStorage {
215    inner: Arc<ChangeManager>,
216}
217
218impl TieredStorage {
219    /// Creates a new `TieredStorage` with the given backends and change log.
220    pub fn new(
221        high_volume: Box<dyn HighVolumeBackend>,
222        long_term: Box<dyn Backend>,
223        changelog: Box<dyn ChangeLog>,
224    ) -> Self {
225        let inner = ChangeManager::new(high_volume, long_term, changelog);
226        // Note on cancellation: Our `join` method will wait for all tasks tracked by the spawned
227        // recovery job, so we defer shutdown until recovery is complete or times out.
228        tokio::spawn(inner.clone().recover());
229        Self { inner }
230    }
231
232    /// Records the change to the log and returns a guard that cleans up on drop.
233    async fn record_change(&self, change: Change) -> Result<ChangeGuard> {
234        self.inner.clone().record(change).await
235    }
236
237    /// Returns the name of the backend corresponding to the given routing choice.
238    fn backend_type(&self, choice: &BackendChoice) -> &'static str {
239        match choice {
240            BackendChoice::HighVolume => self.inner.high_volume.name(),
241            BackendChoice::LongTerm => self.inner.long_term.name(),
242        }
243    }
244
245    /// Puts an object into the high-volume backend.
246    ///
247    /// If a tombstone already exists, attempts to swap it for the new object and delete the old
248    /// long-term object.
249    async fn put_high_volume(
250        &self,
251        id: &ObjectId,
252        metadata: &Metadata,
253        payload: Bytes,
254    ) -> Result<()> {
255        let tombstone_opt = self
256            .inner
257            .high_volume
258            .put_non_tombstone(id, metadata, payload.clone())
259            .await?;
260
261        let Some(Tombstone { target, .. }) = tombstone_opt else {
262            // No tombstone exists - write succeeded
263            return Ok(());
264        };
265
266        // Tombstone exists — Swap it for inline data
267        let mut guard = self
268            .record_change(Change {
269                id: id.clone(),
270                new: None,
271                old: Some(target.clone()),
272            })
273            .await?;
274
275        let write = TieredWrite::Object(metadata.clone(), payload);
276        guard.advance(ChangePhase::Written);
277
278        let written = self
279            .inner
280            .high_volume
281            .compare_and_write(id, Some(&target), write)
282            .await?;
283
284        // Update guard and let it schedule cleanup in the background.
285        guard.advance(ChangePhase::compare_and_write(written));
286
287        Ok(())
288    }
289
290    /// Puts an object into the long-term backend with a redirect tombstone in front.
291    ///
292    /// Deletes the previous long-term object if overwriting an existing tombstone. If the tombstone
293    /// write fails, the new long-term object is cleaned up.
294    async fn put_long_term(
295        &self,
296        id: &ObjectId,
297        metadata: &Metadata,
298        stream: ClientStream,
299    ) -> Result<()> {
300        // 1. Read current HV revision to establish the write precondition
301        let current = match self.inner.high_volume.get_tiered_metadata(id).await? {
302            TieredMetadata::Tombstone(t) => Some(t.target),
303            _ => None,
304        };
305
306        // 2. Write payload to long-term at a unique revision key.
307        let new = new_long_term_revision(id);
308        let mut guard = self
309            .record_change(Change {
310                id: id.clone(),
311                new: Some(new.clone()),
312                old: current.clone(),
313            })
314            .await?;
315
316        self.inner
317            .long_term
318            .put_object(&new, metadata, stream)
319            .await?;
320        guard.advance(ChangePhase::Written);
321
322        // 3. CAS commit: write tombstone only if HV state matches what we saw.
323        let tombstone = Tombstone {
324            target: new.clone(),
325            expiration_policy: metadata.expiration_policy,
326        };
327        let written = self
328            .inner
329            .high_volume
330            .compare_and_write(id, current.as_ref(), TieredWrite::Tombstone(tombstone))
331            .await?;
332
333        // Update guard and let it schedule cleanup in the background.
334        guard.advance(ChangePhase::compare_and_write(written));
335
336        Ok(())
337    }
338}
339
340#[async_trait::async_trait]
341impl Backend for TieredStorage {
342    fn name(&self) -> &'static str {
343        "tiered"
344    }
345
346    async fn put_object(
347        &self,
348        id: &ObjectId,
349        metadata: &Metadata,
350        stream: ClientStream,
351    ) -> Result<PutResponse> {
352        let start = Instant::now();
353        if metadata.origin.is_none() {
354            objectstore_metrics::count!("put.origin_missing", usecase = id.usecase().to_owned());
355        }
356
357        let peeked = SizedPeek::new(stream, BACKEND_SIZE_THRESHOLD).await?;
358        objectstore_metrics::record!(
359            "put.first_chunk.latency" = start.elapsed(),
360            usecase = id.usecase().to_owned(),
361            complete = if peeked.is_exhausted() { "yes" } else { "no" },
362        );
363
364        let (backend_choice, stored_size) = if peeked.is_exhausted() {
365            let payload = peeked.into_bytes().await?;
366            let payload_len = payload.len() as u64;
367            self.put_high_volume(id, metadata, payload).await?;
368            (BackendChoice::HighVolume, payload_len)
369        } else {
370            let (stored_size, stream) = counting_stream(peeked.into_stream());
371            self.put_long_term(id, metadata, stream.boxed()).await?;
372            (BackendChoice::LongTerm, stored_size.load(Ordering::Acquire))
373        };
374
375        let backend_ty = self.backend_type(&backend_choice);
376        objectstore_metrics::record!(
377            "put.latency" = start.elapsed(),
378            usecase = id.usecase().to_owned(),
379            backend_choice = backend_choice.as_str(),
380            backend_type = backend_ty,
381        );
382        objectstore_metrics::record!(
383            "put.size" = stored_size,
384            usecase = id.usecase().to_owned(),
385            backend_choice = backend_choice.as_str(),
386            backend_type = backend_ty,
387        );
388
389        Ok(())
390    }
391
392    async fn get_object(&self, id: &ObjectId) -> Result<GetResponse> {
393        let start = Instant::now();
394
395        let hv_result = self.inner.high_volume.get_tiered_object(id).await?;
396        let (result, backend_choice) = match hv_result {
397            TieredGet::NotFound => (None, BackendChoice::HighVolume),
398            TieredGet::Object(metadata, stream) => {
399                (Some((metadata, stream)), BackendChoice::HighVolume)
400            }
401            TieredGet::Tombstone(tombstone) => (
402                self.inner.long_term.get_object(&tombstone.target).await?,
403                BackendChoice::LongTerm,
404            ),
405        };
406
407        let backend_type = self.backend_type(&backend_choice);
408        objectstore_metrics::record!(
409            "get.latency.pre-response" = start.elapsed(),
410            usecase = id.usecase().to_owned(),
411            backend_choice = backend_choice.as_str(),
412            backend_type = backend_type,
413        );
414
415        if let Some((ref metadata, _)) = result {
416            if let Some(size) = metadata.size {
417                objectstore_metrics::record!(
418                    "get.size" = size,
419                    usecase = id.usecase().to_owned(),
420                    backend_choice = backend_choice.as_str(),
421                    backend_type = backend_type,
422                );
423            } else {
424                objectstore_log::warn!(backend_type, "Missing object size");
425            }
426        }
427
428        Ok(result)
429    }
430
431    async fn get_metadata(&self, id: &ObjectId) -> Result<MetadataResponse> {
432        let start = Instant::now();
433
434        let hv_result = self.inner.high_volume.get_tiered_metadata(id).await?;
435        let (result, backend_choice) = match hv_result {
436            TieredMetadata::NotFound => (None, BackendChoice::HighVolume),
437            TieredMetadata::Object(metadata) => (Some(metadata), BackendChoice::HighVolume),
438            TieredMetadata::Tombstone(tombstone) => (
439                self.inner.long_term.get_metadata(&tombstone.target).await?,
440                BackendChoice::LongTerm,
441            ),
442        };
443
444        objectstore_metrics::record!(
445            "head.latency" = start.elapsed(),
446            usecase = id.usecase().to_owned(),
447            backend_choice = backend_choice.as_str(),
448            backend_type = self.backend_type(&backend_choice),
449        );
450
451        Ok(result)
452    }
453
454    async fn delete_object(&self, id: &ObjectId) -> Result<DeleteResponse> {
455        let start = Instant::now();
456
457        let mut backend_choice = BackendChoice::HighVolume;
458
459        if let Some(tombstone) = self.inner.high_volume.delete_non_tombstone(id).await? {
460            backend_choice = BackendChoice::LongTerm;
461
462            let mut guard = self
463                .record_change(Change {
464                    id: id.clone(),
465                    new: None,
466                    old: Some(tombstone.target.clone()),
467                })
468                .await?;
469            guard.advance(ChangePhase::Written);
470
471            // Remove the tombstone; the LT blob becomes unreachable at this point.
472            let deleted = self
473                .inner
474                .high_volume
475                .compare_and_write(id, Some(&tombstone.target), TieredWrite::Delete)
476                .await?;
477
478            // Update guard and let it schedule cleanup in the background.
479            guard.advance(ChangePhase::compare_and_write(deleted));
480        }
481
482        objectstore_metrics::record!(
483            "delete.latency" = start.elapsed(),
484            usecase = id.usecase().to_owned(),
485            backend_choice = backend_choice.as_str(),
486            backend_type = self.backend_type(&backend_choice),
487        );
488
489        Ok(())
490    }
491
492    async fn join(&self) {
493        self.inner.tracker.close();
494        tokio::join!(
495            self.inner.high_volume.join(),
496            self.inner.long_term.join(),
497            self.inner.tracker.wait()
498        );
499    }
500}
501
502#[derive(Debug)]
503enum BackendChoice {
504    HighVolume,
505    LongTerm,
506}
507
508impl BackendChoice {
509    fn as_str(&self) -> &'static str {
510        match self {
511            BackendChoice::HighVolume => "high-volume",
512            BackendChoice::LongTerm => "long-term",
513        }
514    }
515}
516
517impl std::fmt::Display for BackendChoice {
518    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
519        f.write_str(self.as_str())
520    }
521}
522
523/// Wraps a stream to count the total bytes yielded by successful chunks.
524///
525/// Returns the shared counter and the wrapped stream. The counter is incremented
526/// as the stream is consumed, so read it only after the stream is exhausted.
527fn counting_stream<S, E>(stream: S) -> (Arc<AtomicU64>, impl Stream<Item = Result<Bytes, E>>)
528where
529    S: Stream<Item = Result<Bytes, E>>,
530{
531    let counter = Arc::new(AtomicU64::new(0));
532
533    (
534        counter.clone(),
535        stream.inspect(move |res| {
536            if let Ok(chunk) = res {
537                counter.fetch_add(chunk.len() as u64, Ordering::Relaxed);
538            }
539        }),
540    )
541}
542
543#[cfg(test)]
544mod tests {
545    use std::time::Duration;
546
547    use objectstore_types::metadata::{ExpirationPolicy, Metadata};
548    use objectstore_types::scope::{Scope, Scopes};
549
550    use super::*;
551    use crate::backend::changelog::{InMemoryChangeLog, NoopChangeLog};
552    use crate::backend::in_memory::InMemoryBackend;
553    use crate::backend::testing::{Hooks, TestBackend};
554    use crate::error::Error;
555    use crate::id::ObjectContext;
556    use crate::stream::{self, ClientStream};
557
558    fn make_context() -> ObjectContext {
559        ObjectContext {
560            usecase: "testing".into(),
561            scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
562        }
563    }
564
565    fn make_id(key: &str) -> ObjectId {
566        ObjectId::new(make_context(), key.into())
567    }
568
569    fn make_tiered_storage() -> (
570        TieredStorage,
571        InMemoryBackend,
572        InMemoryBackend,
573        InMemoryChangeLog,
574    ) {
575        let hv = InMemoryBackend::new("in-memory-hv");
576        let lt = InMemoryBackend::new("in-memory-lt");
577        let changelog = InMemoryChangeLog::default();
578        let storage = TieredStorage::new(
579            Box::new(hv.clone()),
580            Box::new(lt.clone()),
581            Box::new(changelog.clone()),
582        );
583        (storage, hv, lt, changelog)
584    }
585
586    // --- new_long_term_revision tests ---
587
588    #[test]
589    fn revision_id_preserves_context() {
590        let id = make_id("my-key");
591        let revised = new_long_term_revision(&id);
592        assert_eq!(revised.context, id.context);
593        assert!(
594            revised.key.starts_with("my-key/"),
595            "revised key should have /<uuid> suffix, got: {}",
596            revised.key
597        );
598    }
599
600    #[test]
601    fn revision_id_roundtrips_storage_path() {
602        let id = make_id("original");
603        let revised = new_long_term_revision(&id);
604        let path = revised.as_storage_path().to_string();
605        let parsed = ObjectId::from_storage_path(&path)
606            .unwrap_or_else(|| panic!("failed to parse '{path}'"));
607        assert_eq!(parsed, revised);
608    }
609
610    #[test]
611    fn revision_id_is_unique() {
612        let id = make_id("base-key");
613        let a = new_long_term_revision(&id);
614        let b = new_long_term_revision(&id);
615        assert_ne!(a.key, b.key, "two calls should produce different keys");
616    }
617
618    // --- Basic behavior ---
619
620    #[tokio::test]
621    async fn get_nonexistent_returns_none() {
622        let (storage, _hv, _lt, _) = make_tiered_storage();
623        let id = make_id("does-not-exist");
624
625        assert!(storage.get_object(&id).await.unwrap().is_none());
626        assert!(storage.get_metadata(&id).await.unwrap().is_none());
627    }
628
629    #[tokio::test]
630    async fn delete_nonexistent_succeeds() {
631        let (storage, _hv, _lt, _) = make_tiered_storage();
632        let id = make_id("does-not-exist");
633
634        storage.delete_object(&id).await.unwrap();
635    }
636
637    // --- Put routing ---
638
639    #[tokio::test]
640    async fn put_small_object_stores_inline() {
641        let (storage, hv, lt, _) = make_tiered_storage();
642        let id = make_id("small");
643        let payload = b"small payload".to_vec();
644
645        storage
646            .put_object(&id, &Default::default(), stream::single(payload.clone()))
647            .await
648            .unwrap();
649
650        assert!(hv.contains(&id), "expected in high-volume");
651        assert!(!lt.contains(&id), "leaked to long-term");
652
653        let (_, s) = storage.get_object(&id).await.unwrap().unwrap();
654        let body = stream::read_to_vec(s).await.unwrap();
655        assert_eq!(body, payload);
656
657        assert!(
658            storage.get_metadata(&id).await.unwrap().is_some(),
659            "get_metadata should return metadata for inline objects"
660        );
661    }
662
663    #[tokio::test]
664    async fn put_large_object_creates_tombstone() {
665        let (storage, hv, lt, _) = make_tiered_storage();
666        let id = make_id("large");
667        let payload = vec![0xCDu8; 2 * 1024 * 1024]; // 2 MiB, over threshold
668        let metadata_in = Metadata {
669            content_type: "image/png".into(),
670            expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_secs(3600)),
671            origin: Some("10.0.0.1".into()),
672            ..Default::default()
673        };
674
675        storage
676            .put_object(&id, &metadata_in, stream::single(payload.clone()))
677            .await
678            .unwrap();
679
680        // Tombstone in HV: correct expiration_policy, target is a revision key.
681        let tombstone = hv.get(&id).expect_tombstone();
682        assert_eq!(tombstone.expiration_policy, metadata_in.expiration_policy);
683        let lt_id = tombstone.target;
684        assert!(
685            lt_id.key().starts_with(id.key()),
686            "tombstone target key should be a revision of the HV key, got: {}",
687            lt_id.key()
688        );
689
690        // LT object at revision key with correct metadata.
691        let (lt_meta, _) = lt.get(&lt_id).expect_object();
692        assert_eq!(lt_meta.content_type, "image/png");
693        assert_eq!(lt_meta.expiration_policy, metadata_in.expiration_policy);
694
695        // get_object follows the tombstone and returns the correct payload.
696        let (_, s) = storage.get_object(&id).await.unwrap().unwrap();
697        let body = stream::read_to_vec(s).await.unwrap();
698        assert_eq!(body, payload);
699
700        // get_metadata follows the tombstone and returns the correct content_type.
701        let metadata = storage.get_metadata(&id).await.unwrap().unwrap();
702        assert_eq!(metadata.content_type, "image/png");
703    }
704
705    // --- Put overwrites ---
706
707    #[tokio::test]
708    async fn reinsert_small_over_large_swaps_to_inline() {
709        let (storage, hv, lt, _) = make_tiered_storage();
710        let id = make_id("reinsert-key");
711
712        // First: insert a large object → creates tombstone in hv, payload in lt at lt_id
713        let large_payload = vec![0xABu8; 2 * 1024 * 1024];
714        storage
715            .put_object(&id, &Default::default(), stream::single(large_payload))
716            .await
717            .unwrap();
718
719        let lt_id = hv.get(&id).expect_tombstone().target;
720
721        // Re-insert a SMALL payload with the same key.
722        // The CAS-swap puts the small object inline in HV and schedules background cleanup.
723        let small_payload = vec![0xCDu8; 100]; // well under 1 MiB threshold
724        storage
725            .put_object(&id, &Default::default(), stream::single(small_payload))
726            .await
727            .unwrap();
728
729        // The small object is now inline in high-volume.
730        hv.get(&id).expect_object();
731
732        // Drain background cleanup tasks before asserting LT state.
733        storage.join().await;
734
735        // The old long-term blob was cleaned up.
736        lt.get(&lt_id).expect_not_found();
737    }
738
739    #[tokio::test]
740    async fn overwrite_large_with_large_replaces_revision() {
741        let (storage, hv, lt, _) = make_tiered_storage();
742        let id = make_id("overwrite-large");
743
744        let payload1 = vec![0xAAu8; 2 * 1024 * 1024];
745        storage
746            .put_object(&id, &Default::default(), stream::single(payload1))
747            .await
748            .unwrap();
749        let lt_id_1 = hv.get(&id).expect_tombstone().target;
750
751        let payload2 = vec![0xBBu8; 2 * 1024 * 1024];
752        storage
753            .put_object(&id, &Default::default(), stream::single(payload2.clone()))
754            .await
755            .unwrap();
756        let lt_id_2 = hv.get(&id).expect_tombstone().target;
757
758        assert_ne!(
759            lt_id_1, lt_id_2,
760            "second write should create a new revision"
761        );
762
763        // Drain background cleanup tasks before asserting LT state.
764        storage.join().await;
765
766        lt.get(&lt_id_1).expect_not_found();
767        lt.get(&lt_id_2).expect_object();
768
769        let (_, s) = storage.get_object(&id).await.unwrap().unwrap();
770        let body = stream::read_to_vec(s).await.unwrap();
771        assert_eq!(body, payload2);
772    }
773
774    // --- Delete ---
775
776    #[tokio::test]
777    async fn delete_small_object() {
778        let (storage, hv, _lt, _) = make_tiered_storage();
779        let id = make_id("delete-small");
780
781        storage
782            .put_object(&id, &Default::default(), stream::single("tiny"))
783            .await
784            .unwrap();
785
786        storage.delete_object(&id).await.unwrap();
787
788        hv.get(&id).expect_not_found();
789        assert!(storage.get_object(&id).await.unwrap().is_none());
790    }
791
792    #[tokio::test]
793    async fn delete_large_object_cleans_up_both_backends() {
794        let (storage, hv, lt, _) = make_tiered_storage();
795        let id = make_id("delete-both");
796        let payload = vec![0u8; 2 * 1024 * 1024]; // 2 MiB
797
798        storage
799            .put_object(&id, &Default::default(), stream::single(payload))
800            .await
801            .unwrap();
802
803        // Capture lt_id before deleting (it lives at the revision key, not at id).
804        let lt_id = hv.get(&id).expect_tombstone().target;
805
806        storage.delete_object(&id).await.unwrap();
807
808        // Drain background cleanup tasks before asserting LT state.
809        storage.join().await;
810
811        assert!(!hv.contains(&id), "tombstone not cleaned up");
812        assert!(!lt.contains(&lt_id), "long-term object not cleaned up");
813    }
814
815    #[derive(Debug)]
816    struct FailDelete;
817
818    #[async_trait::async_trait]
819    impl Hooks for FailDelete {
820        async fn delete_object(
821            &self,
822            _inner: &InMemoryBackend,
823            _id: &ObjectId,
824        ) -> Result<DeleteResponse> {
825            Err(Error::Io(std::io::Error::new(
826                std::io::ErrorKind::ConnectionRefused,
827                "simulated long-term delete failure",
828            )))
829        }
830    }
831
832    /// When the long-term GCS cleanup fails after the tombstone is deleted, the
833    /// delete still succeeds (GCS cleanup is best-effort). An orphan blob may
834    /// remain in LT storage, which is accepted.
835    #[tokio::test]
836    async fn delete_succeeds_when_gcs_cleanup_fails() {
837        let hv = InMemoryBackend::new("hv");
838        let lt = TestBackend::new(FailDelete);
839        let log = NoopChangeLog;
840        let storage = TieredStorage::new(Box::new(hv.clone()), Box::new(lt), Box::new(log));
841
842        let id = make_id("fail-delete");
843        let payload = vec![0xABu8; 2 * 1024 * 1024]; // 2 MiB -> goes to long-term
844        storage
845            .put_object(&id, &Default::default(), stream::single(payload))
846            .await
847            .unwrap();
848
849        // Delete succeeds even though GCS cleanup fails (it is best-effort).
850        let result = storage.delete_object(&id).await;
851        assert!(
852            result.is_ok(),
853            "delete should succeed despite GCS cleanup failure"
854        );
855
856        // The tombstone in HV is gone (CAS-deleted first, before GCS cleanup).
857        hv.get(&id).expect_not_found();
858
859        // The orphaned GCS blob remains but the object is unreachable through the service.
860        assert!(
861            storage.get_object(&id).await.unwrap().is_none(),
862            "object should be unreachable after tombstone is deleted"
863        );
864    }
865
866    // --- CAS conflicts ---
867
868    #[derive(Debug)]
869    struct CasConflict;
870
871    #[async_trait::async_trait]
872    impl Hooks for CasConflict {
873        async fn compare_and_write(
874            &self,
875            _inner: &InMemoryBackend,
876            _id: &ObjectId,
877            _current: Option<&ObjectId>,
878            _write: TieredWrite,
879        ) -> Result<bool> {
880            Ok(false) // always conflict
881        }
882    }
883
884    /// After a large-object write loses the CAS race, the new LT blob must be
885    /// cleaned up. The put still returns `Ok(())` — from the caller's view, a
886    /// concurrent write won.
887    #[tokio::test]
888    async fn put_large_cas_conflict_cleans_up_new_blob() {
889        let hv = TestBackend::new(CasConflict);
890        let lt = InMemoryBackend::new("lt");
891        let log = NoopChangeLog;
892        let storage = TieredStorage::new(Box::new(hv), Box::new(lt.clone()), Box::new(log));
893
894        let id = make_id("cas-conflict-large");
895        let payload = vec![0xABu8; 2 * 1024 * 1024]; // 2 MiB -> long-term path
896
897        storage
898            .put_object(&id, &Default::default(), stream::single(payload))
899            .await
900            .unwrap();
901
902        // Drain background cleanup tasks before asserting LT state.
903        storage.join().await;
904
905        assert!(
906            lt.is_empty(),
907            "LT blob should be cleaned up after CAS conflict"
908        );
909    }
910
911    /// When swapping a tombstone for inline data, a CAS conflict means another
912    /// writer won. The put still returns `Ok(())` — no LT blob was written, so
913    /// there is nothing to clean up.
914    #[tokio::test]
915    async fn put_small_over_tombstone_cas_conflict_succeeds() {
916        let inner = InMemoryBackend::new("hv");
917        let id = make_id("cas-conflict-small");
918
919        // Pre-seed a tombstone directly in the inner backend so put_non_tombstone
920        // returns it instead of writing inline.
921        let tombstone = Tombstone {
922            target: make_id("lt-object"),
923            expiration_policy: ExpirationPolicy::Manual,
924        };
925        inner
926            .compare_and_write(&id, None, TieredWrite::Tombstone(tombstone))
927            .await
928            .unwrap();
929
930        let lt = InMemoryBackend::new("lt");
931        let hv = TestBackend::with_inner(inner, CasConflict);
932        let log = NoopChangeLog;
933        let storage = TieredStorage::new(Box::new(hv), Box::new(lt), Box::new(log));
934
935        // Writing a small object over a tombstone should succeed even when CAS
936        // conflicts — the other writer's write is accepted.
937        storage
938            .put_object(&id, &Default::default(), stream::single("tiny"))
939            .await
940            .unwrap();
941    }
942
943    // --- Failure / inconsistency ---
944
945    /// Simulates compare_and_write failure. If `true`, it fails after commit.
946    #[derive(Debug)]
947    struct FailCas(bool);
948
949    #[async_trait::async_trait]
950    impl Hooks for FailCas {
951        async fn compare_and_write(
952            &self,
953            inner: &InMemoryBackend,
954            id: &ObjectId,
955            current: Option<&ObjectId>,
956            write: TieredWrite,
957        ) -> Result<bool> {
958            if self.0 {
959                // simulate a network error _after_ commit went through
960                inner.compare_and_write(id, current, write).await?;
961            }
962            Err(Error::Io(std::io::Error::new(
963                std::io::ErrorKind::TimedOut,
964                "simulated compare_and_write failure",
965            )))
966        }
967    }
968
969    /// If the tombstone write to the high-volume backend fails after the long-term
970    /// write succeeds, the long-term object must be cleaned up so we never leave
971    /// an unreachable orphan in long-term storage.
972    #[tokio::test]
973    async fn no_orphan_when_tombstone_write_fails() {
974        let lt = InMemoryBackend::new("lt");
975        let hv = TestBackend::new(FailCas(false));
976        let log = NoopChangeLog;
977        let storage = TieredStorage::new(Box::new(hv), Box::new(lt.clone()), Box::new(log));
978
979        let id = make_id("orphan-test");
980        let payload = vec![0xABu8; 2 * 1024 * 1024]; // 2 MiB -> long-term path
981        let result = storage
982            .put_object(&id, &Default::default(), stream::single(payload))
983            .await;
984
985        assert!(result.is_err());
986
987        // Drain background cleanup tasks before asserting LT state.
988        storage.join().await;
989
990        assert!(lt.is_empty(), "long-term object not cleaned up");
991    }
992
993    /// If a tombstone exists in high-volume but the corresponding object is
994    /// missing from long-term storage (e.g. due to a race condition or partial
995    /// cleanup), reads should gracefully return None rather than error.
996    #[tokio::test]
997    async fn orphan_tombstone_returns_none() {
998        let (storage, hv, lt, _) = make_tiered_storage();
999        let id = make_id("orphan-tombstone");
1000        let payload = vec![0xCDu8; 2 * 1024 * 1024]; // 2 MiB
1001
1002        storage
1003            .put_object(&id, &Default::default(), stream::single(payload))
1004            .await
1005            .unwrap();
1006
1007        // The object is at the revision key in LT, not at id.
1008        let lt_id = hv.get(&id).expect_tombstone().target;
1009
1010        // Remove the long-term object, leaving an orphan tombstone in hv
1011        lt.remove(&lt_id);
1012
1013        assert!(
1014            storage.get_object(&id).await.unwrap().is_none(),
1015            "orphan tombstone should resolve to None on get_object"
1016        );
1017        assert!(
1018            storage.get_metadata(&id).await.unwrap().is_none(),
1019            "orphan tombstone should resolve to None on get_metadata"
1020        );
1021    }
1022
1023    // --- Redirect target ---
1024
1025    /// A tombstone carrying an explicit `target` is followed correctly on reads and deletes,
1026    /// including when the target ObjectId differs from the HV ObjectId.
1027    #[tokio::test]
1028    async fn tombstone_target_is_used_for_reads_and_deletes() {
1029        let hv = InMemoryBackend::new("hv");
1030        let lt = InMemoryBackend::new("lt");
1031        let log = NoopChangeLog;
1032        let storage = TieredStorage::new(Box::new(hv.clone()), Box::new(lt.clone()), Box::new(log));
1033
1034        let hv_id = make_id("hv-key");
1035        let lt_id = make_id("lt-key");
1036        let payload = vec![0xABu8; 100];
1037
1038        // Write the object under the LT id and a tombstone pointing to it from HV.
1039        lt.put_object(&lt_id, &Default::default(), stream::single(payload.clone()))
1040            .await
1041            .unwrap();
1042        let tombstone = Tombstone {
1043            target: lt_id.clone(),
1044            expiration_policy: ExpirationPolicy::Manual,
1045        };
1046        hv.compare_and_write(&hv_id, None, TieredWrite::Tombstone(tombstone))
1047            .await
1048            .unwrap();
1049
1050        // get_object must follow the tombstone and find the object via the lt_id target.
1051        let (_, s) = storage.get_object(&hv_id).await.unwrap().unwrap();
1052        let body = stream::read_to_vec(s).await.unwrap();
1053        assert_eq!(body, payload);
1054
1055        // delete_object must clean up both backends using the target.
1056        storage.delete_object(&hv_id).await.unwrap();
1057        storage.join().await;
1058        assert!(!hv.contains(&hv_id), "tombstone should be removed");
1059        assert!(!lt.contains(&lt_id), "lt object should be removed");
1060    }
1061
1062    // --- Multi-chunk ---
1063
1064    #[tokio::test]
1065    async fn multi_chunk_large_object_chains_buffered_and_remaining() {
1066        let (storage, hv, lt, _) = make_tiered_storage();
1067        let id = make_id("multi-chunk");
1068
1069        // Deliver a 2 MiB payload across multiple chunks that individually
1070        // fit under the threshold but collectively exceed it.
1071        let chunk_size = 512 * 1024; // 512 KiB per chunk
1072        let chunk_count = 4; // 4 × 512 KiB = 2 MiB total
1073        let stream: ClientStream = futures_util::stream::iter(
1074            (0..chunk_count).map(move |i| Ok(bytes::Bytes::from(vec![i as u8; chunk_size]))),
1075        )
1076        .boxed();
1077
1078        storage
1079            .put_object(&id, &Default::default(), stream)
1080            .await
1081            .unwrap();
1082
1083        // Should have been routed to long-term (over 1 MiB) at the revision key.
1084        let lt_id = hv.get(&id).expect_tombstone().target;
1085        let (_, lt_bytes) = lt.get(&lt_id).expect_object();
1086        assert_eq!(lt_bytes.len(), chunk_size * chunk_count);
1087
1088        // Verify data integrity — each chunk's fill byte should appear in order.
1089        for i in 0..chunk_count {
1090            let offset = i * chunk_size;
1091            assert!(
1092                lt_bytes[offset..offset + chunk_size]
1093                    .iter()
1094                    .all(|&b| b == i as u8),
1095                "data mismatch in chunk {i}"
1096            );
1097        }
1098    }
1099
1100    // --- Written-phase cleanup ---
1101
1102    /// When a large-object overwrite commits in HV but its response is lost, the guard drops in
1103    /// `Written` phase. Cleanup must read HV to determine the CAS outcome, then delete whichever
1104    /// LT blob is no longer referenced — here the old one, since the new tombstone committed.
1105    #[tokio::test]
1106    async fn written_cleanup_after_lost_cas_response() {
1107        let (storage, hv, lt, log) = make_tiered_storage();
1108        let id = make_id("obj");
1109
1110        // First put: establishes tombstone
1111        let payload = vec![0xAAu8; 2 * 1024 * 1024];
1112        storage
1113            .put_object(&id, &Default::default(), stream::single(payload.clone()))
1114            .await
1115            .unwrap();
1116        let tombstone1 = hv.get(&id).expect_tombstone().target;
1117
1118        // Second put: Updates tombstone but fails immediately after committing
1119        let broken_storage = TieredStorage::new(
1120            Box::new(TestBackend::with_inner(hv.clone(), FailCas(true))),
1121            Box::new(lt.clone()),
1122            Box::new(log.clone()),
1123        );
1124        broken_storage
1125            .put_object(&id, &Default::default(), stream::single(payload.clone()))
1126            .await
1127            .unwrap_err(); // must fail
1128        let tombstone2 = hv.get(&id).expect_tombstone().target;
1129        assert_ne!(tombstone1, tombstone2);
1130
1131        // The first tombstone's target should be cleaned up, but the second should remain.
1132        broken_storage.join().await;
1133        lt.get(&tombstone1).expect_not_found();
1134        lt.get(&tombstone2).expect_object();
1135
1136        // Now delete the new object with the same tombstone failure
1137        broken_storage.delete_object(&id).await.unwrap_err();
1138        hv.get(&id).expect_not_found();
1139        broken_storage.join().await;
1140        lt.get(&tombstone2).expect_not_found();
1141
1142        // Create a fresh large object
1143        let id = make_id("obj2");
1144        storage
1145            .put_object(&id, &Default::default(), stream::single(payload.clone()))
1146            .await
1147            .unwrap();
1148        let tombstone3 = hv.get(&id).expect_tombstone().target;
1149
1150        // Overwrite it with a small object and check again for cleanup
1151        broken_storage
1152            .put_object(&id, &Default::default(), stream::single(&b"small"[..]))
1153            .await
1154            .unwrap_err(); // must fail
1155        hv.get(&id).expect_object();
1156        broken_storage.join().await;
1157        lt.get(&tombstone3).expect_not_found();
1158    }
1159
1160    // --- ChangeGuard drop safety tests ---
1161
1162    /// Dropping a guard outside any tokio runtime must not panic.
1163    #[test]
1164    fn guard_dropped_outside_runtime_does_not_panic() {
1165        let manager = ChangeManager::new(
1166            Box::new(InMemoryBackend::new("hv")),
1167            Box::new(InMemoryBackend::new("lt")),
1168            Box::new(NoopChangeLog),
1169        );
1170
1171        let change = Change {
1172            id: make_id("object-key"),
1173            new: Some(make_id("cleanup-target")),
1174            old: None,
1175        };
1176
1177        // Build the guard inside a temporary runtime, then let the runtime drop
1178        // so that no tokio context is active when the guard drops.
1179        let guard = {
1180            let rt = tokio::runtime::Runtime::new().unwrap();
1181            rt.block_on(manager.record(change)).unwrap()
1182        };
1183
1184        drop(guard); // Must not panic.
1185    }
1186
1187    /// `join` blocks until all in-flight guards have completed cleanup.
1188    ///
1189    /// Time is advanced manually so the test runs at virtual speed. The guard
1190    /// completes after 10 s; `join` must still be waiting at 9 s and done by 11 s.
1191    #[tokio::test(start_paused = true)]
1192    async fn join_waits_for_cleanup_to_complete() {
1193        let (storage, _hv, _lt, _) = make_tiered_storage();
1194        let change = Change {
1195            id: make_id("object-key"),
1196            new: None,
1197            old: None,
1198        };
1199        let mut guard = storage.record_change(change).await.unwrap();
1200
1201        tokio::spawn(async move {
1202            tokio::time::sleep(Duration::from_secs(10)).await;
1203            guard.advance(ChangePhase::Completed);
1204            drop(guard);
1205        });
1206
1207        let join_future = tokio::spawn(async move { storage.join().await });
1208
1209        tokio::time::sleep(Duration::from_secs(9)).await;
1210        assert!(!join_future.is_finished(), "finished before guard dropped");
1211
1212        tokio::time::sleep(Duration::from_secs(2)).await;
1213        assert!(join_future.is_finished(), "finish after guard drops");
1214    }
1215
1216    // --- Changelog integration tests ---
1217
1218    /// LT backend hook that completes the write, then pauses until resumed.
1219    ///
1220    /// Lets tests cancel the owning future after the blob is committed but
1221    /// before the HV tombstone is set.
1222    #[derive(Clone, Debug)]
1223    struct PauseAfterPut {
1224        paused: Arc<tokio::sync::Notify>,
1225        resume: Arc<tokio::sync::Notify>,
1226    }
1227
1228    #[async_trait::async_trait]
1229    impl Hooks for PauseAfterPut {
1230        async fn put_object(
1231            &self,
1232            inner: &InMemoryBackend,
1233            id: &ObjectId,
1234            metadata: &Metadata,
1235            stream: ClientStream,
1236        ) -> Result<PutResponse> {
1237            inner.put_object(id, metadata, stream).await?;
1238            self.paused.notify_one();
1239            self.resume.notified().await;
1240            Ok(())
1241        }
1242    }
1243
1244    /// When a future is cancelled after the LT write but before the HV tombstone is set,
1245    /// the `ChangeGuard` cleans up the orphaned LT blob and removes the log entry.
1246    #[tokio::test]
1247    async fn dropped_future_triggers_cleanup_and_log_entry_removed() {
1248        let paused = Arc::new(tokio::sync::Notify::new());
1249        let hooks = PauseAfterPut {
1250            paused: Arc::clone(&paused),
1251            resume: Arc::new(tokio::sync::Notify::new()),
1252        };
1253
1254        let lt_inner = InMemoryBackend::new("lt");
1255        let log = InMemoryChangeLog::default();
1256        let storage = TieredStorage::new(
1257            Box::new(InMemoryBackend::new("hv")),
1258            Box::new(TestBackend::with_inner(lt_inner.clone(), hooks)),
1259            Box::new(log.clone()),
1260        );
1261
1262        let id = make_id("drop-test");
1263        let metadata = Metadata::default();
1264        let payload = vec![0xABu8; 2 * 1024 * 1024]; // 2 MiB → long-term path
1265
1266        // Drive the put until the LT write commits, then cancel before the HV tombstone is set.
1267        tokio::select! {
1268            result = storage.put_object(&id, &metadata, stream::single(payload)) => {
1269                panic!("expected put to pause before completing, got: {result:?}");
1270            }
1271            _ = paused.notified() => {
1272                // LT blob stored; cancelling drops the guard in Recorded phase.
1273            }
1274        }
1275
1276        // ChangeGuard dropped → background cleanup task spawned; wait for it.
1277        storage.join().await;
1278
1279        // The orphaned LT blob must have been deleted.
1280        assert!(lt_inner.is_empty(), "orphaned LT blob was not cleaned up");
1281
1282        // The log entry must be gone once cleanup completes.
1283        let entries = log.scan().await.unwrap();
1284        assert!(
1285            entries.is_empty(),
1286            "changelog entry not removed after cleanup"
1287        );
1288    }
1289}