Skip to main content

objectstore_service/backend/
tiered.rs

1//! Two-tier storage backend with size-based routing and redirect tombstones.
2//!
3//! [`TieredStorage`] routes objects to a high-volume or long-term backend based
4//! on size and maintains redirect tombstones so that reads never need to probe
5//! both backends. See the [crate-level documentation](crate) for the high-level
6//! motivation, and the [`TieredStorage`] struct docs for routing and tombstone
7//! semantics.
8//!
9//! # Cross-Tier Consistency
10//!
11//! A single logical object may span both backends: a tombstone in HV pointing
12//! to a payload in LT. Mutations keep the two in sync through compare-and-swap
13//! on the high-volume backend (see [`HighVolumeBackend::compare_and_write`]).
14//! Each operation reads the current HV revision, performs its work, then
15//! atomically swaps the HV entry only if the revision is still current —
16//! rolling back on conflict.
17//!
18//! ## Revision Keys
19//!
20//! Every large-object write stores its payload at a **revision key** in the
21//! long-term backend: `{original_key}/{uuid}`. The UUID suffix is random (no
22//! monotonicity is guaranteed), so each write targets a distinct LT path
23//! regardless of whether another write to the same logical key is in progress.
24//! The tombstone in HV then points to this specific revision. Because each
25//! writer owns its own LT blob, the compare-and-swap on the tombstone becomes
26//! an atomic pointer swap: the winner's revision is committed and the loser
27//! can safely delete its own blob without affecting the winner.
28//!
29//! See `new_long_term_revision` for the key construction.
30//!
31//! ## Compare-and-Swap
32//!
33//! All mutating operations follow a common pattern of reading the current
34//! revision, performing the upload, atomically swapping the revision (commit
35//! point), and cleaning up the now-unreferenced LT blob in the background:
36//!
37//! ### Large-Object Write (> 1 MiB)
38//!
39//! 1. **Read HV** to capture the current revision (existing tombstone target,
40//!    or absent).
41//! 2. **Write payload to LT** at a unique revision key.
42//! 3. **Compare-and-swap in HV**: write a tombstone pointing to the new
43//!    revision, only if the current revision still matches step 1.
44//!    - **OK** — schedule background deletion of the old LT blob, if any.
45//!    - **Conflict** — another writer won the race; schedule background deletion
46//!      of our new LT blob.
47//!    - **Error** — reload the tombstone and delete the unreferenced blob or
48//!      blobs.
49//!
50//! ### Small-Object Write (≤ 1 MiB)
51//!
52//! 1. **Write inline to HV**, skipping the write if a tombstone is present.
53//!    - **OK** — done; the object is stored entirely in HV.
54//!    - **Tombstone present** — a large object already occupies this key;
55//!      continue:
56//! 2. **Compare-and-swap in HV**: replace the tombstone with inline data, only
57//!    if the tombstone's revision still matches.
58//!    - **OK** — schedule background deletion of the old LT blob.
59//!    - **Conflict** — another writer won the race; they will clean up the
60//!      LT blob and we have no new LT blob to clean up.
61//!    - **Error** — reload the tombstone and delete the unreferenced blob if
62//!      the write went through.
63//!
64//! ### Delete
65//!
66//! 1. **Delete from HV** if the entry is not a tombstone.
67//!    - **OK** — done; there is no LT data to clean up.
68//!    - **Tombstone present** — a large object is stored here; continue:
69//! 2. **Compare-and-swap in HV**: remove the tombstone, only if its revision
70//!    still matches.
71//!    - **OK** — schedule background deletion of the LT blob.
72//!    - **Conflict** — another writer won the race; they will clean up.
73//!    - **Error** — reload the tombstone and delete the unreferenced blob if
74//!      the write went through.
75//!
76//! Tombstone removal is the commit point for deletes. If the subsequent LT
77//! cleanup fails, an orphan blob remains but the object is already unreachable
78//! through the normal read path.
79//!
80//! ## Last-Writer-Wins
81//!
82//! Concurrent mutations on the same key are inherently a race. Even a write
83//! that returns `Ok` may be immediately overwritten by another caller — there
84//! is no ordering guarantee and objectstore cannot provide a read-your-writes
85//! promise.
86//!
87//! CAS conflicts are therefore **not errors**: the losing writer's data is
88//! cleaned up and `Ok` is returned, because the result is indistinguishable
89//! from having succeeded a moment earlier and then been overwritten.
90//!
91//! ### Idempotency
92//!
93//! `compare_and_write` is idempotent: if the row is already in the target state, it
94//! returns `true` without re-applying the mutation. This is critical for retry
95//! safety. If the server commits a write but the response is lost, a retry sees the
96//! already-mutated state and still returns `true` — so callers do not mistakenly
97//! treat a successful commit as a lost race and clean up data that was actually
98//! persisted.
99
100use std::sync::Arc;
101use std::sync::atomic::{AtomicU64, Ordering};
102use std::time::{Duration, SystemTime};
103
104use base64::Engine as _;
105use bytes::Bytes;
106use futures_util::{Stream, StreamExt};
107use objectstore_types::metadata::Metadata;
108use objectstore_types::range::ByteRange;
109use serde::{Deserialize, Serialize};
110
111use crate::backend::changelog::{Change, ChangeGuard, ChangeLog, ChangeManager, ChangePhase};
112use crate::backend::common::{
113    Backend, DeleteResponse, GetResponse, HighVolumeBackend, MetadataResponse,
114    MultipartUploadBackend, PutResponse, TieredGet, TieredMetadata, TieredWrite, Tombstone,
115};
116use crate::backend::{HighVolumeStorageConfig, MultipartUploadStorageConfig};
117use crate::error::{Error, Result};
118use crate::id::ObjectId;
119use crate::multipart::{
120    AbortMultipartResponse, CompleteMultipartResponse, CompletedPart, InitiateMultipartResponse,
121    ListPartsResponse, PartNumber, UploadId, UploadPartResponse,
122};
123use crate::stream::{ClientStream, SizedPeek};
124
125/// The threshold up until which we will go to the "high volume" backend.
126const BACKEND_SIZE_THRESHOLD: usize = 1024 * 1024; // 1 MiB
127
128/// Amount of time for which a `Change` generated by a `complete_multipart` operation is kept in the `Assembling`
129/// state before becoming eligible for cleanup by the `ChangeLog` recovery process.
130/// This allows the client to retry the `complete_multipart` operation upon any failures for at least this long,
131/// avoiding scenarios where the `ChangeLog` recovery would race to delete the assembled LT blob.
132const MULTIPART_COMPLETE_CLEANUP_DELAY: Duration = Duration::from_hours(24);
133
134/// Creates a new [`ObjectId`] with the same context but a unique revision key.
135///
136/// The new key has the format `{original_key}/{uuid_v7}`, producing a distinct
137/// storage path for each large-object write. [`ObjectId::from_storage_path`] parses
138/// the result back correctly because the key portion may contain `/`.
139fn new_long_term_revision(id: &ObjectId) -> ObjectId {
140    ObjectId {
141        context: id.context.clone(),
142        key: format!("{}/{}", id.key, uuid::Uuid::now_v7()),
143    }
144}
145
146/// Configuration for [`TieredStorage`].
147///
148/// Composes two backends into a tiered routing setup: `high_volume` for small
149/// objects and `long_term` for large objects. Nesting [`super::StorageConfig::Tiered`]
150/// inside another tiered config is not supported.
151///
152/// # Example
153///
154/// ```yaml
155/// storage:
156///   type: tiered
157///   high_volume:
158///     type: bigtable
159///     project_id: my-project
160///     instance_name: objectstore
161///     table_name: objectstore
162///   long_term:
163///     type: gcs
164///     bucket: my-objectstore-bucket
165/// ```
166#[derive(Debug, Clone, Deserialize, Serialize)]
167pub struct TieredStorageConfig {
168    /// Backend for high-volume, small objects.
169    ///
170    /// Must be a backend that implements [`HighVolumeBackend`] (currently
171    /// only BigTable).
172    pub high_volume: HighVolumeStorageConfig,
173    /// Backend for large, long-term objects.
174    ///
175    /// Must be a backend that implements [`MultipartUploadBackend`].
176    pub long_term: MultipartUploadStorageConfig,
177}
178
179/// Two-tier storage backend that routes objects by size.
180///
181/// `TieredStorage` implements [`Backend`] and is intended to be used inside a
182/// [`StorageService`](crate::StorageService), which wraps it with task spawning and panic
183/// isolation.
184///
185/// # Size-Based Routing
186///
187/// Objects are routed at write time based on their size relative to a **1 MiB threshold**:
188///
189/// - Objects **≤ 1 MiB** go to the `high_volume` backend — optimized for low-latency reads
190///   and writes of small objects (e.g. BigTable).
191/// - Objects **> 1 MiB** go to the `long_term` backend — optimized for cost-efficient
192///   storage of large objects (e.g. GCS).
193///
194/// # Redirect Tombstones
195///
196/// Because the [`ObjectId`] is backend-independent, reads must be able to find an object
197/// without knowing which backend stores it. A naive approach would check the long-term
198/// backend on every read miss in the high-volume backend — but that is slow and expensive.
199///
200/// Instead, when an object is stored in the long-term backend, a **redirect tombstone** is
201/// written in the high-volume backend. It acts as a signpost: "the real data lives in the
202/// other backend at this target." On reads, a single high-volume lookup either returns the
203/// object directly or follows the tombstone to long-term storage, without probing both
204/// backends.
205///
206/// How tombstones are physically stored is determined by the [`HighVolumeBackend`]
207/// implementation — refer to the backend's own documentation for storage format details.
208///
209/// # Consistency
210///
211/// Consistency across the two backends is maintained through compare-and-swap
212/// operations on the high-volume backend (see
213/// [`HighVolumeBackend::compare_and_write`]), not distributed locks. Each
214/// mutating operation reads the current high-volume revision, performs its
215/// work, and then atomically swaps the high-volume entry only if the revision
216/// is still current — rolling back on conflict. Cleanup of unreferenced LT
217/// blobs runs in background tasks so the caller returns as soon as the commit
218/// point is reached. Call [`Backend::join`] during shutdown to wait for
219/// outstanding cleanup.
220///
221/// See the [module-level documentation](self) for per-operation diagrams.
222///
223/// # Usage
224///
225/// `TieredStorage` handles only the routing and consistency logic. Wrap it in a
226/// [`StorageService`](crate::service::StorageService) to add task spawning, panic isolation,
227/// and concurrency limiting.
228#[derive(Debug)]
229pub struct TieredStorage {
230    inner: Arc<ChangeManager>,
231}
232
233impl TieredStorage {
234    /// Creates a new `TieredStorage` with the given backends and change log.
235    pub fn new(
236        high_volume: Box<dyn HighVolumeBackend>,
237        long_term: Box<dyn MultipartUploadBackend>,
238        changelog: Box<dyn ChangeLog>,
239    ) -> Self {
240        let inner = ChangeManager::new(high_volume, long_term, changelog);
241        // Note on cancellation: Our `join` method will wait for all tasks tracked by the spawned
242        // recovery job, so we defer shutdown until recovery is complete or times out.
243        tokio::spawn(inner.clone().recover());
244        Self { inner }
245    }
246
247    /// Records the change to the log and returns a guard that cleans up on drop.
248    async fn record_change(&self, change: Change) -> Result<ChangeGuard> {
249        self.inner.clone().record(change).await
250    }
251
252    /// Records the change to the log in the `Assembling` phase, and returns a guard that does
253    /// nothing on drop unless advanced.
254    async fn record_assembling(&self, change: Change) -> Result<ChangeGuard> {
255        self.inner.clone().record_assembling(change).await
256    }
257
258    /// Returns the name of the backend corresponding to the given routing choice.
259    fn backend_type(&self, choice: &BackendChoice) -> &'static str {
260        match choice {
261            BackendChoice::HighVolume => self.inner.high_volume.name(),
262            BackendChoice::LongTerm => self.inner.long_term.name(),
263        }
264    }
265
266    /// Puts an object into the high-volume backend.
267    ///
268    /// If a tombstone already exists, attempts to swap it for the new object and delete the old
269    /// long-term object.
270    async fn put_high_volume(
271        &self,
272        id: &ObjectId,
273        metadata: &Metadata,
274        payload: Bytes,
275    ) -> Result<()> {
276        let tombstone_opt = self
277            .inner
278            .high_volume
279            .put_non_tombstone(id, metadata, payload.clone())
280            .await?;
281
282        let Some(Tombstone { target, .. }) = tombstone_opt else {
283            // No tombstone exists - write succeeded
284            return Ok(());
285        };
286
287        // Tombstone exists — Swap it for inline data
288        let mut guard = self
289            .record_change(Change {
290                id: id.clone(),
291                new: None,
292                old: Some(target.clone()),
293                cleanup_after: None,
294            })
295            .await?;
296
297        let write = TieredWrite::Object(metadata.clone(), payload);
298        guard.advance(ChangePhase::Written);
299
300        let written = self
301            .inner
302            .high_volume
303            .compare_and_write(id, Some(&target), write)
304            .await?;
305
306        // Update guard and let it schedule cleanup in the background.
307        guard.advance(ChangePhase::compare_and_write(written));
308
309        Ok(())
310    }
311
312    /// Puts an object into the long-term backend with a redirect tombstone in front.
313    ///
314    /// Deletes the previous long-term object if overwriting an existing tombstone. If the tombstone
315    /// write fails, the new long-term object is cleaned up.
316    async fn put_long_term(
317        &self,
318        id: &ObjectId,
319        metadata: &Metadata,
320        stream: ClientStream,
321    ) -> Result<()> {
322        // 1. Read current HV revision to establish the write precondition
323        let current = match self.inner.high_volume.get_tiered_metadata(id).await? {
324            TieredMetadata::Tombstone(t) => Some(t.target),
325            _ => None,
326        };
327
328        // 2. Write payload to long-term at a unique revision key.
329        let new = new_long_term_revision(id);
330        let mut guard = self
331            .record_change(Change {
332                id: id.clone(),
333                new: Some(new.clone()),
334                old: current.clone(),
335                cleanup_after: None,
336            })
337            .await?;
338
339        self.inner
340            .long_term
341            .put_object(&new, metadata, stream)
342            .await?;
343        guard.advance(ChangePhase::Written);
344
345        // 3. CAS commit: write tombstone only if HV state matches what we saw.
346        let tombstone = Tombstone {
347            target: new.clone(),
348            expiration_policy: metadata.expiration_policy,
349        };
350        let written = self
351            .inner
352            .high_volume
353            .compare_and_write(id, current.as_ref(), TieredWrite::Tombstone(tombstone))
354            .await?;
355
356        // Update guard and let it schedule cleanup in the background.
357        guard.advance(ChangePhase::compare_and_write(written));
358
359        Ok(())
360    }
361}
362
363#[async_trait::async_trait]
364impl Backend for TieredStorage {
365    fn name(&self) -> &'static str {
366        "tiered"
367    }
368
369    fn as_multipart_upload_backend(&self) -> Result<&dyn MultipartUploadBackend> {
370        Ok(self)
371    }
372
373    async fn put_object(
374        &self,
375        id: &ObjectId,
376        metadata: &Metadata,
377        stream: ClientStream,
378    ) -> Result<PutResponse> {
379        let timer = objectstore_metrics::timer!("put.latency", usecase = id.usecase().to_owned());
380        if metadata.origin.is_none() {
381            objectstore_metrics::count!("put.origin_missing", usecase = id.usecase().to_owned());
382        }
383
384        let peeked = SizedPeek::new(stream, BACKEND_SIZE_THRESHOLD).await?;
385        objectstore_metrics::record!(
386            "put.first_chunk.latency" = timer.elapsed(),
387            usecase = id.usecase().to_owned(),
388            complete = if peeked.is_exhausted() { "yes" } else { "no" },
389        );
390
391        let (backend_choice, stored_size) = if peeked.is_exhausted() {
392            let payload = peeked.into_bytes().await?;
393            let payload_len = payload.len() as u64;
394            self.put_high_volume(id, metadata, payload).await?;
395            (BackendChoice::HighVolume, payload_len)
396        } else {
397            let (stored_size, stream) = counting_stream(peeked.into_stream());
398            self.put_long_term(id, metadata, stream.boxed()).await?;
399            (BackendChoice::LongTerm, stored_size.load(Ordering::Acquire))
400        };
401
402        let backend_ty = self.backend_type(&backend_choice);
403        timer
404            .tag("backend_choice", backend_choice.as_str())
405            .tag("backend_type", backend_ty)
406            .record();
407        objectstore_metrics::record!(
408            "put.size" = stored_size,
409            usecase = id.usecase().to_owned(),
410            backend_choice = backend_choice.as_str(),
411            backend_type = backend_ty,
412            upload_type = "direct",
413        );
414
415        Ok(())
416    }
417
418    async fn get_object(&self, id: &ObjectId, range: Option<ByteRange>) -> Result<GetResponse> {
419        let timer = objectstore_metrics::timer!(
420            "get.latency.pre-response",
421            usecase = id.usecase().to_owned(),
422        );
423
424        let hv_result = self.inner.high_volume.get_tiered_object(id, range).await?;
425        let (result, backend_choice) = match hv_result {
426            TieredGet::NotFound => (None, BackendChoice::HighVolume),
427            TieredGet::Object(metadata, content_range, stream) => (
428                Some((metadata, content_range, stream)),
429                BackendChoice::HighVolume,
430            ),
431            TieredGet::Tombstone(tombstone) => (
432                self.inner
433                    .long_term
434                    .get_object(&tombstone.target, range)
435                    .await?,
436                BackendChoice::LongTerm,
437            ),
438        };
439
440        let backend_type = self.backend_type(&backend_choice);
441        timer
442            .tag("backend_choice", backend_choice.as_str())
443            .tag("backend_type", backend_type)
444            .record();
445
446        if let Some((ref metadata, ref content_range, _)) = result {
447            let size = content_range.map(|cr| cr.len() as usize).or(metadata.size);
448            if let Some(size) = size {
449                objectstore_metrics::record!(
450                    "get.size" = size,
451                    usecase = id.usecase().to_owned(),
452                    backend_choice = backend_choice.as_str(),
453                    backend_type = backend_type,
454                );
455            }
456        }
457
458        Ok(result)
459    }
460
461    async fn get_metadata(&self, id: &ObjectId) -> Result<MetadataResponse> {
462        let timer = objectstore_metrics::timer!("head.latency", usecase = id.usecase().to_owned());
463
464        let hv_result = self.inner.high_volume.get_tiered_metadata(id).await?;
465        let (result, backend_choice) = match hv_result {
466            TieredMetadata::NotFound => (None, BackendChoice::HighVolume),
467            TieredMetadata::Object(metadata) => (Some(metadata), BackendChoice::HighVolume),
468            TieredMetadata::Tombstone(tombstone) => (
469                self.inner.long_term.get_metadata(&tombstone.target).await?,
470                BackendChoice::LongTerm,
471            ),
472        };
473
474        timer
475            .tag("backend_choice", backend_choice.as_str())
476            .tag("backend_type", self.backend_type(&backend_choice))
477            .record();
478
479        Ok(result)
480    }
481
482    async fn delete_object(&self, id: &ObjectId) -> Result<DeleteResponse> {
483        let timer =
484            objectstore_metrics::timer!("delete.latency", usecase = id.usecase().to_owned());
485
486        let mut backend_choice = BackendChoice::HighVolume;
487
488        if let Some(tombstone) = self.inner.high_volume.delete_non_tombstone(id).await? {
489            backend_choice = BackendChoice::LongTerm;
490
491            let mut guard = self
492                .record_change(Change {
493                    id: id.clone(),
494                    new: None,
495                    old: Some(tombstone.target.clone()),
496                    cleanup_after: None,
497                })
498                .await?;
499            guard.advance(ChangePhase::Written);
500
501            // Remove the tombstone; the LT blob becomes unreachable at this point.
502            let deleted = self
503                .inner
504                .high_volume
505                .compare_and_write(id, Some(&tombstone.target), TieredWrite::Delete)
506                .await?;
507
508            // Update guard and let it schedule cleanup in the background.
509            guard.advance(ChangePhase::compare_and_write(deleted));
510        }
511
512        timer
513            .tag("backend_choice", backend_choice.as_str())
514            .tag("backend_type", self.backend_type(&backend_choice))
515            .record();
516
517        Ok(())
518    }
519
520    async fn join(&self) {
521        self.inner.tracker.close();
522        tokio::join!(
523            self.inner.high_volume.join(),
524            self.inner.long_term.join(),
525            self.inner.tracker.wait()
526        );
527    }
528}
529
530#[derive(Debug)]
531enum BackendChoice {
532    HighVolume,
533    LongTerm,
534}
535
536impl BackendChoice {
537    fn as_str(&self) -> &'static str {
538        match self {
539            BackendChoice::HighVolume => "high-volume",
540            BackendChoice::LongTerm => "long-term",
541        }
542    }
543}
544
545impl std::fmt::Display for BackendChoice {
546    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
547        f.write_str(self.as_str())
548    }
549}
550
551/// Wraps a stream to count the total bytes yielded by successful chunks.
552///
553/// Returns the shared counter and the wrapped stream. The counter is incremented
554/// as the stream is consumed, so read it only after the stream is exhausted.
555fn counting_stream<S, E>(stream: S) -> (Arc<AtomicU64>, impl Stream<Item = Result<Bytes, E>>)
556where
557    S: Stream<Item = Result<Bytes, E>>,
558{
559    let counter = Arc::new(AtomicU64::new(0));
560
561    (
562        counter.clone(),
563        stream.inspect(move |res| {
564            if let Ok(chunk) = res {
565                counter.fetch_add(chunk.len() as u64, Ordering::Relaxed);
566            }
567        }),
568    )
569}
570
571/// The multipart upload state for TieredStorage.
572#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
573struct TieredUploadId {
574    revision: String,
575    upload_id: UploadId,
576}
577
578impl TryInto<UploadId> for TieredUploadId {
579    type Error = Error;
580
581    fn try_into(self) -> Result<UploadId, Self::Error> {
582        let json =
583            serde_json::to_vec(&self).map_err(|e| Error::serde("encoding multipart token", e))?;
584        Ok(UploadId::new(
585            base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(json),
586        )?)
587    }
588}
589
590impl TryFrom<&UploadId> for TieredUploadId {
591    type Error = Error;
592
593    fn try_from(value: &UploadId) -> Result<Self, Self::Error> {
594        let json = base64::engine::general_purpose::URL_SAFE_NO_PAD
595            .decode(value.as_bytes())
596            .map_err(|e| Error::generic(format!("invalid multipart upload ID: {e}")))?;
597        serde_json::from_slice(&json).map_err(|e| Error::serde("decoding multipart token", e))
598    }
599}
600
601#[async_trait::async_trait]
602impl MultipartUploadBackend for TieredStorage {
603    async fn initiate_multipart(
604        &self,
605        id: &ObjectId,
606        metadata: &Metadata,
607    ) -> Result<InitiateMultipartResponse> {
608        let timer = objectstore_metrics::timer!(
609            "multipart.initiate.latency",
610            usecase = id.usecase().to_owned(),
611        );
612        let physical = new_long_term_revision(id);
613
614        let upload_id = self
615            .inner
616            .long_term
617            .initiate_multipart(&physical, metadata)
618            .await?;
619
620        let id = TieredUploadId {
621            revision: physical.key,
622            upload_id,
623        };
624        let id = id.try_into()?;
625
626        timer.record();
627        Ok(id)
628    }
629
630    async fn upload_part(
631        &self,
632        id: &ObjectId,
633        upload_id: &UploadId,
634        part_number: PartNumber,
635        content_length: u64,
636        content_md5: Option<&str>,
637        body: ClientStream,
638    ) -> Result<UploadPartResponse> {
639        let timer = objectstore_metrics::timer!(
640            "multipart.upload_part.latency",
641            usecase = id.usecase().to_owned(),
642        );
643        let tiered: TieredUploadId = upload_id.try_into()?;
644
645        let physical = ObjectId {
646            context: id.context.clone(),
647            key: tiered.revision,
648        };
649
650        let etag = self
651            .inner
652            .long_term
653            .upload_part(
654                &physical,
655                &tiered.upload_id,
656                part_number,
657                content_length,
658                content_md5,
659                body,
660            )
661            .await?;
662
663        timer.record();
664        objectstore_metrics::record!(
665            "multipart.upload_part.size" = content_length,
666            usecase = id.usecase().to_owned(),
667        );
668
669        Ok(etag)
670    }
671
672    async fn list_parts(
673        &self,
674        id: &ObjectId,
675        upload_id: &UploadId,
676        max_parts: Option<u32>,
677        part_number_marker: Option<PartNumber>,
678    ) -> Result<ListPartsResponse> {
679        let timer = objectstore_metrics::timer!(
680            "multipart.list_parts.latency",
681            usecase = id.usecase().to_owned(),
682        );
683        let tiered: TieredUploadId = upload_id.try_into()?;
684
685        let physical = ObjectId {
686            context: id.context.clone(),
687            key: tiered.revision,
688        };
689
690        let response = self
691            .inner
692            .long_term
693            .list_parts(&physical, &tiered.upload_id, max_parts, part_number_marker)
694            .await?;
695
696        timer.record();
697        Ok(response)
698    }
699
700    async fn abort_multipart(
701        &self,
702        id: &ObjectId,
703        upload_id: &UploadId,
704    ) -> Result<AbortMultipartResponse> {
705        let timer = objectstore_metrics::timer!(
706            "multipart.abort.latency",
707            usecase = id.usecase().to_owned(),
708        );
709        let tiered: TieredUploadId = upload_id.try_into()?;
710
711        let physical = ObjectId {
712            context: id.context.clone(),
713            key: tiered.revision,
714        };
715
716        let () = self
717            .inner
718            .long_term
719            .abort_multipart(&physical, &tiered.upload_id)
720            .await?;
721
722        timer.record();
723        Ok(())
724    }
725
726    async fn complete_multipart(
727        &self,
728        id: &ObjectId,
729        upload_id: &UploadId,
730        parts: Vec<CompletedPart>,
731    ) -> Result<CompleteMultipartResponse> {
732        let timer = objectstore_metrics::timer!(
733            "multipart.complete.latency",
734            usecase = id.usecase().to_owned(),
735        );
736        let part_count = parts.len();
737        let tiered: TieredUploadId = upload_id.try_into()?;
738
739        let physical = ObjectId {
740            context: id.context.clone(),
741            key: tiered.revision,
742        };
743
744        // 1. Read current HV revision to establish the write precondition.
745        let current = match self.inner.high_volume.get_tiered_metadata(id).await? {
746            // Optimization: a previous attempt already finalized this revision and tombstone -- report success.
747            TieredMetadata::Tombstone(t) if t.target == physical => {
748                timer.record();
749                return Ok(None);
750            }
751            TieredMetadata::Tombstone(t) => Some(t.target),
752            _ => None,
753        };
754
755        // Register a guard with cleanup deferred to now + `MULTIPART_COMPLETE_CLEANUP_DELAY`,
756        // so that the user has the chance to retry finalizing the upload in this timeframe.
757        let mut guard = self
758            .record_assembling(Change {
759                id: id.clone(),
760                new: Some(physical.clone()),
761                old: current.clone(),
762                cleanup_after: Some(SystemTime::now() + MULTIPART_COMPLETE_CLEANUP_DELAY),
763            })
764            .await?;
765
766        // 2. Complete the upload, creating the object at the given revision key.
767        let maybe_complete_multipart_err = match self
768            .inner
769            .long_term
770            .complete_multipart(&physical, &tiered.upload_id, parts)
771            .await
772        {
773            // The request went through but we got an error in the response body.
774            // Transparently proxy the error to the user.
775            Ok(error) => {
776                if error.is_some() {
777                    return Ok(error);
778                }
779                None
780            }
781            // We got status 4xx/5xx, or a network error.
782            // Either way, `complete_multipart` might have been completed successfully,
783            // either now or in a previous attempt (in that case, that's a 404 and we indeed end up
784            // here).
785            // We cannot know if that's the case yet, so we continue to the next steps.
786            Err(err) => Some(err),
787        };
788
789        // 3. Retrieve the metadata of the object, which was determined at initiation time, to
790        //    get the expiration policy.
791        //
792        //    This also serves as an existence check to understand if the LT revision was actually
793        //    created successfully in this or a previous attempt, in which case we just need to
794        //    finalize the tombstone.
795        let metadata = self.inner.long_term.get_metadata(&physical).await;
796
797        let metadata = match (metadata, maybe_complete_multipart_err) {
798            // The LT revision already exists, so we can continue to finalize the tombstone.
799            (Ok(Some(metadata)), _) => metadata,
800            // The LT revision doesn't exist, cannot proceed.
801            (Ok(None), Some(err)) => return Err(err),
802            // The `complete_multipart` succeeded, creating the object, but the `get_metadata`
803            // immediately after failed to find the object. This should never happen.
804            (Ok(None), None) => {
805                objectstore_log::error!(
806                    id = ?id,
807                    upload_id = ?upload_id,
808                    physical = ?physical,
809                    "complete_multipart call succeeded on long_term backend, but subsequent get_metadata found no object"
810                );
811                return Err(Error::generic(
812                    "completed multipart object not found in long-term storage",
813                ));
814            }
815            // Failed to `get_metadata`, cannot proceed.
816            (Err(get_metadata_err), maybe_complete_multipart_err) => {
817                // Prefer the `complete_multipart_err`, as it's likely more informative.
818                // TODO(FS-358): convert this properly. Right now `ApiErrorResponse` will turn this into a 500,
819                // but we would actually want to transparently surface the original status (and message?) instead.
820                return Err(maybe_complete_multipart_err.unwrap_or(get_metadata_err));
821            }
822        };
823
824        // 4. CAS commit: write tombstone only if HV state matches what we saw.
825        let tombstone = Tombstone {
826            target: physical.clone(),
827            expiration_policy: metadata.expiration_policy,
828        };
829        let written = self
830            .inner
831            .high_volume
832            .compare_and_write(id, current.as_ref(), TieredWrite::Tombstone(tombstone))
833            .await?;
834
835        // Update guard and let it schedule cleanup in the background.
836        guard.advance(ChangePhase::compare_and_write(written));
837
838        timer.record();
839        objectstore_metrics::record!(
840            "multipart.complete.part_count" = part_count as u64,
841            usecase = id.usecase().to_owned(),
842        );
843        if let Some(size) = metadata.size {
844            objectstore_metrics::record!(
845                "put.size" = size as u64,
846                usecase = id.usecase().to_owned(),
847                backend_choice = BackendChoice::LongTerm.as_str(),
848                backend_type = self.backend_type(&BackendChoice::LongTerm),
849                upload_type = "multipart",
850            );
851        }
852
853        Ok(None)
854    }
855}
856
857#[cfg(test)]
858mod tests {
859    use std::num::NonZeroU32;
860
861    use futures::lock::Mutex;
862    use objectstore_types::metadata::{ExpirationPolicy, Metadata};
863    use objectstore_types::scope::{Scope, Scopes};
864
865    use super::*;
866    use crate::backend::changelog::{InMemoryChangeLog, NoopChangeLog};
867    use crate::backend::in_memory::InMemoryBackend;
868    use crate::backend::testing::{Hooks, TestBackend};
869    use crate::error::Error;
870    use crate::id::ObjectContext;
871
872    use crate::stream::{self, ClientStream};
873
874    fn make_context() -> ObjectContext {
875        ObjectContext {
876            usecase: "testing".into(),
877            scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
878        }
879    }
880
881    fn make_id(key: &str) -> ObjectId {
882        ObjectId::new(make_context(), key.into())
883    }
884
885    fn make_tiered_storage() -> (
886        TieredStorage,
887        InMemoryBackend,
888        InMemoryBackend,
889        InMemoryChangeLog,
890    ) {
891        let hv = InMemoryBackend::new("in-memory-hv");
892        let lt = InMemoryBackend::new("in-memory-lt");
893        let changelog = InMemoryChangeLog::default();
894        let storage = TieredStorage::new(
895            Box::new(hv.clone()),
896            Box::new(lt.clone()),
897            Box::new(changelog.clone()),
898        );
899        (storage, hv, lt, changelog)
900    }
901
902    // --- new_long_term_revision tests ---
903
904    #[test]
905    fn revision_id_preserves_context() {
906        let id = make_id("my-key");
907        let revised = new_long_term_revision(&id);
908        assert_eq!(revised.context, id.context);
909        assert!(
910            revised.key.starts_with("my-key/"),
911            "revised key should have /<uuid> suffix, got: {}",
912            revised.key
913        );
914    }
915
916    #[test]
917    fn revision_id_roundtrips_storage_path() {
918        let id = make_id("original");
919        let revised = new_long_term_revision(&id);
920        let path = revised.as_storage_path().to_string();
921        let parsed = ObjectId::from_storage_path(&path)
922            .unwrap_or_else(|| panic!("failed to parse '{path}'"));
923        assert_eq!(parsed, revised);
924    }
925
926    #[test]
927    fn revision_id_is_unique() {
928        let id = make_id("base-key");
929        let a = new_long_term_revision(&id);
930        let b = new_long_term_revision(&id);
931        assert_ne!(a.key, b.key, "two calls should produce different keys");
932    }
933
934    // --- Basic behavior ---
935
936    #[tokio::test]
937    async fn get_nonexistent_returns_none() {
938        let (storage, _hv, _lt, _) = make_tiered_storage();
939        let id = make_id("does-not-exist");
940
941        assert!(storage.get_object(&id, None).await.unwrap().is_none());
942        assert!(storage.get_metadata(&id).await.unwrap().is_none());
943    }
944
945    #[tokio::test]
946    async fn delete_nonexistent_succeeds() {
947        let (storage, _hv, _lt, _) = make_tiered_storage();
948        let id = make_id("does-not-exist");
949
950        storage.delete_object(&id).await.unwrap();
951    }
952
953    // --- Put routing ---
954
955    #[tokio::test]
956    async fn put_small_object_stores_inline() {
957        let (storage, hv, lt, _) = make_tiered_storage();
958        let id = make_id("small");
959        let payload = b"small payload".to_vec();
960
961        storage
962            .put_object(&id, &Metadata::default(), stream::single(payload.clone()))
963            .await
964            .unwrap();
965
966        assert!(hv.contains(&id), "expected in high-volume");
967        assert!(!lt.contains(&id), "leaked to long-term");
968
969        let (_, _, s) = storage.get_object(&id, None).await.unwrap().unwrap();
970        let body = stream::read_to_vec(s).await.unwrap();
971        assert_eq!(body, payload);
972
973        assert!(
974            storage.get_metadata(&id).await.unwrap().is_some(),
975            "get_metadata should return metadata for inline objects"
976        );
977    }
978
979    #[tokio::test]
980    async fn put_large_object_creates_tombstone() {
981        let (storage, hv, lt, _) = make_tiered_storage();
982        let id = make_id("large");
983        let payload = vec![0xCDu8; 2 * 1024 * 1024]; // 2 MiB, over threshold
984        let metadata_in = Metadata {
985            content_type: "image/png".into(),
986            expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_hours(1)),
987            origin: Some("10.0.0.1".into()),
988            ..Metadata::default()
989        };
990
991        storage
992            .put_object(&id, &metadata_in, stream::single(payload.clone()))
993            .await
994            .unwrap();
995
996        // Tombstone in HV: correct expiration_policy, target is a revision key.
997        let tombstone = hv.get(&id).expect_tombstone();
998        assert_eq!(tombstone.expiration_policy, metadata_in.expiration_policy);
999        let lt_id = tombstone.target;
1000        assert!(
1001            lt_id.key().starts_with(id.key()),
1002            "tombstone target key should be a revision of the HV key, got: {}",
1003            lt_id.key()
1004        );
1005
1006        // LT object at revision key with correct metadata.
1007        let (lt_meta, _) = lt.get(&lt_id).expect_object();
1008        assert_eq!(lt_meta.content_type, "image/png");
1009        assert_eq!(lt_meta.expiration_policy, metadata_in.expiration_policy);
1010
1011        // get_object follows the tombstone and returns the correct payload.
1012        let (_, _, s) = storage.get_object(&id, None).await.unwrap().unwrap();
1013        let body = stream::read_to_vec(s).await.unwrap();
1014        assert_eq!(body, payload);
1015
1016        // get_metadata follows the tombstone and returns the correct content_type.
1017        let metadata = storage.get_metadata(&id).await.unwrap().unwrap();
1018        assert_eq!(metadata.content_type, "image/png");
1019    }
1020
1021    // --- Put overwrites ---
1022
1023    #[tokio::test]
1024    async fn reinsert_small_over_large_swaps_to_inline() {
1025        let (storage, hv, lt, _) = make_tiered_storage();
1026        let id = make_id("reinsert-key");
1027
1028        // First: insert a large object → creates tombstone in hv, payload in lt at lt_id
1029        let large_payload = vec![0xABu8; 2 * 1024 * 1024];
1030        storage
1031            .put_object(&id, &Metadata::default(), stream::single(large_payload))
1032            .await
1033            .unwrap();
1034
1035        let lt_id = hv.get(&id).expect_tombstone().target;
1036
1037        // Re-insert a SMALL payload with the same key.
1038        // The CAS-swap puts the small object inline in HV and schedules background cleanup.
1039        let small_payload = vec![0xCDu8; 100]; // well under 1 MiB threshold
1040        storage
1041            .put_object(&id, &Metadata::default(), stream::single(small_payload))
1042            .await
1043            .unwrap();
1044
1045        // The small object is now inline in high-volume.
1046        hv.get(&id).expect_object();
1047
1048        // Drain background cleanup tasks before asserting LT state.
1049        storage.join().await;
1050
1051        // The old long-term blob was cleaned up.
1052        lt.get(&lt_id).expect_not_found();
1053    }
1054
1055    #[tokio::test]
1056    async fn overwrite_large_with_large_replaces_revision() {
1057        let (storage, hv, lt, _) = make_tiered_storage();
1058        let id = make_id("overwrite-large");
1059
1060        let payload1 = vec![0xAAu8; 2 * 1024 * 1024];
1061        storage
1062            .put_object(&id, &Metadata::default(), stream::single(payload1))
1063            .await
1064            .unwrap();
1065        let lt_id_1 = hv.get(&id).expect_tombstone().target;
1066
1067        let payload2 = vec![0xBBu8; 2 * 1024 * 1024];
1068        storage
1069            .put_object(&id, &Metadata::default(), stream::single(payload2.clone()))
1070            .await
1071            .unwrap();
1072        let lt_id_2 = hv.get(&id).expect_tombstone().target;
1073
1074        assert_ne!(
1075            lt_id_1, lt_id_2,
1076            "second write should create a new revision"
1077        );
1078
1079        // Drain background cleanup tasks before asserting LT state.
1080        storage.join().await;
1081
1082        lt.get(&lt_id_1).expect_not_found();
1083        lt.get(&lt_id_2).expect_object();
1084
1085        let (_, _, s) = storage.get_object(&id, None).await.unwrap().unwrap();
1086        let body = stream::read_to_vec(s).await.unwrap();
1087        assert_eq!(body, payload2);
1088    }
1089
1090    // --- Delete ---
1091
1092    #[tokio::test]
1093    async fn delete_small_object() {
1094        let (storage, hv, _lt, _) = make_tiered_storage();
1095        let id = make_id("delete-small");
1096
1097        storage
1098            .put_object(&id, &Metadata::default(), stream::single("tiny"))
1099            .await
1100            .unwrap();
1101
1102        storage.delete_object(&id).await.unwrap();
1103
1104        hv.get(&id).expect_not_found();
1105        assert!(storage.get_object(&id, None).await.unwrap().is_none());
1106    }
1107
1108    #[tokio::test]
1109    async fn delete_large_object_cleans_up_both_backends() {
1110        let (storage, hv, lt, _) = make_tiered_storage();
1111        let id = make_id("delete-both");
1112        let payload = vec![0u8; 2 * 1024 * 1024]; // 2 MiB
1113
1114        storage
1115            .put_object(&id, &Metadata::default(), stream::single(payload))
1116            .await
1117            .unwrap();
1118
1119        // Capture lt_id before deleting (it lives at the revision key, not at id).
1120        let lt_id = hv.get(&id).expect_tombstone().target;
1121
1122        storage.delete_object(&id).await.unwrap();
1123
1124        // Drain background cleanup tasks before asserting LT state.
1125        storage.join().await;
1126
1127        assert!(!hv.contains(&id), "tombstone not cleaned up");
1128        assert!(!lt.contains(&lt_id), "long-term object not cleaned up");
1129    }
1130
1131    #[derive(Debug)]
1132    struct FailDelete;
1133
1134    #[async_trait::async_trait]
1135    impl Hooks for FailDelete {
1136        async fn delete_object(
1137            &self,
1138            _inner: &InMemoryBackend,
1139            _id: &ObjectId,
1140        ) -> Result<DeleteResponse> {
1141            Err(Error::Io(std::io::Error::new(
1142                std::io::ErrorKind::ConnectionRefused,
1143                "simulated long-term delete failure",
1144            )))
1145        }
1146    }
1147
1148    /// When the long-term GCS cleanup fails after the tombstone is deleted, the
1149    /// delete still succeeds (GCS cleanup is best-effort). An orphan blob may
1150    /// remain in LT storage, which is accepted.
1151    #[tokio::test]
1152    async fn delete_succeeds_when_gcs_cleanup_fails() {
1153        let hv = InMemoryBackend::new("hv");
1154        let lt = TestBackend::new(FailDelete);
1155        let log = NoopChangeLog;
1156        let storage = TieredStorage::new(Box::new(hv.clone()), Box::new(lt), Box::new(log));
1157
1158        let id = make_id("fail-delete");
1159        let payload = vec![0xABu8; 2 * 1024 * 1024]; // 2 MiB -> goes to long-term
1160        storage
1161            .put_object(&id, &Metadata::default(), stream::single(payload))
1162            .await
1163            .unwrap();
1164
1165        // Delete succeeds even though GCS cleanup fails (it is best-effort).
1166        let result = storage.delete_object(&id).await;
1167        assert!(
1168            result.is_ok(),
1169            "delete should succeed despite GCS cleanup failure"
1170        );
1171
1172        // The tombstone in HV is gone (CAS-deleted first, before GCS cleanup).
1173        hv.get(&id).expect_not_found();
1174
1175        // The orphaned GCS blob remains but the object is unreachable through the service.
1176        assert!(
1177            storage.get_object(&id, None).await.unwrap().is_none(),
1178            "object should be unreachable after tombstone is deleted"
1179        );
1180    }
1181
1182    // --- CAS conflicts ---
1183
1184    #[derive(Debug)]
1185    struct CasConflict;
1186
1187    #[async_trait::async_trait]
1188    impl Hooks for CasConflict {
1189        async fn compare_and_write(
1190            &self,
1191            _inner: &InMemoryBackend,
1192            _id: &ObjectId,
1193            _current: Option<&ObjectId>,
1194            _write: TieredWrite,
1195        ) -> Result<bool> {
1196            Ok(false) // always conflict
1197        }
1198    }
1199
1200    /// After a large-object write loses the CAS race, the new LT blob must be
1201    /// cleaned up. The put still returns `Ok(())` — from the caller's view, a
1202    /// concurrent write won.
1203    #[tokio::test]
1204    async fn put_large_cas_conflict_cleans_up_new_blob() {
1205        let hv = TestBackend::new(CasConflict);
1206        let lt = InMemoryBackend::new("lt");
1207        let log = NoopChangeLog;
1208        let storage = TieredStorage::new(Box::new(hv), Box::new(lt.clone()), Box::new(log));
1209
1210        let id = make_id("cas-conflict-large");
1211        let payload = vec![0xABu8; 2 * 1024 * 1024]; // 2 MiB -> long-term path
1212
1213        storage
1214            .put_object(&id, &Metadata::default(), stream::single(payload))
1215            .await
1216            .unwrap();
1217
1218        // Drain background cleanup tasks before asserting LT state.
1219        storage.join().await;
1220
1221        assert!(
1222            lt.is_empty(),
1223            "LT blob should be cleaned up after CAS conflict"
1224        );
1225    }
1226
1227    /// When swapping a tombstone for inline data, a CAS conflict means another
1228    /// writer won. The put still returns `Ok(())` — no LT blob was written, so
1229    /// there is nothing to clean up.
1230    #[tokio::test]
1231    async fn put_small_over_tombstone_cas_conflict_succeeds() {
1232        let inner = InMemoryBackend::new("hv");
1233        let id = make_id("cas-conflict-small");
1234
1235        // Pre-seed a tombstone directly in the inner backend so put_non_tombstone
1236        // returns it instead of writing inline.
1237        let tombstone = Tombstone {
1238            target: make_id("lt-object"),
1239            expiration_policy: ExpirationPolicy::Manual,
1240        };
1241        inner
1242            .compare_and_write(&id, None, TieredWrite::Tombstone(tombstone))
1243            .await
1244            .unwrap();
1245
1246        let lt = InMemoryBackend::new("lt");
1247        let hv = TestBackend::with_inner(inner, CasConflict);
1248        let log = NoopChangeLog;
1249        let storage = TieredStorage::new(Box::new(hv), Box::new(lt), Box::new(log));
1250
1251        // Writing a small object over a tombstone should succeed even when CAS
1252        // conflicts — the other writer's write is accepted.
1253        storage
1254            .put_object(&id, &Metadata::default(), stream::single("tiny"))
1255            .await
1256            .unwrap();
1257    }
1258
1259    // --- Failure / inconsistency ---
1260
1261    /// Simulates compare_and_write failure. If `true`, it fails after commit.
1262    #[derive(Debug)]
1263    struct FailCas(bool);
1264
1265    #[async_trait::async_trait]
1266    impl Hooks for FailCas {
1267        async fn compare_and_write(
1268            &self,
1269            inner: &InMemoryBackend,
1270            id: &ObjectId,
1271            current: Option<&ObjectId>,
1272            write: TieredWrite,
1273        ) -> Result<bool> {
1274            if self.0 {
1275                // simulate a network error _after_ commit went through
1276                inner.compare_and_write(id, current, write).await?;
1277            }
1278            Err(Error::Io(std::io::Error::new(
1279                std::io::ErrorKind::TimedOut,
1280                "simulated compare_and_write failure",
1281            )))
1282        }
1283    }
1284
1285    /// If the tombstone write to the high-volume backend fails after the long-term
1286    /// write succeeds, the long-term object must be cleaned up so we never leave
1287    /// an unreachable orphan in long-term storage.
1288    #[tokio::test]
1289    async fn no_orphan_when_tombstone_write_fails() {
1290        let lt = InMemoryBackend::new("lt");
1291        let hv = TestBackend::new(FailCas(false));
1292        let log = NoopChangeLog;
1293        let storage = TieredStorage::new(Box::new(hv), Box::new(lt.clone()), Box::new(log));
1294
1295        let id = make_id("orphan-test");
1296        let payload = vec![0xABu8; 2 * 1024 * 1024]; // 2 MiB -> long-term path
1297        let result = storage
1298            .put_object(&id, &Metadata::default(), stream::single(payload))
1299            .await;
1300
1301        assert!(result.is_err());
1302
1303        // Drain background cleanup tasks before asserting LT state.
1304        storage.join().await;
1305
1306        assert!(lt.is_empty(), "long-term object not cleaned up");
1307    }
1308
1309    /// If a tombstone exists in high-volume but the corresponding object is
1310    /// missing from long-term storage (e.g. due to a race condition or partial
1311    /// cleanup), reads should gracefully return None rather than error.
1312    #[tokio::test]
1313    async fn orphan_tombstone_returns_none() {
1314        let (storage, hv, lt, _) = make_tiered_storage();
1315        let id = make_id("orphan-tombstone");
1316        let payload = vec![0xCDu8; 2 * 1024 * 1024]; // 2 MiB
1317
1318        storage
1319            .put_object(&id, &Metadata::default(), stream::single(payload))
1320            .await
1321            .unwrap();
1322
1323        // The object is at the revision key in LT, not at id.
1324        let lt_id = hv.get(&id).expect_tombstone().target;
1325
1326        // Remove the long-term object, leaving an orphan tombstone in hv
1327        lt.remove(&lt_id);
1328
1329        assert!(
1330            storage.get_object(&id, None).await.unwrap().is_none(),
1331            "orphan tombstone should resolve to None on get_object"
1332        );
1333        assert!(
1334            storage.get_metadata(&id).await.unwrap().is_none(),
1335            "orphan tombstone should resolve to None on get_metadata"
1336        );
1337    }
1338
1339    // --- Redirect target ---
1340
1341    /// A tombstone carrying an explicit `target` is followed correctly on reads and deletes,
1342    /// including when the target ObjectId differs from the HV ObjectId.
1343    #[tokio::test]
1344    async fn tombstone_target_is_used_for_reads_and_deletes() {
1345        let hv = InMemoryBackend::new("hv");
1346        let lt = InMemoryBackend::new("lt");
1347        let log = NoopChangeLog;
1348        let storage = TieredStorage::new(Box::new(hv.clone()), Box::new(lt.clone()), Box::new(log));
1349
1350        let hv_id = make_id("hv-key");
1351        let lt_id = make_id("lt-key");
1352        let payload = vec![0xABu8; 100];
1353
1354        // Write the object under the LT id and a tombstone pointing to it from HV.
1355        lt.put_object(
1356            &lt_id,
1357            &Metadata::default(),
1358            stream::single(payload.clone()),
1359        )
1360        .await
1361        .unwrap();
1362        let tombstone = Tombstone {
1363            target: lt_id.clone(),
1364            expiration_policy: ExpirationPolicy::Manual,
1365        };
1366        hv.compare_and_write(&hv_id, None, TieredWrite::Tombstone(tombstone))
1367            .await
1368            .unwrap();
1369
1370        // get_object must follow the tombstone and find the object via the lt_id target.
1371        let (_, _, s) = storage.get_object(&hv_id, None).await.unwrap().unwrap();
1372        let body = stream::read_to_vec(s).await.unwrap();
1373        assert_eq!(body, payload);
1374
1375        // delete_object must clean up both backends using the target.
1376        storage.delete_object(&hv_id).await.unwrap();
1377        storage.join().await;
1378        assert!(!hv.contains(&hv_id), "tombstone should be removed");
1379        assert!(!lt.contains(&lt_id), "lt object should be removed");
1380    }
1381
1382    // --- Multi-chunk ---
1383
1384    #[tokio::test]
1385    async fn multi_chunk_large_object_chains_buffered_and_remaining() {
1386        let (storage, hv, lt, _) = make_tiered_storage();
1387        let id = make_id("multi-chunk");
1388
1389        // Deliver a 2 MiB payload across multiple chunks that individually
1390        // fit under the threshold but collectively exceed it.
1391        let chunk_size = 512 * 1024; // 512 KiB per chunk
1392        let chunk_count = 4; // 4 × 512 KiB = 2 MiB total
1393        let stream: ClientStream = futures_util::stream::iter(
1394            (0..chunk_count).map(move |i| Ok(Bytes::from(vec![i as u8; chunk_size]))),
1395        )
1396        .boxed();
1397
1398        storage
1399            .put_object(&id, &Metadata::default(), stream)
1400            .await
1401            .unwrap();
1402
1403        // Should have been routed to long-term (over 1 MiB) at the revision key.
1404        let lt_id = hv.get(&id).expect_tombstone().target;
1405        let (_, lt_bytes) = lt.get(&lt_id).expect_object();
1406        assert_eq!(lt_bytes.len(), chunk_size * chunk_count);
1407
1408        // Verify data integrity — each chunk's fill byte should appear in order.
1409        for i in 0..chunk_count {
1410            let offset = i * chunk_size;
1411            assert!(
1412                lt_bytes[offset..offset + chunk_size]
1413                    .iter()
1414                    .all(|&b| b == i as u8),
1415                "data mismatch in chunk {i}"
1416            );
1417        }
1418    }
1419
1420    // --- Written-phase cleanup ---
1421
1422    /// When a large-object overwrite commits in HV but its response is lost, the guard drops in
1423    /// `Written` phase. Cleanup must read HV to determine the CAS outcome, then delete whichever
1424    /// LT blob is no longer referenced — here the old one, since the new tombstone committed.
1425    #[tokio::test]
1426    async fn written_cleanup_after_lost_cas_response() {
1427        let (storage, hv, lt, log) = make_tiered_storage();
1428        let id = make_id("obj");
1429
1430        // First put: establishes tombstone
1431        let payload = vec![0xAAu8; 2 * 1024 * 1024];
1432        storage
1433            .put_object(&id, &Metadata::default(), stream::single(payload.clone()))
1434            .await
1435            .unwrap();
1436        let tombstone1 = hv.get(&id).expect_tombstone().target;
1437
1438        // Second put: Updates tombstone but fails immediately after committing
1439        let broken_storage = TieredStorage::new(
1440            Box::new(TestBackend::with_inner(hv.clone(), FailCas(true))),
1441            Box::new(lt.clone()),
1442            Box::new(log.clone()),
1443        );
1444        broken_storage
1445            .put_object(&id, &Metadata::default(), stream::single(payload.clone()))
1446            .await
1447            .unwrap_err(); // must fail
1448        let tombstone2 = hv.get(&id).expect_tombstone().target;
1449        assert_ne!(tombstone1, tombstone2);
1450
1451        // The first tombstone's target should be cleaned up, but the second should remain.
1452        broken_storage.join().await;
1453        lt.get(&tombstone1).expect_not_found();
1454        lt.get(&tombstone2).expect_object();
1455
1456        // Now delete the new object with the same tombstone failure
1457        broken_storage.delete_object(&id).await.unwrap_err();
1458        hv.get(&id).expect_not_found();
1459        broken_storage.join().await;
1460        lt.get(&tombstone2).expect_not_found();
1461
1462        // Create a fresh large object
1463        let id = make_id("obj2");
1464        storage
1465            .put_object(&id, &Metadata::default(), stream::single(payload.clone()))
1466            .await
1467            .unwrap();
1468        let tombstone3 = hv.get(&id).expect_tombstone().target;
1469
1470        // Overwrite it with a small object and check again for cleanup
1471        broken_storage
1472            .put_object(&id, &Metadata::default(), stream::single(&b"small"[..]))
1473            .await
1474            .unwrap_err(); // must fail
1475        hv.get(&id).expect_object();
1476        broken_storage.join().await;
1477        lt.get(&tombstone3).expect_not_found();
1478    }
1479
1480    // --- ChangeGuard drop safety tests ---
1481
1482    /// Dropping a guard outside any tokio runtime must not panic.
1483    #[test]
1484    fn guard_dropped_outside_runtime_does_not_panic() {
1485        let manager = ChangeManager::new(
1486            Box::new(InMemoryBackend::new("hv")),
1487            Box::new(InMemoryBackend::new("lt")),
1488            Box::new(NoopChangeLog),
1489        );
1490
1491        let change = Change {
1492            id: make_id("object-key"),
1493            new: Some(make_id("cleanup-target")),
1494            old: None,
1495            cleanup_after: None,
1496        };
1497
1498        // Build the guard inside a temporary runtime, then let the runtime drop
1499        // so that no tokio context is active when the guard drops.
1500        let guard = {
1501            let rt = tokio::runtime::Runtime::new().unwrap();
1502            rt.block_on(manager.record(change)).unwrap()
1503        };
1504
1505        drop(guard); // Must not panic.
1506    }
1507
1508    /// `join` blocks until all in-flight guards have completed cleanup.
1509    ///
1510    /// Time is advanced manually so the test runs at virtual speed. The guard
1511    /// completes after 10 s; `join` must still be waiting at 9 s and done by 11 s.
1512    #[tokio::test(start_paused = true)]
1513    async fn join_waits_for_cleanup_to_complete() {
1514        let (storage, _hv, _lt, _) = make_tiered_storage();
1515        let change = Change {
1516            id: make_id("object-key"),
1517            new: None,
1518            old: None,
1519            cleanup_after: None,
1520        };
1521        let mut guard = storage.record_change(change).await.unwrap();
1522
1523        tokio::spawn(async move {
1524            tokio::time::sleep(Duration::from_secs(10)).await;
1525            guard.advance(ChangePhase::Completed);
1526            drop(guard);
1527        });
1528
1529        let join_future = tokio::spawn(async move { storage.join().await });
1530
1531        tokio::time::sleep(Duration::from_secs(9)).await;
1532        assert!(!join_future.is_finished(), "finished before guard dropped");
1533
1534        tokio::time::sleep(Duration::from_secs(2)).await;
1535        assert!(join_future.is_finished(), "finish after guard drops");
1536    }
1537
1538    // --- Changelog integration tests ---
1539
1540    /// LT backend hook that completes the write, then pauses until resumed.
1541    ///
1542    /// Lets tests cancel the owning future after the blob is committed but
1543    /// before the HV tombstone is set.
1544    #[derive(Clone, Debug)]
1545    struct PauseAfterPut {
1546        paused: Arc<tokio::sync::Notify>,
1547        resume: Arc<tokio::sync::Notify>,
1548    }
1549
1550    #[async_trait::async_trait]
1551    impl Hooks for PauseAfterPut {
1552        async fn put_object(
1553            &self,
1554            inner: &InMemoryBackend,
1555            id: &ObjectId,
1556            metadata: &Metadata,
1557            stream: ClientStream,
1558        ) -> Result<PutResponse> {
1559            inner.put_object(id, metadata, stream).await?;
1560            self.paused.notify_one();
1561            self.resume.notified().await;
1562            Ok(())
1563        }
1564    }
1565
1566    /// When a future is cancelled after the LT write but before the HV tombstone is set,
1567    /// the `ChangeGuard` cleans up the orphaned LT blob and removes the log entry.
1568    #[tokio::test]
1569    async fn dropped_future_triggers_cleanup_and_log_entry_removed() {
1570        let paused = Arc::new(tokio::sync::Notify::new());
1571        let hooks = PauseAfterPut {
1572            paused: Arc::clone(&paused),
1573            resume: Arc::new(tokio::sync::Notify::new()),
1574        };
1575
1576        let lt_inner = InMemoryBackend::new("lt");
1577        let log = InMemoryChangeLog::default();
1578        let storage = TieredStorage::new(
1579            Box::new(InMemoryBackend::new("hv")),
1580            Box::new(TestBackend::with_inner(lt_inner.clone(), hooks)),
1581            Box::new(log.clone()),
1582        );
1583
1584        let id = make_id("drop-test");
1585        let metadata = Metadata::default();
1586        let payload = vec![0xABu8; 2 * 1024 * 1024]; // 2 MiB → long-term path
1587
1588        // Drive the put until the LT write commits, then cancel before the HV tombstone is set.
1589        tokio::select! {
1590            result = storage.put_object(&id, &metadata, stream::single(payload)) => {
1591                panic!("expected put to pause before completing, got: {result:?}");
1592            }
1593            _ = paused.notified() => {
1594                // LT blob stored; cancelling drops the guard in Recorded phase.
1595            }
1596        }
1597
1598        // ChangeGuard dropped → background cleanup task spawned; wait for it.
1599        storage.join().await;
1600
1601        // The orphaned LT blob must have been deleted.
1602        assert!(lt_inner.is_empty(), "orphaned LT blob was not cleaned up");
1603
1604        // The log entry must be gone once cleanup completes.
1605        let entries = log.scan().await.unwrap();
1606        assert!(
1607            entries.is_empty(),
1608            "changelog entry not removed after cleanup"
1609        );
1610    }
1611
1612    // --- Multipart upload ---
1613
1614    #[test]
1615    fn multipart_upload_id_roundtrip() {
1616        let id = TieredUploadId {
1617            revision: "my-key/01924a6f-7e28-7b9a-9c1d-abcdef123456".into(),
1618            upload_id: UploadId::new("upstream-upload-id-abc".into()).unwrap(),
1619        };
1620        let encoded: UploadId = id.clone().try_into().unwrap();
1621        let decoded: TieredUploadId = (&encoded.clone()).try_into().unwrap();
1622        assert_eq!(decoded, id);
1623    }
1624
1625    #[tokio::test]
1626    async fn multipart_single_part_roundtrip() {
1627        let (storage, hv, lt, _) = make_tiered_storage();
1628        let id = make_id("mp-single");
1629        let metadata = Metadata {
1630            content_type: "application/octet-stream".into(),
1631            expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_hours(1)),
1632            ..Metadata::default()
1633        };
1634        let payload = vec![0xABu8; 2 * 1024 * 1024]; // 2 MiB
1635
1636        let upload_id = storage.initiate_multipart(&id, &metadata).await.unwrap();
1637
1638        let etag = storage
1639            .upload_part(
1640                &id,
1641                &upload_id,
1642                NonZeroU32::new(1).unwrap(),
1643                payload.len() as u64,
1644                None,
1645                stream::single(payload.clone()),
1646            )
1647            .await
1648            .unwrap();
1649
1650        let error = storage
1651            .complete_multipart(
1652                &id,
1653                &upload_id,
1654                vec![CompletedPart {
1655                    part_number: NonZeroU32::new(1).unwrap(),
1656                    etag,
1657                }],
1658            )
1659            .await
1660            .unwrap();
1661        assert!(
1662            error.is_none(),
1663            "complete_multipart returned error: {error:?}"
1664        );
1665
1666        // get_object should follow the tombstone and return the payload.
1667        let (got_meta, _, s) = storage.get_object(&id, None).await.unwrap().unwrap();
1668        let body = stream::read_to_vec(s).await.unwrap();
1669        assert_eq!(body, payload);
1670        assert_eq!(got_meta.content_type, "application/octet-stream");
1671
1672        // HV should have a tombstone, LT should have the object at the physical key.
1673        let tombstone = hv.get(&id).expect_tombstone();
1674        assert!(
1675            tombstone.target.key().starts_with(id.key()),
1676            "tombstone target should be a revision key"
1677        );
1678        lt.get(&tombstone.target).expect_object();
1679    }
1680
1681    #[tokio::test]
1682    async fn multipart_upload() {
1683        let (storage, _hv, _lt, _) = make_tiered_storage();
1684        let id = make_id("multipart");
1685
1686        let upload_id = storage
1687            .initiate_multipart(&id, &Metadata::default())
1688            .await
1689            .unwrap();
1690
1691        let part1 = vec![0xAAu8; 512 * 1024];
1692        let part2 = vec![0xBBu8; 512 * 1024];
1693        let part3 = vec![0xCCu8; 512 * 1024];
1694
1695        let etag3 = storage
1696            .upload_part(
1697                &id,
1698                &upload_id,
1699                NonZeroU32::new(3).unwrap(),
1700                part3.len() as u64,
1701                None,
1702                stream::single(part3.clone()),
1703            )
1704            .await
1705            .unwrap();
1706        let etag2 = storage
1707            .upload_part(
1708                &id,
1709                &upload_id,
1710                NonZeroU32::new(2).unwrap(),
1711                part2.len() as u64,
1712                None,
1713                stream::single(part2.clone()),
1714            )
1715            .await
1716            .unwrap();
1717        let etag1 = storage
1718            .upload_part(
1719                &id,
1720                &upload_id,
1721                NonZeroU32::new(1).unwrap(),
1722                part1.len() as u64,
1723                None,
1724                stream::single(part1.clone()),
1725            )
1726            .await
1727            .unwrap();
1728
1729        let error = storage
1730            .complete_multipart(
1731                &id,
1732                &upload_id,
1733                vec![
1734                    CompletedPart {
1735                        part_number: NonZeroU32::new(1).unwrap(),
1736                        etag: etag1,
1737                    },
1738                    CompletedPart {
1739                        part_number: NonZeroU32::new(2).unwrap(),
1740                        etag: etag2,
1741                    },
1742                    CompletedPart {
1743                        part_number: NonZeroU32::new(3).unwrap(),
1744                        etag: etag3,
1745                    },
1746                ],
1747            )
1748            .await
1749            .unwrap();
1750        assert!(error.is_none());
1751
1752        let (_, _, s) = storage.get_object(&id, None).await.unwrap().unwrap();
1753        let body = stream::read_to_vec(s).await.unwrap();
1754
1755        let mut expected = Vec::new();
1756        expected.extend_from_slice(&part1);
1757        expected.extend_from_slice(&part2);
1758        expected.extend_from_slice(&part3);
1759        assert_eq!(body, expected);
1760    }
1761
1762    #[tokio::test]
1763    async fn multipart_abort() {
1764        let (storage, hv, _lt, _) = make_tiered_storage();
1765        let id = make_id("mp-abort");
1766
1767        let upload_id = storage
1768            .initiate_multipart(&id, &Metadata::default())
1769            .await
1770            .unwrap();
1771
1772        // Upload a part then abort.
1773        let payload = vec![0xABu8; 100];
1774        storage
1775            .upload_part(
1776                &id,
1777                &upload_id,
1778                NonZeroU32::new(1).unwrap(),
1779                payload.len() as u64,
1780                None,
1781                stream::single(payload),
1782            )
1783            .await
1784            .unwrap();
1785
1786        storage.abort_multipart(&id, &upload_id).await.unwrap();
1787
1788        // No tombstone should have been written.
1789        hv.get(&id).expect_not_found();
1790
1791        // The object should not be reachable.
1792        assert!(storage.get_object(&id, None).await.unwrap().is_none());
1793    }
1794
1795    #[tokio::test]
1796    async fn multipart_list_parts() {
1797        let (storage, _hv, _lt, _) = make_tiered_storage();
1798        let id = make_id("mp-list");
1799
1800        let upload_id = storage
1801            .initiate_multipart(&id, &Metadata::default())
1802            .await
1803            .unwrap();
1804
1805        let part1 = vec![0xAAu8; 100];
1806        let part2 = vec![0xBBu8; 200];
1807        storage
1808            .upload_part(
1809                &id,
1810                &upload_id,
1811                NonZeroU32::new(1).unwrap(),
1812                part1.len() as u64,
1813                None,
1814                stream::single(part1),
1815            )
1816            .await
1817            .unwrap();
1818        storage
1819            .upload_part(
1820                &id,
1821                &upload_id,
1822                NonZeroU32::new(2).unwrap(),
1823                part2.len() as u64,
1824                None,
1825                stream::single(part2),
1826            )
1827            .await
1828            .unwrap();
1829
1830        let resp = storage
1831            .list_parts(&id, &upload_id, None, None)
1832            .await
1833            .unwrap();
1834        assert_eq!(resp.parts.len(), 2);
1835        assert_eq!(resp.parts[0].part_number.get(), 1);
1836        assert_eq!(resp.parts[0].size, 100);
1837        assert_eq!(resp.parts[1].part_number.get(), 2);
1838        assert_eq!(resp.parts[1].size, 200);
1839    }
1840
1841    #[tokio::test]
1842    async fn multipart_overwrites_existing_tombstone() {
1843        let (storage, hv, lt, _) = make_tiered_storage();
1844        let id = make_id("mp-overwrite");
1845
1846        // Put a large object via the normal path.
1847        let payload1 = vec![0xAAu8; 2 * 1024 * 1024];
1848        storage
1849            .put_object(&id, &Metadata::default(), stream::single(payload1))
1850            .await
1851            .unwrap();
1852        let old_lt_id = hv.get(&id).expect_tombstone().target;
1853
1854        // Overwrite via multipart.
1855        let upload_id = storage
1856            .initiate_multipart(&id, &Metadata::default())
1857            .await
1858            .unwrap();
1859
1860        let payload2 = vec![0xBBu8; 2 * 1024 * 1024];
1861        let etag = storage
1862            .upload_part(
1863                &id,
1864                &upload_id,
1865                NonZeroU32::new(1).unwrap(),
1866                payload2.len() as u64,
1867                None,
1868                stream::single(payload2.clone()),
1869            )
1870            .await
1871            .unwrap();
1872
1873        // The multipart upload is not finalized, so the tombstone still points to the old
1874        // revision.
1875        let lt_id = hv.get(&id).expect_tombstone().target;
1876        assert_eq!(old_lt_id, lt_id);
1877
1878        let error = storage
1879            .complete_multipart(
1880                &id,
1881                &upload_id,
1882                vec![CompletedPart {
1883                    part_number: NonZeroU32::new(1).unwrap(),
1884                    etag,
1885                }],
1886            )
1887            .await
1888            .unwrap();
1889        assert!(error.is_none());
1890
1891        // Now the upload has been finalized, so the new tombstone points to the new revision.
1892        let new_lt_id = hv.get(&id).expect_tombstone().target;
1893        assert_ne!(old_lt_id, new_lt_id);
1894
1895        // Wait for background cleanup.
1896        storage.join().await;
1897
1898        // Old revision should be cleaned up.
1899        lt.get(&old_lt_id).expect_not_found();
1900        lt.get(&new_lt_id).expect_object();
1901
1902        // Assert the contents of the new revision.
1903        let (_, _, s) = storage.get_object(&id, None).await.unwrap().unwrap();
1904        let body = stream::read_to_vec(s).await.unwrap();
1905        assert_eq!(body, payload2);
1906    }
1907
1908    // --- Multipart completion failure handling (consistency, retries, delayed cleanup) ---
1909
1910    /// Assembles the blob via `complete_multipart`, but returns an error to simulate a network
1911    /// failure on the response path. Also fails `get_metadata` so the tiered layer cannot
1912    /// recover by detecting the already-assembled blob.
1913    #[derive(Debug)]
1914    struct CompleteMultipartButReturnError;
1915
1916    #[async_trait::async_trait]
1917    impl Hooks for CompleteMultipartButReturnError {
1918        async fn complete_multipart(
1919            &self,
1920            inner: &InMemoryBackend,
1921            id: &ObjectId,
1922            upload_id: &UploadId,
1923            parts: Vec<CompletedPart>,
1924        ) -> Result<CompleteMultipartResponse> {
1925            inner
1926                .complete_multipart(id, upload_id, parts)
1927                .await
1928                .unwrap();
1929            Err(Error::Io(std::io::Error::new(
1930                std::io::ErrorKind::TimedOut,
1931                "simulated network error on complete_multipart",
1932            )))
1933        }
1934
1935        async fn get_metadata(
1936            &self,
1937            _inner: &InMemoryBackend,
1938            _id: &ObjectId,
1939        ) -> Result<MetadataResponse> {
1940            Err(Error::Io(std::io::Error::new(
1941                std::io::ErrorKind::TimedOut,
1942                "simulated network error on get_metadata",
1943            )))
1944        }
1945    }
1946
1947    /// `complete_multipart` on the inner LT backend assembles the blob successfully, but both
1948    /// `complete_multipart` and `get_metadata` return errors, so the tiered layer cannot finalize.
1949    /// After `MULTIPART_COMPLETE_CLEANUP_DELAY`, `ChangeLog` recovery deletes the orphaned blob.
1950    #[tokio::test]
1951    async fn cleans_up_orphan_after_failed_multipart_complete() {
1952        let hv = InMemoryBackend::new("hv");
1953        let lt_inner = InMemoryBackend::new("lt");
1954        let log = InMemoryChangeLog::default();
1955        let storage = TieredStorage::new(
1956            Box::new(hv.clone()),
1957            Box::new(TestBackend::with_inner(
1958                lt_inner.clone(),
1959                CompleteMultipartButReturnError {},
1960            )),
1961            Box::new(log.clone()),
1962        );
1963
1964        let id = make_id("mp-orphan");
1965        let upload_id = storage
1966            .initiate_multipart(&id, &Metadata::default())
1967            .await
1968            .unwrap();
1969
1970        let tiered_id: TieredUploadId = (&upload_id).try_into().unwrap();
1971        let physical = ObjectId {
1972            context: id.context.clone(),
1973            key: tiered_id.revision,
1974        };
1975
1976        let payload = vec![0xABu8; 2 * 1024 * 1024];
1977        let etag = storage
1978            .upload_part(
1979                &id,
1980                &upload_id,
1981                NonZeroU32::new(1).unwrap(),
1982                payload.len() as u64,
1983                None,
1984                stream::single(payload),
1985            )
1986            .await
1987            .unwrap();
1988
1989        let result = storage
1990            .complete_multipart(
1991                &id,
1992                &upload_id,
1993                vec![CompletedPart {
1994                    part_number: NonZeroU32::new(1).unwrap(),
1995                    etag,
1996                }],
1997            )
1998            .await;
1999        assert!(result.is_err());
2000        storage.join().await;
2001
2002        // The LT blob is orphaned, and no cleanup has been performed (yet), due to the guard being
2003        // dropped while in the `Assembling` state.
2004        lt_inner.get(&physical).expect_object();
2005        hv.get(&id).expect_not_found();
2006
2007        // Simulate the passage of time and run recovery.
2008        log.expire_all();
2009        let manager = ChangeManager::new(
2010            Box::new(hv.clone()),
2011            Box::new(lt_inner.clone()),
2012            Box::new(log.clone()),
2013        );
2014        manager.recover().await.unwrap();
2015
2016        // The orphaned LT blob has been cleaned up.
2017        lt_inner.get(&physical).expect_not_found();
2018        // The change has been removed from the log.
2019        let remaining = log.scan().await.unwrap();
2020        assert!(remaining.is_empty());
2021    }
2022
2023    #[derive(Debug)]
2024    struct FailOnFirstCompleteMultipartAttempt {
2025        attempt: Mutex<u32>,
2026    }
2027
2028    impl FailOnFirstCompleteMultipartAttempt {
2029        fn new() -> Self {
2030            Self {
2031                attempt: Mutex::new(0),
2032            }
2033        }
2034    }
2035
2036    #[async_trait::async_trait]
2037    impl Hooks for FailOnFirstCompleteMultipartAttempt {
2038        async fn complete_multipart(
2039            &self,
2040            inner: &InMemoryBackend,
2041            id: &ObjectId,
2042            upload_id: &UploadId,
2043            parts: Vec<CompletedPart>,
2044        ) -> Result<CompleteMultipartResponse> {
2045            let mut attempt = self.attempt.lock().await;
2046            *attempt += 1;
2047            if *attempt == 1 {
2048                Err(Error::Io(std::io::Error::new(
2049                    std::io::ErrorKind::TimedOut,
2050                    "simulated network error",
2051                )))
2052            } else {
2053                Ok(inner
2054                    .complete_multipart(id, upload_id, parts)
2055                    .await
2056                    .unwrap())
2057            }
2058        }
2059    }
2060
2061    /// The first attempt to `complete_multipart` fails, which generates a `Change` entry.
2062    /// The second call succeeds.
2063    /// When it's time to clean up, nothing is deleted, as the `complete_multipart` eventually went
2064    /// through before the cleanup deadline.
2065    #[tokio::test]
2066    async fn multipart_complete_succeeds_on_retry_and_leaves_state_consistent() {
2067        let hv = InMemoryBackend::new("hv");
2068        let lt_inner = InMemoryBackend::new("lt");
2069        let log = InMemoryChangeLog::default();
2070        let storage = TieredStorage::new(
2071            Box::new(hv.clone()),
2072            Box::new(TestBackend::with_inner(
2073                lt_inner.clone(),
2074                FailOnFirstCompleteMultipartAttempt::new(),
2075            )),
2076            Box::new(log.clone()),
2077        );
2078
2079        let id = make_id("mp-retry");
2080        let upload_id = storage
2081            .initiate_multipart(&id, &Metadata::default())
2082            .await
2083            .unwrap();
2084
2085        let tiered_id: TieredUploadId = (&upload_id).try_into().unwrap();
2086        let physical = ObjectId {
2087            context: id.context.clone(),
2088            key: tiered_id.revision,
2089        };
2090
2091        let payload = vec![0xABu8; 2 * 1024 * 1024];
2092        let etag = storage
2093            .upload_part(
2094                &id,
2095                &upload_id,
2096                NonZeroU32::new(1).unwrap(),
2097                payload.len() as u64,
2098                None,
2099                stream::single(payload.clone()),
2100            )
2101            .await
2102            .unwrap();
2103
2104        // The first `complete_multipart` call fails.
2105        let result = storage
2106            .complete_multipart(
2107                &id,
2108                &upload_id,
2109                vec![CompletedPart {
2110                    part_number: NonZeroU32::new(1).unwrap(),
2111                    etag: etag.clone(),
2112                }],
2113            )
2114            .await;
2115        assert!(result.is_err());
2116        storage.join().await;
2117
2118        // The second `complete_multipart` call succeeds.
2119        let result = storage
2120            .complete_multipart(
2121                &id,
2122                &upload_id,
2123                vec![CompletedPart {
2124                    part_number: NonZeroU32::new(1).unwrap(),
2125                    etag,
2126                }],
2127            )
2128            .await;
2129        assert!(result.is_ok());
2130        storage.join().await;
2131
2132        // The object is there.
2133        let (_, _, s) = storage.get_object(&id, None).await.unwrap().unwrap();
2134        let body = stream::read_to_vec(s).await.unwrap();
2135        assert_eq!(body, payload);
2136
2137        // Simulate the passage of time and run recovery.
2138        log.expire_all();
2139        let manager = ChangeManager::new(
2140            Box::new(hv.clone()),
2141            Box::new(lt_inner.clone()),
2142            Box::new(log.clone()),
2143        );
2144        manager.recover().await.unwrap();
2145
2146        // The LT blob has not been cleaned up, as the write eventually went through.
2147        lt_inner.get(&physical).expect_object();
2148        // The tombstone still points to the blob.
2149        let tombstone = hv.get(&id).expect_tombstone();
2150        assert_eq!(tombstone.target, physical);
2151        // The change has been removed from the log.
2152        let remaining = log.scan().await.unwrap();
2153        assert!(remaining.is_empty());
2154
2155        // The object is still there after recovery.
2156        let (_, _, s) = storage.get_object(&id, None).await.unwrap().unwrap();
2157        let body = stream::read_to_vec(s).await.unwrap();
2158        assert_eq!(body, payload);
2159    }
2160
2161    #[derive(Debug)]
2162    struct FailOnFirstGetMetadataAttempt {
2163        attempt: Mutex<u32>,
2164    }
2165
2166    impl FailOnFirstGetMetadataAttempt {
2167        fn new() -> Self {
2168            Self {
2169                attempt: Mutex::new(0),
2170            }
2171        }
2172    }
2173
2174    #[async_trait::async_trait]
2175    impl Hooks for FailOnFirstGetMetadataAttempt {
2176        async fn get_metadata(
2177            &self,
2178            inner: &InMemoryBackend,
2179            id: &ObjectId,
2180        ) -> Result<MetadataResponse> {
2181            let mut attempt = self.attempt.lock().await;
2182            *attempt += 1;
2183            if *attempt == 1 {
2184                Err(Error::Io(std::io::Error::new(
2185                    std::io::ErrorKind::TimedOut,
2186                    "simulated network error",
2187                )))
2188            } else {
2189                inner.get_metadata(id).await
2190            }
2191        }
2192    }
2193
2194    /// The first attempt to `complete_multipart` succeeds on the LT backend, but the subsequent
2195    /// `get_metadata` call fails with a network error, causing the overall `complete_multipart`
2196    /// to fail. The second call retries and succeeds (the LT object already exists from the first
2197    /// attempt).
2198    /// When it's time to clean up, nothing is deleted, as the `complete_multipart` eventually went
2199    /// through before the cleanup deadline.
2200    #[tokio::test]
2201    async fn multipart_complete_succeeds_on_retry_if_get_metadata_errs_and_leaves_state_consistent()
2202    {
2203        let hv = InMemoryBackend::new("hv");
2204        let lt_inner = InMemoryBackend::new("lt");
2205        let log = InMemoryChangeLog::default();
2206        let storage = TieredStorage::new(
2207            Box::new(hv.clone()),
2208            Box::new(TestBackend::with_inner(
2209                lt_inner.clone(),
2210                FailOnFirstGetMetadataAttempt::new(),
2211            )),
2212            Box::new(log.clone()),
2213        );
2214
2215        let id = make_id("mp-retry-meta");
2216        let upload_id = storage
2217            .initiate_multipart(&id, &Metadata::default())
2218            .await
2219            .unwrap();
2220
2221        let tiered_id: TieredUploadId = (&upload_id).try_into().unwrap();
2222        let physical = ObjectId {
2223            context: id.context.clone(),
2224            key: tiered_id.revision,
2225        };
2226
2227        let payload = vec![0xABu8; 2 * 1024 * 1024];
2228        let etag = storage
2229            .upload_part(
2230                &id,
2231                &upload_id,
2232                NonZeroU32::new(1).unwrap(),
2233                payload.len() as u64,
2234                None,
2235                stream::single(payload.clone()),
2236            )
2237            .await
2238            .unwrap();
2239
2240        // The first `complete_multipart` call fails (get_metadata network error), even though it
2241        // internally creates the LT blob.
2242        let result = storage
2243            .complete_multipart(
2244                &id,
2245                &upload_id,
2246                vec![CompletedPart {
2247                    part_number: NonZeroU32::new(1).unwrap(),
2248                    etag: etag.clone(),
2249                }],
2250            )
2251            .await;
2252        assert!(result.is_err());
2253        storage.join().await;
2254
2255        // The second `complete_multipart` call succeeds.
2256        let result = storage
2257            .complete_multipart(
2258                &id,
2259                &upload_id,
2260                vec![CompletedPart {
2261                    part_number: NonZeroU32::new(1).unwrap(),
2262                    etag,
2263                }],
2264            )
2265            .await;
2266        assert!(result.is_ok());
2267        storage.join().await;
2268
2269        // The object is there.
2270        let (_, _, s) = storage.get_object(&id, None).await.unwrap().unwrap();
2271        let body = stream::read_to_vec(s).await.unwrap();
2272        assert_eq!(body, payload);
2273
2274        // Simulate the passage of time and run recovery.
2275        log.expire_all();
2276        let manager = ChangeManager::new(
2277            Box::new(hv.clone()),
2278            Box::new(lt_inner.clone()),
2279            Box::new(log.clone()),
2280        );
2281        manager.recover().await.unwrap();
2282
2283        // The LT blob has not been cleaned up, as the write eventually went through.
2284        lt_inner.get(&physical).expect_object();
2285        // The tombstone still points to the blob.
2286        let tombstone = hv.get(&id).expect_tombstone();
2287        assert_eq!(tombstone.target, physical);
2288        // The change has been removed from the log.
2289        let remaining = log.scan().await.unwrap();
2290        assert!(remaining.is_empty());
2291
2292        // The object is there after recovery.
2293        let (_, _, s) = storage.get_object(&id, None).await.unwrap().unwrap();
2294        let body = stream::read_to_vec(s).await.unwrap();
2295        assert_eq!(body, payload);
2296    }
2297}