Skip to main content

objectstore_service/backend/
tiered.rs

1//! Two-tier storage backend with size-based routing and redirect tombstones.
2//!
3//! [`TieredStorage`] routes objects to a high-volume or long-term backend based
4//! on size and maintains redirect tombstones so that reads never need to probe
5//! both backends. See the [crate-level documentation](crate) for the high-level
6//! motivation, and the [`TieredStorage`] struct docs for routing and tombstone
7//! semantics.
8//!
9//! # Cross-Tier Consistency
10//!
11//! A single logical object may span both backends: a tombstone in HV pointing
12//! to a payload in LT. Mutations keep the two in sync through compare-and-swap
13//! on the high-volume backend (see [`HighVolumeBackend::compare_and_write`]).
14//! Each operation reads the current HV revision, performs its work, then
15//! atomically swaps the HV entry only if the revision is still current —
16//! rolling back on conflict.
17//!
18//! ## Revision Keys
19//!
20//! Every large-object write stores its payload at a **revision key** in the
21//! long-term backend: `{original_key}/{uuid}`. The UUID suffix is random (no
22//! monotonicity is guaranteed), so each write targets a distinct LT path
23//! regardless of whether another write to the same logical key is in progress.
24//! The tombstone in HV then points to this specific revision. Because each
25//! writer owns its own LT blob, the compare-and-swap on the tombstone becomes
26//! an atomic pointer swap: the winner's revision is committed and the loser
27//! can safely delete its own blob without affecting the winner.
28//!
29//! See `new_long_term_revision` for the key construction.
30//!
31//! ## Compare-and-Swap
32//!
33//! All mutating operations follow a common pattern of reading the current
34//! revision, performing the upload, atomically swapping the revision (commit
35//! point), and cleaning up the now-unreferenced LT blob in the background:
36//!
37//! ### Large-Object Write (> 1 MiB)
38//!
39//! 1. **Read HV** to capture the current revision (existing tombstone target,
40//!    or absent).
41//! 2. **Write payload to LT** at a unique revision key.
42//! 3. **Compare-and-swap in HV**: write a tombstone pointing to the new
43//!    revision, only if the current revision still matches step 1.
44//!    - **OK** — schedule background deletion of the old LT blob, if any.
45//!    - **Conflict** — another writer won the race; schedule background deletion
46//!      of our new LT blob.
47//!    - **Error** — reload the tombstone and delete the unreferenced blob or
48//!      blobs.
49//!
50//! ### Small-Object Write (≤ 1 MiB)
51//!
52//! 1. **Write inline to HV**, skipping the write if a tombstone is present.
53//!    - **OK** — done; the object is stored entirely in HV.
54//!    - **Tombstone present** — a large object already occupies this key;
55//!      continue:
56//! 2. **Compare-and-swap in HV**: replace the tombstone with inline data, only
57//!    if the tombstone's revision still matches.
58//!    - **OK** — schedule background deletion of the old LT blob.
59//!    - **Conflict** — another writer won the race; they will clean up the
60//!      LT blob and we have no new LT blob to clean up.
61//!    - **Error** — reload the tombstone and delete the unreferenced blob if
62//!      the write went through.
63//!
64//! ### Delete
65//!
66//! 1. **Delete from HV** if the entry is not a tombstone.
67//!    - **OK** — done; there is no LT data to clean up.
68//!    - **Tombstone present** — a large object is stored here; continue:
69//! 2. **Compare-and-swap in HV**: remove the tombstone, only if its revision
70//!    still matches.
71//!    - **OK** — schedule background deletion of the LT blob.
72//!    - **Conflict** — another writer won the race; they will clean up.
73//!    - **Error** — reload the tombstone and delete the unreferenced blob if
74//!      the write went through.
75//!
76//! Tombstone removal is the commit point for deletes. If the subsequent LT
77//! cleanup fails, an orphan blob remains but the object is already unreachable
78//! through the normal read path.
79//!
80//! ## Last-Writer-Wins
81//!
82//! Concurrent mutations on the same key are inherently a race. Even a write
83//! that returns `Ok` may be immediately overwritten by another caller — there
84//! is no ordering guarantee and objectstore cannot provide a read-your-writes
85//! promise.
86//!
87//! CAS conflicts are therefore **not errors**: the losing writer's data is
88//! cleaned up and `Ok` is returned, because the result is indistinguishable
89//! from having succeeded a moment earlier and then been overwritten.
90//!
91//! ### Idempotency
92//!
93//! `compare_and_write` is idempotent: if the row is already in the target state, it
94//! returns `true` without re-applying the mutation. This is critical for retry
95//! safety. If the server commits a write but the response is lost, a retry sees the
96//! already-mutated state and still returns `true` — so callers do not mistakenly
97//! treat a successful commit as a lost race and clean up data that was actually
98//! persisted.
99
100use std::sync::Arc;
101use std::sync::atomic::{AtomicU64, Ordering};
102use std::time::{Duration, Instant, SystemTime};
103
104use base64::Engine as _;
105use bytes::Bytes;
106use futures_util::{Stream, StreamExt};
107use objectstore_types::metadata::Metadata;
108use serde::{Deserialize, Serialize};
109
110use crate::backend::changelog::{Change, ChangeGuard, ChangeLog, ChangeManager, ChangePhase};
111use crate::backend::common::{
112    Backend, DeleteResponse, GetResponse, HighVolumeBackend, MetadataResponse,
113    MultipartUploadBackend, PutResponse, TieredGet, TieredMetadata, TieredWrite, Tombstone,
114};
115use crate::backend::{HighVolumeStorageConfig, MultipartUploadStorageConfig};
116use crate::error::{Error, Result};
117use crate::id::ObjectId;
118use crate::multipart::{
119    AbortMultipartResponse, CompleteMultipartResponse, CompletedPart, InitiateMultipartResponse,
120    ListPartsResponse, PartNumber, UploadId, UploadPartResponse,
121};
122use crate::stream::{ClientStream, SizedPeek};
123
124/// The threshold up until which we will go to the "high volume" backend.
125const BACKEND_SIZE_THRESHOLD: usize = 1024 * 1024; // 1 MiB
126
127/// Amount of time for which a `Change` generated by a `complete_multipart` operation is kept in the `Assembling`
128/// state before becoming eligible for cleanup by the `ChangeLog` recovery process.
129/// This allows the client to retry the `complete_multipart` operation upon any failures for at least this long,
130/// avoiding scenarios where the `ChangeLog` recovery would race to delete the assembled LT blob.
131const MULTIPART_COMPLETE_CLEANUP_DELAY: Duration = Duration::from_hours(24);
132
133/// Creates a new [`ObjectId`] with the same context but a unique revision key.
134///
135/// The new key has the format `{original_key}/{uuid_v7}`, producing a distinct
136/// storage path for each large-object write. [`ObjectId::from_storage_path`] parses
137/// the result back correctly because the key portion may contain `/`.
138fn new_long_term_revision(id: &ObjectId) -> ObjectId {
139    ObjectId {
140        context: id.context.clone(),
141        key: format!("{}/{}", id.key, uuid::Uuid::now_v7()),
142    }
143}
144
145/// Configuration for [`TieredStorage`].
146///
147/// Composes two backends into a tiered routing setup: `high_volume` for small
148/// objects and `long_term` for large objects. Nesting [`super::StorageConfig::Tiered`]
149/// inside another tiered config is not supported.
150///
151/// # Example
152///
153/// ```yaml
154/// storage:
155///   type: tiered
156///   high_volume:
157///     type: bigtable
158///     project_id: my-project
159///     instance_name: objectstore
160///     table_name: objectstore
161///   long_term:
162///     type: gcs
163///     bucket: my-objectstore-bucket
164/// ```
165#[derive(Debug, Clone, Deserialize, Serialize)]
166pub struct TieredStorageConfig {
167    /// Backend for high-volume, small objects.
168    ///
169    /// Must be a backend that implements [`HighVolumeBackend`] (currently
170    /// only BigTable).
171    pub high_volume: HighVolumeStorageConfig,
172    /// Backend for large, long-term objects.
173    ///
174    /// Must be a backend that implements [`MultipartUploadBackend`].
175    pub long_term: MultipartUploadStorageConfig,
176}
177
178/// Two-tier storage backend that routes objects by size.
179///
180/// `TieredStorage` implements [`Backend`] and is intended to be used inside a
181/// [`StorageService`](crate::StorageService), which wraps it with task spawning and panic
182/// isolation.
183///
184/// # Size-Based Routing
185///
186/// Objects are routed at write time based on their size relative to a **1 MiB threshold**:
187///
188/// - Objects **≤ 1 MiB** go to the `high_volume` backend — optimized for low-latency reads
189///   and writes of small objects (e.g. BigTable).
190/// - Objects **> 1 MiB** go to the `long_term` backend — optimized for cost-efficient
191///   storage of large objects (e.g. GCS).
192///
193/// # Redirect Tombstones
194///
195/// Because the [`ObjectId`] is backend-independent, reads must be able to find an object
196/// without knowing which backend stores it. A naive approach would check the long-term
197/// backend on every read miss in the high-volume backend — but that is slow and expensive.
198///
199/// Instead, when an object is stored in the long-term backend, a **redirect tombstone** is
200/// written in the high-volume backend. It acts as a signpost: "the real data lives in the
201/// other backend at this target." On reads, a single high-volume lookup either returns the
202/// object directly or follows the tombstone to long-term storage, without probing both
203/// backends.
204///
205/// How tombstones are physically stored is determined by the [`HighVolumeBackend`]
206/// implementation — refer to the backend's own documentation for storage format details.
207///
208/// # Consistency
209///
210/// Consistency across the two backends is maintained through compare-and-swap
211/// operations on the high-volume backend (see
212/// [`HighVolumeBackend::compare_and_write`]), not distributed locks. Each
213/// mutating operation reads the current high-volume revision, performs its
214/// work, and then atomically swaps the high-volume entry only if the revision
215/// is still current — rolling back on conflict. Cleanup of unreferenced LT
216/// blobs runs in background tasks so the caller returns as soon as the commit
217/// point is reached. Call [`Backend::join`] during shutdown to wait for
218/// outstanding cleanup.
219///
220/// See the [module-level documentation](self) for per-operation diagrams.
221///
222/// # Usage
223///
224/// `TieredStorage` handles only the routing and consistency logic. Wrap it in a
225/// [`StorageService`](crate::service::StorageService) to add task spawning, panic isolation,
226/// and concurrency limiting.
227#[derive(Debug)]
228pub struct TieredStorage {
229    inner: Arc<ChangeManager>,
230}
231
232impl TieredStorage {
233    /// Creates a new `TieredStorage` with the given backends and change log.
234    pub fn new(
235        high_volume: Box<dyn HighVolumeBackend>,
236        long_term: Box<dyn MultipartUploadBackend>,
237        changelog: Box<dyn ChangeLog>,
238    ) -> Self {
239        let inner = ChangeManager::new(high_volume, long_term, changelog);
240        // Note on cancellation: Our `join` method will wait for all tasks tracked by the spawned
241        // recovery job, so we defer shutdown until recovery is complete or times out.
242        tokio::spawn(inner.clone().recover());
243        Self { inner }
244    }
245
246    /// Records the change to the log and returns a guard that cleans up on drop.
247    async fn record_change(&self, change: Change) -> Result<ChangeGuard> {
248        self.inner.clone().record(change).await
249    }
250
251    /// Records the change to the log in the `Assembling` phase, and returns a guard that does
252    /// nothing on drop unless advanced.
253    async fn record_assembling(&self, change: Change) -> Result<ChangeGuard> {
254        self.inner.clone().record_assembling(change).await
255    }
256
257    /// Returns the name of the backend corresponding to the given routing choice.
258    fn backend_type(&self, choice: &BackendChoice) -> &'static str {
259        match choice {
260            BackendChoice::HighVolume => self.inner.high_volume.name(),
261            BackendChoice::LongTerm => self.inner.long_term.name(),
262        }
263    }
264
265    /// Puts an object into the high-volume backend.
266    ///
267    /// If a tombstone already exists, attempts to swap it for the new object and delete the old
268    /// long-term object.
269    async fn put_high_volume(
270        &self,
271        id: &ObjectId,
272        metadata: &Metadata,
273        payload: Bytes,
274    ) -> Result<()> {
275        let tombstone_opt = self
276            .inner
277            .high_volume
278            .put_non_tombstone(id, metadata, payload.clone())
279            .await?;
280
281        let Some(Tombstone { target, .. }) = tombstone_opt else {
282            // No tombstone exists - write succeeded
283            return Ok(());
284        };
285
286        // Tombstone exists — Swap it for inline data
287        let mut guard = self
288            .record_change(Change {
289                id: id.clone(),
290                new: None,
291                old: Some(target.clone()),
292                cleanup_after: None,
293            })
294            .await?;
295
296        let write = TieredWrite::Object(metadata.clone(), payload);
297        guard.advance(ChangePhase::Written);
298
299        let written = self
300            .inner
301            .high_volume
302            .compare_and_write(id, Some(&target), write)
303            .await?;
304
305        // Update guard and let it schedule cleanup in the background.
306        guard.advance(ChangePhase::compare_and_write(written));
307
308        Ok(())
309    }
310
311    /// Puts an object into the long-term backend with a redirect tombstone in front.
312    ///
313    /// Deletes the previous long-term object if overwriting an existing tombstone. If the tombstone
314    /// write fails, the new long-term object is cleaned up.
315    async fn put_long_term(
316        &self,
317        id: &ObjectId,
318        metadata: &Metadata,
319        stream: ClientStream,
320    ) -> Result<()> {
321        // 1. Read current HV revision to establish the write precondition
322        let current = match self.inner.high_volume.get_tiered_metadata(id).await? {
323            TieredMetadata::Tombstone(t) => Some(t.target),
324            _ => None,
325        };
326
327        // 2. Write payload to long-term at a unique revision key.
328        let new = new_long_term_revision(id);
329        let mut guard = self
330            .record_change(Change {
331                id: id.clone(),
332                new: Some(new.clone()),
333                old: current.clone(),
334                cleanup_after: None,
335            })
336            .await?;
337
338        self.inner
339            .long_term
340            .put_object(&new, metadata, stream)
341            .await?;
342        guard.advance(ChangePhase::Written);
343
344        // 3. CAS commit: write tombstone only if HV state matches what we saw.
345        let tombstone = Tombstone {
346            target: new.clone(),
347            expiration_policy: metadata.expiration_policy,
348        };
349        let written = self
350            .inner
351            .high_volume
352            .compare_and_write(id, current.as_ref(), TieredWrite::Tombstone(tombstone))
353            .await?;
354
355        // Update guard and let it schedule cleanup in the background.
356        guard.advance(ChangePhase::compare_and_write(written));
357
358        Ok(())
359    }
360}
361
362#[async_trait::async_trait]
363impl Backend for TieredStorage {
364    fn name(&self) -> &'static str {
365        "tiered"
366    }
367
368    fn as_multipart_upload_backend(self: Arc<Self>) -> Result<Arc<dyn MultipartUploadBackend>> {
369        Ok(self)
370    }
371
372    async fn put_object(
373        &self,
374        id: &ObjectId,
375        metadata: &Metadata,
376        stream: ClientStream,
377    ) -> Result<PutResponse> {
378        let start = Instant::now();
379        if metadata.origin.is_none() {
380            objectstore_metrics::count!("put.origin_missing", usecase = id.usecase().to_owned());
381        }
382
383        let peeked = SizedPeek::new(stream, BACKEND_SIZE_THRESHOLD).await?;
384        objectstore_metrics::record!(
385            "put.first_chunk.latency" = start.elapsed(),
386            usecase = id.usecase().to_owned(),
387            complete = if peeked.is_exhausted() { "yes" } else { "no" },
388        );
389
390        let (backend_choice, stored_size) = if peeked.is_exhausted() {
391            let payload = peeked.into_bytes().await?;
392            let payload_len = payload.len() as u64;
393            self.put_high_volume(id, metadata, payload).await?;
394            (BackendChoice::HighVolume, payload_len)
395        } else {
396            let (stored_size, stream) = counting_stream(peeked.into_stream());
397            self.put_long_term(id, metadata, stream.boxed()).await?;
398            (BackendChoice::LongTerm, stored_size.load(Ordering::Acquire))
399        };
400
401        let backend_ty = self.backend_type(&backend_choice);
402        objectstore_metrics::record!(
403            "put.latency" = start.elapsed(),
404            usecase = id.usecase().to_owned(),
405            backend_choice = backend_choice.as_str(),
406            backend_type = backend_ty,
407        );
408        objectstore_metrics::record!(
409            "put.size" = stored_size,
410            usecase = id.usecase().to_owned(),
411            backend_choice = backend_choice.as_str(),
412            backend_type = backend_ty,
413        );
414
415        Ok(())
416    }
417
418    async fn get_object(&self, id: &ObjectId) -> Result<GetResponse> {
419        let start = Instant::now();
420
421        let hv_result = self.inner.high_volume.get_tiered_object(id).await?;
422        let (result, backend_choice) = match hv_result {
423            TieredGet::NotFound => (None, BackendChoice::HighVolume),
424            TieredGet::Object(metadata, stream) => {
425                (Some((metadata, stream)), BackendChoice::HighVolume)
426            }
427            TieredGet::Tombstone(tombstone) => (
428                self.inner.long_term.get_object(&tombstone.target).await?,
429                BackendChoice::LongTerm,
430            ),
431        };
432
433        let backend_type = self.backend_type(&backend_choice);
434        objectstore_metrics::record!(
435            "get.latency.pre-response" = start.elapsed(),
436            usecase = id.usecase().to_owned(),
437            backend_choice = backend_choice.as_str(),
438            backend_type = backend_type,
439        );
440
441        if let Some((ref metadata, _)) = result {
442            if let Some(size) = metadata.size {
443                objectstore_metrics::record!(
444                    "get.size" = size,
445                    usecase = id.usecase().to_owned(),
446                    backend_choice = backend_choice.as_str(),
447                    backend_type = backend_type,
448                );
449            } else {
450                objectstore_log::warn!(backend_type, "Missing object size");
451            }
452        }
453
454        Ok(result)
455    }
456
457    async fn get_metadata(&self, id: &ObjectId) -> Result<MetadataResponse> {
458        let start = Instant::now();
459
460        let hv_result = self.inner.high_volume.get_tiered_metadata(id).await?;
461        let (result, backend_choice) = match hv_result {
462            TieredMetadata::NotFound => (None, BackendChoice::HighVolume),
463            TieredMetadata::Object(metadata) => (Some(metadata), BackendChoice::HighVolume),
464            TieredMetadata::Tombstone(tombstone) => (
465                self.inner.long_term.get_metadata(&tombstone.target).await?,
466                BackendChoice::LongTerm,
467            ),
468        };
469
470        objectstore_metrics::record!(
471            "head.latency" = start.elapsed(),
472            usecase = id.usecase().to_owned(),
473            backend_choice = backend_choice.as_str(),
474            backend_type = self.backend_type(&backend_choice),
475        );
476
477        Ok(result)
478    }
479
480    async fn delete_object(&self, id: &ObjectId) -> Result<DeleteResponse> {
481        let start = Instant::now();
482
483        let mut backend_choice = BackendChoice::HighVolume;
484
485        if let Some(tombstone) = self.inner.high_volume.delete_non_tombstone(id).await? {
486            backend_choice = BackendChoice::LongTerm;
487
488            let mut guard = self
489                .record_change(Change {
490                    id: id.clone(),
491                    new: None,
492                    old: Some(tombstone.target.clone()),
493                    cleanup_after: None,
494                })
495                .await?;
496            guard.advance(ChangePhase::Written);
497
498            // Remove the tombstone; the LT blob becomes unreachable at this point.
499            let deleted = self
500                .inner
501                .high_volume
502                .compare_and_write(id, Some(&tombstone.target), TieredWrite::Delete)
503                .await?;
504
505            // Update guard and let it schedule cleanup in the background.
506            guard.advance(ChangePhase::compare_and_write(deleted));
507        }
508
509        objectstore_metrics::record!(
510            "delete.latency" = start.elapsed(),
511            usecase = id.usecase().to_owned(),
512            backend_choice = backend_choice.as_str(),
513            backend_type = self.backend_type(&backend_choice),
514        );
515
516        Ok(())
517    }
518
519    async fn join(&self) {
520        self.inner.tracker.close();
521        tokio::join!(
522            self.inner.high_volume.join(),
523            self.inner.long_term.join(),
524            self.inner.tracker.wait()
525        );
526    }
527}
528
529#[derive(Debug)]
530enum BackendChoice {
531    HighVolume,
532    LongTerm,
533}
534
535impl BackendChoice {
536    fn as_str(&self) -> &'static str {
537        match self {
538            BackendChoice::HighVolume => "high-volume",
539            BackendChoice::LongTerm => "long-term",
540        }
541    }
542}
543
544impl std::fmt::Display for BackendChoice {
545    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
546        f.write_str(self.as_str())
547    }
548}
549
550/// Wraps a stream to count the total bytes yielded by successful chunks.
551///
552/// Returns the shared counter and the wrapped stream. The counter is incremented
553/// as the stream is consumed, so read it only after the stream is exhausted.
554fn counting_stream<S, E>(stream: S) -> (Arc<AtomicU64>, impl Stream<Item = Result<Bytes, E>>)
555where
556    S: Stream<Item = Result<Bytes, E>>,
557{
558    let counter = Arc::new(AtomicU64::new(0));
559
560    (
561        counter.clone(),
562        stream.inspect(move |res| {
563            if let Ok(chunk) = res {
564                counter.fetch_add(chunk.len() as u64, Ordering::Relaxed);
565            }
566        }),
567    )
568}
569
570/// The multipart upload state for TieredStorage.
571#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
572struct TieredUploadId {
573    revision: String,
574    upload_id: UploadId,
575}
576
577impl TryInto<UploadId> for TieredUploadId {
578    type Error = Error;
579
580    fn try_into(self) -> Result<UploadId, Self::Error> {
581        let json =
582            serde_json::to_vec(&self).map_err(|e| Error::serde("encoding multipart token", e))?;
583        Ok(UploadId::new(
584            base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(json),
585        )?)
586    }
587}
588
589impl TryFrom<&UploadId> for TieredUploadId {
590    type Error = Error;
591
592    fn try_from(value: &UploadId) -> Result<Self, Self::Error> {
593        let json = base64::engine::general_purpose::URL_SAFE_NO_PAD
594            .decode(value.as_bytes())
595            .map_err(|e| Error::generic(format!("invalid multipart upload ID: {e}")))?;
596        serde_json::from_slice(&json).map_err(|e| Error::serde("decoding multipart token", e))
597    }
598}
599
600#[async_trait::async_trait]
601impl MultipartUploadBackend for TieredStorage {
602    async fn initiate_multipart(
603        &self,
604        id: &ObjectId,
605        metadata: &Metadata,
606    ) -> Result<InitiateMultipartResponse> {
607        let physical = new_long_term_revision(id);
608
609        let id = self
610            .inner
611            .long_term
612            .initiate_multipart(&physical, metadata)
613            .await?;
614
615        let id = TieredUploadId {
616            revision: physical.key,
617            upload_id: id,
618        };
619        id.try_into()
620    }
621
622    async fn upload_part(
623        &self,
624        id: &ObjectId,
625        upload_id: &UploadId,
626        part_number: PartNumber,
627        content_length: u64,
628        content_md5: Option<&str>,
629        body: ClientStream,
630    ) -> Result<UploadPartResponse> {
631        let tiered: TieredUploadId = upload_id.try_into()?;
632
633        let physical = ObjectId {
634            context: id.context.clone(),
635            key: tiered.revision,
636        };
637
638        self.inner
639            .long_term
640            .upload_part(
641                &physical,
642                &tiered.upload_id,
643                part_number,
644                content_length,
645                content_md5,
646                body,
647            )
648            .await
649    }
650
651    async fn list_parts(
652        &self,
653        id: &ObjectId,
654        upload_id: &UploadId,
655        max_parts: Option<u32>,
656        part_number_marker: Option<PartNumber>,
657    ) -> Result<ListPartsResponse> {
658        let tiered: TieredUploadId = upload_id.try_into()?;
659
660        let physical = ObjectId {
661            context: id.context.clone(),
662            key: tiered.revision,
663        };
664
665        self.inner
666            .long_term
667            .list_parts(&physical, &tiered.upload_id, max_parts, part_number_marker)
668            .await
669    }
670
671    async fn abort_multipart(
672        &self,
673        id: &ObjectId,
674        upload_id: &UploadId,
675    ) -> Result<AbortMultipartResponse> {
676        let tiered: TieredUploadId = upload_id.try_into()?;
677
678        let physical = ObjectId {
679            context: id.context.clone(),
680            key: tiered.revision,
681        };
682
683        self.inner
684            .long_term
685            .abort_multipart(&physical, &tiered.upload_id)
686            .await
687    }
688
689    async fn complete_multipart(
690        &self,
691        id: &ObjectId,
692        upload_id: &UploadId,
693        parts: Vec<CompletedPart>,
694    ) -> Result<CompleteMultipartResponse> {
695        let tiered: TieredUploadId = upload_id.try_into()?;
696
697        let physical = ObjectId {
698            context: id.context.clone(),
699            key: tiered.revision,
700        };
701
702        // 1. Read current HV revision to establish the write precondition.
703        let current = match self.inner.high_volume.get_tiered_metadata(id).await? {
704            // Optimization: a previous attempt already finalized this revision and tombstone -- report success.
705            TieredMetadata::Tombstone(t) if t.target == physical => return Ok(None),
706            TieredMetadata::Tombstone(t) => Some(t.target),
707            _ => None,
708        };
709
710        // Register a guard with cleanup deferred to now + `MULTIPART_COMPLETE_CLEANUP_DELAY`,
711        // so that the user has the chance to retry finalizing the upload in this timeframe.
712        let mut guard = self
713            .record_assembling(Change {
714                id: id.clone(),
715                new: Some(physical.clone()),
716                old: current.clone(),
717                cleanup_after: Some(SystemTime::now() + MULTIPART_COMPLETE_CLEANUP_DELAY),
718            })
719            .await?;
720
721        // 2. Complete the upload, creating the object at the given revision key.
722        let maybe_complete_multipart_err = match self
723            .inner
724            .long_term
725            .complete_multipart(&physical, &tiered.upload_id, parts)
726            .await
727        {
728            // The request went through but we got an error in the response body.
729            // Transparently proxy the error to the user.
730            Ok(error) => {
731                if error.is_some() {
732                    return Ok(error);
733                }
734                None
735            }
736            // We got status 4xx/5xx, or a network error.
737            // Either way, `complete_multipart` might have been completed successfully,
738            // either now or in a previous attempt (in that case, that's a 404 and we indeed end up
739            // here).
740            // We cannot know if that's the case yet, so we continue to the next steps.
741            Err(err) => Some(err),
742        };
743
744        // 3. Retrieve the metadata of the object, which was determined at initiation time, to
745        //    get the expiration policy.
746        //
747        //    This also serves as an existence check to understand if the LT revision was actually
748        //    created successfully in this or a previous attempt, in which case we just need to
749        //    finalize the tombstone.
750        let metadata = self.inner.long_term.get_metadata(&physical).await;
751
752        let metadata = match (metadata, maybe_complete_multipart_err) {
753            // The LT revision already exists, so we can continue to finalize the tombstone.
754            (Ok(Some(metadata)), _) => metadata,
755            // The LT revision doesn't exist, cannot proceed.
756            (Ok(None), Some(err)) => return Err(err),
757            // The `complete_multipart` succeeded, creating the object, but the `get_metadata`
758            // immediately after failed to find the object. This should never happen.
759            (Ok(None), None) => {
760                objectstore_log::error!(
761                    id = ?id,
762                    upload_id = ?upload_id,
763                    physical = ?physical,
764                    "complete_multipart call succeeded on long_term backend, but subsequent get_metadata found no object"
765                );
766                return Err(Error::generic(
767                    "completed multipart object not found in long-term storage",
768                ));
769            }
770            // Failed to `get_metadata`, cannot proceed.
771            (Err(get_metadata_err), maybe_complete_multipart_err) => {
772                // Prefer the `complete_multipart_err`, as it's likely more informative.
773                // TODO(FS-358): convert this properly. Right now `ApiErrorResponse` will turn this into a 500,
774                // but we would actually want to transparently surface the original status (and message?) instead.
775                return Err(maybe_complete_multipart_err.unwrap_or(get_metadata_err));
776            }
777        };
778
779        // 4. CAS commit: write tombstone only if HV state matches what we saw.
780        let tombstone = Tombstone {
781            target: physical.clone(),
782            expiration_policy: metadata.expiration_policy,
783        };
784        let written = self
785            .inner
786            .high_volume
787            .compare_and_write(id, current.as_ref(), TieredWrite::Tombstone(tombstone))
788            .await?;
789
790        // Update guard and let it schedule cleanup in the background.
791        guard.advance(ChangePhase::compare_and_write(written));
792
793        Ok(None)
794    }
795}
796
797#[cfg(test)]
798mod tests {
799    use std::num::NonZeroU32;
800
801    use futures::lock::Mutex;
802    use objectstore_types::metadata::{ExpirationPolicy, Metadata};
803    use objectstore_types::scope::{Scope, Scopes};
804
805    use super::*;
806    use crate::backend::changelog::{InMemoryChangeLog, NoopChangeLog};
807    use crate::backend::in_memory::InMemoryBackend;
808    use crate::backend::testing::{Hooks, TestBackend};
809    use crate::error::Error;
810    use crate::id::ObjectContext;
811
812    use crate::stream::{self, ClientStream};
813
814    fn make_context() -> ObjectContext {
815        ObjectContext {
816            usecase: "testing".into(),
817            scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
818        }
819    }
820
821    fn make_id(key: &str) -> ObjectId {
822        ObjectId::new(make_context(), key.into())
823    }
824
825    fn make_tiered_storage() -> (
826        TieredStorage,
827        InMemoryBackend,
828        InMemoryBackend,
829        InMemoryChangeLog,
830    ) {
831        let hv = InMemoryBackend::new("in-memory-hv");
832        let lt = InMemoryBackend::new("in-memory-lt");
833        let changelog = InMemoryChangeLog::default();
834        let storage = TieredStorage::new(
835            Box::new(hv.clone()),
836            Box::new(lt.clone()),
837            Box::new(changelog.clone()),
838        );
839        (storage, hv, lt, changelog)
840    }
841
842    // --- new_long_term_revision tests ---
843
844    #[test]
845    fn revision_id_preserves_context() {
846        let id = make_id("my-key");
847        let revised = new_long_term_revision(&id);
848        assert_eq!(revised.context, id.context);
849        assert!(
850            revised.key.starts_with("my-key/"),
851            "revised key should have /<uuid> suffix, got: {}",
852            revised.key
853        );
854    }
855
856    #[test]
857    fn revision_id_roundtrips_storage_path() {
858        let id = make_id("original");
859        let revised = new_long_term_revision(&id);
860        let path = revised.as_storage_path().to_string();
861        let parsed = ObjectId::from_storage_path(&path)
862            .unwrap_or_else(|| panic!("failed to parse '{path}'"));
863        assert_eq!(parsed, revised);
864    }
865
866    #[test]
867    fn revision_id_is_unique() {
868        let id = make_id("base-key");
869        let a = new_long_term_revision(&id);
870        let b = new_long_term_revision(&id);
871        assert_ne!(a.key, b.key, "two calls should produce different keys");
872    }
873
874    // --- Basic behavior ---
875
876    #[tokio::test]
877    async fn get_nonexistent_returns_none() {
878        let (storage, _hv, _lt, _) = make_tiered_storage();
879        let id = make_id("does-not-exist");
880
881        assert!(storage.get_object(&id).await.unwrap().is_none());
882        assert!(storage.get_metadata(&id).await.unwrap().is_none());
883    }
884
885    #[tokio::test]
886    async fn delete_nonexistent_succeeds() {
887        let (storage, _hv, _lt, _) = make_tiered_storage();
888        let id = make_id("does-not-exist");
889
890        storage.delete_object(&id).await.unwrap();
891    }
892
893    // --- Put routing ---
894
895    #[tokio::test]
896    async fn put_small_object_stores_inline() {
897        let (storage, hv, lt, _) = make_tiered_storage();
898        let id = make_id("small");
899        let payload = b"small payload".to_vec();
900
901        storage
902            .put_object(&id, &Default::default(), stream::single(payload.clone()))
903            .await
904            .unwrap();
905
906        assert!(hv.contains(&id), "expected in high-volume");
907        assert!(!lt.contains(&id), "leaked to long-term");
908
909        let (_, s) = storage.get_object(&id).await.unwrap().unwrap();
910        let body = stream::read_to_vec(s).await.unwrap();
911        assert_eq!(body, payload);
912
913        assert!(
914            storage.get_metadata(&id).await.unwrap().is_some(),
915            "get_metadata should return metadata for inline objects"
916        );
917    }
918
919    #[tokio::test]
920    async fn put_large_object_creates_tombstone() {
921        let (storage, hv, lt, _) = make_tiered_storage();
922        let id = make_id("large");
923        let payload = vec![0xCDu8; 2 * 1024 * 1024]; // 2 MiB, over threshold
924        let metadata_in = Metadata {
925            content_type: "image/png".into(),
926            expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_secs(3600)),
927            origin: Some("10.0.0.1".into()),
928            ..Default::default()
929        };
930
931        storage
932            .put_object(&id, &metadata_in, stream::single(payload.clone()))
933            .await
934            .unwrap();
935
936        // Tombstone in HV: correct expiration_policy, target is a revision key.
937        let tombstone = hv.get(&id).expect_tombstone();
938        assert_eq!(tombstone.expiration_policy, metadata_in.expiration_policy);
939        let lt_id = tombstone.target;
940        assert!(
941            lt_id.key().starts_with(id.key()),
942            "tombstone target key should be a revision of the HV key, got: {}",
943            lt_id.key()
944        );
945
946        // LT object at revision key with correct metadata.
947        let (lt_meta, _) = lt.get(&lt_id).expect_object();
948        assert_eq!(lt_meta.content_type, "image/png");
949        assert_eq!(lt_meta.expiration_policy, metadata_in.expiration_policy);
950
951        // get_object follows the tombstone and returns the correct payload.
952        let (_, s) = storage.get_object(&id).await.unwrap().unwrap();
953        let body = stream::read_to_vec(s).await.unwrap();
954        assert_eq!(body, payload);
955
956        // get_metadata follows the tombstone and returns the correct content_type.
957        let metadata = storage.get_metadata(&id).await.unwrap().unwrap();
958        assert_eq!(metadata.content_type, "image/png");
959    }
960
961    // --- Put overwrites ---
962
963    #[tokio::test]
964    async fn reinsert_small_over_large_swaps_to_inline() {
965        let (storage, hv, lt, _) = make_tiered_storage();
966        let id = make_id("reinsert-key");
967
968        // First: insert a large object → creates tombstone in hv, payload in lt at lt_id
969        let large_payload = vec![0xABu8; 2 * 1024 * 1024];
970        storage
971            .put_object(&id, &Default::default(), stream::single(large_payload))
972            .await
973            .unwrap();
974
975        let lt_id = hv.get(&id).expect_tombstone().target;
976
977        // Re-insert a SMALL payload with the same key.
978        // The CAS-swap puts the small object inline in HV and schedules background cleanup.
979        let small_payload = vec![0xCDu8; 100]; // well under 1 MiB threshold
980        storage
981            .put_object(&id, &Default::default(), stream::single(small_payload))
982            .await
983            .unwrap();
984
985        // The small object is now inline in high-volume.
986        hv.get(&id).expect_object();
987
988        // Drain background cleanup tasks before asserting LT state.
989        storage.join().await;
990
991        // The old long-term blob was cleaned up.
992        lt.get(&lt_id).expect_not_found();
993    }
994
995    #[tokio::test]
996    async fn overwrite_large_with_large_replaces_revision() {
997        let (storage, hv, lt, _) = make_tiered_storage();
998        let id = make_id("overwrite-large");
999
1000        let payload1 = vec![0xAAu8; 2 * 1024 * 1024];
1001        storage
1002            .put_object(&id, &Default::default(), stream::single(payload1))
1003            .await
1004            .unwrap();
1005        let lt_id_1 = hv.get(&id).expect_tombstone().target;
1006
1007        let payload2 = vec![0xBBu8; 2 * 1024 * 1024];
1008        storage
1009            .put_object(&id, &Default::default(), stream::single(payload2.clone()))
1010            .await
1011            .unwrap();
1012        let lt_id_2 = hv.get(&id).expect_tombstone().target;
1013
1014        assert_ne!(
1015            lt_id_1, lt_id_2,
1016            "second write should create a new revision"
1017        );
1018
1019        // Drain background cleanup tasks before asserting LT state.
1020        storage.join().await;
1021
1022        lt.get(&lt_id_1).expect_not_found();
1023        lt.get(&lt_id_2).expect_object();
1024
1025        let (_, s) = storage.get_object(&id).await.unwrap().unwrap();
1026        let body = stream::read_to_vec(s).await.unwrap();
1027        assert_eq!(body, payload2);
1028    }
1029
1030    // --- Delete ---
1031
1032    #[tokio::test]
1033    async fn delete_small_object() {
1034        let (storage, hv, _lt, _) = make_tiered_storage();
1035        let id = make_id("delete-small");
1036
1037        storage
1038            .put_object(&id, &Default::default(), stream::single("tiny"))
1039            .await
1040            .unwrap();
1041
1042        storage.delete_object(&id).await.unwrap();
1043
1044        hv.get(&id).expect_not_found();
1045        assert!(storage.get_object(&id).await.unwrap().is_none());
1046    }
1047
1048    #[tokio::test]
1049    async fn delete_large_object_cleans_up_both_backends() {
1050        let (storage, hv, lt, _) = make_tiered_storage();
1051        let id = make_id("delete-both");
1052        let payload = vec![0u8; 2 * 1024 * 1024]; // 2 MiB
1053
1054        storage
1055            .put_object(&id, &Default::default(), stream::single(payload))
1056            .await
1057            .unwrap();
1058
1059        // Capture lt_id before deleting (it lives at the revision key, not at id).
1060        let lt_id = hv.get(&id).expect_tombstone().target;
1061
1062        storage.delete_object(&id).await.unwrap();
1063
1064        // Drain background cleanup tasks before asserting LT state.
1065        storage.join().await;
1066
1067        assert!(!hv.contains(&id), "tombstone not cleaned up");
1068        assert!(!lt.contains(&lt_id), "long-term object not cleaned up");
1069    }
1070
1071    #[derive(Debug)]
1072    struct FailDelete;
1073
1074    #[async_trait::async_trait]
1075    impl Hooks for FailDelete {
1076        async fn delete_object(
1077            &self,
1078            _inner: &InMemoryBackend,
1079            _id: &ObjectId,
1080        ) -> Result<DeleteResponse> {
1081            Err(Error::Io(std::io::Error::new(
1082                std::io::ErrorKind::ConnectionRefused,
1083                "simulated long-term delete failure",
1084            )))
1085        }
1086    }
1087
1088    /// When the long-term GCS cleanup fails after the tombstone is deleted, the
1089    /// delete still succeeds (GCS cleanup is best-effort). An orphan blob may
1090    /// remain in LT storage, which is accepted.
1091    #[tokio::test]
1092    async fn delete_succeeds_when_gcs_cleanup_fails() {
1093        let hv = InMemoryBackend::new("hv");
1094        let lt = TestBackend::new(FailDelete);
1095        let log = NoopChangeLog;
1096        let storage = TieredStorage::new(Box::new(hv.clone()), Box::new(lt), Box::new(log));
1097
1098        let id = make_id("fail-delete");
1099        let payload = vec![0xABu8; 2 * 1024 * 1024]; // 2 MiB -> goes to long-term
1100        storage
1101            .put_object(&id, &Default::default(), stream::single(payload))
1102            .await
1103            .unwrap();
1104
1105        // Delete succeeds even though GCS cleanup fails (it is best-effort).
1106        let result = storage.delete_object(&id).await;
1107        assert!(
1108            result.is_ok(),
1109            "delete should succeed despite GCS cleanup failure"
1110        );
1111
1112        // The tombstone in HV is gone (CAS-deleted first, before GCS cleanup).
1113        hv.get(&id).expect_not_found();
1114
1115        // The orphaned GCS blob remains but the object is unreachable through the service.
1116        assert!(
1117            storage.get_object(&id).await.unwrap().is_none(),
1118            "object should be unreachable after tombstone is deleted"
1119        );
1120    }
1121
1122    // --- CAS conflicts ---
1123
1124    #[derive(Debug)]
1125    struct CasConflict;
1126
1127    #[async_trait::async_trait]
1128    impl Hooks for CasConflict {
1129        async fn compare_and_write(
1130            &self,
1131            _inner: &InMemoryBackend,
1132            _id: &ObjectId,
1133            _current: Option<&ObjectId>,
1134            _write: TieredWrite,
1135        ) -> Result<bool> {
1136            Ok(false) // always conflict
1137        }
1138    }
1139
1140    /// After a large-object write loses the CAS race, the new LT blob must be
1141    /// cleaned up. The put still returns `Ok(())` — from the caller's view, a
1142    /// concurrent write won.
1143    #[tokio::test]
1144    async fn put_large_cas_conflict_cleans_up_new_blob() {
1145        let hv = TestBackend::new(CasConflict);
1146        let lt = InMemoryBackend::new("lt");
1147        let log = NoopChangeLog;
1148        let storage = TieredStorage::new(Box::new(hv), Box::new(lt.clone()), Box::new(log));
1149
1150        let id = make_id("cas-conflict-large");
1151        let payload = vec![0xABu8; 2 * 1024 * 1024]; // 2 MiB -> long-term path
1152
1153        storage
1154            .put_object(&id, &Default::default(), stream::single(payload))
1155            .await
1156            .unwrap();
1157
1158        // Drain background cleanup tasks before asserting LT state.
1159        storage.join().await;
1160
1161        assert!(
1162            lt.is_empty(),
1163            "LT blob should be cleaned up after CAS conflict"
1164        );
1165    }
1166
1167    /// When swapping a tombstone for inline data, a CAS conflict means another
1168    /// writer won. The put still returns `Ok(())` — no LT blob was written, so
1169    /// there is nothing to clean up.
1170    #[tokio::test]
1171    async fn put_small_over_tombstone_cas_conflict_succeeds() {
1172        let inner = InMemoryBackend::new("hv");
1173        let id = make_id("cas-conflict-small");
1174
1175        // Pre-seed a tombstone directly in the inner backend so put_non_tombstone
1176        // returns it instead of writing inline.
1177        let tombstone = Tombstone {
1178            target: make_id("lt-object"),
1179            expiration_policy: ExpirationPolicy::Manual,
1180        };
1181        inner
1182            .compare_and_write(&id, None, TieredWrite::Tombstone(tombstone))
1183            .await
1184            .unwrap();
1185
1186        let lt = InMemoryBackend::new("lt");
1187        let hv = TestBackend::with_inner(inner, CasConflict);
1188        let log = NoopChangeLog;
1189        let storage = TieredStorage::new(Box::new(hv), Box::new(lt), Box::new(log));
1190
1191        // Writing a small object over a tombstone should succeed even when CAS
1192        // conflicts — the other writer's write is accepted.
1193        storage
1194            .put_object(&id, &Default::default(), stream::single("tiny"))
1195            .await
1196            .unwrap();
1197    }
1198
1199    // --- Failure / inconsistency ---
1200
1201    /// Simulates compare_and_write failure. If `true`, it fails after commit.
1202    #[derive(Debug)]
1203    struct FailCas(bool);
1204
1205    #[async_trait::async_trait]
1206    impl Hooks for FailCas {
1207        async fn compare_and_write(
1208            &self,
1209            inner: &InMemoryBackend,
1210            id: &ObjectId,
1211            current: Option<&ObjectId>,
1212            write: TieredWrite,
1213        ) -> Result<bool> {
1214            if self.0 {
1215                // simulate a network error _after_ commit went through
1216                inner.compare_and_write(id, current, write).await?;
1217            }
1218            Err(Error::Io(std::io::Error::new(
1219                std::io::ErrorKind::TimedOut,
1220                "simulated compare_and_write failure",
1221            )))
1222        }
1223    }
1224
1225    /// If the tombstone write to the high-volume backend fails after the long-term
1226    /// write succeeds, the long-term object must be cleaned up so we never leave
1227    /// an unreachable orphan in long-term storage.
1228    #[tokio::test]
1229    async fn no_orphan_when_tombstone_write_fails() {
1230        let lt = InMemoryBackend::new("lt");
1231        let hv = TestBackend::new(FailCas(false));
1232        let log = NoopChangeLog;
1233        let storage = TieredStorage::new(Box::new(hv), Box::new(lt.clone()), Box::new(log));
1234
1235        let id = make_id("orphan-test");
1236        let payload = vec![0xABu8; 2 * 1024 * 1024]; // 2 MiB -> long-term path
1237        let result = storage
1238            .put_object(&id, &Default::default(), stream::single(payload))
1239            .await;
1240
1241        assert!(result.is_err());
1242
1243        // Drain background cleanup tasks before asserting LT state.
1244        storage.join().await;
1245
1246        assert!(lt.is_empty(), "long-term object not cleaned up");
1247    }
1248
1249    /// If a tombstone exists in high-volume but the corresponding object is
1250    /// missing from long-term storage (e.g. due to a race condition or partial
1251    /// cleanup), reads should gracefully return None rather than error.
1252    #[tokio::test]
1253    async fn orphan_tombstone_returns_none() {
1254        let (storage, hv, lt, _) = make_tiered_storage();
1255        let id = make_id("orphan-tombstone");
1256        let payload = vec![0xCDu8; 2 * 1024 * 1024]; // 2 MiB
1257
1258        storage
1259            .put_object(&id, &Default::default(), stream::single(payload))
1260            .await
1261            .unwrap();
1262
1263        // The object is at the revision key in LT, not at id.
1264        let lt_id = hv.get(&id).expect_tombstone().target;
1265
1266        // Remove the long-term object, leaving an orphan tombstone in hv
1267        lt.remove(&lt_id);
1268
1269        assert!(
1270            storage.get_object(&id).await.unwrap().is_none(),
1271            "orphan tombstone should resolve to None on get_object"
1272        );
1273        assert!(
1274            storage.get_metadata(&id).await.unwrap().is_none(),
1275            "orphan tombstone should resolve to None on get_metadata"
1276        );
1277    }
1278
1279    // --- Redirect target ---
1280
1281    /// A tombstone carrying an explicit `target` is followed correctly on reads and deletes,
1282    /// including when the target ObjectId differs from the HV ObjectId.
1283    #[tokio::test]
1284    async fn tombstone_target_is_used_for_reads_and_deletes() {
1285        let hv = InMemoryBackend::new("hv");
1286        let lt = InMemoryBackend::new("lt");
1287        let log = NoopChangeLog;
1288        let storage = TieredStorage::new(Box::new(hv.clone()), Box::new(lt.clone()), Box::new(log));
1289
1290        let hv_id = make_id("hv-key");
1291        let lt_id = make_id("lt-key");
1292        let payload = vec![0xABu8; 100];
1293
1294        // Write the object under the LT id and a tombstone pointing to it from HV.
1295        lt.put_object(&lt_id, &Default::default(), stream::single(payload.clone()))
1296            .await
1297            .unwrap();
1298        let tombstone = Tombstone {
1299            target: lt_id.clone(),
1300            expiration_policy: ExpirationPolicy::Manual,
1301        };
1302        hv.compare_and_write(&hv_id, None, TieredWrite::Tombstone(tombstone))
1303            .await
1304            .unwrap();
1305
1306        // get_object must follow the tombstone and find the object via the lt_id target.
1307        let (_, s) = storage.get_object(&hv_id).await.unwrap().unwrap();
1308        let body = stream::read_to_vec(s).await.unwrap();
1309        assert_eq!(body, payload);
1310
1311        // delete_object must clean up both backends using the target.
1312        storage.delete_object(&hv_id).await.unwrap();
1313        storage.join().await;
1314        assert!(!hv.contains(&hv_id), "tombstone should be removed");
1315        assert!(!lt.contains(&lt_id), "lt object should be removed");
1316    }
1317
1318    // --- Multi-chunk ---
1319
1320    #[tokio::test]
1321    async fn multi_chunk_large_object_chains_buffered_and_remaining() {
1322        let (storage, hv, lt, _) = make_tiered_storage();
1323        let id = make_id("multi-chunk");
1324
1325        // Deliver a 2 MiB payload across multiple chunks that individually
1326        // fit under the threshold but collectively exceed it.
1327        let chunk_size = 512 * 1024; // 512 KiB per chunk
1328        let chunk_count = 4; // 4 × 512 KiB = 2 MiB total
1329        let stream: ClientStream = futures_util::stream::iter(
1330            (0..chunk_count).map(move |i| Ok(bytes::Bytes::from(vec![i as u8; chunk_size]))),
1331        )
1332        .boxed();
1333
1334        storage
1335            .put_object(&id, &Default::default(), stream)
1336            .await
1337            .unwrap();
1338
1339        // Should have been routed to long-term (over 1 MiB) at the revision key.
1340        let lt_id = hv.get(&id).expect_tombstone().target;
1341        let (_, lt_bytes) = lt.get(&lt_id).expect_object();
1342        assert_eq!(lt_bytes.len(), chunk_size * chunk_count);
1343
1344        // Verify data integrity — each chunk's fill byte should appear in order.
1345        for i in 0..chunk_count {
1346            let offset = i * chunk_size;
1347            assert!(
1348                lt_bytes[offset..offset + chunk_size]
1349                    .iter()
1350                    .all(|&b| b == i as u8),
1351                "data mismatch in chunk {i}"
1352            );
1353        }
1354    }
1355
1356    // --- Written-phase cleanup ---
1357
1358    /// When a large-object overwrite commits in HV but its response is lost, the guard drops in
1359    /// `Written` phase. Cleanup must read HV to determine the CAS outcome, then delete whichever
1360    /// LT blob is no longer referenced — here the old one, since the new tombstone committed.
1361    #[tokio::test]
1362    async fn written_cleanup_after_lost_cas_response() {
1363        let (storage, hv, lt, log) = make_tiered_storage();
1364        let id = make_id("obj");
1365
1366        // First put: establishes tombstone
1367        let payload = vec![0xAAu8; 2 * 1024 * 1024];
1368        storage
1369            .put_object(&id, &Default::default(), stream::single(payload.clone()))
1370            .await
1371            .unwrap();
1372        let tombstone1 = hv.get(&id).expect_tombstone().target;
1373
1374        // Second put: Updates tombstone but fails immediately after committing
1375        let broken_storage = TieredStorage::new(
1376            Box::new(TestBackend::with_inner(hv.clone(), FailCas(true))),
1377            Box::new(lt.clone()),
1378            Box::new(log.clone()),
1379        );
1380        broken_storage
1381            .put_object(&id, &Default::default(), stream::single(payload.clone()))
1382            .await
1383            .unwrap_err(); // must fail
1384        let tombstone2 = hv.get(&id).expect_tombstone().target;
1385        assert_ne!(tombstone1, tombstone2);
1386
1387        // The first tombstone's target should be cleaned up, but the second should remain.
1388        broken_storage.join().await;
1389        lt.get(&tombstone1).expect_not_found();
1390        lt.get(&tombstone2).expect_object();
1391
1392        // Now delete the new object with the same tombstone failure
1393        broken_storage.delete_object(&id).await.unwrap_err();
1394        hv.get(&id).expect_not_found();
1395        broken_storage.join().await;
1396        lt.get(&tombstone2).expect_not_found();
1397
1398        // Create a fresh large object
1399        let id = make_id("obj2");
1400        storage
1401            .put_object(&id, &Default::default(), stream::single(payload.clone()))
1402            .await
1403            .unwrap();
1404        let tombstone3 = hv.get(&id).expect_tombstone().target;
1405
1406        // Overwrite it with a small object and check again for cleanup
1407        broken_storage
1408            .put_object(&id, &Default::default(), stream::single(&b"small"[..]))
1409            .await
1410            .unwrap_err(); // must fail
1411        hv.get(&id).expect_object();
1412        broken_storage.join().await;
1413        lt.get(&tombstone3).expect_not_found();
1414    }
1415
1416    // --- ChangeGuard drop safety tests ---
1417
1418    /// Dropping a guard outside any tokio runtime must not panic.
1419    #[test]
1420    fn guard_dropped_outside_runtime_does_not_panic() {
1421        let manager = ChangeManager::new(
1422            Box::new(InMemoryBackend::new("hv")),
1423            Box::new(InMemoryBackend::new("lt")),
1424            Box::new(NoopChangeLog),
1425        );
1426
1427        let change = Change {
1428            id: make_id("object-key"),
1429            new: Some(make_id("cleanup-target")),
1430            old: None,
1431            cleanup_after: None,
1432        };
1433
1434        // Build the guard inside a temporary runtime, then let the runtime drop
1435        // so that no tokio context is active when the guard drops.
1436        let guard = {
1437            let rt = tokio::runtime::Runtime::new().unwrap();
1438            rt.block_on(manager.record(change)).unwrap()
1439        };
1440
1441        drop(guard); // Must not panic.
1442    }
1443
1444    /// `join` blocks until all in-flight guards have completed cleanup.
1445    ///
1446    /// Time is advanced manually so the test runs at virtual speed. The guard
1447    /// completes after 10 s; `join` must still be waiting at 9 s and done by 11 s.
1448    #[tokio::test(start_paused = true)]
1449    async fn join_waits_for_cleanup_to_complete() {
1450        let (storage, _hv, _lt, _) = make_tiered_storage();
1451        let change = Change {
1452            id: make_id("object-key"),
1453            new: None,
1454            old: None,
1455            cleanup_after: None,
1456        };
1457        let mut guard = storage.record_change(change).await.unwrap();
1458
1459        tokio::spawn(async move {
1460            tokio::time::sleep(Duration::from_secs(10)).await;
1461            guard.advance(ChangePhase::Completed);
1462            drop(guard);
1463        });
1464
1465        let join_future = tokio::spawn(async move { storage.join().await });
1466
1467        tokio::time::sleep(Duration::from_secs(9)).await;
1468        assert!(!join_future.is_finished(), "finished before guard dropped");
1469
1470        tokio::time::sleep(Duration::from_secs(2)).await;
1471        assert!(join_future.is_finished(), "finish after guard drops");
1472    }
1473
1474    // --- Changelog integration tests ---
1475
1476    /// LT backend hook that completes the write, then pauses until resumed.
1477    ///
1478    /// Lets tests cancel the owning future after the blob is committed but
1479    /// before the HV tombstone is set.
1480    #[derive(Clone, Debug)]
1481    struct PauseAfterPut {
1482        paused: Arc<tokio::sync::Notify>,
1483        resume: Arc<tokio::sync::Notify>,
1484    }
1485
1486    #[async_trait::async_trait]
1487    impl Hooks for PauseAfterPut {
1488        async fn put_object(
1489            &self,
1490            inner: &InMemoryBackend,
1491            id: &ObjectId,
1492            metadata: &Metadata,
1493            stream: ClientStream,
1494        ) -> Result<PutResponse> {
1495            inner.put_object(id, metadata, stream).await?;
1496            self.paused.notify_one();
1497            self.resume.notified().await;
1498            Ok(())
1499        }
1500    }
1501
1502    /// When a future is cancelled after the LT write but before the HV tombstone is set,
1503    /// the `ChangeGuard` cleans up the orphaned LT blob and removes the log entry.
1504    #[tokio::test]
1505    async fn dropped_future_triggers_cleanup_and_log_entry_removed() {
1506        let paused = Arc::new(tokio::sync::Notify::new());
1507        let hooks = PauseAfterPut {
1508            paused: Arc::clone(&paused),
1509            resume: Arc::new(tokio::sync::Notify::new()),
1510        };
1511
1512        let lt_inner = InMemoryBackend::new("lt");
1513        let log = InMemoryChangeLog::default();
1514        let storage = TieredStorage::new(
1515            Box::new(InMemoryBackend::new("hv")),
1516            Box::new(TestBackend::with_inner(lt_inner.clone(), hooks)),
1517            Box::new(log.clone()),
1518        );
1519
1520        let id = make_id("drop-test");
1521        let metadata = Metadata::default();
1522        let payload = vec![0xABu8; 2 * 1024 * 1024]; // 2 MiB → long-term path
1523
1524        // Drive the put until the LT write commits, then cancel before the HV tombstone is set.
1525        tokio::select! {
1526            result = storage.put_object(&id, &metadata, stream::single(payload)) => {
1527                panic!("expected put to pause before completing, got: {result:?}");
1528            }
1529            _ = paused.notified() => {
1530                // LT blob stored; cancelling drops the guard in Recorded phase.
1531            }
1532        }
1533
1534        // ChangeGuard dropped → background cleanup task spawned; wait for it.
1535        storage.join().await;
1536
1537        // The orphaned LT blob must have been deleted.
1538        assert!(lt_inner.is_empty(), "orphaned LT blob was not cleaned up");
1539
1540        // The log entry must be gone once cleanup completes.
1541        let entries = log.scan().await.unwrap();
1542        assert!(
1543            entries.is_empty(),
1544            "changelog entry not removed after cleanup"
1545        );
1546    }
1547
1548    // --- Multipart upload ---
1549
1550    #[test]
1551    fn multipart_upload_id_roundtrip() {
1552        let id = TieredUploadId {
1553            revision: "my-key/01924a6f-7e28-7b9a-9c1d-abcdef123456".into(),
1554            upload_id: UploadId::new("upstream-upload-id-abc".into()).unwrap(),
1555        };
1556        let encoded: UploadId = id.clone().try_into().unwrap();
1557        let decoded: TieredUploadId = (&encoded.clone()).try_into().unwrap();
1558        assert_eq!(decoded, id);
1559    }
1560
1561    #[tokio::test]
1562    async fn multipart_single_part_roundtrip() {
1563        let (storage, hv, lt, _) = make_tiered_storage();
1564        let id = make_id("mp-single");
1565        let metadata = Metadata {
1566            content_type: "application/octet-stream".into(),
1567            expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_secs(3600)),
1568            ..Default::default()
1569        };
1570        let payload = vec![0xABu8; 2 * 1024 * 1024]; // 2 MiB
1571
1572        let upload_id = storage.initiate_multipart(&id, &metadata).await.unwrap();
1573
1574        let etag = storage
1575            .upload_part(
1576                &id,
1577                &upload_id,
1578                NonZeroU32::new(1).unwrap(),
1579                payload.len() as u64,
1580                None,
1581                stream::single(payload.clone()),
1582            )
1583            .await
1584            .unwrap();
1585
1586        let error = storage
1587            .complete_multipart(
1588                &id,
1589                &upload_id,
1590                vec![CompletedPart {
1591                    part_number: NonZeroU32::new(1).unwrap(),
1592                    etag,
1593                }],
1594            )
1595            .await
1596            .unwrap();
1597        assert!(
1598            error.is_none(),
1599            "complete_multipart returned error: {error:?}"
1600        );
1601
1602        // get_object should follow the tombstone and return the payload.
1603        let (got_meta, s) = storage.get_object(&id).await.unwrap().unwrap();
1604        let body = stream::read_to_vec(s).await.unwrap();
1605        assert_eq!(body, payload);
1606        assert_eq!(got_meta.content_type, "application/octet-stream");
1607
1608        // HV should have a tombstone, LT should have the object at the physical key.
1609        let tombstone = hv.get(&id).expect_tombstone();
1610        assert!(
1611            tombstone.target.key().starts_with(id.key()),
1612            "tombstone target should be a revision key"
1613        );
1614        lt.get(&tombstone.target).expect_object();
1615    }
1616
1617    #[tokio::test]
1618    async fn multipart_upload() {
1619        let (storage, _hv, _lt, _) = make_tiered_storage();
1620        let id = make_id("multipart");
1621
1622        let upload_id = storage
1623            .initiate_multipart(&id, &Default::default())
1624            .await
1625            .unwrap();
1626
1627        let part1 = vec![0xAAu8; 512 * 1024];
1628        let part2 = vec![0xBBu8; 512 * 1024];
1629        let part3 = vec![0xCCu8; 512 * 1024];
1630
1631        let etag3 = storage
1632            .upload_part(
1633                &id,
1634                &upload_id,
1635                NonZeroU32::new(3).unwrap(),
1636                part3.len() as u64,
1637                None,
1638                stream::single(part3.clone()),
1639            )
1640            .await
1641            .unwrap();
1642        let etag2 = storage
1643            .upload_part(
1644                &id,
1645                &upload_id,
1646                NonZeroU32::new(2).unwrap(),
1647                part2.len() as u64,
1648                None,
1649                stream::single(part2.clone()),
1650            )
1651            .await
1652            .unwrap();
1653        let etag1 = storage
1654            .upload_part(
1655                &id,
1656                &upload_id,
1657                NonZeroU32::new(1).unwrap(),
1658                part1.len() as u64,
1659                None,
1660                stream::single(part1.clone()),
1661            )
1662            .await
1663            .unwrap();
1664
1665        let error = storage
1666            .complete_multipart(
1667                &id,
1668                &upload_id,
1669                vec![
1670                    CompletedPart {
1671                        part_number: NonZeroU32::new(1).unwrap(),
1672                        etag: etag1,
1673                    },
1674                    CompletedPart {
1675                        part_number: NonZeroU32::new(2).unwrap(),
1676                        etag: etag2,
1677                    },
1678                    CompletedPart {
1679                        part_number: NonZeroU32::new(3).unwrap(),
1680                        etag: etag3,
1681                    },
1682                ],
1683            )
1684            .await
1685            .unwrap();
1686        assert!(error.is_none());
1687
1688        let (_, s) = storage.get_object(&id).await.unwrap().unwrap();
1689        let body = stream::read_to_vec(s).await.unwrap();
1690
1691        let mut expected = Vec::new();
1692        expected.extend_from_slice(&part1);
1693        expected.extend_from_slice(&part2);
1694        expected.extend_from_slice(&part3);
1695        assert_eq!(body, expected);
1696    }
1697
1698    #[tokio::test]
1699    async fn multipart_abort() {
1700        let (storage, hv, _lt, _) = make_tiered_storage();
1701        let id = make_id("mp-abort");
1702
1703        let upload_id = storage
1704            .initiate_multipart(&id, &Default::default())
1705            .await
1706            .unwrap();
1707
1708        // Upload a part then abort.
1709        let payload = vec![0xABu8; 100];
1710        storage
1711            .upload_part(
1712                &id,
1713                &upload_id,
1714                NonZeroU32::new(1).unwrap(),
1715                payload.len() as u64,
1716                None,
1717                stream::single(payload),
1718            )
1719            .await
1720            .unwrap();
1721
1722        storage.abort_multipart(&id, &upload_id).await.unwrap();
1723
1724        // No tombstone should have been written.
1725        hv.get(&id).expect_not_found();
1726
1727        // The object should not be reachable.
1728        assert!(storage.get_object(&id).await.unwrap().is_none());
1729    }
1730
1731    #[tokio::test]
1732    async fn multipart_list_parts() {
1733        let (storage, _hv, _lt, _) = make_tiered_storage();
1734        let id = make_id("mp-list");
1735
1736        let upload_id = storage
1737            .initiate_multipart(&id, &Default::default())
1738            .await
1739            .unwrap();
1740
1741        let part1 = vec![0xAAu8; 100];
1742        let part2 = vec![0xBBu8; 200];
1743        storage
1744            .upload_part(
1745                &id,
1746                &upload_id,
1747                NonZeroU32::new(1).unwrap(),
1748                part1.len() as u64,
1749                None,
1750                stream::single(part1),
1751            )
1752            .await
1753            .unwrap();
1754        storage
1755            .upload_part(
1756                &id,
1757                &upload_id,
1758                NonZeroU32::new(2).unwrap(),
1759                part2.len() as u64,
1760                None,
1761                stream::single(part2),
1762            )
1763            .await
1764            .unwrap();
1765
1766        let resp = storage
1767            .list_parts(&id, &upload_id, None, None)
1768            .await
1769            .unwrap();
1770        assert_eq!(resp.parts.len(), 2);
1771        assert_eq!(resp.parts[0].part_number.get(), 1);
1772        assert_eq!(resp.parts[0].size, 100);
1773        assert_eq!(resp.parts[1].part_number.get(), 2);
1774        assert_eq!(resp.parts[1].size, 200);
1775    }
1776
1777    #[tokio::test]
1778    async fn multipart_overwrites_existing_tombstone() {
1779        let (storage, hv, lt, _) = make_tiered_storage();
1780        let id = make_id("mp-overwrite");
1781
1782        // Put a large object via the normal path.
1783        let payload1 = vec![0xAAu8; 2 * 1024 * 1024];
1784        storage
1785            .put_object(&id, &Default::default(), stream::single(payload1))
1786            .await
1787            .unwrap();
1788        let old_lt_id = hv.get(&id).expect_tombstone().target;
1789
1790        // Overwrite via multipart.
1791        let upload_id = storage
1792            .initiate_multipart(&id, &Default::default())
1793            .await
1794            .unwrap();
1795
1796        let payload2 = vec![0xBBu8; 2 * 1024 * 1024];
1797        let etag = storage
1798            .upload_part(
1799                &id,
1800                &upload_id,
1801                NonZeroU32::new(1).unwrap(),
1802                payload2.len() as u64,
1803                None,
1804                stream::single(payload2.clone()),
1805            )
1806            .await
1807            .unwrap();
1808
1809        // The multipart upload is not finalized, so the tombstone still points to the old
1810        // revision.
1811        let lt_id = hv.get(&id).expect_tombstone().target;
1812        assert_eq!(old_lt_id, lt_id);
1813
1814        let error = storage
1815            .complete_multipart(
1816                &id,
1817                &upload_id,
1818                vec![CompletedPart {
1819                    part_number: NonZeroU32::new(1).unwrap(),
1820                    etag,
1821                }],
1822            )
1823            .await
1824            .unwrap();
1825        assert!(error.is_none());
1826
1827        // Now the upload has been finalized, so the new tombstone points to the new revision.
1828        let new_lt_id = hv.get(&id).expect_tombstone().target;
1829        assert_ne!(old_lt_id, new_lt_id);
1830
1831        // Wait for background cleanup.
1832        storage.join().await;
1833
1834        // Old revision should be cleaned up.
1835        lt.get(&old_lt_id).expect_not_found();
1836        lt.get(&new_lt_id).expect_object();
1837
1838        // Assert the contents of the new revision.
1839        let (_, s) = storage.get_object(&id).await.unwrap().unwrap();
1840        let body = stream::read_to_vec(s).await.unwrap();
1841        assert_eq!(body, payload2);
1842    }
1843
1844    // --- Multipart completion failure handling (consistency, retries, delayed cleanup) ---
1845
1846    /// Assembles the blob via `complete_multipart`, but returns an error to simulate a network
1847    /// failure on the response path. Also fails `get_metadata` so the tiered layer cannot
1848    /// recover by detecting the already-assembled blob.
1849    #[derive(Debug)]
1850    struct CompleteMultipartButReturnError;
1851
1852    #[async_trait::async_trait]
1853    impl Hooks for CompleteMultipartButReturnError {
1854        async fn complete_multipart(
1855            &self,
1856            inner: &InMemoryBackend,
1857            id: &ObjectId,
1858            upload_id: &UploadId,
1859            parts: Vec<CompletedPart>,
1860        ) -> Result<CompleteMultipartResponse> {
1861            inner
1862                .complete_multipart(id, upload_id, parts)
1863                .await
1864                .unwrap();
1865            Err(Error::Io(std::io::Error::new(
1866                std::io::ErrorKind::TimedOut,
1867                "simulated network error on complete_multipart",
1868            )))
1869        }
1870
1871        async fn get_metadata(
1872            &self,
1873            _inner: &InMemoryBackend,
1874            _id: &ObjectId,
1875        ) -> Result<MetadataResponse> {
1876            Err(Error::Io(std::io::Error::new(
1877                std::io::ErrorKind::TimedOut,
1878                "simulated network error on get_metadata",
1879            )))
1880        }
1881    }
1882
1883    /// `complete_multipart` on the inner LT backend assembles the blob successfully, but both
1884    /// `complete_multipart` and `get_metadata` return errors, so the tiered layer cannot finalize.
1885    /// After `MULTIPART_COMPLETE_CLEANUP_DELAY`, `ChangeLog` recovery deletes the orphaned blob.
1886    #[tokio::test]
1887    async fn cleans_up_orphan_after_failed_multipart_complete() {
1888        let hv = InMemoryBackend::new("hv");
1889        let lt_inner = InMemoryBackend::new("lt");
1890        let log = InMemoryChangeLog::default();
1891        let storage = TieredStorage::new(
1892            Box::new(hv.clone()),
1893            Box::new(TestBackend::with_inner(
1894                lt_inner.clone(),
1895                CompleteMultipartButReturnError {},
1896            )),
1897            Box::new(log.clone()),
1898        );
1899
1900        let id = make_id("mp-orphan");
1901        let upload_id = storage
1902            .initiate_multipart(&id, &Default::default())
1903            .await
1904            .unwrap();
1905
1906        let tiered_id: TieredUploadId = (&upload_id).try_into().unwrap();
1907        let physical = ObjectId {
1908            context: id.context.clone(),
1909            key: tiered_id.revision,
1910        };
1911
1912        let payload = vec![0xABu8; 2 * 1024 * 1024];
1913        let etag = storage
1914            .upload_part(
1915                &id,
1916                &upload_id,
1917                NonZeroU32::new(1).unwrap(),
1918                payload.len() as u64,
1919                None,
1920                stream::single(payload),
1921            )
1922            .await
1923            .unwrap();
1924
1925        let result = storage
1926            .complete_multipart(
1927                &id,
1928                &upload_id,
1929                vec![CompletedPart {
1930                    part_number: NonZeroU32::new(1).unwrap(),
1931                    etag,
1932                }],
1933            )
1934            .await;
1935        assert!(result.is_err());
1936        storage.join().await;
1937
1938        // The LT blob is orphaned, and no cleanup has been performed (yet), due to the guard being
1939        // dropped while in the `Assembling` state.
1940        lt_inner.get(&physical).expect_object();
1941        hv.get(&id).expect_not_found();
1942
1943        // Simulate the passage of time and run recovery.
1944        log.expire_all();
1945        let manager = ChangeManager::new(
1946            Box::new(hv.clone()),
1947            Box::new(lt_inner.clone()),
1948            Box::new(log.clone()),
1949        );
1950        manager.recover().await.unwrap();
1951
1952        // The orphaned LT blob has been cleaned up.
1953        lt_inner.get(&physical).expect_not_found();
1954        // The change has been removed from the log.
1955        let remaining = log.scan().await.unwrap();
1956        assert!(remaining.is_empty());
1957    }
1958
1959    #[derive(Debug)]
1960    struct FailOnFirstCompleteMultipartAttempt {
1961        attempt: Mutex<u32>,
1962    }
1963
1964    impl FailOnFirstCompleteMultipartAttempt {
1965        fn new() -> Self {
1966            Self {
1967                attempt: Mutex::new(0),
1968            }
1969        }
1970    }
1971
1972    #[async_trait::async_trait]
1973    impl Hooks for FailOnFirstCompleteMultipartAttempt {
1974        async fn complete_multipart(
1975            &self,
1976            inner: &InMemoryBackend,
1977            id: &ObjectId,
1978            upload_id: &UploadId,
1979            parts: Vec<CompletedPart>,
1980        ) -> Result<CompleteMultipartResponse> {
1981            let mut attempt = self.attempt.lock().await;
1982            *attempt += 1;
1983            if *attempt == 1 {
1984                return Err(Error::Io(std::io::Error::new(
1985                    std::io::ErrorKind::TimedOut,
1986                    "simulated network error",
1987                )));
1988            } else {
1989                return Ok(inner
1990                    .complete_multipart(id, upload_id, parts)
1991                    .await
1992                    .unwrap());
1993            }
1994        }
1995    }
1996
1997    /// The first attempt to `complete_multipart` fails, which generates a `Change` entry.
1998    /// The second call succeeds.
1999    /// When it's time to clean up, nothing is deleted, as the `complete_multipart` eventually went
2000    /// through before the cleanup deadline.
2001    #[tokio::test]
2002    async fn multipart_complete_succeeds_on_retry_and_leaves_state_consistent() {
2003        let hv = InMemoryBackend::new("hv");
2004        let lt_inner = InMemoryBackend::new("lt");
2005        let log = InMemoryChangeLog::default();
2006        let storage = TieredStorage::new(
2007            Box::new(hv.clone()),
2008            Box::new(TestBackend::with_inner(
2009                lt_inner.clone(),
2010                FailOnFirstCompleteMultipartAttempt::new(),
2011            )),
2012            Box::new(log.clone()),
2013        );
2014
2015        let id = make_id("mp-retry");
2016        let upload_id = storage
2017            .initiate_multipart(&id, &Default::default())
2018            .await
2019            .unwrap();
2020
2021        let tiered_id: TieredUploadId = (&upload_id).try_into().unwrap();
2022        let physical = ObjectId {
2023            context: id.context.clone(),
2024            key: tiered_id.revision,
2025        };
2026
2027        let payload = vec![0xABu8; 2 * 1024 * 1024];
2028        let etag = storage
2029            .upload_part(
2030                &id,
2031                &upload_id,
2032                NonZeroU32::new(1).unwrap(),
2033                payload.len() as u64,
2034                None,
2035                stream::single(payload.clone()),
2036            )
2037            .await
2038            .unwrap();
2039
2040        // The first `complete_multipart` call fails.
2041        let result = storage
2042            .complete_multipart(
2043                &id,
2044                &upload_id,
2045                vec![CompletedPart {
2046                    part_number: NonZeroU32::new(1).unwrap(),
2047                    etag: etag.clone(),
2048                }],
2049            )
2050            .await;
2051        assert!(result.is_err());
2052        storage.join().await;
2053
2054        // The second `complete_multipart` call succeeds.
2055        let result = storage
2056            .complete_multipart(
2057                &id,
2058                &upload_id,
2059                vec![CompletedPart {
2060                    part_number: NonZeroU32::new(1).unwrap(),
2061                    etag,
2062                }],
2063            )
2064            .await;
2065        assert!(result.is_ok());
2066        storage.join().await;
2067
2068        // The object is there.
2069        let (_, s) = storage.get_object(&id).await.unwrap().unwrap();
2070        let body = stream::read_to_vec(s).await.unwrap();
2071        assert_eq!(body, payload);
2072
2073        // Simulate the passage of time and run recovery.
2074        log.expire_all();
2075        let manager = ChangeManager::new(
2076            Box::new(hv.clone()),
2077            Box::new(lt_inner.clone()),
2078            Box::new(log.clone()),
2079        );
2080        manager.recover().await.unwrap();
2081
2082        // The LT blob has not been cleaned up, as the write eventually went through.
2083        lt_inner.get(&physical).expect_object();
2084        // The tombstone still points to the blob.
2085        let tombstone = hv.get(&id).expect_tombstone();
2086        assert_eq!(tombstone.target, physical);
2087        // The change has been removed from the log.
2088        let remaining = log.scan().await.unwrap();
2089        assert!(remaining.is_empty());
2090
2091        // The object is still there after recovery.
2092        let (_, s) = storage.get_object(&id).await.unwrap().unwrap();
2093        let body = stream::read_to_vec(s).await.unwrap();
2094        assert_eq!(body, payload);
2095    }
2096
2097    #[derive(Debug)]
2098    struct FailOnFirstGetMetadataAttempt {
2099        attempt: Mutex<u32>,
2100    }
2101
2102    impl FailOnFirstGetMetadataAttempt {
2103        fn new() -> Self {
2104            Self {
2105                attempt: Mutex::new(0),
2106            }
2107        }
2108    }
2109
2110    #[async_trait::async_trait]
2111    impl Hooks for FailOnFirstGetMetadataAttempt {
2112        async fn get_metadata(
2113            &self,
2114            inner: &InMemoryBackend,
2115            id: &ObjectId,
2116        ) -> Result<MetadataResponse> {
2117            let mut attempt = self.attempt.lock().await;
2118            *attempt += 1;
2119            if *attempt == 1 {
2120                return Err(Error::Io(std::io::Error::new(
2121                    std::io::ErrorKind::TimedOut,
2122                    "simulated network error",
2123                )));
2124            } else {
2125                inner.get_metadata(id).await
2126            }
2127        }
2128    }
2129
2130    /// The first attempt to `complete_multipart` succeeds on the LT backend, but the subsequent
2131    /// `get_metadata` call fails with a network error, causing the overall `complete_multipart`
2132    /// to fail. The second call retries and succeeds (the LT object already exists from the first
2133    /// attempt).
2134    /// When it's time to clean up, nothing is deleted, as the `complete_multipart` eventually went
2135    /// through before the cleanup deadline.
2136    #[tokio::test]
2137    async fn multipart_complete_succeeds_on_retry_if_get_metadata_errs_and_leaves_state_consistent()
2138    {
2139        let hv = InMemoryBackend::new("hv");
2140        let lt_inner = InMemoryBackend::new("lt");
2141        let log = InMemoryChangeLog::default();
2142        let storage = TieredStorage::new(
2143            Box::new(hv.clone()),
2144            Box::new(TestBackend::with_inner(
2145                lt_inner.clone(),
2146                FailOnFirstGetMetadataAttempt::new(),
2147            )),
2148            Box::new(log.clone()),
2149        );
2150
2151        let id = make_id("mp-retry-meta");
2152        let upload_id = storage
2153            .initiate_multipart(&id, &Default::default())
2154            .await
2155            .unwrap();
2156
2157        let tiered_id: TieredUploadId = (&upload_id).try_into().unwrap();
2158        let physical = ObjectId {
2159            context: id.context.clone(),
2160            key: tiered_id.revision,
2161        };
2162
2163        let payload = vec![0xABu8; 2 * 1024 * 1024];
2164        let etag = storage
2165            .upload_part(
2166                &id,
2167                &upload_id,
2168                NonZeroU32::new(1).unwrap(),
2169                payload.len() as u64,
2170                None,
2171                stream::single(payload.clone()),
2172            )
2173            .await
2174            .unwrap();
2175
2176        // The first `complete_multipart` call fails (get_metadata network error), even though it
2177        // internally creates the LT blob.
2178        let result = storage
2179            .complete_multipart(
2180                &id,
2181                &upload_id,
2182                vec![CompletedPart {
2183                    part_number: NonZeroU32::new(1).unwrap(),
2184                    etag: etag.clone(),
2185                }],
2186            )
2187            .await;
2188        assert!(result.is_err());
2189        storage.join().await;
2190
2191        // The second `complete_multipart` call succeeds.
2192        let result = storage
2193            .complete_multipart(
2194                &id,
2195                &upload_id,
2196                vec![CompletedPart {
2197                    part_number: NonZeroU32::new(1).unwrap(),
2198                    etag,
2199                }],
2200            )
2201            .await;
2202        assert!(result.is_ok());
2203        storage.join().await;
2204
2205        // The object is there.
2206        let (_, s) = storage.get_object(&id).await.unwrap().unwrap();
2207        let body = stream::read_to_vec(s).await.unwrap();
2208        assert_eq!(body, payload);
2209
2210        // Simulate the passage of time and run recovery.
2211        log.expire_all();
2212        let manager = ChangeManager::new(
2213            Box::new(hv.clone()),
2214            Box::new(lt_inner.clone()),
2215            Box::new(log.clone()),
2216        );
2217        manager.recover().await.unwrap();
2218
2219        // The LT blob has not been cleaned up, as the write eventually went through.
2220        lt_inner.get(&physical).expect_object();
2221        // The tombstone still points to the blob.
2222        let tombstone = hv.get(&id).expect_tombstone();
2223        assert_eq!(tombstone.target, physical);
2224        // The change has been removed from the log.
2225        let remaining = log.scan().await.unwrap();
2226        assert!(remaining.is_empty());
2227
2228        // The object is there after recovery.
2229        let (_, s) = storage.get_object(&id).await.unwrap().unwrap();
2230        let body = stream::read_to_vec(s).await.unwrap();
2231        assert_eq!(body, payload);
2232    }
2233}