objectstore_service/backend/
bigtable.rs

1//! BigTable backend for high-volume, low-latency storage of small objects.
2//!
3//! # Row Format
4//!
5//! Each row key is the object's storage path. A row contains either an **object** or a
6//! **tombstone** — never both. The two layouts are mutually exclusive and distinguished by
7//! column presence:
8//!
9//! | Column | Family    | Content                     | Present when       |
10//! |--------|-----------|-----------------------------|--------------------|
11//! | `p`    | `fg`/`fm` | Compressed payload bytes    | Object row only    |
12//! | `m`    | `fg`/`fm` | [`Metadata`] JSON           | Object row only    |
13//! | `r`    | `fg`/`fm` | Redirect path to LT storage | Tombstone row only |
14//! | `t`    | `fg`/`fm` | [`Tombstone`] metadata JSON | Tombstone row only |
15//!
16//! The `r` column signals a tombstone row: its **value** is the long-term `ObjectId`
17//! serialized via `as_storage_path()`. Callers can resolve the LT object directly from the
18//! `r` value without reconstructing it from the row key.
19//!
20//! `p`/`m` and `r`/`t` are mutually exclusive. Every write begins with a `DeleteFromRow`
21//! mutation that clears all columns before writing the new cells, so mixed rows cannot exist.
22//!
23//! ## Legacy Tombstone Format
24//!
25//! Tombstones written before the `r`/`t` column layout used the object-row format with an
26//! empty `p` column and `"is_redirect_tombstone": true` in the `m` JSON. Both formats are
27//! supported for reading. A `bigtable.legacy_tombstone_read` metric is emitted on each legacy
28//! read. Legacy tombstones expire naturally by TTL/GC; TTI bumps transparently upgrade them
29//! to the new format.
30
31use std::fmt;
32use std::future::Future;
33use std::sync::Arc;
34use std::time::{Duration, SystemTime};
35
36use bigtable_rs::bigtable::{BigTableConnection, Error as BigTableError, RowCell};
37use bigtable_rs::google::bigtable::v2::{self, mutation};
38use bytes::Bytes;
39use futures_util::TryStreamExt;
40use objectstore_types::metadata::{ExpirationPolicy, Metadata};
41use serde::{Deserialize, Serialize};
42use tonic::Code;
43
44use crate::backend::common::{
45    Backend, DeleteResponse, GetResponse, HighVolumeBackend, MetadataResponse, PutResponse,
46    TieredGet, TieredMetadata, TieredWrite, Tombstone,
47};
48use crate::error::{Error, Result};
49use crate::gcp_auth::PrefetchingTokenProvider;
50use crate::id::ObjectId;
51use crate::stream::{ChunkedBytes, ClientStream};
52
53/// Configuration for [`BigTableBackend`].
54///
55/// Stores objects in [Google Cloud Bigtable], a NoSQL wide-column database optimized for
56/// high-throughput, low-latency workloads with small objects. Authentication uses Application
57/// Default Credentials (ADC).
58///
59/// **Note**: The table must be pre-created with the following column families:
60/// - `fg`: timestamp-based garbage collection (`maxage=1s`)
61/// - `fm`: manual garbage collection (`no GC policy`)
62///
63/// [Google Cloud Bigtable]: https://cloud.google.com/bigtable
64///
65/// # Example
66///
67/// ```yaml
68/// storage:
69///   type: bigtable
70///   project_id: my-project
71///   instance_name: objectstore
72///   table_name: objectstore
73/// ```
74#[derive(Debug, Clone, Deserialize, Serialize)]
75pub struct BigTableConfig {
76    /// Optional custom Bigtable endpoint.
77    ///
78    /// Useful for testing with emulators. If `None`, uses the default Bigtable endpoint.
79    ///
80    /// # Default
81    ///
82    /// `None` (uses default Bigtable endpoint)
83    ///
84    /// # Environment Variables
85    ///
86    /// - `OS__STORAGE__TYPE=bigtable`
87    /// - `OS__STORAGE__ENDPOINT=localhost:8086` (optional)
88    pub endpoint: Option<String>,
89
90    /// GCP project ID.
91    ///
92    /// The Google project ID (not project number) containing the Bigtable instance.
93    ///
94    /// # Environment Variables
95    ///
96    /// - `OS__STORAGE__PROJECT_ID=my-project`
97    pub project_id: String,
98
99    /// Bigtable instance name.
100    ///
101    /// # Environment Variables
102    ///
103    /// - `OS__STORAGE__INSTANCE_NAME=my-instance`
104    pub instance_name: String,
105
106    /// Bigtable table name.
107    ///
108    /// The table must exist before starting the server.
109    ///
110    /// # Environment Variables
111    ///
112    /// - `OS__STORAGE__TABLE_NAME=objectstore`
113    pub table_name: String,
114
115    /// Optional number of connections to maintain to Bigtable.
116    ///
117    /// # Default
118    ///
119    /// `None` (defaults to 1)
120    ///
121    /// # Environment Variables
122    ///
123    /// - `OS__STORAGE__CONNECTIONS=16` (optional)
124    pub connections: Option<usize>,
125}
126
127/// Connection timeout used for the initial connection to Bigtable.
128const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
129/// Maximum age for connections (GRPC channels) to Bigtable, after which they will be swapped with
130/// new ones in the background.
131/// This is intended to avoid latency spikes that could occur every hour or so, when the server
132/// closes long standing connections ([source](https://web.archive.org/web/20260211140930/https://docs.cloud.google.com/bigtable/docs/performance#cold-starts:~:text=return%20an%20error.-,Cold%20start,-at%20client%20initialization)).
133/// `tonic` already handles reconnections transparently, but lazily, meaning that the first requests
134/// that attempt to use a certain channel after the server has closed it will pay the cost of the
135/// reconnection, resulting in increased latency for those requests.
136const MAX_CHANNEL_AGE: Option<Duration> = Some(Duration::from_mins(50));
137/// Time to debounce bumping an object with configured TTI.
138const TTI_DEBOUNCE: Duration = Duration::from_secs(24 * 3600); // 1 day
139/// Permission scopes required for accessing the BigTable data API.
140const TOKEN_SCOPES: &[&str] = &["https://www.googleapis.com/auth/bigtable.data"];
141
142/// How often to retry failed requests.
143const REQUEST_RETRY_COUNT: usize = 2;
144/// How many times to retry a CAS mutation before giving up and returning an error.
145const CAS_RETRY_COUNT: usize = 3;
146
147/// Column that stores the raw payload (compressed).
148const COLUMN_PAYLOAD: &[u8] = b"p";
149/// Column that stores metadata in JSON.
150const COLUMN_METADATA: &[u8] = b"m";
151/// Column that stores the redirect path for tombstone rows.
152const COLUMN_REDIRECT: &[u8] = b"r";
153/// Column that stores [`TombstoneMeta`] JSON for tombstone rows.
154const COLUMN_TOMBSTONE_META: &[u8] = b"t";
155/// Regex to match all non-payload columns (`m`, `r`, `t`) for metadata-only reads.
156const FILTER_META: &[u8] = b"^[mrt]$";
157
158/// Column family that uses timestamp-based garbage collection.
159///
160/// We require a GC rule on this family to automatically delete rows.
161/// See: <https://cloud.google.com/bigtable/docs/gc-cell-level>
162const FAMILY_GC: &str = "fg";
163/// Column family that uses manual garbage collection.
164const FAMILY_MANUAL: &str = "fm";
165
166/// BigTable storage backend for high-volume, low-latency object storage.
167pub struct BigTableBackend {
168    bigtable: BigTableConnection,
169
170    instance_path: String,
171    table_path: String,
172    table_name: String,
173}
174
175impl fmt::Debug for BigTableBackend {
176    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
177        f.debug_struct("BigTableBackend")
178            .field("instance_path", &self.instance_path)
179            .field("table_path", &self.table_path)
180            .field("table_name", &self.table_name)
181            .finish_non_exhaustive()
182    }
183}
184
185/// Creates a row filter that matches a single column by exact qualifier.
186fn column_filter(column: &[u8]) -> v2::RowFilter {
187    v2::RowFilter {
188        filter: Some(v2::row_filter::Filter::ColumnQualifierRegexFilter(
189            [b"^", column, b"$"].concat(),
190        )),
191    }
192}
193
194/// Creates a row filter matching the legacy tombstone format: `m` column JSON starts with
195/// `{"is_redirect_tombstone":true`.
196///
197/// After legacy tombstones expire naturally this filter becomes dead code in both callers.
198fn legacy_tombstone_filter() -> v2::RowFilter {
199    v2::RowFilter {
200        filter: Some(v2::row_filter::Filter::Chain(v2::row_filter::Chain {
201            filters: vec![
202                column_filter(COLUMN_METADATA),
203                v2::RowFilter {
204                    filter: Some(v2::row_filter::Filter::ValueRegexFilter(
205                        b"^\\{\"is_redirect_tombstone\":true[,}].*".to_vec(),
206                    )),
207                },
208            ],
209        })),
210    }
211}
212
213/// Builds a raw row filter that matches any tombstone row, new- or legacy-format.
214///
215/// New format: presence of the `r` column.
216/// Legacy format: `is_redirect_tombstone: true` in the `m` column JSON.
217///
218/// After legacy tombstones expire naturally this simplifies to just
219/// `column_filter(COLUMN_REDIRECT)`.
220fn tombstone_filter() -> v2::RowFilter {
221    v2::RowFilter {
222        filter: Some(v2::row_filter::Filter::Interleave(
223            v2::row_filter::Interleave {
224                filters: vec![
225                    // Current: redirect column is present.
226                    column_filter(COLUMN_REDIRECT),
227                    // Legacy: metadata column JSON format.
228                    legacy_tombstone_filter(),
229                ],
230            },
231        )),
232    }
233}
234
235/// Returns a [`MutatePredicate`] that matches any tombstone row.
236///
237/// Mutations run only when no tombstone is present (`predicate_matched == false`).
238/// Used by [`BigTableBackend::put_non_tombstone`], [`BigTableBackend::delete_non_tombstone`],
239/// and [`BigTableBackend::compare_and_write`] as the `CheckAndMutateRow` predicate.
240fn tombstone_predicate() -> MutatePredicate {
241    MutatePredicate::Exclude(tombstone_filter())
242}
243
244/// Builds an anchored regex pattern (`^…$`) that matches `value` literally.
245///
246/// Uses [`regex::escape`] so that metacharacters in storage paths (`.`, `/`, etc.)
247/// are treated as literal bytes.
248fn exact_value_regex(value: &str) -> Vec<u8> {
249    format!("^{}$", regex::escape(value)).into_bytes()
250}
251
252/// Matches tombstones whose redirect resolves to `target`.
253///
254/// ## Predicate Matches
255///
256/// Must be used with `true_mutations` and `predicate_matched == true`.
257///
258/// ## Details
259///
260/// Always includes an exact match on the `r` (redirect) column:
261/// - Chain: `r` column present AND value == `target` storage path
262///
263/// When `target == own_id` (the caller expects a legacy identity redirect), the
264/// exact match is wrapped in an Interleave with two additional fallbacks:
265/// - Chain: `r` column present AND value == `b""` (empty-sentinel written before the redirect
266///   column stored the path)
267/// - Chain: `m` column present AND value matches `{"is_redirect_tombstone":true...}` regex
268///   (legacy metadata format predating the dedicated `r` column)
269fn redirect_target_filter(target: &ObjectId, own_id: &ObjectId) -> v2::RowFilter {
270    let target_path = exact_value_regex(&target.as_storage_path().to_string());
271
272    let exact_match = v2::RowFilter {
273        filter: Some(v2::row_filter::Filter::Chain(v2::row_filter::Chain {
274            filters: vec![
275                column_filter(COLUMN_REDIRECT),
276                v2::RowFilter {
277                    filter: Some(v2::row_filter::Filter::ValueRegexFilter(target_path)),
278                },
279            ],
280        })),
281    };
282
283    if target != own_id {
284        return exact_match;
285    }
286
287    let empty_redirect_match = v2::RowFilter {
288        filter: Some(v2::row_filter::Filter::Chain(v2::row_filter::Chain {
289            filters: vec![
290                column_filter(COLUMN_REDIRECT),
291                v2::RowFilter {
292                    filter: Some(v2::row_filter::Filter::ValueRegexFilter(b"^$".to_vec())),
293                },
294            ],
295        })),
296    };
297
298    // Also match legacy tombstones that resolve to the HV id:
299    // - empty `r` value (written before the redirect column stored the path)
300    // - legacy `m` column format (`is_redirect_tombstone: true`)
301    v2::RowFilter {
302        filter: Some(v2::row_filter::Filter::Interleave(
303            v2::row_filter::Interleave {
304                filters: vec![exact_match, empty_redirect_match, legacy_tombstone_filter()],
305            },
306        )),
307    }
308}
309
310/// Returns a [`MutatePredicate`] that matches tombstones whose redirect resolves to either `old` or `new`.
311///
312/// Mutations run only when the predicate matches (`predicate_matched == true`):
313/// equivalent to `t == old || t == new`. Built as an Interleave of two
314/// [`redirect_target_filter`] calls — yields cells iff at least one branch matches.
315/// An absent row or non-tombstone row yields 0 cells, so `predicate_matched = false` (conflict).
316fn update_predicate(old: &ObjectId, new: &ObjectId, own_id: &ObjectId) -> MutatePredicate {
317    MutatePredicate::Include(v2::RowFilter {
318        filter: Some(v2::row_filter::Filter::Interleave(
319            v2::row_filter::Interleave {
320                filters: vec![
321                    redirect_target_filter(old, own_id),
322                    redirect_target_filter(new, own_id),
323                ],
324            },
325        )),
326    })
327}
328
329/// Returns a [`MutatePredicate`] that matches rows where no conflicting tombstone exists.
330///
331/// Mutations run only when the row is conflict-free (`predicate_matched == false`):
332/// no tombstone is present, or the tombstone's redirect already points to `target`.
333///
334/// Built as an inverted `Condition` filter:
335/// - Predicate: [`redirect_target_filter`]`(target)` — tombstone already points to `target`?
336/// - True branch: `BlockAllFilter` → 0 cells (already at target, safe state).
337/// - False branch: [`tombstone_filter`] → 0 cells when no tombstone exists.
338///
339/// Both safe states yield 0 cells, so `predicate_matched = false` in both cases.
340fn optional_target_predicate(target: &ObjectId, own_id: &ObjectId) -> MutatePredicate {
341    MutatePredicate::Exclude(v2::RowFilter {
342        filter: Some(v2::row_filter::Filter::Condition(Box::new(
343            v2::row_filter::Condition {
344                predicate_filter: Some(Box::new(redirect_target_filter(target, own_id))),
345                true_filter: Some(Box::new(v2::RowFilter {
346                    filter: Some(v2::row_filter::Filter::BlockAllFilter(true)),
347                })),
348                false_filter: Some(Box::new(tombstone_filter())),
349            },
350        ))),
351    })
352}
353
354/// The condition under which a [`BigTableBackend::check_and_mutate`] write proceeds.
355///
356/// Each variant pairs a row filter with the state that makes the write safe:
357/// `Include` writes when the row matches; `Exclude` writes when it does not.
358#[derive(Clone, Debug)]
359enum MutatePredicate {
360    /// Write proceeds when the filter matches the row.
361    ///
362    /// Mutations run in `true_mutations`; succeeds when `predicate_matched == true`.
363    Include(v2::RowFilter),
364    /// Write proceeds when the filter does not match the row.
365    ///
366    /// Mutations run in `false_mutations`; succeeds when `predicate_matched == false`.
367    Exclude(v2::RowFilter),
368}
369
370/// Creates a row filter that reads all non-payload columns (`m`, `r`, `t`).
371///
372/// Used by metadata-only reads to avoid fetching the (potentially large) payload column
373/// while still being able to detect both new- and legacy-format tombstones.
374fn metadata_filter() -> v2::RowFilter {
375    v2::RowFilter {
376        filter: Some(v2::row_filter::Filter::ColumnQualifierRegexFilter(
377            FILTER_META.to_owned(),
378        )),
379    }
380}
381
382fn mutation(mutation: mutation::Mutation) -> v2::Mutation {
383    v2::Mutation {
384        mutation: Some(mutation),
385    }
386}
387
388/// Creates a `DeleteFromRow` mutation wrapped in the outer [`v2::Mutation`] envelope.
389fn delete_row_mutation() -> v2::Mutation {
390    mutation(mutation::Mutation::DeleteFromRow(
391        mutation::DeleteFromRow {},
392    ))
393}
394
395/// Builds the three mutations that write an object row: clear existing data,
396/// then set the payload and metadata cells.
397///
398/// Used by both [`BigTableBackend::put_row`] (unconditional write) and
399/// [`BigTableBackend::put_non_tombstone`] (conditional write).
400fn object_mutations(
401    metadata: &Metadata,
402    payload: Vec<u8>,
403    now: SystemTime,
404) -> Result<[v2::Mutation; 3]> {
405    let (family, timestamp_micros) = match metadata.expiration_policy {
406        ExpirationPolicy::Manual => (FAMILY_MANUAL, -1),
407        ExpirationPolicy::TimeToLive(ttl) => (FAMILY_GC, ttl_to_micros(ttl, now)?),
408        ExpirationPolicy::TimeToIdle(tti) => (FAMILY_GC, ttl_to_micros(tti, now)?),
409    };
410
411    let metadata_bytes = serde_json::to_vec(metadata)
412        .map_err(|cause| Error::serde("failed to serialize metadata", cause))?;
413
414    Ok([
415        // NB: We explicitly delete the row to clear metadata on overwrite.
416        delete_row_mutation(),
417        mutation(mutation::Mutation::SetCell(mutation::SetCell {
418            family_name: family.to_owned(),
419            column_qualifier: COLUMN_PAYLOAD.to_owned(),
420            timestamp_micros,
421            value: payload,
422        })),
423        mutation(mutation::Mutation::SetCell(mutation::SetCell {
424            family_name: family.to_owned(),
425            column_qualifier: COLUMN_METADATA.to_owned(),
426            timestamp_micros,
427            value: metadata_bytes,
428        })),
429    ])
430}
431
432/// Metadata carried by tombstone rows in the `t` (tombstone-meta) column.
433///
434/// Tombstone-specific metadata evolves independently of object [`Metadata`]. Only fields
435/// that are meaningful on tombstones are included here.
436#[derive(Clone, Debug, Default, Deserialize, Serialize)]
437struct TombstoneMeta {
438    /// Expiration policy for this tombstone.
439    ///
440    /// Skipped during serialization when set to [`ExpirationPolicy::Manual`].
441    #[serde(default, skip_serializing_if = "ExpirationPolicy::is_manual")]
442    expiration_policy: ExpirationPolicy,
443}
444
445/// Builds the three mutations that write a tombstone row: clear existing data,
446/// then set the redirect sentinel and tombstone-meta cells.
447///
448/// Used by both [`BigTableBackend::put_tombstone_row`] (unconditional write) and the
449/// TTI bump path in tiered reads.
450fn tombstone_mutations(tombstone: &Tombstone, now: SystemTime) -> Result<[v2::Mutation; 3]> {
451    let (family, timestamp_micros) = match tombstone.expiration_policy {
452        ExpirationPolicy::Manual => (FAMILY_MANUAL, -1),
453        ExpirationPolicy::TimeToLive(ttl) => (FAMILY_GC, ttl_to_micros(ttl, now)?),
454        ExpirationPolicy::TimeToIdle(tti) => (FAMILY_GC, ttl_to_micros(tti, now)?),
455    };
456
457    let tombstone_meta = TombstoneMeta {
458        expiration_policy: tombstone.expiration_policy,
459    };
460
461    Ok([
462        delete_row_mutation(),
463        mutation(mutation::Mutation::SetCell(mutation::SetCell {
464            family_name: family.to_owned(),
465            column_qualifier: COLUMN_REDIRECT.to_owned(),
466            timestamp_micros,
467            value: tombstone.target.as_storage_path().to_string().into_bytes(),
468        })),
469        mutation(mutation::Mutation::SetCell(mutation::SetCell {
470            family_name: family.to_owned(),
471            column_qualifier: COLUMN_TOMBSTONE_META.to_owned(),
472            timestamp_micros,
473            value: serde_json::to_vec(&tombstone_meta)
474                .map_err(|cause| Error::serde("failed to serialize tombstone", cause))?,
475        })),
476    ])
477}
478
479/// Subset of [`Metadata`] that indicates a row is a tombstone instead of a real object.
480///
481/// Used to construct [`RowData`].
482#[derive(Debug, Deserialize)]
483struct LegacyTombstoneMeta {
484    /// Internal redirect tombstone marker.
485    ///
486    /// When `true`, this object is a legacy tombstone. This implies:
487    ///  - the payload is empty
488    ///  - metadata other than the expiration policy is not meaningful
489    ///  - the `r` and `t` columns are not present
490    #[serde(default)]
491    is_redirect_tombstone: bool,
492
493    /// Expiration policy for this tombstone.
494    #[serde(default)]
495    expiration_policy: ExpirationPolicy,
496}
497
498/// Parsed data from a BigTable row's cells.
499enum RowData {
500    /// A regular object row with payload and metadata.
501    Object {
502        metadata: Metadata,
503        payload: Vec<u8>,
504    },
505    /// A tombstone row indicating the real payload lives on the long-term backend.
506    Tombstone {
507        target: Vec<u8>,
508        meta: TombstoneMeta,
509        time_expires: Option<SystemTime>,
510    },
511}
512
513impl RowData {
514    /// Parses a set of row cells into a [`RowData`].
515    ///
516    /// New-format tombstones are identified by the presence of the `r` column.
517    /// Legacy tombstones (written before the column migration) are identified by
518    /// `is_redirect_tombstone: true` in the `m` column JSON; a
519    /// `bigtable.legacy_tombstone_read` metric is emitted on each such read.
520    fn from_cells(cells: Vec<RowCell>) -> Result<Self> {
521        let mut metadata_opt: Option<Metadata> = None;
522        let mut tombstone_meta_opt: Option<TombstoneMeta> = None;
523        let mut redirect_detected = false;
524        let mut redirect_target = Vec::new();
525        let mut expire_at = None;
526        let mut payload = Vec::new();
527
528        for cell in cells {
529            // NB: All cells are written with the same timestamp; last write is safe.
530            expire_at = micros_to_time(cell.timestamp_micros);
531
532            match cell.qualifier.as_slice() {
533                COLUMN_REDIRECT => {
534                    redirect_detected = true;
535                    redirect_target = cell.value;
536                }
537                COLUMN_PAYLOAD => {
538                    payload = cell.value;
539                }
540                COLUMN_TOMBSTONE_META => {
541                    tombstone_meta_opt =
542                        Some(serde_json::from_slice(&cell.value).map_err(|cause| {
543                            Error::serde("failed to deserialize tombstone meta", cause)
544                        })?);
545                }
546                COLUMN_METADATA => {
547                    if let Ok(legacy_meta) =
548                        serde_json::from_slice::<LegacyTombstoneMeta>(&cell.value)
549                        && legacy_meta.is_redirect_tombstone
550                    {
551                        redirect_detected = true;
552                        objectstore_metrics::count!("bigtable.legacy_tombstone_read");
553                        tombstone_meta_opt = Some(TombstoneMeta {
554                            expiration_policy: legacy_meta.expiration_policy,
555                        });
556                    } else {
557                        metadata_opt =
558                            Some(serde_json::from_slice(&cell.value).map_err(|cause| {
559                                Error::serde("failed to deserialize metadata", cause)
560                            })?);
561                    }
562                }
563                _ => {}
564            }
565        }
566
567        Ok(if redirect_detected {
568            RowData::Tombstone {
569                target: redirect_target,
570                meta: tombstone_meta_opt.unwrap_or_default(),
571                time_expires: expire_at,
572            }
573        } else {
574            // Metadata may have been skipped during read - payload-only read for TTI bump.
575            let mut metadata = metadata_opt.unwrap_or_default();
576            metadata.time_expires = expire_at;
577            RowData::Object { metadata, payload }
578        })
579    }
580
581    /// Returns the expiration policy for this row, regardless of variant.
582    fn expiration_policy(&self) -> ExpirationPolicy {
583        match self {
584            RowData::Object { metadata, .. } => metadata.expiration_policy,
585            RowData::Tombstone { meta, .. } => meta.expiration_policy,
586        }
587    }
588
589    /// Returns the resolved expiration timestamp for this row, regardless of variant.
590    fn time_expires(&self) -> Option<SystemTime> {
591        match self {
592            RowData::Object { metadata, .. } => metadata.time_expires,
593            RowData::Tombstone { time_expires, .. } => *time_expires,
594        }
595    }
596
597    /// Returns `true` if this row is expired as of the given `time`.
598    ///
599    /// Only applies to rows with an expiration policy set.
600    fn expires_before(&self, time: SystemTime) -> bool {
601        self.expiration_policy().is_timeout() && self.time_expires().is_some_and(|ts| ts < time)
602    }
603
604    /// Returns `true` if this row's TTI deadline should be bumped.
605    fn needs_tti_bump(&self) -> bool {
606        matches!(
607            self.expiration_policy(),
608            ExpirationPolicy::TimeToIdle(tti) if self.expires_before(SystemTime::now() + tti - TTI_DEBOUNCE)
609        )
610    }
611}
612
613/// Parses the raw `r` column bytes into a redirect target [`ObjectId`].
614///
615/// For tombstones with an empty `r` value, falls back to the ID of the tombstone
616/// itself and emits a `bigtable.empty_redirect_read` metric so deployments can
617/// track when it is safe to remove the legacy empty-value code path.
618fn parse_redirect_target(redirect_path: &[u8], tombstone_id: &ObjectId) -> Result<ObjectId> {
619    if redirect_path.is_empty() {
620        objectstore_metrics::count!("bigtable.empty_redirect_read");
621        Ok(tombstone_id.clone())
622    } else {
623        let redirect_str = std::str::from_utf8(redirect_path)
624            .map_err(|_| Error::generic("invalid UTF-8 in redirect path"))?;
625        ObjectId::from_storage_path(redirect_str)
626            .ok_or_else(|| Error::generic("corrupt redirect path"))
627    }
628}
629
630impl BigTableBackend {
631    /// Creates a new [`BigTableBackend`] from the given `config`.
632    ///
633    /// Pass an `endpoint` in the config to connect to a local emulator; omit it to use real GCP
634    /// credentials. `connections` controls the gRPC connection pool size (defaults to 1).
635    pub async fn new(config: BigTableConfig) -> anyhow::Result<Self> {
636        let BigTableConfig {
637            endpoint,
638            project_id,
639            instance_name,
640            table_name,
641            connections,
642        } = config;
643
644        let bigtable = if let Some(ref endpoint) = endpoint {
645            BigTableConnection::new_with_emulator(
646                endpoint,
647                &project_id,
648                &instance_name,
649                false, // is_read_only
650                Some(CONNECT_TIMEOUT),
651            )?
652        } else {
653            let token_provider = PrefetchingTokenProvider::gcp_auth(TOKEN_SCOPES).await?;
654            BigTableConnection::new_with_managed_transport(
655                &project_id,
656                &instance_name,
657                false, // is_read_only
658                Some(CONNECT_TIMEOUT),
659                Arc::new(token_provider),
660                connections.unwrap_or(1),
661                true, // prime_channels
662                None, // app_profile_id
663                MAX_CHANNEL_AGE,
664            )
665            .await?
666        };
667
668        let client = bigtable.client();
669
670        Ok(Self {
671            bigtable,
672            instance_path: format!("projects/{project_id}/instances/{instance_name}"),
673            table_path: client.get_full_table_name(&table_name),
674            table_name,
675        })
676    }
677
678    /// Reads a single row by key, returning parsed row data.
679    ///
680    /// Returns `None` if the row is absent or has expired.
681    async fn read_row(
682        &self,
683        path: &[u8],
684        filter: Option<v2::RowFilter>,
685        action: &'static str,
686    ) -> Result<Option<RowData>> {
687        let request = v2::ReadRowsRequest {
688            table_name: self.table_path.clone(),
689            rows: Some(v2::RowSet {
690                row_keys: vec![path.to_owned()],
691                row_ranges: vec![],
692            }),
693            filter,
694            rows_limit: 1,
695            ..Default::default()
696        };
697
698        let response = retry(action, || async {
699            self.bigtable.client().read_rows(request.clone()).await
700        })
701        .await?;
702        debug_assert!(response.len() <= 1, "Expected at most one row");
703
704        let Some((_, cells)) = response.into_iter().next() else {
705            objectstore_log::debug!("Object not found");
706            return Ok(None);
707        };
708
709        let row = RowData::from_cells(cells)?;
710        Ok(if row.expires_before(SystemTime::now()) {
711            None
712        } else {
713            Some(row)
714        })
715    }
716
717    async fn mutate(
718        &self,
719        path: Vec<u8>,
720        mutations: impl Into<Vec<v2::Mutation>>,
721        action: &'static str,
722    ) -> Result<v2::MutateRowResponse> {
723        let request = v2::MutateRowRequest {
724            table_name: self.table_path.clone(),
725            row_key: path,
726            mutations: mutations.into(),
727            ..Default::default()
728        };
729
730        let response = retry(action, || async {
731            self.bigtable.client().mutate_row(request.clone()).await
732        })
733        .await?;
734
735        Ok(response.into_inner())
736    }
737
738    async fn put_row(
739        &self,
740        path: Vec<u8>,
741        metadata: &Metadata,
742        payload: Vec<u8>,
743        action: &'static str,
744    ) -> Result<v2::MutateRowResponse> {
745        let mutations = object_mutations(metadata, payload, SystemTime::now())?;
746        self.mutate(path, mutations, action).await
747    }
748
749    async fn put_tombstone_row(
750        &self,
751        path: Vec<u8>,
752        tombstone: &Tombstone,
753        action: &'static str,
754    ) -> Result<v2::MutateRowResponse> {
755        let mutations = tombstone_mutations(tombstone, SystemTime::now())?;
756        self.mutate(path, mutations, action).await
757    }
758
759    /// Best-effort TTI bump for a row.
760    ///
761    /// If the payload isn't loaded, it will be fetched. Failures are ignored silently.
762    async fn bump_tti(&self, path: Vec<u8>, row: &RowData, loaded: bool, hv_id: &ObjectId) {
763        let expiration_policy = row.expiration_policy();
764
765        match row {
766            RowData::Tombstone { target, .. } => {
767                let target = match parse_redirect_target(target, hv_id) {
768                    Ok(target) => target,
769                    Err(e) => {
770                        objectstore_log::error!(!!&e, "invalid redirect target in tombstone row");
771                        return;
772                    }
773                };
774
775                let tombstone = Tombstone {
776                    target,
777                    expiration_policy,
778                };
779                let _ = self.put_tombstone_row(path, &tombstone, "tti-bump").await;
780            }
781            RowData::Object { metadata, payload } if loaded => {
782                let _ = self
783                    .put_row(path, metadata, payload.clone(), "tti-bump")
784                    .await;
785            }
786            RowData::Object { metadata, .. } => {
787                let payload_read = self
788                    .read_row(&path, Some(column_filter(COLUMN_PAYLOAD)), "tti-bump")
789                    .await;
790
791                if let Ok(Some(RowData::Object { payload, .. })) = payload_read {
792                    let _ = self.put_row(path, metadata, payload, "tti-bump").await;
793                }
794            }
795        }
796    }
797
798    /// Executes a `CheckAndMutateRow` request.
799    async fn check_and_mutate(
800        &self,
801        row_key: Vec<u8>,
802        predicate: MutatePredicate,
803        mutations: impl Into<Vec<v2::Mutation>>,
804        context: &'static str,
805    ) -> Result<bool> {
806        let (filter, true_mutations, false_mutations, success_on_match) = match predicate {
807            MutatePredicate::Include(f) => (f, mutations.into(), vec![], true),
808            MutatePredicate::Exclude(f) => (f, vec![], mutations.into(), false),
809        };
810
811        let request = v2::CheckAndMutateRowRequest {
812            table_name: self.table_path.clone(),
813            row_key,
814            predicate_filter: Some(filter),
815            true_mutations,
816            false_mutations,
817            ..Default::default()
818        };
819
820        let future = retry(context, || async {
821            self.bigtable
822                .client()
823                .check_and_mutate_row(request.clone())
824                .await
825        });
826
827        Ok(future.await?.predicate_matched == success_on_match)
828    }
829}
830
831#[async_trait::async_trait]
832impl Backend for BigTableBackend {
833    fn name(&self) -> &'static str {
834        "bigtable"
835    }
836
837    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
838    async fn put_object(
839        &self,
840        id: &ObjectId,
841        metadata: &Metadata,
842        mut stream: ClientStream,
843    ) -> Result<PutResponse> {
844        objectstore_log::debug!("Writing to Bigtable backend");
845        let path = id.as_storage_path().to_string().into_bytes();
846
847        let mut payload = ChunkedBytes::new(0);
848        while let Some(chunk) = stream.try_next().await? {
849            payload.push(chunk);
850        }
851
852        self.put_row(path, metadata, payload.into_bytes().into(), "put")
853            .await?;
854        Ok(())
855    }
856
857    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
858    async fn get_object(&self, id: &ObjectId) -> Result<GetResponse> {
859        match self.get_tiered_object(id).await? {
860            TieredGet::Object(metadata, payload) => Ok(Some((metadata, payload))),
861            TieredGet::Tombstone(_) => Err(Error::UnexpectedTombstone),
862            TieredGet::NotFound => Ok(None),
863        }
864    }
865
866    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
867    async fn get_metadata(&self, id: &ObjectId) -> Result<MetadataResponse> {
868        match self.get_tiered_metadata(id).await? {
869            TieredMetadata::Object(metadata) => Ok(Some(metadata)),
870            TieredMetadata::Tombstone(_) => Err(Error::UnexpectedTombstone),
871            TieredMetadata::NotFound => Ok(None),
872        }
873    }
874
875    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
876    async fn delete_object(&self, id: &ObjectId) -> Result<DeleteResponse> {
877        objectstore_log::debug!("Deleting from Bigtable backend");
878
879        let path = id.as_storage_path().to_string().into_bytes();
880        self.mutate(path, [delete_row_mutation()], "delete").await?;
881
882        Ok(())
883    }
884}
885
886#[async_trait::async_trait]
887impl HighVolumeBackend for BigTableBackend {
888    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
889    async fn put_non_tombstone(
890        &self,
891        id: &ObjectId,
892        metadata: &Metadata,
893        payload: Bytes,
894    ) -> Result<Option<Tombstone>> {
895        objectstore_log::debug!("Conditional put to Bigtable backend");
896
897        let path = id.as_storage_path().to_string().into_bytes();
898        let mutations = object_mutations(metadata, payload.to_vec(), SystemTime::now())?;
899
900        for _ in 0..CAS_RETRY_COUNT {
901            let write_succeeded = self
902                .check_and_mutate(
903                    path.clone(),
904                    tombstone_predicate(),
905                    mutations.clone(),
906                    "put_non_tombstone",
907                )
908                .await?;
909
910            if write_succeeded {
911                return Ok(None);
912            }
913
914            // A tombstone was present: read its data for the caller.
915            let row = self
916                .read_row(&path, Some(metadata_filter()), "put_non_tombstone")
917                .await?;
918
919            match row {
920                Some(RowData::Tombstone { target, meta, .. }) => {
921                    return Ok(Some(Tombstone {
922                        target: parse_redirect_target(&target, id)?,
923                        expiration_policy: meta.expiration_policy,
924                    }));
925                }
926                // Race: Tombstone was replaced by an object, retry to overwrite
927                Some(RowData::Object { .. }) => continue,
928                // Race: Tombstone was deleted, retry to write.
929                None => continue,
930            }
931        }
932
933        Err(Error::generic("BigTable: race loop in put_non_tombstone"))
934    }
935
936    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
937    async fn get_tiered_object(&self, id: &ObjectId) -> Result<TieredGet> {
938        objectstore_log::debug!("Reading from Bigtable backend");
939        let path = id.as_storage_path().to_string().into_bytes();
940
941        let Some(row) = self.read_row(&path, None, "get_tiered_object").await? else {
942            return Ok(TieredGet::NotFound);
943        };
944
945        if row.needs_tti_bump() {
946            self.bump_tti(path.clone(), &row, true, id).await;
947        }
948
949        Ok(match row {
950            RowData::Tombstone { meta, target, .. } => TieredGet::Tombstone(Tombstone {
951                target: parse_redirect_target(&target, id)?,
952                expiration_policy: meta.expiration_policy,
953            }),
954            RowData::Object { metadata, payload } => {
955                let mut metadata = metadata;
956                metadata.size = Some(payload.len());
957                TieredGet::Object(metadata, crate::stream::single(payload))
958            }
959        })
960    }
961
962    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
963    async fn get_tiered_metadata(&self, id: &ObjectId) -> Result<TieredMetadata> {
964        objectstore_log::debug!("Reading metadata from Bigtable backend");
965        let path = id.as_storage_path().to_string().into_bytes();
966
967        // Read metadata and tombstone columns — skip the (potentially large) payload.
968        // NB: `metadata.size` will not be populated since the payload is not fetched.
969        let row_opt = self
970            .read_row(&path, Some(metadata_filter()), "get_tiered_metadata")
971            .await?;
972        let Some(row) = row_opt else {
973            return Ok(TieredMetadata::NotFound);
974        };
975
976        if row.needs_tti_bump() {
977            self.bump_tti(path.clone(), &row, false, id).await;
978        }
979
980        Ok(match row {
981            RowData::Tombstone { meta, target, .. } => TieredMetadata::Tombstone(Tombstone {
982                target: parse_redirect_target(&target, id)?,
983                expiration_policy: meta.expiration_policy,
984            }),
985            RowData::Object { metadata, .. } => TieredMetadata::Object(metadata),
986        })
987    }
988
989    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
990    async fn delete_non_tombstone(&self, id: &ObjectId) -> Result<Option<Tombstone>> {
991        objectstore_log::debug!("Conditional delete from Bigtable backend");
992
993        let path = id.as_storage_path().to_string().into_bytes();
994
995        for _ in 0..CAS_RETRY_COUNT {
996            let write_succeeded = self
997                .check_and_mutate(
998                    path.clone(),
999                    tombstone_predicate(),
1000                    [delete_row_mutation()],
1001                    "delete_non_tombstone",
1002                )
1003                .await?;
1004
1005            if write_succeeded {
1006                return Ok(None);
1007            }
1008
1009            // A tombstone was present: read its data for the caller.
1010            let row = self
1011                .read_row(&path, Some(metadata_filter()), "delete_non_tombstone")
1012                .await?;
1013
1014            match row {
1015                Some(RowData::Tombstone { target, meta, .. }) => {
1016                    return Ok(Some(Tombstone {
1017                        target: parse_redirect_target(&target, id)?,
1018                        expiration_policy: meta.expiration_policy,
1019                    }));
1020                }
1021                // Race: An object replaced the tombstone, delete the new object now.
1022                Some(RowData::Object { .. }) => continue,
1023                // Race: Entry was deleted in the meanwhile, nothing left to do.
1024                None => return Ok(None),
1025            }
1026        }
1027
1028        Err(Error::generic(
1029            "BigTable: race loop in delete_non_tombstone",
1030        ))
1031    }
1032
1033    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
1034    async fn compare_and_write(
1035        &self,
1036        id: &ObjectId,
1037        current: Option<&ObjectId>,
1038        write: TieredWrite,
1039    ) -> Result<bool> {
1040        objectstore_log::debug!("CAS put to Bigtable backend");
1041
1042        let path = id.as_storage_path().to_string().into_bytes();
1043        let now = SystemTime::now();
1044
1045        let predicate = match (current, write.target()) {
1046            (Some(old), Some(new)) => update_predicate(old, new, id),
1047            (Some(target), None) => optional_target_predicate(target, id),
1048            (None, Some(target)) => optional_target_predicate(target, id),
1049            (None, None) => tombstone_predicate(),
1050        };
1051
1052        let mutations = match write {
1053            TieredWrite::Tombstone(tombstone) => tombstone_mutations(&tombstone, now)?.into(),
1054            TieredWrite::Object(m, p) => object_mutations(&m, p.to_vec(), now)?.into(),
1055            TieredWrite::Delete => vec![delete_row_mutation()],
1056        };
1057
1058        self.check_and_mutate(path, predicate, mutations, "compare_and_write")
1059            .await
1060    }
1061}
1062
1063/// Converts the given TTL duration to a microsecond-precision unix timestamp.
1064///
1065/// The TTL is anchored at the provided `from` timestamp, which defaults to `SystemTime::now()`. As
1066/// required by BigTable, the resulting timestamp has millisecond precision, with the last digits at
1067/// 0.
1068fn ttl_to_micros(ttl: Duration, from: SystemTime) -> Result<i64> {
1069    let deadline = from.checked_add(ttl).ok_or_else(|| Error::Generic {
1070        context: format!(
1071            "TTL duration overflow: {} plus {}s cannot be represented as SystemTime",
1072            humantime::format_rfc3339_seconds(from),
1073            ttl.as_secs()
1074        ),
1075        cause: None,
1076    })?;
1077    let millis = deadline
1078        .duration_since(SystemTime::UNIX_EPOCH)
1079        .map_err(|e| Error::Generic {
1080            context: format!(
1081                "unable to get duration since UNIX_EPOCH for SystemTime {}",
1082                humantime::format_rfc3339_seconds(deadline)
1083            ),
1084            cause: Some(Box::new(e)),
1085        })?
1086        .as_millis();
1087    (millis * 1000).try_into().map_err(|e| Error::Generic {
1088        context: format!("failed to convert {}ms to i64 microseconds", millis),
1089        cause: Some(Box::new(e)),
1090    })
1091}
1092
1093/// Converts a microsecond-precision unix timestamp to a `SystemTime`.
1094fn micros_to_time(micros: i64) -> Option<SystemTime> {
1095    let micros = u64::try_from(micros).ok()?;
1096    let duration = Duration::from_micros(micros);
1097    SystemTime::UNIX_EPOCH.checked_add(duration)
1098}
1099
1100/// Retries a BigTable RPC on transient errors.
1101async fn retry<T, F>(context: &'static str, f: impl Fn() -> F) -> Result<T>
1102where
1103    F: Future<Output = Result<T, BigTableError>> + Send,
1104{
1105    let mut retry_count = 0usize;
1106
1107    loop {
1108        match f().await {
1109            Ok(res) => return Ok(res),
1110            Err(e) if retry_count >= REQUEST_RETRY_COUNT || !is_retryable(&e) => {
1111                objectstore_metrics::count!("bigtable.failures", action = context);
1112                return Err(Error::Generic {
1113                    context: format!("Bigtable: `{context}` failed"),
1114                    cause: Some(Box::new(e)),
1115                });
1116            }
1117            Err(e) => {
1118                retry_count += 1;
1119                objectstore_metrics::count!("bigtable.retries", action = context);
1120                objectstore_log::warn!(!!&e, retry_count, context, "Retrying request");
1121            }
1122        }
1123    }
1124}
1125
1126fn is_retryable(error: &BigTableError) -> bool {
1127    match error {
1128        // Transient errors on auth token refresh
1129        BigTableError::GCPAuthError(_) => true,
1130        // Transient GRPC network failures
1131        BigTableError::TransportError(_) => true,
1132        // These could also indicate transient network failures
1133        BigTableError::IoError(_) => true,
1134        BigTableError::TimeoutError(_) => true,
1135
1136        // See https://docs.cloud.google.com/bigtable/docs/status-codes
1137        BigTableError::RpcError(status) => match status.code() {
1138            // Generic retriable status
1139            Code::Unavailable => true,
1140            // Timeouts
1141            Code::Cancelled => true,
1142            Code::DeadlineExceeded => true,
1143            // Token might have refreshed too late
1144            Code::Unauthenticated => true,
1145            // Unspecified, attempt to retry anyways
1146            Code::Aborted => true,
1147            Code::Internal => true,
1148            Code::FailedPrecondition => true,
1149            Code::Unknown => true,
1150            _ => false,
1151        },
1152        _ => false,
1153    }
1154}
1155
1156#[cfg(test)]
1157mod tests {
1158    use std::collections::BTreeMap;
1159
1160    use anyhow::Result;
1161    use objectstore_types::scope::{Scope, Scopes};
1162
1163    use super::*;
1164    use crate::id::ObjectContext;
1165    use crate::stream;
1166
1167    // NB: Most of these tests require a BigTable emulator running. This is done
1168    // automatically in CI.
1169    //
1170    // Refer to the readme for how to set up the emulator.
1171
1172    async fn create_test_backend() -> Result<BigTableBackend> {
1173        BigTableBackend::new(BigTableConfig {
1174            endpoint: Some("localhost:8086".into()),
1175            project_id: "testing".into(),
1176            instance_name: "objectstore".into(),
1177            table_name: "objectstore".into(),
1178            connections: None,
1179        })
1180        .await
1181    }
1182
1183    fn make_id() -> ObjectId {
1184        ObjectId::random(ObjectContext {
1185            usecase: "testing".into(),
1186            scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
1187        })
1188    }
1189
1190    async fn create_object(
1191        backend: &BigTableBackend,
1192        id: &ObjectId,
1193        metadata: &Metadata,
1194        payload: &[u8],
1195        now: SystemTime,
1196    ) -> Result<()> {
1197        let path = id.as_storage_path().to_string().into_bytes();
1198        let mutations = object_mutations(metadata, payload.to_vec(), now)?;
1199        backend.mutate(path, mutations, "test-setup").await?;
1200        Ok(())
1201    }
1202
1203    async fn create_tombstone(
1204        backend: &BigTableBackend,
1205        id: &ObjectId,
1206        tombstone: &Tombstone,
1207        now: SystemTime,
1208    ) -> Result<()> {
1209        let path = id.as_storage_path().to_string().into_bytes();
1210        let mutations = tombstone_mutations(tombstone, now)?;
1211        backend.mutate(path, mutations, "test-setup").await?;
1212        Ok(())
1213    }
1214
1215    /// Writes a legacy-format tombstone row directly into Bigtable.
1216    async fn write_legacy_tombstone(
1217        backend: &BigTableBackend,
1218        id: &ObjectId,
1219        expiration_policy: ExpirationPolicy,
1220        time_expires: Option<SystemTime>,
1221    ) -> Result<()> {
1222        let meta = if expiration_policy.is_manual() {
1223            r#"{"is_redirect_tombstone":true}"#.to_owned()
1224        } else {
1225            let policy_json = serde_json::to_string(&expiration_policy).unwrap();
1226            format!(r#"{{"is_redirect_tombstone":true,"expiration_policy":{policy_json}}}"#)
1227        };
1228
1229        let (family, timestamp_micros) = if expiration_policy.is_manual() {
1230            (FAMILY_MANUAL, -1)
1231        } else {
1232            let t =
1233                time_expires.unwrap_or(SystemTime::now() + expiration_policy.expires_in().unwrap());
1234            let timestamp = t
1235                .duration_since(SystemTime::UNIX_EPOCH)
1236                .unwrap()
1237                .as_millis();
1238            (FAMILY_GC, timestamp as i64 * 1000)
1239        };
1240
1241        let path = id.as_storage_path().to_string().into_bytes();
1242        let mutations = [mutation(mutation::Mutation::SetCell(mutation::SetCell {
1243            family_name: family.to_owned(),
1244            column_qualifier: COLUMN_METADATA.to_owned(),
1245            timestamp_micros,
1246            value: meta.into_bytes(),
1247        }))];
1248
1249        backend.mutate(path, mutations, "test-setup").await?;
1250
1251        Ok(())
1252    }
1253
1254    /// Writes a new-format tombstone row with an empty `r` value directly,
1255    /// simulating rows written by code before this change.
1256    async fn write_empty_redirect_tombstone(
1257        backend: &BigTableBackend,
1258        id: &ObjectId,
1259    ) -> Result<()> {
1260        let path = id.as_storage_path().to_string().into_bytes();
1261        let mutations = [
1262            mutation(mutation::Mutation::SetCell(mutation::SetCell {
1263                family_name: FAMILY_MANUAL.to_owned(),
1264                column_qualifier: COLUMN_REDIRECT.to_owned(),
1265                timestamp_micros: -1,
1266                value: b"".to_vec(), // empty — legacy format
1267            })),
1268            mutation(mutation::Mutation::SetCell(mutation::SetCell {
1269                family_name: FAMILY_MANUAL.to_owned(),
1270                column_qualifier: COLUMN_TOMBSTONE_META.to_owned(),
1271                timestamp_micros: -1,
1272                value: b"{}".to_vec(),
1273            })),
1274        ];
1275
1276        backend.mutate(path, mutations, "test-setup").await?;
1277
1278        Ok(())
1279    }
1280
1281    // --- Section 1: Object Operations ---
1282
1283    /// Verifies the full roundtrip: put → get_object (payload + metadata) → get_metadata (metadata).
1284    #[tokio::test]
1285    async fn test_roundtrip() -> Result<()> {
1286        let backend = create_test_backend().await?;
1287
1288        let id = make_id();
1289        let metadata = Metadata {
1290            content_type: "text/plain".into(),
1291            time_created: Some(SystemTime::now()),
1292            custom: BTreeMap::from_iter([("hello".into(), "world".into())]),
1293            ..Default::default()
1294        };
1295
1296        backend
1297            .put_object(&id, &metadata, stream::single("hello, world"))
1298            .await?;
1299
1300        let (obj_meta, stream) = backend.get_object(&id).await?.unwrap();
1301        let payload = stream::read_to_vec(stream).await?;
1302        assert_eq!(payload, b"hello, world");
1303        assert_eq!(obj_meta.content_type, metadata.content_type);
1304        assert_eq!(obj_meta.custom, metadata.custom);
1305
1306        let head_meta = backend.get_metadata(&id).await?.unwrap();
1307        assert_eq!(head_meta.content_type, metadata.content_type);
1308        assert_eq!(head_meta.custom, metadata.custom);
1309
1310        Ok(())
1311    }
1312
1313    /// Verifies that absent rows return None or succeed silently for all read/delete operations.
1314    #[tokio::test]
1315    async fn test_nonexistent() -> Result<()> {
1316        let backend = create_test_backend().await?;
1317
1318        let id = make_id();
1319        assert!(backend.get_object(&id).await?.is_none());
1320        assert!(backend.get_metadata(&id).await?.is_none());
1321        backend.delete_object(&id).await?;
1322
1323        Ok(())
1324    }
1325
1326    #[tokio::test]
1327    async fn test_overwrite() -> Result<()> {
1328        let backend = create_test_backend().await?;
1329
1330        let id = make_id();
1331        let first_metadata = Metadata {
1332            custom: BTreeMap::from_iter([("invalid".into(), "invalid".into())]),
1333            ..Default::default()
1334        };
1335        create_object(&backend, &id, &first_metadata, b"hello", SystemTime::now()).await?;
1336
1337        let second_metadata = Metadata {
1338            custom: BTreeMap::from_iter([("hello".into(), "world".into())]),
1339            ..Default::default()
1340        };
1341        backend
1342            .put_object(&id, &second_metadata, stream::single("world"))
1343            .await?;
1344
1345        let (meta, stream) = backend.get_object(&id).await?.unwrap();
1346        let payload = stream::read_to_vec(stream).await?;
1347        assert_eq!(payload, b"world");
1348        assert_eq!(meta.custom, second_metadata.custom);
1349
1350        Ok(())
1351    }
1352
1353    #[tokio::test]
1354    async fn test_read_after_delete() -> Result<()> {
1355        let backend = create_test_backend().await?;
1356
1357        let id = make_id();
1358        let metadata = Metadata::default();
1359        create_object(&backend, &id, &metadata, b"hello", SystemTime::now()).await?;
1360        backend.delete_object(&id).await?;
1361
1362        assert!(backend.get_object(&id).await?.is_none());
1363
1364        Ok(())
1365    }
1366
1367    /// Verifies TTI bump via both `get_object` (loaded=true path) and `get_metadata` (loaded=false path).
1368    ///
1369    /// The bump condition is: `expire_at < now + tti - TTI_DEBOUNCE`. We write a stale
1370    /// timestamp just inside the bump window (still in the future, so the row is not GC'd)
1371    /// and confirm that a subsequent read returns a later expiry.
1372    #[tokio::test]
1373    async fn test_tti_bump() -> Result<()> {
1374        let backend = create_test_backend().await?;
1375        // TTI must exceed TTI_DEBOUNCE (1 day) for the bump condition to be reachable.
1376        let tti = Duration::from_secs(2 * 24 * 3600); // 2 days
1377        let metadata = Metadata {
1378            expiration_policy: ExpirationPolicy::TimeToIdle(tti),
1379            ..Default::default()
1380        };
1381
1382        // Pass a backdated `now` so the written expiry is inside the bump window:
1383        // expire_at = past_now + tti = now - TTI_DEBOUNCE - 60s (stale but not yet expired).
1384        let past_now = SystemTime::now() - TTI_DEBOUNCE - Duration::from_secs(60);
1385
1386        // Sub-sequence 1: get_object triggers bump (loaded=true path).
1387        let id1 = make_id();
1388        create_object(&backend, &id1, &metadata, b"hello, world", past_now).await?;
1389
1390        // get_object reads the stale row, triggers bump, and returns the pre-bump metadata.
1391        let (pre_obj_meta, _) = backend.get_object(&id1).await?.unwrap();
1392        let pre_obj_expiry = pre_obj_meta.time_expires.unwrap();
1393
1394        // A second get_metadata reads the freshly bumped row.
1395        let post_obj_meta = backend.get_metadata(&id1).await?.unwrap();
1396        let post_obj_expiry = post_obj_meta.time_expires.unwrap();
1397        assert!(
1398            post_obj_expiry > pre_obj_expiry,
1399            "bump should extend expiry"
1400        );
1401
1402        // Sub-sequence 2: get_metadata triggers bump (loaded=false path).
1403        let id2 = make_id();
1404        create_object(&backend, &id2, &metadata, b"hello, world", past_now).await?;
1405
1406        // First get_metadata sees the stale row and triggers a bump.
1407        let pre_meta = backend.get_metadata(&id2).await?.unwrap();
1408        let pre_expiry = pre_meta.time_expires.unwrap();
1409
1410        // Second get_metadata reads the freshly bumped row.
1411        let post_meta = backend.get_metadata(&id2).await?.unwrap();
1412        let post_expiry = post_meta.time_expires.unwrap();
1413        assert!(post_expiry > pre_expiry, "bump should extend expiry");
1414
1415        // Payload must be intact after the loaded=false bump (which re-fetches the payload).
1416        let (_, stream) = backend.get_object(&id2).await?.unwrap();
1417        let payload = stream::read_to_vec(stream).await?;
1418        assert_eq!(payload, b"hello, world");
1419
1420        Ok(())
1421    }
1422
1423    #[tokio::test]
1424    async fn test_tti_no_bump_when_fresh() -> Result<()> {
1425        let backend = create_test_backend().await?;
1426
1427        let id = make_id();
1428        // TTI must exceed TTI_DEBOUNCE (1 day) for the bump condition to be reachable.
1429        let tti = Duration::from_secs(2 * 24 * 3600); // 2 days
1430        let metadata = Metadata {
1431            expiration_policy: ExpirationPolicy::TimeToIdle(tti),
1432            ..Default::default()
1433        };
1434        create_object(&backend, &id, &metadata, b"hello, world", SystemTime::now()).await?;
1435
1436        // A freshly written object has time_expires ≈ now + 2d, well outside the bump
1437        // window (now + 2d - 1d = now + 1d). No bump should occur.
1438        let first = backend.get_metadata(&id).await?.unwrap();
1439        let second = backend.get_metadata(&id).await?.unwrap();
1440
1441        assert_eq!(
1442            first.time_expires.unwrap(),
1443            second.time_expires.unwrap(),
1444            "fresh TTI object must not be bumped"
1445        );
1446
1447        Ok(())
1448    }
1449
1450    // --- Section 2: Expiration ---
1451
1452    #[tokio::test]
1453    async fn test_ttl_immediate() -> Result<()> {
1454        // NB: We create a TTL that immediately expires in this test. This might be optimized away
1455        // in a future implementation, so we will have to update this test accordingly.
1456
1457        let backend = create_test_backend().await?;
1458
1459        let id = make_id();
1460        let metadata = Metadata {
1461            expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_secs(0)),
1462            ..Default::default()
1463        };
1464        create_object(&backend, &id, &metadata, b"hello, world", SystemTime::now()).await?;
1465
1466        assert!(backend.get_object(&id).await?.is_none());
1467
1468        Ok(())
1469    }
1470
1471    #[tokio::test]
1472    async fn test_tti_immediate() -> Result<()> {
1473        // NB: We create a TTI that immediately expires in this test. This might be optimized away
1474        // in a future implementation, so we will have to update this test accordingly.
1475
1476        let backend = create_test_backend().await?;
1477
1478        let id = make_id();
1479        let metadata = Metadata {
1480            expiration_policy: ExpirationPolicy::TimeToIdle(Duration::from_secs(0)),
1481            ..Default::default()
1482        };
1483        create_object(&backend, &id, &metadata, b"hello, world", SystemTime::now()).await?;
1484
1485        assert!(backend.get_object(&id).await?.is_none());
1486
1487        Ok(())
1488    }
1489
1490    // --- Section 3: Tiered Operations ---
1491
1492    /// Covers all three row states for `get_tiered_object` and `get_tiered_metadata`.
1493    ///
1494    /// - **empty**: both return NotFound.
1495    /// - **object**: put_object, both return the Object variant with correct payload/metadata.
1496    /// - **tombstone**: CAS-write with a distinct `lt_id`, both return the Tombstone variant
1497    ///   with `target == lt_id`.
1498    #[tokio::test]
1499    async fn test_tiered_get() -> Result<()> {
1500        let backend = create_test_backend().await?;
1501
1502        // empty
1503        let id = make_id();
1504        assert!(matches!(
1505            backend.get_tiered_object(&id).await?,
1506            TieredGet::NotFound
1507        ));
1508        assert!(matches!(
1509            backend.get_tiered_metadata(&id).await?,
1510            TieredMetadata::NotFound
1511        ));
1512
1513        // object
1514        let id = make_id();
1515        let put_meta = Metadata {
1516            content_type: "text/plain".into(),
1517            custom: BTreeMap::from_iter([("k".into(), "v".into())]),
1518            ..Default::default()
1519        };
1520        create_object(&backend, &id, &put_meta, b"payload", SystemTime::now()).await?;
1521
1522        let TieredGet::Object(obj_meta, obj_stream) = backend.get_tiered_object(&id).await? else {
1523            panic!("expected TieredGet::Object");
1524        };
1525        let obj_payload = stream::read_to_vec(obj_stream).await?;
1526        assert_eq!(obj_payload, b"payload");
1527        assert_eq!(obj_meta.content_type, put_meta.content_type);
1528        assert_eq!(obj_meta.custom, put_meta.custom);
1529
1530        let TieredMetadata::Object(head_meta) = backend.get_tiered_metadata(&id).await? else {
1531            panic!("expected TieredMetadata::Object");
1532        };
1533        assert_eq!(head_meta.content_type, put_meta.content_type);
1534        assert_eq!(head_meta.custom, put_meta.custom);
1535
1536        // tombstone
1537        let hv_id = make_id();
1538        let lt_id = ObjectId::random(hv_id.context().clone());
1539        let tombstone = Tombstone {
1540            target: lt_id.clone(),
1541            expiration_policy: ExpirationPolicy::Manual,
1542        };
1543        create_tombstone(&backend, &hv_id, &tombstone, SystemTime::now()).await?;
1544
1545        match backend.get_tiered_object(&hv_id).await? {
1546            TieredGet::Tombstone(get_t) => assert_eq!(get_t.target, lt_id),
1547            other => panic!("expected TieredGet::Tombstone, got {other:?}"),
1548        }
1549        match backend.get_tiered_metadata(&hv_id).await? {
1550            TieredMetadata::Tombstone(meta_t) => assert_eq!(meta_t.target, lt_id,),
1551            other => panic!("expected TieredMetadata::Tombstone, got {other:?}"),
1552        }
1553
1554        Ok(())
1555    }
1556
1557    /// Covers all three row states for `put_non_tombstone`.
1558    ///
1559    /// - **empty**: returns None, object is readable.
1560    /// - **object**: overwrites with new payload, returns None.
1561    /// - **tombstone**: returns Some(Tombstone) with the correct target; tombstone still intact.
1562    #[tokio::test]
1563    async fn test_put_non_tombstone() -> Result<()> {
1564        let backend = create_test_backend().await?;
1565
1566        // empty: put_non_tombstone on absent row succeeds and makes object readable.
1567        let id = make_id();
1568        let metadata = Metadata::default();
1569        let result = backend
1570            .put_non_tombstone(&id, &metadata, Bytes::from_static(b"first"))
1571            .await?;
1572        assert_eq!(result, None, "expected None on empty row");
1573        let (_, stream) = backend.get_object(&id).await?.unwrap();
1574        assert_eq!(&stream::read_to_vec(stream).await?, b"first");
1575
1576        // object: put_non_tombstone on existing object replaces payload, returns None.
1577        let id = make_id();
1578        create_object(&backend, &id, &metadata, b"old", SystemTime::now()).await?;
1579        let result = backend
1580            .put_non_tombstone(&id, &metadata, Bytes::from_static(b"new"))
1581            .await?;
1582        assert_eq!(result, None, "expected None when overwriting object");
1583        let (_, stream) = backend.get_object(&id).await?.unwrap();
1584        assert_eq!(&stream::read_to_vec(stream).await?, b"new");
1585
1586        // tombstone: put_non_tombstone returns Some(Tombstone) and leaves tombstone intact.
1587        let hv_id = make_id();
1588        let lt_id = ObjectId::random(hv_id.context().clone());
1589        let tombstone = Tombstone {
1590            target: lt_id.clone(),
1591            expiration_policy: ExpirationPolicy::Manual,
1592        };
1593        create_tombstone(&backend, &hv_id, &tombstone, SystemTime::now()).await?;
1594        let result = backend
1595            .put_non_tombstone(&hv_id, &metadata, Bytes::new())
1596            .await?;
1597        let returned = result.expect("expected Some(Tombstone) when row is a tombstone");
1598        assert_eq!(returned.target, lt_id);
1599        assert!(
1600            matches!(
1601                backend.get_tiered_metadata(&hv_id).await?,
1602                TieredMetadata::Tombstone(_)
1603            ),
1604            "tombstone must still exist after put_non_tombstone"
1605        );
1606
1607        Ok(())
1608    }
1609
1610    /// Covers all three row states for `delete_non_tombstone`.
1611    ///
1612    /// - **empty**: returns None.
1613    /// - **object**: returns None, row gone.
1614    /// - **tombstone**: returns Some(Tombstone) with correct target; tombstone still intact.
1615    ///
1616    /// Verifies that the `r` column is correctly detected by both the `ReadRows` column
1617    /// filter and the `CheckAndMutate` `tombstone_predicate`.
1618    #[tokio::test]
1619    async fn test_delete_non_tombstone() -> Result<()> {
1620        let backend = create_test_backend().await?;
1621
1622        // empty
1623        let id = make_id();
1624        assert_eq!(backend.delete_non_tombstone(&id).await?, None);
1625
1626        // object
1627        let id = make_id();
1628        let metadata = Metadata::default();
1629        create_object(&backend, &id, &metadata, b"hello, world", SystemTime::now()).await?;
1630        assert_eq!(backend.delete_non_tombstone(&id).await?, None);
1631        assert!(backend.get_object(&id).await?.is_none());
1632
1633        // tombstone
1634        let id = make_id();
1635        let tombstone = Tombstone {
1636            target: id.clone(),
1637            expiration_policy: ExpirationPolicy::Manual,
1638        };
1639        create_tombstone(&backend, &id, &tombstone, SystemTime::now()).await?;
1640        let tombstone = backend
1641            .delete_non_tombstone(&id)
1642            .await?
1643            .expect("expected Some(tombstone)");
1644        assert_eq!(tombstone.target, id, "tombstone target must be returned");
1645        assert!(
1646            matches!(
1647                backend.get_tiered_metadata(&id).await?,
1648                TieredMetadata::Tombstone(_)
1649            ),
1650            "tombstone must still exist after delete_non_tombstone"
1651        );
1652
1653        Ok(())
1654    }
1655
1656    // --- Section 4: Compare-and-Write ---
1657
1658    /// Creating a tombstone on an empty row succeeds; a retry of the same CAS also succeeds.
1659    ///
1660    /// After creation, both tiered and legacy APIs reflect the tombstone.
1661    #[tokio::test]
1662    async fn test_cas_create_tombstone() -> Result<()> {
1663        let backend = create_test_backend().await?;
1664
1665        let hv_id = make_id();
1666        let lt_id = ObjectId::random(hv_id.context().clone());
1667        let expiration_policy = ExpirationPolicy::TimeToLive(Duration::from_secs(3600));
1668        let tombstone = Tombstone {
1669            target: lt_id.clone(),
1670            expiration_policy,
1671        };
1672
1673        // First create succeeds.
1674        let committed = backend
1675            .compare_and_write(&hv_id, None, TieredWrite::Tombstone(tombstone.clone()))
1676            .await?;
1677        assert!(committed, "expected CAS success on empty row");
1678
1679        // Tiered reads must see the tombstone with correct target and policy.
1680        let TieredMetadata::Tombstone(t) = backend.get_tiered_metadata(&hv_id).await? else {
1681            panic!("expected TieredMetadata::Tombstone");
1682        };
1683        assert_eq!(t.target, lt_id, "target must round-trip via r column");
1684        assert_eq!(t.expiration_policy, expiration_policy);
1685        match backend.get_tiered_object(&hv_id).await? {
1686            TieredGet::Tombstone(t) => assert_eq!(t.target, lt_id, "round-trip via r column"),
1687            other => panic!("expected TieredGet::Tombstone, got {other:?}"),
1688        }
1689
1690        // Legacy reads must error rather than leak tombstone data.
1691        assert!(matches!(
1692            backend.get_object(&hv_id).await,
1693            Err(Error::UnexpectedTombstone)
1694        ));
1695        assert!(matches!(
1696            backend.get_metadata(&hv_id).await,
1697            Err(Error::UnexpectedTombstone)
1698        ));
1699
1700        // Idempotent retry: retry with the same target succeeds
1701        let second = backend
1702            .compare_and_write(&hv_id, None, TieredWrite::Tombstone(tombstone))
1703            .await?;
1704        assert!(second, "idempotent retry");
1705
1706        Ok(())
1707    }
1708
1709    /// Swapping a tombstone target: wrong expected → false, correct expected → true.
1710    #[tokio::test]
1711    async fn test_cas_swap_tombstone() -> Result<()> {
1712        let backend = create_test_backend().await?;
1713
1714        let hv_id = make_id();
1715        let old_lt_id = ObjectId::random(hv_id.context().clone());
1716        let wrong_lt_id = ObjectId::random(hv_id.context().clone());
1717        let new_lt_id = ObjectId::random(hv_id.context().clone());
1718
1719        let tombstone = Tombstone {
1720            target: old_lt_id.clone(),
1721            expiration_policy: ExpirationPolicy::Manual,
1722        };
1723        create_tombstone(&backend, &hv_id, &tombstone, SystemTime::now()).await?;
1724
1725        // Wrong target: CAS fails, tombstone unchanged.
1726        let write = TieredWrite::Tombstone(Tombstone {
1727            target: new_lt_id.clone(),
1728            expiration_policy: ExpirationPolicy::Manual,
1729        });
1730        let swapped = backend
1731            .compare_and_write(&hv_id, Some(&wrong_lt_id), write.clone())
1732            .await?;
1733        assert!(!swapped, "expected CAS failure due to wrong target");
1734        match backend.get_tiered_metadata(&hv_id).await? {
1735            TieredMetadata::Tombstone(t) => assert_eq!(t.target, old_lt_id),
1736            other => panic!("expected tombstone, got {other:?}"),
1737        }
1738
1739        // Correct target: CAS succeeds, target updated.
1740        let swapped = backend
1741            .compare_and_write(&hv_id, Some(&old_lt_id), write.clone())
1742            .await?;
1743        assert!(swapped, "expected CAS success with correct target");
1744        match backend.get_tiered_metadata(&hv_id).await? {
1745            TieredMetadata::Tombstone(t) => assert_eq!(t.target, new_lt_id),
1746            other => panic!("expected tombstone, got {other:?}"),
1747        }
1748
1749        // Idempotent retry: same A→B swap returns true.
1750        let retry = backend
1751            .compare_and_write(&hv_id, Some(&old_lt_id), write)
1752            .await?;
1753        assert!(retry, "idempotent retry");
1754
1755        Ok(())
1756    }
1757
1758    /// Swapping a tombstone for inline object data: wrong expected → false, correct → true.
1759    #[tokio::test]
1760    async fn test_cas_swap_inline() -> Result<()> {
1761        let backend = create_test_backend().await?;
1762
1763        let id = make_id();
1764        let lt_id = ObjectId::random(id.context().clone());
1765        let wrong_id = ObjectId::random(id.context().clone());
1766
1767        let tombstone = Tombstone {
1768            target: lt_id.clone(),
1769            expiration_policy: ExpirationPolicy::Manual,
1770        };
1771        create_tombstone(&backend, &id, &tombstone, SystemTime::now()).await?;
1772
1773        // Wrong target: CAS fails, tombstone intact.
1774        let write = TieredWrite::Object(Metadata::default(), Bytes::new());
1775        let swapped = backend
1776            .compare_and_write(&id, Some(&wrong_id), write)
1777            .await?;
1778        assert!(!swapped, "expected CAS failure with wrong target");
1779        assert!(matches!(
1780            backend.get_tiered_metadata(&id).await?,
1781            TieredMetadata::Tombstone(_)
1782        ));
1783
1784        // Correct target: CAS succeeds, row becomes an inline object.
1785        let payload = Bytes::from_static(b"hello inline");
1786        let write = TieredWrite::Object(Metadata::default(), payload.clone());
1787        let swapped = backend
1788            .compare_and_write(&id, Some(&lt_id), write.clone())
1789            .await?;
1790        assert!(swapped, "expected CAS success with correct target");
1791        let TieredGet::Object(_, stream) = backend.get_tiered_object(&id).await? else {
1792            panic!("expected inline object after swap");
1793        };
1794        assert_eq!(&stream::read_to_vec(stream).await?, payload.as_ref());
1795
1796        // Idempotent retry: row is already inline (no tombstone), same CAS returns true.
1797        let retry = backend.compare_and_write(&id, Some(&lt_id), write).await?;
1798        assert!(retry, "idempotent retry");
1799
1800        Ok(())
1801    }
1802
1803    /// CAS-write an object onto an empty row (expected=None, write=Object) succeeds.
1804    #[tokio::test]
1805    async fn test_cas_create_object_on_empty_row() -> Result<()> {
1806        let backend = create_test_backend().await?;
1807
1808        let id = make_id();
1809        let payload = Bytes::from_static(b"cas object");
1810        let write = TieredWrite::Object(Metadata::default(), payload.clone());
1811        let committed = backend.compare_and_write(&id, None, write).await?;
1812        assert!(committed, "expected CAS success on empty row");
1813
1814        let TieredGet::Object(_, stream) = backend.get_tiered_object(&id).await? else {
1815            panic!("expected Object after CAS-create");
1816        };
1817        assert_eq!(&stream::read_to_vec(stream).await?, payload.as_ref());
1818
1819        Ok(())
1820    }
1821
1822    /// CAS-delete: wrong expected → false; correct expected → true, row gone.
1823    #[tokio::test]
1824    async fn test_cas_delete() -> Result<()> {
1825        let backend = create_test_backend().await?;
1826
1827        let id = make_id();
1828        let lt_id = ObjectId::random(id.context().clone());
1829        let wrong_id = ObjectId::random(id.context().clone());
1830
1831        let tombstone = Tombstone {
1832            target: lt_id.clone(),
1833            expiration_policy: ExpirationPolicy::Manual,
1834        };
1835        create_tombstone(&backend, &id, &tombstone, SystemTime::now()).await?;
1836
1837        // Wrong target: fails, row preserved.
1838        let deleted = backend
1839            .compare_and_write(&id, Some(&wrong_id), TieredWrite::Delete)
1840            .await?;
1841        assert!(!deleted, "expected CAS failure with wrong target");
1842        assert!(matches!(
1843            backend.get_tiered_metadata(&id).await?,
1844            TieredMetadata::Tombstone(_)
1845        ));
1846
1847        // Correct target: succeeds, row gone.
1848        let deleted = backend
1849            .compare_and_write(&id, Some(&lt_id), TieredWrite::Delete)
1850            .await?;
1851        assert!(deleted, "expected CAS delete success");
1852        assert!(matches!(
1853            backend.get_tiered_metadata(&id).await?,
1854            TieredMetadata::NotFound
1855        ));
1856
1857        // Idempotent retry: row is already absent (no tombstone), same delete returns true.
1858        let retry = backend
1859            .compare_and_write(&id, Some(&lt_id), TieredWrite::Delete)
1860            .await?;
1861        assert!(retry, "idempotent retry");
1862
1863        // Inline object replaced tombstone: Safe to delete since it is an idempotent operation.
1864        let id2 = make_id();
1865        let fake_lt_id = ObjectId::random(id2.context().clone());
1866        let metadata = Metadata::default();
1867        create_object(&backend, &id2, &metadata, b"data", SystemTime::now()).await?;
1868        let deleted = backend
1869            .compare_and_write(&id2, Some(&fake_lt_id), TieredWrite::Delete)
1870            .await?;
1871        assert!(deleted, "expected idempotent deletion");
1872
1873        Ok(())
1874    }
1875
1876    // --- Section 5: Legacy Tombstone Compatibility ---
1877
1878    /// Legacy Manual and TTL tombstones are correctly read via the tiered APIs.
1879    ///
1880    /// Uses `Manual` expiration so `timestamp_micros = -1` (server-assigned ≈ write time)
1881    /// does not trigger immediate expiry.
1882    #[tokio::test]
1883    async fn test_legacy_tombstone_reads() -> Result<()> {
1884        let backend = create_test_backend().await?;
1885
1886        // Manual policy: get_tiered_metadata returns Tombstone(Manual), get_tiered_object returns Tombstone.
1887        let id = make_id();
1888        write_legacy_tombstone(&backend, &id, ExpirationPolicy::Manual, None).await?;
1889
1890        let TieredMetadata::Tombstone(t) = backend.get_tiered_metadata(&id).await? else {
1891            panic!("expected tombstone");
1892        };
1893        assert_eq!(t.expiration_policy, ExpirationPolicy::Manual);
1894        assert!(matches!(
1895            backend.get_tiered_object(&id).await?,
1896            TieredGet::Tombstone(_)
1897        ));
1898
1899        // TTL policy: get_tiered_metadata returns Tombstone with the correct TTL policy.
1900        //
1901        // A future cell timestamp (now + TTL) is required so `expires_before` does not
1902        // immediately filter the row.
1903        let id = make_id();
1904        let ttl = Duration::from_secs(2 * 24 * 3600);
1905        write_legacy_tombstone(&backend, &id, ExpirationPolicy::TimeToLive(ttl), None).await?;
1906
1907        let TieredMetadata::Tombstone(t) = backend.get_tiered_metadata(&id).await? else {
1908            panic!("expected TieredMetadata::Tombstone");
1909        };
1910        assert_eq!(t.expiration_policy, ExpirationPolicy::TimeToLive(ttl));
1911
1912        Ok(())
1913    }
1914
1915    /// A legacy tombstone with TTI policy is upgraded to the new `r`/`t` column format on read.
1916    ///
1917    /// The bump path calls `put_tombstone_row`, which rewrites the row with `r` + `t` columns.
1918    /// The upgraded row has a fresh cell timestamp (≈ now + TTI), so `time_expires` increases.
1919    #[tokio::test]
1920    async fn test_legacy_tombstone_tti_upgrade() -> Result<()> {
1921        let backend = create_test_backend().await?;
1922        let id = make_id();
1923        let path = id.as_storage_path().to_string().into_bytes();
1924
1925        let tti = Duration::from_secs(2 * 24 * 3600); // must exceed TTI_DEBOUNCE (1 day)
1926
1927        // Place time_expires just inside the bump window: past `now + tti - TTI_DEBOUNCE`
1928        // but still in the future so `expires_before(now)` does not filter the row.
1929        let old_deadline = SystemTime::now() + tti - TTI_DEBOUNCE - Duration::from_secs(60);
1930        write_legacy_tombstone(
1931            &backend,
1932            &id,
1933            ExpirationPolicy::TimeToIdle(tti),
1934            Some(old_deadline),
1935        )
1936        .await?;
1937
1938        // First read detects the stale TTI and triggers `put_tombstone_row`.
1939        let TieredMetadata::Tombstone(_) = backend.get_tiered_metadata(&id).await? else {
1940            panic!("expected tombstone");
1941        };
1942
1943        // After the bump, the row is rewritten with a fresh timestamp (≈ now + TTI).
1944        let new_deadline = match backend.read_row(&path, None, "test-verify").await? {
1945            Some(RowData::Tombstone { time_expires, .. }) => time_expires.unwrap(),
1946            _ => panic!("expected tombstone row after bump"),
1947        };
1948
1949        assert!(
1950            new_deadline > old_deadline,
1951            "TTI bump should extend tombstone expiry: {old_deadline:?} -> {new_deadline:?}"
1952        );
1953
1954        Ok(())
1955    }
1956
1957    /// Legacy tombstones are handled correctly by all conditional write operations.
1958    ///
1959    /// Covers: `put_non_tombstone`, `delete_non_tombstone`, CAS-delete for both the
1960    /// legacy-metadata format and the empty-redirect format.
1961    #[tokio::test]
1962    async fn test_legacy_tombstone_conditional_ops() -> Result<()> {
1963        let backend = create_test_backend().await?;
1964
1965        // put_non_tombstone returns Some(target == id) for a legacy tombstone.
1966        let id = make_id();
1967        write_legacy_tombstone(&backend, &id, ExpirationPolicy::Manual, None).await?;
1968        let t_opt = backend
1969            .put_non_tombstone(&id, &Metadata::default(), Bytes::new())
1970            .await?;
1971        assert_eq!(t_opt.map(|t| t.target).as_ref(), Some(&id));
1972
1973        // delete_non_tombstone returns Some(target == id) for a legacy tombstone.
1974        let id = make_id();
1975        write_legacy_tombstone(&backend, &id, ExpirationPolicy::Manual, None).await?;
1976        let t_opt = backend.delete_non_tombstone(&id).await?;
1977        assert_eq!(t_opt.map(|t| t.target).as_ref(), Some(&id));
1978
1979        // CAS-delete succeeds on a legacy-metadata tombstone (target resolves to hv_id).
1980        let id = make_id();
1981        write_legacy_tombstone(&backend, &id, ExpirationPolicy::Manual, None).await?;
1982        let deleted = backend
1983            .compare_and_write(&id, Some(&id), TieredWrite::Delete)
1984            .await?;
1985        assert!(
1986            deleted,
1987            "CAS-delete must succeed on legacy-metadata tombstone"
1988        );
1989        assert!(matches!(
1990            backend.get_tiered_metadata(&id).await?,
1991            TieredMetadata::NotFound
1992        ));
1993
1994        // CAS-delete succeeds on an empty-redirect tombstone (target resolves to hv_id).
1995        let id = make_id();
1996        write_empty_redirect_tombstone(&backend, &id).await?;
1997        let deleted = backend
1998            .compare_and_write(&id, Some(&id), TieredWrite::Delete)
1999            .await?;
2000        assert!(
2001            deleted,
2002            "CAS-delete must succeed on empty-redirect tombstone"
2003        );
2004        assert!(matches!(
2005            backend.get_tiered_metadata(&id).await?,
2006            TieredMetadata::NotFound
2007        ));
2008
2009        Ok(())
2010    }
2011
2012    /// An empty `r` value falls back to the HV id when resolving the tombstone target.
2013    #[tokio::test]
2014    async fn test_empty_redirect_falls_back_to_hv_id() -> Result<()> {
2015        let backend = create_test_backend().await?;
2016        let id = make_id();
2017
2018        write_empty_redirect_tombstone(&backend, &id).await?;
2019        match backend.get_tiered_metadata(&id).await? {
2020            TieredMetadata::Tombstone(t) => assert_eq!(t.target, id, "must fall back to hv_id"),
2021            other => panic!("expected tombstone, got {other:?}"),
2022        }
2023
2024        Ok(())
2025    }
2026}