Skip to main content

objectstore_service/backend/
bigtable.rs

1//! BigTable backend for high-volume, low-latency storage of small objects.
2//!
3//! # Row Format
4//!
5//! Each row key is the object's storage path. A row contains either an **object** or a
6//! **tombstone** — never both. The two layouts are mutually exclusive and distinguished by
7//! column presence:
8//!
9//! | Column | Family    | Content                     | Present when       |
10//! |--------|-----------|-----------------------------|--------------------|
11//! | `p`    | `fg`/`fm` | Compressed payload bytes    | Object row only    |
12//! | `m`    | `fg`/`fm` | [`Metadata`] JSON           | Object row only    |
13//! | `r`    | `fg`/`fm` | Redirect path to LT storage | Tombstone row only |
14//! | `t`    | `fg`/`fm` | [`Tombstone`] metadata JSON | Tombstone row only |
15//!
16//! The `r` column signals a tombstone row: its **value** is the long-term `ObjectId`
17//! serialized via `as_storage_path()`. Callers can resolve the LT object directly from the
18//! `r` value without reconstructing it from the row key.
19//!
20//! `p`/`m` and `r`/`t` are mutually exclusive. Every write begins with a `DeleteFromRow`
21//! mutation that clears all columns before writing the new cells, so mixed rows cannot exist.
22//!
23//! ## Legacy Tombstone Format
24//!
25//! Tombstones written before the `r`/`t` column layout used the object-row format with an
26//! empty `p` column and `"is_redirect_tombstone": true` in the `m` JSON. Both formats are
27//! supported for reading. A `bigtable.legacy_tombstone_read` metric is emitted on each legacy
28//! read. Legacy tombstones expire naturally by TTL/GC; TTI bumps transparently upgrade them
29//! to the new format.
30
31use std::fmt;
32use std::future::Future;
33use std::sync::Arc;
34use std::time::{Duration, SystemTime};
35
36use bigtable_rs::bigtable::{BigTableConnection, Error as BigTableError, RowCell};
37use bigtable_rs::google::bigtable::v2::{self, mutation};
38use bytes::Bytes;
39use futures_util::TryStreamExt;
40use objectstore_types::metadata::{ExpirationPolicy, Metadata};
41use objectstore_types::range::{ByteRange, ContentRange};
42use serde::{Deserialize, Serialize};
43use tonic::Code;
44
45use crate::backend::common::{
46    Backend, DeleteResponse, GetResponse, HighVolumeBackend, MetadataResponse, PutResponse,
47    TieredGet, TieredMetadata, TieredWrite, Tombstone,
48};
49use crate::error::{Error, Result};
50use crate::gcp_auth::PrefetchingTokenProvider;
51use crate::id::ObjectId;
52use crate::stream::{ChunkedBytes, ClientStream};
53
54/// Configuration for [`BigTableBackend`].
55///
56/// Stores objects in [Google Cloud Bigtable], a NoSQL wide-column database optimized for
57/// high-throughput, low-latency workloads with small objects. Authentication uses Application
58/// Default Credentials (ADC).
59///
60/// **Note**: The table must be pre-created with the following column families:
61/// - `fg`: timestamp-based garbage collection (`maxage=1s`)
62/// - `fm`: manual garbage collection (`no GC policy`)
63///
64/// [Google Cloud Bigtable]: https://cloud.google.com/bigtable
65///
66/// # Example
67///
68/// ```yaml
69/// storage:
70///   type: bigtable
71///   project_id: my-project
72///   instance_name: objectstore
73///   table_name: objectstore
74/// ```
75#[derive(Debug, Clone, Deserialize, Serialize)]
76pub struct BigTableConfig {
77    /// Optional custom Bigtable endpoint.
78    ///
79    /// Useful for testing with emulators. If `None`, uses the default Bigtable endpoint.
80    ///
81    /// # Default
82    ///
83    /// `None` (uses default Bigtable endpoint)
84    ///
85    /// # Environment Variables
86    ///
87    /// - `OS__STORAGE__TYPE=bigtable`
88    /// - `OS__STORAGE__ENDPOINT=localhost:8086` (optional)
89    pub endpoint: Option<String>,
90
91    /// GCP project ID.
92    ///
93    /// The Google project ID (not project number) containing the Bigtable instance.
94    ///
95    /// # Environment Variables
96    ///
97    /// - `OS__STORAGE__PROJECT_ID=my-project`
98    pub project_id: String,
99
100    /// Bigtable instance name.
101    ///
102    /// # Environment Variables
103    ///
104    /// - `OS__STORAGE__INSTANCE_NAME=my-instance`
105    pub instance_name: String,
106
107    /// Bigtable table name.
108    ///
109    /// The table must exist before starting the server.
110    ///
111    /// # Environment Variables
112    ///
113    /// - `OS__STORAGE__TABLE_NAME=objectstore`
114    pub table_name: String,
115
116    /// Optional number of connections to maintain to Bigtable.
117    ///
118    /// # Default
119    ///
120    /// `None` (defaults to 1)
121    ///
122    /// # Environment Variables
123    ///
124    /// - `OS__STORAGE__CONNECTIONS=16` (optional)
125    pub connections: Option<usize>,
126}
127
128/// Connection timeout used for the initial connection to Bigtable.
129const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
130/// Maximum age for connections (GRPC channels) to Bigtable, after which they will be swapped with
131/// new ones in the background.
132/// This is intended to avoid latency spikes that could occur every hour or so, when the server
133/// closes long standing connections ([source](https://web.archive.org/web/20260211140930/https://docs.cloud.google.com/bigtable/docs/performance#cold-starts:~:text=return%20an%20error.-,Cold%20start,-at%20client%20initialization)).
134/// `tonic` already handles reconnections transparently, but lazily, meaning that the first requests
135/// that attempt to use a certain channel after the server has closed it will pay the cost of the
136/// reconnection, resulting in increased latency for those requests.
137const MAX_CHANNEL_AGE: Option<Duration> = Some(Duration::from_mins(50));
138/// Time to debounce bumping an object with configured TTI.
139const TTI_DEBOUNCE: Duration = Duration::from_hours(24);
140/// Permission scopes required for accessing the BigTable data API.
141const TOKEN_SCOPES: &[&str] = &["https://www.googleapis.com/auth/bigtable.data"];
142
143/// How often to retry failed requests.
144const REQUEST_RETRY_COUNT: usize = 2;
145/// How many times to retry a CAS mutation before giving up and returning an error.
146const CAS_RETRY_COUNT: usize = 3;
147
148/// Column that stores the raw payload (compressed).
149const COLUMN_PAYLOAD: &[u8] = b"p";
150/// Column that stores metadata in JSON.
151const COLUMN_METADATA: &[u8] = b"m";
152/// Column that stores the redirect path for tombstone rows.
153const COLUMN_REDIRECT: &[u8] = b"r";
154/// Column that stores [`TombstoneMeta`] JSON for tombstone rows.
155const COLUMN_TOMBSTONE_META: &[u8] = b"t";
156/// Regex to match all non-payload columns (`m`, `r`, `t`) for metadata-only reads.
157const FILTER_META: &[u8] = b"^[mrt]$";
158
159/// Column family that uses timestamp-based garbage collection.
160///
161/// We require a GC rule on this family to automatically delete rows.
162/// See: <https://cloud.google.com/bigtable/docs/gc-cell-level>
163const FAMILY_GC: &str = "fg";
164/// Column family that uses manual garbage collection.
165const FAMILY_MANUAL: &str = "fm";
166
167/// BigTable storage backend for high-volume, low-latency object storage.
168pub struct BigTableBackend {
169    bigtable: BigTableConnection,
170
171    instance_path: String,
172    table_path: String,
173    table_name: String,
174}
175
176impl fmt::Debug for BigTableBackend {
177    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
178        f.debug_struct("BigTableBackend")
179            .field("instance_path", &self.instance_path)
180            .field("table_path", &self.table_path)
181            .field("table_name", &self.table_name)
182            .finish_non_exhaustive()
183    }
184}
185
186/// Creates a row filter that matches a single column by exact qualifier.
187fn column_filter(column: &[u8]) -> v2::RowFilter {
188    v2::RowFilter {
189        filter: Some(v2::row_filter::Filter::ColumnQualifierRegexFilter(
190            [b"^", column, b"$"].concat(),
191        )),
192    }
193}
194
195/// Creates a row filter matching the legacy tombstone format: `m` column JSON starts with
196/// `{"is_redirect_tombstone":true`.
197///
198/// After legacy tombstones expire naturally this filter becomes dead code in both callers.
199fn legacy_tombstone_filter() -> v2::RowFilter {
200    v2::RowFilter {
201        filter: Some(v2::row_filter::Filter::Chain(v2::row_filter::Chain {
202            filters: vec![
203                column_filter(COLUMN_METADATA),
204                v2::RowFilter {
205                    filter: Some(v2::row_filter::Filter::ValueRegexFilter(
206                        b"^\\{\"is_redirect_tombstone\":true[,}].*".to_vec(),
207                    )),
208                },
209            ],
210        })),
211    }
212}
213
214/// Wraps `inner` so that it only matches live (non-expired) cells.
215fn live_row_filter(inner: v2::RowFilter) -> v2::RowFilter {
216    let now_micros = SystemTime::now()
217        .duration_since(SystemTime::UNIX_EPOCH)
218        .unwrap_or_default()
219        .as_millis() as i64
220        * 1000;
221
222    v2::RowFilter {
223        filter: Some(v2::row_filter::Filter::Interleave(
224            v2::row_filter::Interleave {
225                filters: vec![
226                    // Manual family: never expires.
227                    v2::RowFilter {
228                        filter: Some(v2::row_filter::Filter::Chain(v2::row_filter::Chain {
229                            filters: vec![
230                                v2::RowFilter {
231                                    filter: Some(v2::row_filter::Filter::FamilyNameRegexFilter(
232                                        format!("^{FAMILY_MANUAL}$"),
233                                    )),
234                                },
235                                inner.clone(),
236                            ],
237                        })),
238                    },
239                    // GC family: only match non-expired cells.
240                    v2::RowFilter {
241                        filter: Some(v2::row_filter::Filter::Chain(v2::row_filter::Chain {
242                            filters: vec![
243                                v2::RowFilter {
244                                    filter: Some(v2::row_filter::Filter::FamilyNameRegexFilter(
245                                        format!("^{FAMILY_GC}$"),
246                                    )),
247                                },
248                                v2::RowFilter {
249                                    filter: Some(v2::row_filter::Filter::TimestampRangeFilter(
250                                        v2::TimestampRange {
251                                            start_timestamp_micros: now_micros,
252                                            end_timestamp_micros: 0,
253                                        },
254                                    )),
255                                },
256                                inner,
257                            ],
258                        })),
259                    },
260                ],
261            },
262        )),
263    }
264}
265
266/// Builds a raw row filter that matches any tombstone row, new- or legacy-format.
267///
268/// New format: presence of the `r` column.
269/// Legacy format: `is_redirect_tombstone: true` in the `m` column JSON.
270///
271/// After legacy tombstones expire naturally this simplifies to just
272/// `column_filter(COLUMN_REDIRECT)`.
273fn tombstone_filter() -> v2::RowFilter {
274    let filter = v2::RowFilter {
275        filter: Some(v2::row_filter::Filter::Interleave(
276            v2::row_filter::Interleave {
277                filters: vec![column_filter(COLUMN_REDIRECT), legacy_tombstone_filter()],
278            },
279        )),
280    };
281    live_row_filter(filter)
282}
283
284/// Returns a [`MutatePredicate`] that matches any tombstone row.
285///
286/// Mutations run only when no tombstone is present (`predicate_matched == false`).
287/// Used by [`BigTableBackend::put_non_tombstone`], [`BigTableBackend::delete_non_tombstone`],
288/// and [`BigTableBackend::compare_and_write`] as the `CheckAndMutateRow` predicate.
289fn tombstone_predicate() -> MutatePredicate {
290    MutatePredicate::Exclude(tombstone_filter())
291}
292
293/// Builds an anchored regex pattern (`^…$`) that matches `value` literally.
294///
295/// Uses [`regex::escape`] so that metacharacters in storage paths (`.`, `/`, etc.)
296/// are treated as literal bytes.
297fn exact_value_regex(value: &str) -> Vec<u8> {
298    format!("^{}$", regex::escape(value)).into_bytes()
299}
300
301/// Matches tombstones whose redirect resolves to `target`.
302///
303/// ## Predicate Matches
304///
305/// Must be used with `true_mutations` and `predicate_matched == true`.
306///
307/// ## Details
308///
309/// Always includes an exact match on the `r` (redirect) column:
310/// - Chain: `r` column present AND value == `target` storage path
311///
312/// When `target == own_id` (the caller expects a legacy identity redirect), the
313/// exact match is wrapped in an Interleave with two additional fallbacks:
314/// - Chain: `r` column present AND value == `b""` (empty-sentinel written before the redirect
315///   column stored the path)
316/// - Chain: `m` column present AND value matches `{"is_redirect_tombstone":true...}` regex
317///   (legacy metadata format predating the dedicated `r` column)
318fn redirect_target_filter(target: &ObjectId, own_id: &ObjectId) -> v2::RowFilter {
319    let target_path = exact_value_regex(&target.as_storage_path().to_string());
320
321    let exact_match = v2::RowFilter {
322        filter: Some(v2::row_filter::Filter::Chain(v2::row_filter::Chain {
323            filters: vec![
324                column_filter(COLUMN_REDIRECT),
325                v2::RowFilter {
326                    filter: Some(v2::row_filter::Filter::ValueRegexFilter(target_path)),
327                },
328            ],
329        })),
330    };
331
332    if target != own_id {
333        return live_row_filter(exact_match);
334    }
335
336    let empty_redirect_match = v2::RowFilter {
337        filter: Some(v2::row_filter::Filter::Chain(v2::row_filter::Chain {
338            filters: vec![
339                column_filter(COLUMN_REDIRECT),
340                v2::RowFilter {
341                    filter: Some(v2::row_filter::Filter::ValueRegexFilter(b"^$".to_vec())),
342                },
343            ],
344        })),
345    };
346
347    // Also match legacy tombstones that resolve to the HV id:
348    // - empty `r` value (written before the redirect column stored the path)
349    // - legacy `m` column format (`is_redirect_tombstone: true`)
350    let filter = v2::RowFilter {
351        filter: Some(v2::row_filter::Filter::Interleave(
352            v2::row_filter::Interleave {
353                filters: vec![exact_match, empty_redirect_match, legacy_tombstone_filter()],
354            },
355        )),
356    };
357    live_row_filter(filter)
358}
359
360/// Returns a [`MutatePredicate`] that matches tombstones whose redirect resolves to either `old` or `new`.
361///
362/// Mutations run only when the predicate matches (`predicate_matched == true`):
363/// equivalent to `t == old || t == new`. Built as an Interleave of two
364/// [`redirect_target_filter`] calls — yields cells iff at least one branch matches.
365/// An absent row or non-tombstone row yields 0 cells, so `predicate_matched = false` (conflict).
366fn update_predicate(old: &ObjectId, new: &ObjectId, own_id: &ObjectId) -> MutatePredicate {
367    MutatePredicate::Include(v2::RowFilter {
368        filter: Some(v2::row_filter::Filter::Interleave(
369            v2::row_filter::Interleave {
370                filters: vec![
371                    redirect_target_filter(old, own_id),
372                    redirect_target_filter(new, own_id),
373                ],
374            },
375        )),
376    })
377}
378
379/// Returns a [`MutatePredicate`] that matches rows where no conflicting tombstone exists.
380///
381/// Mutations run only when the row is conflict-free (`predicate_matched == false`):
382/// no tombstone is present, or the tombstone's redirect already points to `target`.
383///
384/// Built as an inverted `Condition` filter:
385/// - Predicate: [`redirect_target_filter`]`(target)` — tombstone already points to `target`?
386/// - True branch: `BlockAllFilter` → 0 cells (already at target, safe state).
387/// - False branch: [`tombstone_filter`] → 0 cells when no tombstone exists.
388///
389/// Both safe states yield 0 cells, so `predicate_matched = false` in both cases.
390fn optional_target_predicate(target: &ObjectId, own_id: &ObjectId) -> MutatePredicate {
391    MutatePredicate::Exclude(v2::RowFilter {
392        filter: Some(v2::row_filter::Filter::Condition(Box::new(
393            v2::row_filter::Condition {
394                predicate_filter: Some(Box::new(redirect_target_filter(target, own_id))),
395                true_filter: Some(Box::new(v2::RowFilter {
396                    filter: Some(v2::row_filter::Filter::BlockAllFilter(true)),
397                })),
398                false_filter: Some(Box::new(tombstone_filter())),
399            },
400        ))),
401    })
402}
403
404/// The condition under which a [`BigTableBackend::check_and_mutate`] write proceeds.
405///
406/// Each variant pairs a row filter with the state that makes the write safe:
407/// `Include` writes when the row matches; `Exclude` writes when it does not.
408#[derive(Clone, Debug)]
409enum MutatePredicate {
410    /// Write proceeds when the filter matches the row.
411    ///
412    /// Mutations run in `true_mutations`; succeeds when `predicate_matched == true`.
413    Include(v2::RowFilter),
414    /// Write proceeds when the filter does not match the row.
415    ///
416    /// Mutations run in `false_mutations`; succeeds when `predicate_matched == false`.
417    Exclude(v2::RowFilter),
418}
419
420/// Creates a row filter that reads all non-payload columns (`m`, `r`, `t`).
421///
422/// Used by metadata-only reads to avoid fetching the (potentially large) payload column
423/// while still being able to detect both new- and legacy-format tombstones.
424fn metadata_filter() -> v2::RowFilter {
425    v2::RowFilter {
426        filter: Some(v2::row_filter::Filter::ColumnQualifierRegexFilter(
427            FILTER_META.to_owned(),
428        )),
429    }
430}
431
432fn mutation(mutation: mutation::Mutation) -> v2::Mutation {
433    v2::Mutation {
434        mutation: Some(mutation),
435    }
436}
437
438/// Creates a `DeleteFromRow` mutation wrapped in the outer [`v2::Mutation`] envelope.
439fn delete_row_mutation() -> v2::Mutation {
440    mutation(mutation::Mutation::DeleteFromRow(
441        mutation::DeleteFromRow {},
442    ))
443}
444
445/// Builds the three mutations that write an object row: clear existing data,
446/// then set the payload and metadata cells.
447///
448/// Used by both [`BigTableBackend::put_row`] (unconditional write) and
449/// [`BigTableBackend::put_non_tombstone`] (conditional write).
450fn object_mutations(
451    mut metadata: Metadata,
452    payload: Vec<u8>,
453    now: SystemTime,
454) -> Result<[v2::Mutation; 3]> {
455    let (family, timestamp_micros) = match metadata.expiration_policy {
456        ExpirationPolicy::Manual => (FAMILY_MANUAL, -1),
457        ExpirationPolicy::TimeToLive(ttl) => (FAMILY_GC, ttl_to_micros(ttl, now)?),
458        ExpirationPolicy::TimeToIdle(tti) => (FAMILY_GC, ttl_to_micros(tti, now)?),
459    };
460
461    // Record the payload size in the metadata before persisting it.
462    metadata.size = Some(payload.len());
463
464    let metadata_bytes = serde_json::to_vec(&metadata)
465        .map_err(|cause| Error::serde("failed to serialize metadata", cause))?;
466
467    Ok([
468        // NB: We explicitly delete the row to clear metadata on overwrite.
469        delete_row_mutation(),
470        mutation(mutation::Mutation::SetCell(mutation::SetCell {
471            family_name: family.to_owned(),
472            column_qualifier: COLUMN_PAYLOAD.to_owned(),
473            timestamp_micros,
474            value: payload,
475        })),
476        mutation(mutation::Mutation::SetCell(mutation::SetCell {
477            family_name: family.to_owned(),
478            column_qualifier: COLUMN_METADATA.to_owned(),
479            timestamp_micros,
480            value: metadata_bytes,
481        })),
482    ])
483}
484
485/// Metadata carried by tombstone rows in the `t` (tombstone-meta) column.
486///
487/// Tombstone-specific metadata evolves independently of object [`Metadata`]. Only fields
488/// that are meaningful on tombstones are included here.
489#[derive(Clone, Debug, Default, Deserialize, Serialize)]
490struct TombstoneMeta {
491    /// Expiration policy for this tombstone.
492    ///
493    /// Skipped during serialization when set to [`ExpirationPolicy::Manual`].
494    #[serde(default, skip_serializing_if = "ExpirationPolicy::is_manual")]
495    expiration_policy: ExpirationPolicy,
496}
497
498/// Builds the three mutations that write a tombstone row: clear existing data,
499/// then set the redirect sentinel and tombstone-meta cells.
500///
501/// Used by both [`BigTableBackend::put_tombstone_row`] (unconditional write) and the
502/// TTI bump path in tiered reads.
503fn tombstone_mutations(tombstone: &Tombstone, now: SystemTime) -> Result<[v2::Mutation; 3]> {
504    let (family, timestamp_micros) = match tombstone.expiration_policy {
505        ExpirationPolicy::Manual => (FAMILY_MANUAL, -1),
506        ExpirationPolicy::TimeToLive(ttl) => (FAMILY_GC, ttl_to_micros(ttl, now)?),
507        ExpirationPolicy::TimeToIdle(tti) => (FAMILY_GC, ttl_to_micros(tti, now)?),
508    };
509
510    let tombstone_meta = TombstoneMeta {
511        expiration_policy: tombstone.expiration_policy,
512    };
513
514    Ok([
515        delete_row_mutation(),
516        mutation(mutation::Mutation::SetCell(mutation::SetCell {
517            family_name: family.to_owned(),
518            column_qualifier: COLUMN_REDIRECT.to_owned(),
519            timestamp_micros,
520            value: tombstone.target.as_storage_path().to_string().into_bytes(),
521        })),
522        mutation(mutation::Mutation::SetCell(mutation::SetCell {
523            family_name: family.to_owned(),
524            column_qualifier: COLUMN_TOMBSTONE_META.to_owned(),
525            timestamp_micros,
526            value: serde_json::to_vec(&tombstone_meta)
527                .map_err(|cause| Error::serde("failed to serialize tombstone", cause))?,
528        })),
529    ])
530}
531
532/// Subset of [`Metadata`] that indicates a row is a tombstone instead of a real object.
533///
534/// Used to construct [`RowData`].
535#[derive(Debug, Deserialize)]
536struct LegacyTombstoneMeta {
537    /// Internal redirect tombstone marker.
538    ///
539    /// When `true`, this object is a legacy tombstone. This implies:
540    ///  - the payload is empty
541    ///  - metadata other than the expiration policy is not meaningful
542    ///  - the `r` and `t` columns are not present
543    #[serde(default)]
544    is_redirect_tombstone: bool,
545
546    /// Expiration policy for this tombstone.
547    #[serde(default)]
548    expiration_policy: ExpirationPolicy,
549}
550
551/// Parsed data from a BigTable row's cells.
552enum RowData {
553    /// A regular object row with payload and metadata.
554    Object {
555        metadata: Metadata,
556        payload: Vec<u8>,
557    },
558    /// A tombstone row indicating the real payload lives on the long-term backend.
559    Tombstone {
560        target: Vec<u8>,
561        meta: TombstoneMeta,
562        time_expires: Option<SystemTime>,
563    },
564}
565
566impl RowData {
567    /// Parses a set of row cells into a [`RowData`].
568    ///
569    /// New-format tombstones are identified by the presence of the `r` column.
570    /// Legacy tombstones (written before the column migration) are identified by
571    /// `is_redirect_tombstone: true` in the `m` column JSON; a
572    /// `bigtable.legacy_tombstone_read` metric is emitted on each such read.
573    fn from_cells(cells: Vec<RowCell>) -> Result<Self> {
574        let mut metadata_opt: Option<Metadata> = None;
575        let mut tombstone_meta_opt: Option<TombstoneMeta> = None;
576        let mut redirect_detected = false;
577        let mut redirect_target = Vec::new();
578        let mut expire_at = None;
579        let mut payload = Vec::new();
580
581        for cell in cells {
582            // NB: All cells are written with the same timestamp; last write is safe.
583
584            // Only derive expiration from GC-family cells — manual-family cells
585            // use server-assigned timestamps that don't represent expiration.
586            if cell.family_name == FAMILY_GC {
587                expire_at = micros_to_time(cell.timestamp_micros);
588            }
589
590            match cell.qualifier.as_slice() {
591                COLUMN_REDIRECT => {
592                    redirect_detected = true;
593                    redirect_target = cell.value;
594                }
595                COLUMN_PAYLOAD => {
596                    payload = cell.value;
597                }
598                COLUMN_TOMBSTONE_META => {
599                    tombstone_meta_opt =
600                        Some(serde_json::from_slice(&cell.value).map_err(|cause| {
601                            Error::serde("failed to deserialize tombstone meta", cause)
602                        })?);
603                }
604                COLUMN_METADATA => {
605                    if let Ok(legacy_meta) =
606                        serde_json::from_slice::<LegacyTombstoneMeta>(&cell.value)
607                        && legacy_meta.is_redirect_tombstone
608                    {
609                        redirect_detected = true;
610                        objectstore_metrics::count!("bigtable.legacy_tombstone_read");
611                        tombstone_meta_opt = Some(TombstoneMeta {
612                            expiration_policy: legacy_meta.expiration_policy,
613                        });
614                    } else {
615                        metadata_opt =
616                            Some(serde_json::from_slice(&cell.value).map_err(|cause| {
617                                Error::serde("failed to deserialize metadata", cause)
618                            })?);
619                    }
620                }
621                _ => {}
622            }
623        }
624
625        Ok(if redirect_detected {
626            RowData::Tombstone {
627                target: redirect_target,
628                meta: tombstone_meta_opt.unwrap_or_default(),
629                time_expires: expire_at,
630            }
631        } else {
632            // Metadata may have been skipped during read - payload-only read for TTI bump.
633            let mut metadata = metadata_opt.unwrap_or_default();
634            metadata.time_expires = expire_at;
635            RowData::Object { metadata, payload }
636        })
637    }
638
639    /// Returns the expiration policy for this row, regardless of variant.
640    fn expiration_policy(&self) -> ExpirationPolicy {
641        match self {
642            RowData::Object { metadata, .. } => metadata.expiration_policy,
643            RowData::Tombstone { meta, .. } => meta.expiration_policy,
644        }
645    }
646
647    /// Returns the resolved expiration timestamp for this row, regardless of variant.
648    fn time_expires(&self) -> Option<SystemTime> {
649        match self {
650            RowData::Object { metadata, .. } => metadata.time_expires,
651            RowData::Tombstone { time_expires, .. } => *time_expires,
652        }
653    }
654
655    /// Returns `true` if this row is expired as of the given `time`.
656    ///
657    /// Only applies to rows with an expiration policy set.
658    fn expires_before(&self, time: SystemTime) -> bool {
659        self.expiration_policy().is_timeout() && self.time_expires().is_some_and(|ts| ts < time)
660    }
661
662    /// Returns `true` if this row's TTI deadline should be bumped.
663    fn needs_tti_bump(&self) -> bool {
664        matches!(
665            self.expiration_policy(),
666            ExpirationPolicy::TimeToIdle(tti) if self.expires_before(SystemTime::now() + tti - TTI_DEBOUNCE)
667        )
668    }
669}
670
671/// Parses the raw `r` column bytes into a redirect target [`ObjectId`].
672///
673/// For tombstones with an empty `r` value, falls back to the ID of the tombstone
674/// itself and emits a `bigtable.empty_redirect_read` metric so deployments can
675/// track when it is safe to remove the legacy empty-value code path.
676fn parse_redirect_target(redirect_path: &[u8], tombstone_id: &ObjectId) -> Result<ObjectId> {
677    if redirect_path.is_empty() {
678        objectstore_metrics::count!("bigtable.empty_redirect_read");
679        Ok(tombstone_id.clone())
680    } else {
681        let redirect_str = std::str::from_utf8(redirect_path)
682            .map_err(|_| Error::generic("invalid UTF-8 in redirect path"))?;
683        ObjectId::from_storage_path(redirect_str)
684            .ok_or_else(|| Error::generic("corrupt redirect path"))
685    }
686}
687
688impl BigTableBackend {
689    /// Creates a new [`BigTableBackend`] from the given `config`.
690    ///
691    /// Pass an `endpoint` in the config to connect to a local emulator; omit it to use real GCP
692    /// credentials. `connections` controls the gRPC connection pool size (defaults to 1).
693    pub async fn new(config: BigTableConfig) -> anyhow::Result<Self> {
694        let BigTableConfig {
695            endpoint,
696            project_id,
697            instance_name,
698            table_name,
699            connections,
700        } = config;
701
702        let bigtable = if let Some(ref endpoint) = endpoint {
703            BigTableConnection::new_with_emulator(
704                endpoint,
705                &project_id,
706                &instance_name,
707                false, // is_read_only
708                Some(CONNECT_TIMEOUT),
709            )?
710        } else {
711            let token_provider = PrefetchingTokenProvider::gcp_auth(TOKEN_SCOPES).await?;
712            BigTableConnection::new_with_managed_transport(
713                &project_id,
714                &instance_name,
715                false, // is_read_only
716                Some(CONNECT_TIMEOUT),
717                Arc::new(token_provider),
718                connections.unwrap_or(1),
719                true, // prime_channels
720                None, // app_profile_id
721                MAX_CHANNEL_AGE,
722            )
723            .await?
724        };
725
726        let client = bigtable.client();
727
728        Ok(Self {
729            bigtable,
730            instance_path: format!("projects/{project_id}/instances/{instance_name}"),
731            table_path: client.get_full_table_name(&table_name),
732            table_name,
733        })
734    }
735
736    /// Reads a single row by key, returning parsed row data.
737    ///
738    /// Returns `None` if the row is absent or has expired.
739    async fn read_row(
740        &self,
741        path: &[u8],
742        filter: Option<v2::RowFilter>,
743        action: &'static str,
744    ) -> Result<Option<RowData>> {
745        let request = v2::ReadRowsRequest {
746            table_name: self.table_path.clone(),
747            rows: Some(v2::RowSet {
748                row_keys: vec![path.to_owned()],
749                row_ranges: vec![],
750            }),
751            filter,
752            rows_limit: 1,
753            ..Default::default()
754        };
755
756        let response = retry(action, || async {
757            self.bigtable.client().read_rows(request.clone()).await
758        })
759        .await?;
760        debug_assert!(response.len() <= 1, "Expected at most one row");
761
762        let Some((_, cells)) = response.into_iter().next() else {
763            objectstore_log::debug!("Object not found");
764            return Ok(None);
765        };
766
767        let row = RowData::from_cells(cells)?;
768        Ok(if row.expires_before(SystemTime::now()) {
769            None
770        } else {
771            Some(row)
772        })
773    }
774
775    async fn mutate(
776        &self,
777        path: Vec<u8>,
778        mutations: impl Into<Vec<v2::Mutation>>,
779        action: &'static str,
780    ) -> Result<v2::MutateRowResponse> {
781        let request = v2::MutateRowRequest {
782            table_name: self.table_path.clone(),
783            row_key: path,
784            mutations: mutations.into(),
785            ..Default::default()
786        };
787
788        let response = retry(action, || async {
789            self.bigtable.client().mutate_row(request.clone()).await
790        })
791        .await?;
792
793        Ok(response.into_inner())
794    }
795
796    async fn put_row(
797        &self,
798        path: Vec<u8>,
799        metadata: &Metadata,
800        payload: Vec<u8>,
801        action: &'static str,
802    ) -> Result<v2::MutateRowResponse> {
803        let mutations = object_mutations(metadata.clone(), payload, SystemTime::now())?;
804        self.mutate(path, mutations, action).await
805    }
806
807    async fn put_tombstone_row(
808        &self,
809        path: Vec<u8>,
810        tombstone: &Tombstone,
811        action: &'static str,
812    ) -> Result<v2::MutateRowResponse> {
813        let mutations = tombstone_mutations(tombstone, SystemTime::now())?;
814        self.mutate(path, mutations, action).await
815    }
816
817    /// Best-effort TTI bump for a row.
818    ///
819    /// If the payload isn't loaded, it will be fetched. Failures are ignored silently.
820    async fn bump_tti(&self, path: Vec<u8>, row: &RowData, loaded: bool, hv_id: &ObjectId) {
821        let expiration_policy = row.expiration_policy();
822
823        match row {
824            RowData::Tombstone { target, .. } => {
825                let target = match parse_redirect_target(target, hv_id) {
826                    Ok(target) => target,
827                    Err(e) => {
828                        objectstore_log::error!(!!&e, "invalid redirect target in tombstone row");
829                        return;
830                    }
831                };
832
833                let tombstone = Tombstone {
834                    target,
835                    expiration_policy,
836                };
837                let _ = self.put_tombstone_row(path, &tombstone, "tti-bump").await;
838            }
839            RowData::Object { metadata, payload } if loaded => {
840                let _ = self
841                    .put_row(path, metadata, payload.clone(), "tti-bump")
842                    .await;
843            }
844            RowData::Object { metadata, .. } => {
845                let payload_read = self
846                    .read_row(&path, Some(column_filter(COLUMN_PAYLOAD)), "tti-bump")
847                    .await;
848
849                if let Ok(Some(RowData::Object { payload, .. })) = payload_read {
850                    let _ = self.put_row(path, metadata, payload, "tti-bump").await;
851                }
852            }
853        }
854    }
855
856    /// Executes a `CheckAndMutateRow` request.
857    async fn check_and_mutate(
858        &self,
859        row_key: Vec<u8>,
860        predicate: MutatePredicate,
861        mutations: impl Into<Vec<v2::Mutation>>,
862        context: &'static str,
863    ) -> Result<bool> {
864        let (filter, true_mutations, false_mutations, success_on_match) = match predicate {
865            MutatePredicate::Include(f) => (f, mutations.into(), vec![], true),
866            MutatePredicate::Exclude(f) => (f, vec![], mutations.into(), false),
867        };
868
869        let request = v2::CheckAndMutateRowRequest {
870            table_name: self.table_path.clone(),
871            row_key,
872            predicate_filter: Some(filter),
873            true_mutations,
874            false_mutations,
875            ..Default::default()
876        };
877
878        let future = retry(context, || async {
879            self.bigtable
880                .client()
881                .check_and_mutate_row(request.clone())
882                .await
883        });
884
885        Ok(future.await?.predicate_matched == success_on_match)
886    }
887}
888
889#[async_trait::async_trait]
890impl Backend for BigTableBackend {
891    fn name(&self) -> &'static str {
892        "bigtable"
893    }
894
895    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
896    async fn put_object(
897        &self,
898        id: &ObjectId,
899        metadata: &Metadata,
900        mut stream: ClientStream,
901    ) -> Result<PutResponse> {
902        objectstore_log::debug!("Writing to Bigtable backend");
903        let path = id.as_storage_path().to_string().into_bytes();
904
905        let mut payload = ChunkedBytes::new(0);
906        while let Some(chunk) = stream.try_next().await? {
907            payload.push(chunk);
908        }
909
910        self.put_row(path, metadata, payload.into_bytes().into(), "put")
911            .await?;
912        Ok(())
913    }
914
915    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
916    async fn get_object(&self, id: &ObjectId, range: Option<ByteRange>) -> Result<GetResponse> {
917        match self.get_tiered_object(id, range).await? {
918            TieredGet::Object(metadata, content_range, payload) => {
919                Ok(Some((metadata, content_range, payload)))
920            }
921            TieredGet::Tombstone(_) => Err(Error::UnexpectedTombstone),
922            TieredGet::NotFound => Ok(None),
923        }
924    }
925
926    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
927    async fn get_metadata(&self, id: &ObjectId) -> Result<MetadataResponse> {
928        match self.get_tiered_metadata(id).await? {
929            TieredMetadata::Object(metadata) => Ok(Some(metadata)),
930            TieredMetadata::Tombstone(_) => Err(Error::UnexpectedTombstone),
931            TieredMetadata::NotFound => Ok(None),
932        }
933    }
934
935    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
936    async fn delete_object(&self, id: &ObjectId) -> Result<DeleteResponse> {
937        objectstore_log::debug!("Deleting from Bigtable backend");
938
939        let path = id.as_storage_path().to_string().into_bytes();
940        self.mutate(path, [delete_row_mutation()], "delete").await?;
941
942        Ok(())
943    }
944}
945
946#[async_trait::async_trait]
947impl HighVolumeBackend for BigTableBackend {
948    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
949    async fn put_non_tombstone(
950        &self,
951        id: &ObjectId,
952        metadata: &Metadata,
953        payload: Bytes,
954    ) -> Result<Option<Tombstone>> {
955        objectstore_log::debug!("Conditional put to Bigtable backend");
956
957        let path = id.as_storage_path().to_string().into_bytes();
958        let mutations = object_mutations(metadata.clone(), payload.to_vec(), SystemTime::now())?;
959
960        for _ in 0..CAS_RETRY_COUNT {
961            let write_succeeded = self
962                .check_and_mutate(
963                    path.clone(),
964                    tombstone_predicate(),
965                    mutations.clone(),
966                    "put_non_tombstone",
967                )
968                .await?;
969
970            if write_succeeded {
971                return Ok(None);
972            }
973
974            // A tombstone was present: read its data for the caller.
975            let row = self
976                .read_row(&path, Some(metadata_filter()), "put_non_tombstone")
977                .await?;
978
979            match row {
980                Some(RowData::Tombstone { target, meta, .. }) => {
981                    return Ok(Some(Tombstone {
982                        target: parse_redirect_target(&target, id)?,
983                        expiration_policy: meta.expiration_policy,
984                    }));
985                }
986                // Race: Tombstone was replaced by an object, retry to overwrite
987                Some(RowData::Object { .. }) => continue,
988                // Race: Tombstone was deleted, retry to write.
989                None => continue,
990            }
991        }
992
993        Err(Error::generic("BigTable: race loop in put_non_tombstone"))
994    }
995
996    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
997    async fn get_tiered_object(
998        &self,
999        id: &ObjectId,
1000        range: Option<ByteRange>,
1001    ) -> Result<TieredGet> {
1002        objectstore_log::debug!("Reading from Bigtable backend");
1003        let path = id.as_storage_path().to_string().into_bytes();
1004
1005        let Some(row) = self.read_row(&path, None, "get_tiered_object").await? else {
1006            return Ok(TieredGet::NotFound);
1007        };
1008
1009        if row.needs_tti_bump() {
1010            self.bump_tti(path.clone(), &row, true, id).await;
1011        }
1012
1013        Ok(match row {
1014            RowData::Tombstone { meta, target, .. } => TieredGet::Tombstone(Tombstone {
1015                target: parse_redirect_target(&target, id)?,
1016                expiration_policy: meta.expiration_policy,
1017            }),
1018            RowData::Object { metadata, payload } => {
1019                let mut metadata = metadata;
1020                let payload = Bytes::from(payload);
1021                if metadata.size.is_none() {
1022                    // If object size wasn't written into the metadata, re-compute it now
1023                    metadata.size = Some(payload.len());
1024                }
1025
1026                let (content_range, payload) = apply_range(payload, range)?;
1027                TieredGet::Object(metadata, content_range, crate::stream::single(payload))
1028            }
1029        })
1030    }
1031
1032    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
1033    async fn get_tiered_metadata(&self, id: &ObjectId) -> Result<TieredMetadata> {
1034        objectstore_log::debug!("Reading metadata from Bigtable backend");
1035        let path = id.as_storage_path().to_string().into_bytes();
1036
1037        // Read metadata and tombstone columns — skip the (potentially large) payload.
1038        // NB: `metadata.size` will only be populated if the size was added to the metadata before
1039        // writing to Bigtable.
1040        let row_opt = self
1041            .read_row(&path, Some(metadata_filter()), "get_tiered_metadata")
1042            .await?;
1043        let Some(row) = row_opt else {
1044            return Ok(TieredMetadata::NotFound);
1045        };
1046
1047        if row.needs_tti_bump() {
1048            self.bump_tti(path.clone(), &row, false, id).await;
1049        }
1050
1051        Ok(match row {
1052            RowData::Tombstone { meta, target, .. } => TieredMetadata::Tombstone(Tombstone {
1053                target: parse_redirect_target(&target, id)?,
1054                expiration_policy: meta.expiration_policy,
1055            }),
1056            RowData::Object { metadata, .. } => TieredMetadata::Object(metadata),
1057        })
1058    }
1059
1060    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
1061    async fn delete_non_tombstone(&self, id: &ObjectId) -> Result<Option<Tombstone>> {
1062        objectstore_log::debug!("Conditional delete from Bigtable backend");
1063
1064        let path = id.as_storage_path().to_string().into_bytes();
1065
1066        for _ in 0..CAS_RETRY_COUNT {
1067            let write_succeeded = self
1068                .check_and_mutate(
1069                    path.clone(),
1070                    tombstone_predicate(),
1071                    [delete_row_mutation()],
1072                    "delete_non_tombstone",
1073                )
1074                .await?;
1075
1076            if write_succeeded {
1077                return Ok(None);
1078            }
1079
1080            // A tombstone was present: read its data for the caller.
1081            let row = self
1082                .read_row(&path, Some(metadata_filter()), "delete_non_tombstone")
1083                .await?;
1084
1085            match row {
1086                Some(RowData::Tombstone { target, meta, .. }) => {
1087                    return Ok(Some(Tombstone {
1088                        target: parse_redirect_target(&target, id)?,
1089                        expiration_policy: meta.expiration_policy,
1090                    }));
1091                }
1092                // Race: An object replaced the tombstone, delete the new object now.
1093                Some(RowData::Object { .. }) => continue,
1094                // Race: Entry was deleted in the meanwhile, nothing left to do.
1095                None => return Ok(None),
1096            }
1097        }
1098
1099        Err(Error::generic(
1100            "BigTable: race loop in delete_non_tombstone",
1101        ))
1102    }
1103
1104    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
1105    async fn compare_and_write(
1106        &self,
1107        id: &ObjectId,
1108        current: Option<&ObjectId>,
1109        write: TieredWrite,
1110    ) -> Result<bool> {
1111        objectstore_log::debug!("CAS put to Bigtable backend");
1112
1113        let path = id.as_storage_path().to_string().into_bytes();
1114        let now = SystemTime::now();
1115
1116        let predicate = match (current, write.target()) {
1117            (Some(old), Some(new)) => update_predicate(old, new, id),
1118            (Some(target), None) => optional_target_predicate(target, id),
1119            (None, Some(target)) => optional_target_predicate(target, id),
1120            (None, None) => tombstone_predicate(),
1121        };
1122
1123        let mutations = match write {
1124            TieredWrite::Tombstone(tombstone) => tombstone_mutations(&tombstone, now)?.into(),
1125            TieredWrite::Object(m, p) => object_mutations(m, p.to_vec(), now)?.into(),
1126            TieredWrite::Delete => vec![delete_row_mutation()],
1127        };
1128
1129        self.check_and_mutate(path, predicate, mutations, "compare_and_write")
1130            .await
1131    }
1132}
1133
1134/// Converts the given TTL duration to a microsecond-precision unix timestamp.
1135///
1136/// The TTL is anchored at the provided `from` timestamp, which defaults to `SystemTime::now()`. As
1137/// required by BigTable, the resulting timestamp has millisecond precision, with the last digits at
1138/// 0.
1139fn ttl_to_micros(ttl: Duration, from: SystemTime) -> Result<i64> {
1140    let deadline = from.checked_add(ttl).ok_or_else(|| Error::Generic {
1141        context: format!(
1142            "TTL duration overflow: {} plus {}s cannot be represented as SystemTime",
1143            humantime::format_rfc3339_seconds(from),
1144            ttl.as_secs()
1145        ),
1146        cause: None,
1147    })?;
1148    let millis = deadline
1149        .duration_since(SystemTime::UNIX_EPOCH)
1150        .map_err(|e| Error::Generic {
1151            context: format!(
1152                "unable to get duration since UNIX_EPOCH for SystemTime {}",
1153                humantime::format_rfc3339_seconds(deadline)
1154            ),
1155            cause: Some(Box::new(e)),
1156        })?
1157        .as_millis();
1158    (millis * 1000).try_into().map_err(|e| Error::Generic {
1159        context: format!("failed to convert {millis}ms to i64 microseconds"),
1160        cause: Some(Box::new(e)),
1161    })
1162}
1163
1164/// Converts a microsecond-precision unix timestamp to a `SystemTime`.
1165fn micros_to_time(micros: i64) -> Option<SystemTime> {
1166    let micros = u64::try_from(micros).ok()?;
1167    let duration = Duration::from_micros(micros);
1168    SystemTime::UNIX_EPOCH.checked_add(duration)
1169}
1170
1171/// Retries a BigTable RPC on transient errors.
1172async fn retry<T, F>(context: &'static str, f: impl Fn() -> F) -> Result<T>
1173where
1174    F: Future<Output = Result<T, BigTableError>> + Send,
1175{
1176    let mut retry_count = 0usize;
1177
1178    loop {
1179        match f().await {
1180            Ok(res) => return Ok(res),
1181            Err(e) if retry_count >= REQUEST_RETRY_COUNT || !is_retryable(&e) => {
1182                objectstore_metrics::count!("bigtable.failures", action = context);
1183                return Err(Error::Generic {
1184                    context: format!("Bigtable: `{context}` failed"),
1185                    cause: Some(Box::new(e)),
1186                });
1187            }
1188            Err(e) => {
1189                retry_count += 1;
1190                objectstore_metrics::count!("bigtable.retries", action = context);
1191                objectstore_log::warn!(!!&e, retry_count, context, "Retrying request");
1192            }
1193        }
1194    }
1195}
1196
1197fn is_retryable(error: &BigTableError) -> bool {
1198    match error {
1199        // Transient errors on auth token refresh
1200        BigTableError::GCPAuthError(_) => true,
1201        // Transient GRPC network failures
1202        BigTableError::TransportError(_) => true,
1203        // These could also indicate transient network failures
1204        BigTableError::IoError(_) => true,
1205        BigTableError::TimeoutError(_) => true,
1206
1207        // See https://docs.cloud.google.com/bigtable/docs/status-codes
1208        BigTableError::RpcError(status) => match status.code() {
1209            // Generic retriable status
1210            Code::Unavailable => true,
1211            // Timeouts
1212            Code::Cancelled => true,
1213            Code::DeadlineExceeded => true,
1214            // Token might have refreshed too late
1215            Code::Unauthenticated => true,
1216            // Unspecified, attempt to retry anyways
1217            Code::Aborted => true,
1218            Code::Internal => true,
1219            Code::FailedPrecondition => true,
1220            Code::Unknown => true,
1221            _ => false,
1222        },
1223        _ => false,
1224    }
1225}
1226
1227/// Resolves an optional byte range against a payload buffer, returning the
1228/// applicable content range and the (potentially narrowed) payload.
1229///
1230/// When `range` is `None`, returns the full payload unchanged. Uses
1231/// `Bytes::slice` to avoid copying data.
1232fn apply_range(payload: Bytes, range: Option<ByteRange>) -> Result<(Option<ContentRange>, Bytes)> {
1233    let Some(byte_range) = range else {
1234        return Ok((None, payload));
1235    };
1236
1237    let total = payload.len() as u64;
1238    let content_range = byte_range
1239        .resolve(total)
1240        .ok_or(Error::RangeNotSatisfiable { total })?;
1241
1242    let sliced = payload.slice(content_range.start as usize..content_range.end as usize + 1);
1243    Ok((Some(content_range), sliced))
1244}
1245
1246#[cfg(test)]
1247mod tests {
1248    use std::collections::BTreeMap;
1249
1250    use anyhow::Result;
1251    use objectstore_types::scope::{Scope, Scopes};
1252
1253    use super::*;
1254    use crate::id::ObjectContext;
1255    use crate::stream;
1256
1257    // NB: Most of these tests require a BigTable emulator running. This is done
1258    // automatically in CI.
1259    //
1260    // Refer to the readme for how to set up the emulator.
1261
1262    async fn create_test_backend() -> Result<BigTableBackend> {
1263        BigTableBackend::new(BigTableConfig {
1264            endpoint: Some("localhost:8086".into()),
1265            project_id: "testing".into(),
1266            instance_name: "objectstore".into(),
1267            table_name: "objectstore".into(),
1268            connections: None,
1269        })
1270        .await
1271    }
1272
1273    fn make_id() -> ObjectId {
1274        ObjectId::random(ObjectContext {
1275            usecase: "testing".into(),
1276            scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
1277        })
1278    }
1279
1280    async fn create_object(
1281        backend: &BigTableBackend,
1282        id: &ObjectId,
1283        metadata: &Metadata,
1284        payload: &[u8],
1285        now: SystemTime,
1286    ) -> Result<()> {
1287        let path = id.as_storage_path().to_string().into_bytes();
1288        let mutations = object_mutations(metadata.clone(), payload.to_vec(), now)?;
1289        backend.mutate(path, mutations, "test-setup").await?;
1290        Ok(())
1291    }
1292
1293    async fn create_tombstone(
1294        backend: &BigTableBackend,
1295        id: &ObjectId,
1296        tombstone: &Tombstone,
1297        now: SystemTime,
1298    ) -> Result<()> {
1299        let path = id.as_storage_path().to_string().into_bytes();
1300        let mutations = tombstone_mutations(tombstone, now)?;
1301        backend.mutate(path, mutations, "test-setup").await?;
1302        Ok(())
1303    }
1304
1305    /// Writes a legacy-format tombstone row directly into Bigtable.
1306    async fn write_legacy_tombstone(
1307        backend: &BigTableBackend,
1308        id: &ObjectId,
1309        expiration_policy: ExpirationPolicy,
1310        time_expires: Option<SystemTime>,
1311    ) -> Result<()> {
1312        let meta = if expiration_policy.is_manual() {
1313            r#"{"is_redirect_tombstone":true}"#.to_owned()
1314        } else {
1315            let policy_json = serde_json::to_string(&expiration_policy).unwrap();
1316            format!(r#"{{"is_redirect_tombstone":true,"expiration_policy":{policy_json}}}"#)
1317        };
1318
1319        let (family, timestamp_micros) = if expiration_policy.is_manual() {
1320            (FAMILY_MANUAL, -1)
1321        } else {
1322            let t =
1323                time_expires.unwrap_or(SystemTime::now() + expiration_policy.expires_in().unwrap());
1324            let timestamp = t
1325                .duration_since(SystemTime::UNIX_EPOCH)
1326                .unwrap()
1327                .as_millis();
1328            (FAMILY_GC, timestamp as i64 * 1000)
1329        };
1330
1331        let path = id.as_storage_path().to_string().into_bytes();
1332        let mutations = [mutation(mutation::Mutation::SetCell(mutation::SetCell {
1333            family_name: family.to_owned(),
1334            column_qualifier: COLUMN_METADATA.to_owned(),
1335            timestamp_micros,
1336            value: meta.into_bytes(),
1337        }))];
1338
1339        backend.mutate(path, mutations, "test-setup").await?;
1340
1341        Ok(())
1342    }
1343
1344    /// Writes a new-format tombstone row with an empty `r` value directly,
1345    /// simulating rows written by code before this change.
1346    async fn write_empty_redirect_tombstone(
1347        backend: &BigTableBackend,
1348        id: &ObjectId,
1349    ) -> Result<()> {
1350        let path = id.as_storage_path().to_string().into_bytes();
1351        let mutations = [
1352            mutation(mutation::Mutation::SetCell(mutation::SetCell {
1353                family_name: FAMILY_MANUAL.to_owned(),
1354                column_qualifier: COLUMN_REDIRECT.to_owned(),
1355                timestamp_micros: -1,
1356                value: b"".to_vec(), // empty — legacy format
1357            })),
1358            mutation(mutation::Mutation::SetCell(mutation::SetCell {
1359                family_name: FAMILY_MANUAL.to_owned(),
1360                column_qualifier: COLUMN_TOMBSTONE_META.to_owned(),
1361                timestamp_micros: -1,
1362                value: b"{}".to_vec(),
1363            })),
1364        ];
1365
1366        backend.mutate(path, mutations, "test-setup").await?;
1367
1368        Ok(())
1369    }
1370
1371    // --- Section 1: Object Operations ---
1372
1373    /// Verifies the full roundtrip: put → get_object (payload + metadata) → get_metadata (metadata).
1374    #[tokio::test]
1375    async fn test_roundtrip() -> Result<()> {
1376        let backend = create_test_backend().await?;
1377
1378        let id = make_id();
1379        let metadata = Metadata {
1380            content_type: "text/plain".into(),
1381            time_created: Some(SystemTime::now()),
1382            custom: BTreeMap::from_iter([("hello".into(), "world".into())]),
1383            ..Default::default()
1384        };
1385
1386        backend
1387            .put_object(&id, &metadata, stream::single("hello, world"))
1388            .await?;
1389
1390        let (obj_meta, _, stream) = backend.get_object(&id, None).await?.unwrap();
1391        let payload = stream::read_to_vec(stream).await?;
1392        assert_eq!(payload, b"hello, world");
1393        assert_eq!(obj_meta.content_type, metadata.content_type);
1394        assert_eq!(obj_meta.custom, metadata.custom);
1395
1396        let head_meta = backend.get_metadata(&id).await?.unwrap();
1397        assert_eq!(head_meta.content_type, metadata.content_type);
1398        assert_eq!(head_meta.custom, metadata.custom);
1399
1400        Ok(())
1401    }
1402
1403    /// Verifies that absent rows return None or succeed silently for all read/delete operations.
1404    #[tokio::test]
1405    async fn test_nonexistent() -> Result<()> {
1406        let backend = create_test_backend().await?;
1407
1408        let id = make_id();
1409        assert!(backend.get_object(&id, None).await?.is_none());
1410        assert!(backend.get_metadata(&id).await?.is_none());
1411        backend.delete_object(&id).await?;
1412
1413        Ok(())
1414    }
1415
1416    #[tokio::test]
1417    async fn test_overwrite() -> Result<()> {
1418        let backend = create_test_backend().await?;
1419
1420        let id = make_id();
1421        let first_metadata = Metadata {
1422            custom: BTreeMap::from_iter([("invalid".into(), "invalid".into())]),
1423            ..Default::default()
1424        };
1425        create_object(&backend, &id, &first_metadata, b"hello", SystemTime::now()).await?;
1426
1427        let second_metadata = Metadata {
1428            custom: BTreeMap::from_iter([("hello".into(), "world".into())]),
1429            ..Default::default()
1430        };
1431        backend
1432            .put_object(&id, &second_metadata, stream::single("world"))
1433            .await?;
1434
1435        let (meta, _, stream) = backend.get_object(&id, None).await?.unwrap();
1436        let payload = stream::read_to_vec(stream).await?;
1437        assert_eq!(payload, b"world");
1438        assert_eq!(meta.custom, second_metadata.custom);
1439
1440        Ok(())
1441    }
1442
1443    #[tokio::test]
1444    async fn test_read_after_delete() -> Result<()> {
1445        let backend = create_test_backend().await?;
1446
1447        let id = make_id();
1448        let metadata = Metadata::default();
1449        create_object(&backend, &id, &metadata, b"hello", SystemTime::now()).await?;
1450        backend.delete_object(&id).await?;
1451
1452        assert!(backend.get_object(&id, None).await?.is_none());
1453
1454        Ok(())
1455    }
1456
1457    /// Verifies TTI bump via both `get_object` (loaded=true path) and `get_metadata` (loaded=false path).
1458    ///
1459    /// The bump condition is: `expire_at < now + tti - TTI_DEBOUNCE`. We write a stale
1460    /// timestamp just inside the bump window (still in the future, so the row is not GC'd)
1461    /// and confirm that a subsequent read returns a later expiry.
1462    #[tokio::test]
1463    async fn test_tti_bump() -> Result<()> {
1464        let backend = create_test_backend().await?;
1465        // TTI must exceed TTI_DEBOUNCE (1 day) for the bump condition to be reachable.
1466        let tti = Duration::from_hours(2 * 24);
1467        let metadata = Metadata {
1468            expiration_policy: ExpirationPolicy::TimeToIdle(tti),
1469            ..Default::default()
1470        };
1471
1472        // Pass a backdated `now` so the written expiry is inside the bump window:
1473        // expire_at = past_now + tti = now - TTI_DEBOUNCE - 60s (stale but not yet expired).
1474        let past_now = SystemTime::now() - TTI_DEBOUNCE - Duration::from_mins(1);
1475
1476        // Sub-sequence 1: get_object triggers bump (loaded=true path).
1477        let id1 = make_id();
1478        create_object(&backend, &id1, &metadata, b"hello, world", past_now).await?;
1479
1480        // get_object reads the stale row, triggers bump, and returns the pre-bump metadata.
1481        let (pre_obj_meta, _, _) = backend.get_object(&id1, None).await?.unwrap();
1482        let pre_obj_expiry = pre_obj_meta.time_expires.unwrap();
1483
1484        // A second get_metadata reads the freshly bumped row.
1485        let post_obj_meta = backend.get_metadata(&id1).await?.unwrap();
1486        let post_obj_expiry = post_obj_meta.time_expires.unwrap();
1487        assert!(
1488            post_obj_expiry > pre_obj_expiry,
1489            "bump should extend expiry"
1490        );
1491
1492        // Sub-sequence 2: get_metadata triggers bump (loaded=false path).
1493        let id2 = make_id();
1494        create_object(&backend, &id2, &metadata, b"hello, world", past_now).await?;
1495
1496        // First get_metadata sees the stale row and triggers a bump.
1497        let pre_meta = backend.get_metadata(&id2).await?.unwrap();
1498        let pre_expiry = pre_meta.time_expires.unwrap();
1499
1500        // Second get_metadata reads the freshly bumped row.
1501        let post_meta = backend.get_metadata(&id2).await?.unwrap();
1502        let post_expiry = post_meta.time_expires.unwrap();
1503        assert!(post_expiry > pre_expiry, "bump should extend expiry");
1504
1505        // Payload must be intact after the loaded=false bump (which re-fetches the payload).
1506        let (_, _, stream) = backend.get_object(&id2, None).await?.unwrap();
1507        let payload = stream::read_to_vec(stream).await?;
1508        assert_eq!(payload, b"hello, world");
1509
1510        Ok(())
1511    }
1512
1513    #[tokio::test]
1514    async fn test_tti_no_bump_when_fresh() -> Result<()> {
1515        let backend = create_test_backend().await?;
1516
1517        let id = make_id();
1518        // TTI must exceed TTI_DEBOUNCE (1 day) for the bump condition to be reachable.
1519        let tti = Duration::from_hours(2 * 24);
1520        let metadata = Metadata {
1521            expiration_policy: ExpirationPolicy::TimeToIdle(tti),
1522            ..Default::default()
1523        };
1524        create_object(&backend, &id, &metadata, b"hello, world", SystemTime::now()).await?;
1525
1526        // A freshly written object has time_expires ≈ now + 2d, well outside the bump
1527        // window (now + 2d - 1d = now + 1d). No bump should occur.
1528        let first = backend.get_metadata(&id).await?.unwrap();
1529        let second = backend.get_metadata(&id).await?.unwrap();
1530
1531        assert_eq!(
1532            first.time_expires.unwrap(),
1533            second.time_expires.unwrap(),
1534            "fresh TTI object must not be bumped"
1535        );
1536
1537        Ok(())
1538    }
1539
1540    // --- Section 2: Expiration ---
1541
1542    #[tokio::test]
1543    async fn test_ttl_immediate() -> Result<()> {
1544        // NB: We create a TTL that immediately expires in this test. This might be optimized away
1545        // in a future implementation, so we will have to update this test accordingly.
1546
1547        let backend = create_test_backend().await?;
1548
1549        let id = make_id();
1550        let metadata = Metadata {
1551            expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_secs(0)),
1552            ..Default::default()
1553        };
1554        create_object(&backend, &id, &metadata, b"hello, world", SystemTime::now()).await?;
1555
1556        assert!(backend.get_object(&id, None).await?.is_none());
1557
1558        Ok(())
1559    }
1560
1561    #[tokio::test]
1562    async fn test_tti_immediate() -> Result<()> {
1563        // NB: We create a TTI that immediately expires in this test. This might be optimized away
1564        // in a future implementation, so we will have to update this test accordingly.
1565
1566        let backend = create_test_backend().await?;
1567
1568        let id = make_id();
1569        let metadata = Metadata {
1570            expiration_policy: ExpirationPolicy::TimeToIdle(Duration::from_secs(0)),
1571            ..Default::default()
1572        };
1573        create_object(&backend, &id, &metadata, b"hello, world", SystemTime::now()).await?;
1574
1575        assert!(backend.get_object(&id, None).await?.is_none());
1576
1577        Ok(())
1578    }
1579
1580    // --- Section 3: Tiered Operations ---
1581
1582    /// Covers all three row states for `get_tiered_object` and `get_tiered_metadata`.
1583    ///
1584    /// - **empty**: both return NotFound.
1585    /// - **object**: put_object, both return the Object variant with correct payload/metadata.
1586    /// - **tombstone**: CAS-write with a distinct `lt_id`, both return the Tombstone variant
1587    ///   with `target == lt_id`.
1588    #[tokio::test]
1589    async fn test_tiered_get() -> Result<()> {
1590        let backend = create_test_backend().await?;
1591
1592        // empty
1593        let id = make_id();
1594        assert!(matches!(
1595            backend.get_tiered_object(&id, None).await?,
1596            TieredGet::NotFound
1597        ));
1598        assert!(matches!(
1599            backend.get_tiered_metadata(&id).await?,
1600            TieredMetadata::NotFound
1601        ));
1602
1603        // object
1604        let id = make_id();
1605        let put_meta = Metadata {
1606            content_type: "text/plain".into(),
1607            custom: BTreeMap::from_iter([("k".into(), "v".into())]),
1608            ..Default::default()
1609        };
1610        create_object(&backend, &id, &put_meta, b"payload", SystemTime::now()).await?;
1611
1612        let TieredGet::Object(obj_meta, _, obj_stream) =
1613            backend.get_tiered_object(&id, None).await?
1614        else {
1615            panic!("expected TieredGet::Object");
1616        };
1617        let obj_payload = stream::read_to_vec(obj_stream).await?;
1618        assert_eq!(obj_payload, b"payload");
1619        assert_eq!(obj_meta.content_type, put_meta.content_type);
1620        assert_eq!(obj_meta.custom, put_meta.custom);
1621
1622        let TieredMetadata::Object(head_meta) = backend.get_tiered_metadata(&id).await? else {
1623            panic!("expected TieredMetadata::Object");
1624        };
1625        assert_eq!(head_meta.content_type, put_meta.content_type);
1626        assert_eq!(head_meta.custom, put_meta.custom);
1627
1628        // tombstone
1629        let hv_id = make_id();
1630        let lt_id = ObjectId::random(hv_id.context().clone());
1631        let tombstone = Tombstone {
1632            target: lt_id.clone(),
1633            expiration_policy: ExpirationPolicy::Manual,
1634        };
1635        create_tombstone(&backend, &hv_id, &tombstone, SystemTime::now()).await?;
1636
1637        match backend.get_tiered_object(&hv_id, None).await? {
1638            TieredGet::Tombstone(get_t) => assert_eq!(get_t.target, lt_id),
1639            other => panic!("expected TieredGet::Tombstone, got {other:?}"),
1640        }
1641        match backend.get_tiered_metadata(&hv_id).await? {
1642            TieredMetadata::Tombstone(meta_t) => assert_eq!(meta_t.target, lt_id,),
1643            other => panic!("expected TieredMetadata::Tombstone, got {other:?}"),
1644        }
1645
1646        Ok(())
1647    }
1648
1649    /// Covers all three row states for `put_non_tombstone`.
1650    ///
1651    /// - **empty**: returns None, object is readable.
1652    /// - **object**: overwrites with new payload, returns None.
1653    /// - **tombstone**: returns Some(Tombstone) with the correct target; tombstone still intact.
1654    #[tokio::test]
1655    async fn test_put_non_tombstone() -> Result<()> {
1656        let backend = create_test_backend().await?;
1657
1658        // empty: put_non_tombstone on absent row succeeds and makes object readable.
1659        let id = make_id();
1660        let metadata = Metadata::default();
1661        let result = backend
1662            .put_non_tombstone(&id, &metadata, Bytes::from_static(b"first"))
1663            .await?;
1664        assert_eq!(result, None, "expected None on empty row");
1665        let (_, _, stream) = backend.get_object(&id, None).await?.unwrap();
1666        assert_eq!(&stream::read_to_vec(stream).await?, b"first");
1667
1668        // object: put_non_tombstone on existing object replaces payload, returns None.
1669        let id = make_id();
1670        create_object(&backend, &id, &metadata, b"old", SystemTime::now()).await?;
1671        let result = backend
1672            .put_non_tombstone(&id, &metadata, Bytes::from_static(b"new"))
1673            .await?;
1674        assert_eq!(result, None, "expected None when overwriting object");
1675        let (_, _, stream) = backend.get_object(&id, None).await?.unwrap();
1676        assert_eq!(&stream::read_to_vec(stream).await?, b"new");
1677
1678        // tombstone: put_non_tombstone returns Some(Tombstone) and leaves tombstone intact.
1679        let hv_id = make_id();
1680        let lt_id = ObjectId::random(hv_id.context().clone());
1681        let tombstone = Tombstone {
1682            target: lt_id.clone(),
1683            expiration_policy: ExpirationPolicy::Manual,
1684        };
1685        create_tombstone(&backend, &hv_id, &tombstone, SystemTime::now()).await?;
1686        let result = backend
1687            .put_non_tombstone(&hv_id, &metadata, Bytes::new())
1688            .await?;
1689        let returned = result.expect("expected Some(Tombstone) when row is a tombstone");
1690        assert_eq!(returned.target, lt_id);
1691        assert!(
1692            matches!(
1693                backend.get_tiered_metadata(&hv_id).await?,
1694                TieredMetadata::Tombstone(_)
1695            ),
1696            "tombstone must still exist after put_non_tombstone"
1697        );
1698
1699        Ok(())
1700    }
1701
1702    /// Covers all three row states for `delete_non_tombstone`.
1703    ///
1704    /// - **empty**: returns None.
1705    /// - **object**: returns None, row gone.
1706    /// - **tombstone**: returns Some(Tombstone) with correct target; tombstone still intact.
1707    ///
1708    /// Verifies that the `r` column is correctly detected by both the `ReadRows` column
1709    /// filter and the `CheckAndMutate` `tombstone_predicate`.
1710    #[tokio::test]
1711    async fn test_delete_non_tombstone() -> Result<()> {
1712        let backend = create_test_backend().await?;
1713
1714        // empty
1715        let id = make_id();
1716        assert_eq!(backend.delete_non_tombstone(&id).await?, None);
1717
1718        // object
1719        let id = make_id();
1720        let metadata = Metadata::default();
1721        create_object(&backend, &id, &metadata, b"hello, world", SystemTime::now()).await?;
1722        assert_eq!(backend.delete_non_tombstone(&id).await?, None);
1723        assert!(backend.get_object(&id, None).await?.is_none());
1724
1725        // tombstone
1726        let id = make_id();
1727        let tombstone = Tombstone {
1728            target: id.clone(),
1729            expiration_policy: ExpirationPolicy::Manual,
1730        };
1731        create_tombstone(&backend, &id, &tombstone, SystemTime::now()).await?;
1732        let tombstone = backend
1733            .delete_non_tombstone(&id)
1734            .await?
1735            .expect("expected Some(tombstone)");
1736        assert_eq!(tombstone.target, id, "tombstone target must be returned");
1737        assert!(
1738            matches!(
1739                backend.get_tiered_metadata(&id).await?,
1740                TieredMetadata::Tombstone(_)
1741            ),
1742            "tombstone must still exist after delete_non_tombstone"
1743        );
1744
1745        Ok(())
1746    }
1747
1748    // --- Section 4: Compare-and-Write ---
1749
1750    /// Creating a tombstone on an empty row succeeds; a retry of the same CAS also succeeds.
1751    ///
1752    /// After creation, both tiered and legacy APIs reflect the tombstone.
1753    #[tokio::test]
1754    async fn test_cas_create_tombstone() -> Result<()> {
1755        let backend = create_test_backend().await?;
1756
1757        let hv_id = make_id();
1758        let lt_id = ObjectId::random(hv_id.context().clone());
1759        let expiration_policy = ExpirationPolicy::TimeToLive(Duration::from_hours(1));
1760        let tombstone = Tombstone {
1761            target: lt_id.clone(),
1762            expiration_policy,
1763        };
1764
1765        // First create succeeds.
1766        let committed = backend
1767            .compare_and_write(&hv_id, None, TieredWrite::Tombstone(tombstone.clone()))
1768            .await?;
1769        assert!(committed, "expected CAS success on empty row");
1770
1771        // Tiered reads must see the tombstone with correct target and policy.
1772        let TieredMetadata::Tombstone(t) = backend.get_tiered_metadata(&hv_id).await? else {
1773            panic!("expected TieredMetadata::Tombstone");
1774        };
1775        assert_eq!(t.target, lt_id, "target must round-trip via r column");
1776        assert_eq!(t.expiration_policy, expiration_policy);
1777        match backend.get_tiered_object(&hv_id, None).await? {
1778            TieredGet::Tombstone(t) => assert_eq!(t.target, lt_id, "round-trip via r column"),
1779            other => panic!("expected TieredGet::Tombstone, got {other:?}"),
1780        }
1781
1782        // Legacy reads must error rather than leak tombstone data.
1783        assert!(matches!(
1784            backend.get_object(&hv_id, None).await,
1785            Err(Error::UnexpectedTombstone)
1786        ));
1787        assert!(matches!(
1788            backend.get_metadata(&hv_id).await,
1789            Err(Error::UnexpectedTombstone)
1790        ));
1791
1792        // Idempotent retry: retry with the same target succeeds
1793        let second = backend
1794            .compare_and_write(&hv_id, None, TieredWrite::Tombstone(tombstone))
1795            .await?;
1796        assert!(second, "idempotent retry");
1797
1798        Ok(())
1799    }
1800
1801    /// Swapping a tombstone target: wrong expected → false, correct expected → true.
1802    #[tokio::test]
1803    async fn test_cas_swap_tombstone() -> Result<()> {
1804        let backend = create_test_backend().await?;
1805
1806        let hv_id = make_id();
1807        let old_lt_id = ObjectId::random(hv_id.context().clone());
1808        let wrong_lt_id = ObjectId::random(hv_id.context().clone());
1809        let new_lt_id = ObjectId::random(hv_id.context().clone());
1810
1811        let tombstone = Tombstone {
1812            target: old_lt_id.clone(),
1813            expiration_policy: ExpirationPolicy::Manual,
1814        };
1815        create_tombstone(&backend, &hv_id, &tombstone, SystemTime::now()).await?;
1816
1817        // Wrong target: CAS fails, tombstone unchanged.
1818        let write = TieredWrite::Tombstone(Tombstone {
1819            target: new_lt_id.clone(),
1820            expiration_policy: ExpirationPolicy::Manual,
1821        });
1822        let swapped = backend
1823            .compare_and_write(&hv_id, Some(&wrong_lt_id), write.clone())
1824            .await?;
1825        assert!(!swapped, "expected CAS failure due to wrong target");
1826        match backend.get_tiered_metadata(&hv_id).await? {
1827            TieredMetadata::Tombstone(t) => assert_eq!(t.target, old_lt_id),
1828            other => panic!("expected tombstone, got {other:?}"),
1829        }
1830
1831        // Correct target: CAS succeeds, target updated.
1832        let swapped = backend
1833            .compare_and_write(&hv_id, Some(&old_lt_id), write.clone())
1834            .await?;
1835        assert!(swapped, "expected CAS success with correct target");
1836        match backend.get_tiered_metadata(&hv_id).await? {
1837            TieredMetadata::Tombstone(t) => assert_eq!(t.target, new_lt_id),
1838            other => panic!("expected tombstone, got {other:?}"),
1839        }
1840
1841        // Idempotent retry: same A→B swap returns true.
1842        let retry = backend
1843            .compare_and_write(&hv_id, Some(&old_lt_id), write)
1844            .await?;
1845        assert!(retry, "idempotent retry");
1846
1847        Ok(())
1848    }
1849
1850    /// Swapping a tombstone for inline object data: wrong expected → false, correct → true.
1851    #[tokio::test]
1852    async fn test_cas_swap_inline() -> Result<()> {
1853        let backend = create_test_backend().await?;
1854
1855        let id = make_id();
1856        let lt_id = ObjectId::random(id.context().clone());
1857        let wrong_id = ObjectId::random(id.context().clone());
1858
1859        let tombstone = Tombstone {
1860            target: lt_id.clone(),
1861            expiration_policy: ExpirationPolicy::Manual,
1862        };
1863        create_tombstone(&backend, &id, &tombstone, SystemTime::now()).await?;
1864
1865        // Wrong target: CAS fails, tombstone intact.
1866        let write = TieredWrite::Object(Metadata::default(), Bytes::new());
1867        let swapped = backend
1868            .compare_and_write(&id, Some(&wrong_id), write)
1869            .await?;
1870        assert!(!swapped, "expected CAS failure with wrong target");
1871        assert!(matches!(
1872            backend.get_tiered_metadata(&id).await?,
1873            TieredMetadata::Tombstone(_)
1874        ));
1875
1876        // Correct target: CAS succeeds, row becomes an inline object.
1877        let payload = Bytes::from_static(b"hello inline");
1878        let write = TieredWrite::Object(Metadata::default(), payload.clone());
1879        let swapped = backend
1880            .compare_and_write(&id, Some(&lt_id), write.clone())
1881            .await?;
1882        assert!(swapped, "expected CAS success with correct target");
1883        let TieredGet::Object(_, _, stream) = backend.get_tiered_object(&id, None).await? else {
1884            panic!("expected inline object after swap");
1885        };
1886        assert_eq!(&stream::read_to_vec(stream).await?, payload.as_ref());
1887
1888        // Idempotent retry: row is already inline (no tombstone), same CAS returns true.
1889        let retry = backend.compare_and_write(&id, Some(&lt_id), write).await?;
1890        assert!(retry, "idempotent retry");
1891
1892        Ok(())
1893    }
1894
1895    /// CAS-write an object onto an empty row (expected=None, write=Object) succeeds.
1896    #[tokio::test]
1897    async fn test_cas_create_object_on_empty_row() -> Result<()> {
1898        let backend = create_test_backend().await?;
1899
1900        let id = make_id();
1901        let payload = Bytes::from_static(b"cas object");
1902        let write = TieredWrite::Object(Metadata::default(), payload.clone());
1903        let committed = backend.compare_and_write(&id, None, write).await?;
1904        assert!(committed, "expected CAS success on empty row");
1905
1906        let TieredGet::Object(_, _, stream) = backend.get_tiered_object(&id, None).await? else {
1907            panic!("expected Object after CAS-create");
1908        };
1909        assert_eq!(&stream::read_to_vec(stream).await?, payload.as_ref());
1910
1911        Ok(())
1912    }
1913
1914    /// CAS-delete: wrong expected → false; correct expected → true, row gone.
1915    #[tokio::test]
1916    async fn test_cas_delete() -> Result<()> {
1917        let backend = create_test_backend().await?;
1918
1919        let id = make_id();
1920        let lt_id = ObjectId::random(id.context().clone());
1921        let wrong_id = ObjectId::random(id.context().clone());
1922
1923        let tombstone = Tombstone {
1924            target: lt_id.clone(),
1925            expiration_policy: ExpirationPolicy::Manual,
1926        };
1927        create_tombstone(&backend, &id, &tombstone, SystemTime::now()).await?;
1928
1929        // Wrong target: fails, row preserved.
1930        let deleted = backend
1931            .compare_and_write(&id, Some(&wrong_id), TieredWrite::Delete)
1932            .await?;
1933        assert!(!deleted, "expected CAS failure with wrong target");
1934        assert!(matches!(
1935            backend.get_tiered_metadata(&id).await?,
1936            TieredMetadata::Tombstone(_)
1937        ));
1938
1939        // Correct target: succeeds, row gone.
1940        let deleted = backend
1941            .compare_and_write(&id, Some(&lt_id), TieredWrite::Delete)
1942            .await?;
1943        assert!(deleted, "expected CAS delete success");
1944        assert!(matches!(
1945            backend.get_tiered_metadata(&id).await?,
1946            TieredMetadata::NotFound
1947        ));
1948
1949        // Idempotent retry: row is already absent (no tombstone), same delete returns true.
1950        let retry = backend
1951            .compare_and_write(&id, Some(&lt_id), TieredWrite::Delete)
1952            .await?;
1953        assert!(retry, "idempotent retry");
1954
1955        // Inline object replaced tombstone: Safe to delete since it is an idempotent operation.
1956        let id2 = make_id();
1957        let fake_lt_id = ObjectId::random(id2.context().clone());
1958        let metadata = Metadata::default();
1959        create_object(&backend, &id2, &metadata, b"data", SystemTime::now()).await?;
1960        let deleted = backend
1961            .compare_and_write(&id2, Some(&fake_lt_id), TieredWrite::Delete)
1962            .await?;
1963        assert!(deleted, "expected idempotent deletion");
1964
1965        Ok(())
1966    }
1967
1968    // --- Section 5: Legacy Tombstone Compatibility ---
1969
1970    /// Legacy Manual and TTL tombstones are correctly read via the tiered APIs.
1971    ///
1972    /// Uses `Manual` expiration so `timestamp_micros = -1` (server-assigned ≈ write time)
1973    /// does not trigger immediate expiry.
1974    #[tokio::test]
1975    async fn test_legacy_tombstone_reads() -> Result<()> {
1976        let backend = create_test_backend().await?;
1977
1978        // Manual policy: get_tiered_metadata returns Tombstone(Manual), get_tiered_object returns Tombstone.
1979        let id = make_id();
1980        write_legacy_tombstone(&backend, &id, ExpirationPolicy::Manual, None).await?;
1981
1982        let TieredMetadata::Tombstone(t) = backend.get_tiered_metadata(&id).await? else {
1983            panic!("expected tombstone");
1984        };
1985        assert_eq!(t.expiration_policy, ExpirationPolicy::Manual);
1986        assert!(matches!(
1987            backend.get_tiered_object(&id, None).await?,
1988            TieredGet::Tombstone(_)
1989        ));
1990
1991        // TTL policy: get_tiered_metadata returns Tombstone with the correct TTL policy.
1992        //
1993        // A future cell timestamp (now + TTL) is required so `expires_before` does not
1994        // immediately filter the row.
1995        let id = make_id();
1996        let ttl = Duration::from_hours(2 * 24);
1997        write_legacy_tombstone(&backend, &id, ExpirationPolicy::TimeToLive(ttl), None).await?;
1998
1999        let TieredMetadata::Tombstone(t) = backend.get_tiered_metadata(&id).await? else {
2000            panic!("expected TieredMetadata::Tombstone");
2001        };
2002        assert_eq!(t.expiration_policy, ExpirationPolicy::TimeToLive(ttl));
2003
2004        Ok(())
2005    }
2006
2007    /// A legacy tombstone with TTI policy is upgraded to the new `r`/`t` column format on read.
2008    ///
2009    /// The bump path calls `put_tombstone_row`, which rewrites the row with `r` + `t` columns.
2010    /// The upgraded row has a fresh cell timestamp (≈ now + TTI), so `time_expires` increases.
2011    #[tokio::test]
2012    async fn test_legacy_tombstone_tti_upgrade() -> Result<()> {
2013        let backend = create_test_backend().await?;
2014        let id = make_id();
2015        let path = id.as_storage_path().to_string().into_bytes();
2016
2017        let tti = Duration::from_hours(2 * 24); // must exceed TTI_DEBOUNCE (1 day)
2018
2019        // Place time_expires just inside the bump window: past `now + tti - TTI_DEBOUNCE`
2020        // but still in the future so `expires_before(now)` does not filter the row.
2021        let old_deadline = SystemTime::now() + tti - TTI_DEBOUNCE - Duration::from_mins(1);
2022        write_legacy_tombstone(
2023            &backend,
2024            &id,
2025            ExpirationPolicy::TimeToIdle(tti),
2026            Some(old_deadline),
2027        )
2028        .await?;
2029
2030        // First read detects the stale TTI and triggers `put_tombstone_row`.
2031        let TieredMetadata::Tombstone(_) = backend.get_tiered_metadata(&id).await? else {
2032            panic!("expected tombstone");
2033        };
2034
2035        // After the bump, the row is rewritten with a fresh timestamp (≈ now + TTI).
2036        let new_deadline = match backend.read_row(&path, None, "test-verify").await? {
2037            Some(RowData::Tombstone { time_expires, .. }) => time_expires.unwrap(),
2038            _ => panic!("expected tombstone row after bump"),
2039        };
2040
2041        assert!(
2042            new_deadline > old_deadline,
2043            "TTI bump should extend tombstone expiry: {old_deadline:?} -> {new_deadline:?}"
2044        );
2045
2046        Ok(())
2047    }
2048
2049    /// Legacy tombstones are handled correctly by all conditional write operations.
2050    ///
2051    /// Covers: `put_non_tombstone`, `delete_non_tombstone`, CAS-delete for both the
2052    /// legacy-metadata format and the empty-redirect format.
2053    #[tokio::test]
2054    async fn test_legacy_tombstone_conditional_ops() -> Result<()> {
2055        let backend = create_test_backend().await?;
2056
2057        // put_non_tombstone returns Some(target == id) for a legacy tombstone.
2058        let id = make_id();
2059        write_legacy_tombstone(&backend, &id, ExpirationPolicy::Manual, None).await?;
2060        let t_opt = backend
2061            .put_non_tombstone(&id, &Metadata::default(), Bytes::new())
2062            .await?;
2063        assert_eq!(t_opt.map(|t| t.target).as_ref(), Some(&id));
2064
2065        // delete_non_tombstone returns Some(target == id) for a legacy tombstone.
2066        let id = make_id();
2067        write_legacy_tombstone(&backend, &id, ExpirationPolicy::Manual, None).await?;
2068        let t_opt = backend.delete_non_tombstone(&id).await?;
2069        assert_eq!(t_opt.map(|t| t.target).as_ref(), Some(&id));
2070
2071        // CAS-delete succeeds on a legacy-metadata tombstone (target resolves to hv_id).
2072        let id = make_id();
2073        write_legacy_tombstone(&backend, &id, ExpirationPolicy::Manual, None).await?;
2074        let deleted = backend
2075            .compare_and_write(&id, Some(&id), TieredWrite::Delete)
2076            .await?;
2077        assert!(
2078            deleted,
2079            "CAS-delete must succeed on legacy-metadata tombstone"
2080        );
2081        assert!(matches!(
2082            backend.get_tiered_metadata(&id).await?,
2083            TieredMetadata::NotFound
2084        ));
2085
2086        // CAS-delete succeeds on an empty-redirect tombstone (target resolves to hv_id).
2087        let id = make_id();
2088        write_empty_redirect_tombstone(&backend, &id).await?;
2089        let deleted = backend
2090            .compare_and_write(&id, Some(&id), TieredWrite::Delete)
2091            .await?;
2092        assert!(
2093            deleted,
2094            "CAS-delete must succeed on empty-redirect tombstone"
2095        );
2096        assert!(matches!(
2097            backend.get_tiered_metadata(&id).await?,
2098            TieredMetadata::NotFound
2099        ));
2100
2101        Ok(())
2102    }
2103
2104    /// An empty `r` value falls back to the HV id when resolving the tombstone target.
2105    #[tokio::test]
2106    async fn test_empty_redirect_falls_back_to_hv_id() -> Result<()> {
2107        let backend = create_test_backend().await?;
2108        let id = make_id();
2109
2110        write_empty_redirect_tombstone(&backend, &id).await?;
2111        match backend.get_tiered_metadata(&id).await? {
2112            TieredMetadata::Tombstone(t) => assert_eq!(t.target, id, "must fall back to hv_id"),
2113            other => panic!("expected tombstone, got {other:?}"),
2114        }
2115
2116        Ok(())
2117    }
2118
2119    // --- Section 6: Expired Tombstone Handling ---
2120
2121    /// CAS with `current=None` must succeed when the row holds an expired
2122    /// tombstone. The physical row still exists but is logically gone.
2123    #[tokio::test]
2124    async fn test_cas_create_tombstone_over_expired() -> Result<()> {
2125        let backend = create_test_backend().await?;
2126
2127        let id = make_id();
2128        let old_lt_id = ObjectId::random(id.context().clone());
2129        let old_tombstone = Tombstone {
2130            target: old_lt_id,
2131            expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_secs(0)),
2132        };
2133        create_tombstone(&backend, &id, &old_tombstone, SystemTime::now()).await?;
2134
2135        let new_lt_id = ObjectId::random(id.context().clone());
2136        let new_tombstone = Tombstone {
2137            target: new_lt_id.clone(),
2138            expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_hours(1)),
2139        };
2140        let committed = backend
2141            .compare_and_write(&id, None, TieredWrite::Tombstone(new_tombstone))
2142            .await?;
2143        assert!(
2144            committed,
2145            "CAS with current=None must succeed over an expired tombstone"
2146        );
2147
2148        let TieredMetadata::Tombstone(t) = backend.get_tiered_metadata(&id).await? else {
2149            panic!("expected new tombstone to be readable");
2150        };
2151        assert_eq!(t.target, new_lt_id);
2152
2153        Ok(())
2154    }
2155
2156    /// `put_non_tombstone` must succeed when the row holds only an expired
2157    /// tombstone — the expired row is logically absent.
2158    #[tokio::test]
2159    async fn test_put_non_tombstone_over_expired() -> Result<()> {
2160        let backend = create_test_backend().await?;
2161
2162        let id = make_id();
2163        let lt_id = ObjectId::random(id.context().clone());
2164        let tombstone = Tombstone {
2165            target: lt_id,
2166            expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_secs(0)),
2167        };
2168        create_tombstone(&backend, &id, &tombstone, SystemTime::now()).await?;
2169
2170        let result = backend
2171            .put_non_tombstone(&id, &Metadata::default(), Bytes::from_static(b"data"))
2172            .await?;
2173        assert_eq!(
2174            result, None,
2175            "put_non_tombstone must succeed (return None) over an expired tombstone"
2176        );
2177
2178        let (_, _, stream) = backend.get_object(&id, None).await?.unwrap();
2179        assert_eq!(&stream::read_to_vec(stream).await?, b"data");
2180
2181        Ok(())
2182    }
2183
2184    // --- Range Request Tests ---
2185
2186    async fn put_range_test_object(backend: &BigTableBackend) -> Result<ObjectId> {
2187        let id = make_id();
2188        let metadata = Metadata {
2189            content_type: "text/plain".into(),
2190            ..Default::default()
2191        };
2192        let payload = b"Hello, range requests!";
2193        backend
2194            .put_object(&id, &metadata, stream::single(payload.as_slice()))
2195            .await?;
2196        Ok(id)
2197    }
2198
2199    #[tokio::test]
2200    async fn get_object_range_bounded() -> Result<()> {
2201        let backend = create_test_backend().await?;
2202        let id = put_range_test_object(&backend).await?;
2203
2204        let (_, content_range, stream) = backend
2205            .get_object(&id, Some(ByteRange::Bounded(7, 11)))
2206            .await?
2207            .unwrap();
2208        let data = stream::read_to_vec(stream).await?;
2209        assert_eq!(&data, b"range");
2210
2211        let content_range = content_range.unwrap();
2212        assert_eq!(content_range.start, 7);
2213        assert_eq!(content_range.end, 11);
2214        assert_eq!(content_range.total, 22);
2215
2216        Ok(())
2217    }
2218
2219    #[tokio::test]
2220    async fn get_object_range_from() -> Result<()> {
2221        let backend = create_test_backend().await?;
2222        let id = put_range_test_object(&backend).await?;
2223
2224        let (_, content_range, stream) = backend
2225            .get_object(&id, Some(ByteRange::From(7)))
2226            .await?
2227            .unwrap();
2228        let data = stream::read_to_vec(stream).await?;
2229        assert_eq!(&data, b"range requests!");
2230
2231        let content_range = content_range.unwrap();
2232        assert_eq!(content_range.start, 7);
2233        assert_eq!(content_range.end, 21);
2234        assert_eq!(content_range.total, 22);
2235
2236        Ok(())
2237    }
2238
2239    #[tokio::test]
2240    async fn get_object_range_last() -> Result<()> {
2241        let backend = create_test_backend().await?;
2242        let id = put_range_test_object(&backend).await?;
2243
2244        let (_, content_range, stream) = backend
2245            .get_object(&id, Some(ByteRange::Last(9)))
2246            .await?
2247            .unwrap();
2248        let data = stream::read_to_vec(stream).await?;
2249        assert_eq!(&data, b"requests!");
2250
2251        let content_range = content_range.unwrap();
2252        assert_eq!(content_range.start, 13);
2253        assert_eq!(content_range.end, 21);
2254        assert_eq!(content_range.total, 22);
2255
2256        Ok(())
2257    }
2258
2259    #[tokio::test]
2260    async fn get_object_range_unsatisfiable() -> Result<()> {
2261        let backend = create_test_backend().await?;
2262        let id = put_range_test_object(&backend).await?;
2263
2264        match backend.get_object(&id, Some(ByteRange::From(100))).await {
2265            Err(Error::RangeNotSatisfiable { total }) => assert_eq!(total, 22),
2266            Ok(_) => panic!("expected RangeNotSatisfiable, got Ok"),
2267            Err(e) => panic!("expected RangeNotSatisfiable, got {e:?}"),
2268        }
2269
2270        Ok(())
2271    }
2272
2273    #[tokio::test]
2274    async fn get_object_no_range_returns_full_payload() -> Result<()> {
2275        let backend = create_test_backend().await?;
2276        let id = put_range_test_object(&backend).await?;
2277
2278        let (_, content_range, stream) = backend.get_object(&id, None).await?.unwrap();
2279        let data = stream::read_to_vec(stream).await?;
2280        assert_eq!(&data, b"Hello, range requests!");
2281        assert!(content_range.is_none());
2282
2283        Ok(())
2284    }
2285}