Skip to main content

objectstore_service/backend/
bigtable.rs

1//! BigTable backend for high-volume, low-latency storage of small objects.
2//!
3//! # Row Format
4//!
5//! Each row key is the object's storage path. A row contains either an **object** or a
6//! **tombstone** — never both. The two layouts are mutually exclusive and distinguished by
7//! column presence:
8//!
9//! | Column | Family    | Content                     | Present when       |
10//! |--------|-----------|-----------------------------|--------------------|
11//! | `p`    | `fg`/`fm` | Compressed payload bytes    | Object row only    |
12//! | `m`    | `fg`/`fm` | [`Metadata`] JSON           | Object row only    |
13//! | `r`    | `fg`/`fm` | Redirect path to LT storage | Tombstone row only |
14//! | `t`    | `fg`/`fm` | [`Tombstone`] metadata JSON | Tombstone row only |
15//!
16//! The `r` column signals a tombstone row: its **value** is the long-term `ObjectId`
17//! serialized via `as_storage_path()`. Callers can resolve the LT object directly from the
18//! `r` value without reconstructing it from the row key.
19//!
20//! `p`/`m` and `r`/`t` are mutually exclusive. Every write begins with a `DeleteFromRow`
21//! mutation that clears all columns before writing the new cells, so mixed rows cannot exist.
22//!
23//! ## Legacy Tombstone Format
24//!
25//! Tombstones written before the `r`/`t` column layout used the object-row format with an
26//! empty `p` column and `"is_redirect_tombstone": true` in the `m` JSON. Both formats are
27//! supported for reading. A `bigtable.legacy_tombstone_read` metric is emitted on each legacy
28//! read. Legacy tombstones expire naturally by TTL/GC; TTI bumps transparently upgrade them
29//! to the new format.
30
31use std::fmt;
32use std::future::Future;
33use std::sync::Arc;
34use std::time::{Duration, SystemTime};
35
36use bigtable_rs::bigtable::{BigTableConnection, Error as BigTableError, RowCell};
37use bigtable_rs::google::bigtable::v2::{self, mutation};
38use bytes::Bytes;
39use futures_util::TryStreamExt;
40use objectstore_types::metadata::{ExpirationPolicy, Metadata};
41use serde::{Deserialize, Serialize};
42use tonic::Code;
43
44use crate::backend::common::{
45    Backend, DeleteResponse, GetResponse, HighVolumeBackend, MetadataResponse, PutResponse,
46    TieredGet, TieredMetadata, TieredWrite, Tombstone,
47};
48use crate::error::{Error, Result};
49use crate::gcp_auth::PrefetchingTokenProvider;
50use crate::id::ObjectId;
51use crate::stream::{ChunkedBytes, ClientStream};
52
53/// Configuration for [`BigTableBackend`].
54///
55/// Stores objects in [Google Cloud Bigtable], a NoSQL wide-column database optimized for
56/// high-throughput, low-latency workloads with small objects. Authentication uses Application
57/// Default Credentials (ADC).
58///
59/// **Note**: The table must be pre-created with the following column families:
60/// - `fg`: timestamp-based garbage collection (`maxage=1s`)
61/// - `fm`: manual garbage collection (`no GC policy`)
62///
63/// [Google Cloud Bigtable]: https://cloud.google.com/bigtable
64///
65/// # Example
66///
67/// ```yaml
68/// storage:
69///   type: bigtable
70///   project_id: my-project
71///   instance_name: objectstore
72///   table_name: objectstore
73/// ```
74#[derive(Debug, Clone, Deserialize, Serialize)]
75pub struct BigTableConfig {
76    /// Optional custom Bigtable endpoint.
77    ///
78    /// Useful for testing with emulators. If `None`, uses the default Bigtable endpoint.
79    ///
80    /// # Default
81    ///
82    /// `None` (uses default Bigtable endpoint)
83    ///
84    /// # Environment Variables
85    ///
86    /// - `OS__STORAGE__TYPE=bigtable`
87    /// - `OS__STORAGE__ENDPOINT=localhost:8086` (optional)
88    pub endpoint: Option<String>,
89
90    /// GCP project ID.
91    ///
92    /// The Google project ID (not project number) containing the Bigtable instance.
93    ///
94    /// # Environment Variables
95    ///
96    /// - `OS__STORAGE__PROJECT_ID=my-project`
97    pub project_id: String,
98
99    /// Bigtable instance name.
100    ///
101    /// # Environment Variables
102    ///
103    /// - `OS__STORAGE__INSTANCE_NAME=my-instance`
104    pub instance_name: String,
105
106    /// Bigtable table name.
107    ///
108    /// The table must exist before starting the server.
109    ///
110    /// # Environment Variables
111    ///
112    /// - `OS__STORAGE__TABLE_NAME=objectstore`
113    pub table_name: String,
114
115    /// Optional number of connections to maintain to Bigtable.
116    ///
117    /// # Default
118    ///
119    /// `None` (defaults to 1)
120    ///
121    /// # Environment Variables
122    ///
123    /// - `OS__STORAGE__CONNECTIONS=16` (optional)
124    pub connections: Option<usize>,
125}
126
127/// Connection timeout used for the initial connection to Bigtable.
128const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
129/// Maximum age for connections (GRPC channels) to Bigtable, after which they will be swapped with
130/// new ones in the background.
131/// This is intended to avoid latency spikes that could occur every hour or so, when the server
132/// closes long standing connections ([source](https://web.archive.org/web/20260211140930/https://docs.cloud.google.com/bigtable/docs/performance#cold-starts:~:text=return%20an%20error.-,Cold%20start,-at%20client%20initialization)).
133/// `tonic` already handles reconnections transparently, but lazily, meaning that the first requests
134/// that attempt to use a certain channel after the server has closed it will pay the cost of the
135/// reconnection, resulting in increased latency for those requests.
136const MAX_CHANNEL_AGE: Option<Duration> = Some(Duration::from_mins(50));
137/// Time to debounce bumping an object with configured TTI.
138const TTI_DEBOUNCE: Duration = Duration::from_secs(24 * 3600); // 1 day
139/// Permission scopes required for accessing the BigTable data API.
140const TOKEN_SCOPES: &[&str] = &["https://www.googleapis.com/auth/bigtable.data"];
141
142/// How often to retry failed requests.
143const REQUEST_RETRY_COUNT: usize = 2;
144/// How many times to retry a CAS mutation before giving up and returning an error.
145const CAS_RETRY_COUNT: usize = 3;
146
147/// Column that stores the raw payload (compressed).
148const COLUMN_PAYLOAD: &[u8] = b"p";
149/// Column that stores metadata in JSON.
150const COLUMN_METADATA: &[u8] = b"m";
151/// Column that stores the redirect path for tombstone rows.
152const COLUMN_REDIRECT: &[u8] = b"r";
153/// Column that stores [`TombstoneMeta`] JSON for tombstone rows.
154const COLUMN_TOMBSTONE_META: &[u8] = b"t";
155/// Regex to match all non-payload columns (`m`, `r`, `t`) for metadata-only reads.
156const FILTER_META: &[u8] = b"^[mrt]$";
157
158/// Column family that uses timestamp-based garbage collection.
159///
160/// We require a GC rule on this family to automatically delete rows.
161/// See: <https://cloud.google.com/bigtable/docs/gc-cell-level>
162const FAMILY_GC: &str = "fg";
163/// Column family that uses manual garbage collection.
164const FAMILY_MANUAL: &str = "fm";
165
166/// BigTable storage backend for high-volume, low-latency object storage.
167pub struct BigTableBackend {
168    bigtable: BigTableConnection,
169
170    instance_path: String,
171    table_path: String,
172    table_name: String,
173}
174
175impl fmt::Debug for BigTableBackend {
176    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
177        f.debug_struct("BigTableBackend")
178            .field("instance_path", &self.instance_path)
179            .field("table_path", &self.table_path)
180            .field("table_name", &self.table_name)
181            .finish_non_exhaustive()
182    }
183}
184
185/// Creates a row filter that matches a single column by exact qualifier.
186fn column_filter(column: &[u8]) -> v2::RowFilter {
187    v2::RowFilter {
188        filter: Some(v2::row_filter::Filter::ColumnQualifierRegexFilter(
189            [b"^", column, b"$"].concat(),
190        )),
191    }
192}
193
194/// Creates a row filter matching the legacy tombstone format: `m` column JSON starts with
195/// `{"is_redirect_tombstone":true`.
196///
197/// After legacy tombstones expire naturally this filter becomes dead code in both callers.
198fn legacy_tombstone_filter() -> v2::RowFilter {
199    v2::RowFilter {
200        filter: Some(v2::row_filter::Filter::Chain(v2::row_filter::Chain {
201            filters: vec![
202                column_filter(COLUMN_METADATA),
203                v2::RowFilter {
204                    filter: Some(v2::row_filter::Filter::ValueRegexFilter(
205                        b"^\\{\"is_redirect_tombstone\":true[,}].*".to_vec(),
206                    )),
207                },
208            ],
209        })),
210    }
211}
212
213/// Wraps `inner` so that it only matches live (non-expired) cells.
214fn live_row_filter(inner: v2::RowFilter) -> v2::RowFilter {
215    let now_micros = SystemTime::now()
216        .duration_since(SystemTime::UNIX_EPOCH)
217        .unwrap_or_default()
218        .as_millis() as i64
219        * 1000;
220
221    v2::RowFilter {
222        filter: Some(v2::row_filter::Filter::Interleave(
223            v2::row_filter::Interleave {
224                filters: vec![
225                    // Manual family: never expires.
226                    v2::RowFilter {
227                        filter: Some(v2::row_filter::Filter::Chain(v2::row_filter::Chain {
228                            filters: vec![
229                                v2::RowFilter {
230                                    filter: Some(v2::row_filter::Filter::FamilyNameRegexFilter(
231                                        format!("^{FAMILY_MANUAL}$"),
232                                    )),
233                                },
234                                inner.clone(),
235                            ],
236                        })),
237                    },
238                    // GC family: only match non-expired cells.
239                    v2::RowFilter {
240                        filter: Some(v2::row_filter::Filter::Chain(v2::row_filter::Chain {
241                            filters: vec![
242                                v2::RowFilter {
243                                    filter: Some(v2::row_filter::Filter::FamilyNameRegexFilter(
244                                        format!("^{FAMILY_GC}$"),
245                                    )),
246                                },
247                                v2::RowFilter {
248                                    filter: Some(v2::row_filter::Filter::TimestampRangeFilter(
249                                        v2::TimestampRange {
250                                            start_timestamp_micros: now_micros,
251                                            end_timestamp_micros: 0,
252                                        },
253                                    )),
254                                },
255                                inner,
256                            ],
257                        })),
258                    },
259                ],
260            },
261        )),
262    }
263}
264
265/// Builds a raw row filter that matches any tombstone row, new- or legacy-format.
266///
267/// New format: presence of the `r` column.
268/// Legacy format: `is_redirect_tombstone: true` in the `m` column JSON.
269///
270/// After legacy tombstones expire naturally this simplifies to just
271/// `column_filter(COLUMN_REDIRECT)`.
272fn tombstone_filter() -> v2::RowFilter {
273    let filter = v2::RowFilter {
274        filter: Some(v2::row_filter::Filter::Interleave(
275            v2::row_filter::Interleave {
276                filters: vec![column_filter(COLUMN_REDIRECT), legacy_tombstone_filter()],
277            },
278        )),
279    };
280    live_row_filter(filter)
281}
282
283/// Returns a [`MutatePredicate`] that matches any tombstone row.
284///
285/// Mutations run only when no tombstone is present (`predicate_matched == false`).
286/// Used by [`BigTableBackend::put_non_tombstone`], [`BigTableBackend::delete_non_tombstone`],
287/// and [`BigTableBackend::compare_and_write`] as the `CheckAndMutateRow` predicate.
288fn tombstone_predicate() -> MutatePredicate {
289    MutatePredicate::Exclude(tombstone_filter())
290}
291
292/// Builds an anchored regex pattern (`^…$`) that matches `value` literally.
293///
294/// Uses [`regex::escape`] so that metacharacters in storage paths (`.`, `/`, etc.)
295/// are treated as literal bytes.
296fn exact_value_regex(value: &str) -> Vec<u8> {
297    format!("^{}$", regex::escape(value)).into_bytes()
298}
299
300/// Matches tombstones whose redirect resolves to `target`.
301///
302/// ## Predicate Matches
303///
304/// Must be used with `true_mutations` and `predicate_matched == true`.
305///
306/// ## Details
307///
308/// Always includes an exact match on the `r` (redirect) column:
309/// - Chain: `r` column present AND value == `target` storage path
310///
311/// When `target == own_id` (the caller expects a legacy identity redirect), the
312/// exact match is wrapped in an Interleave with two additional fallbacks:
313/// - Chain: `r` column present AND value == `b""` (empty-sentinel written before the redirect
314///   column stored the path)
315/// - Chain: `m` column present AND value matches `{"is_redirect_tombstone":true...}` regex
316///   (legacy metadata format predating the dedicated `r` column)
317fn redirect_target_filter(target: &ObjectId, own_id: &ObjectId) -> v2::RowFilter {
318    let target_path = exact_value_regex(&target.as_storage_path().to_string());
319
320    let exact_match = v2::RowFilter {
321        filter: Some(v2::row_filter::Filter::Chain(v2::row_filter::Chain {
322            filters: vec![
323                column_filter(COLUMN_REDIRECT),
324                v2::RowFilter {
325                    filter: Some(v2::row_filter::Filter::ValueRegexFilter(target_path)),
326                },
327            ],
328        })),
329    };
330
331    if target != own_id {
332        return live_row_filter(exact_match);
333    }
334
335    let empty_redirect_match = v2::RowFilter {
336        filter: Some(v2::row_filter::Filter::Chain(v2::row_filter::Chain {
337            filters: vec![
338                column_filter(COLUMN_REDIRECT),
339                v2::RowFilter {
340                    filter: Some(v2::row_filter::Filter::ValueRegexFilter(b"^$".to_vec())),
341                },
342            ],
343        })),
344    };
345
346    // Also match legacy tombstones that resolve to the HV id:
347    // - empty `r` value (written before the redirect column stored the path)
348    // - legacy `m` column format (`is_redirect_tombstone: true`)
349    let filter = v2::RowFilter {
350        filter: Some(v2::row_filter::Filter::Interleave(
351            v2::row_filter::Interleave {
352                filters: vec![exact_match, empty_redirect_match, legacy_tombstone_filter()],
353            },
354        )),
355    };
356    live_row_filter(filter)
357}
358
359/// Returns a [`MutatePredicate`] that matches tombstones whose redirect resolves to either `old` or `new`.
360///
361/// Mutations run only when the predicate matches (`predicate_matched == true`):
362/// equivalent to `t == old || t == new`. Built as an Interleave of two
363/// [`redirect_target_filter`] calls — yields cells iff at least one branch matches.
364/// An absent row or non-tombstone row yields 0 cells, so `predicate_matched = false` (conflict).
365fn update_predicate(old: &ObjectId, new: &ObjectId, own_id: &ObjectId) -> MutatePredicate {
366    MutatePredicate::Include(v2::RowFilter {
367        filter: Some(v2::row_filter::Filter::Interleave(
368            v2::row_filter::Interleave {
369                filters: vec![
370                    redirect_target_filter(old, own_id),
371                    redirect_target_filter(new, own_id),
372                ],
373            },
374        )),
375    })
376}
377
378/// Returns a [`MutatePredicate`] that matches rows where no conflicting tombstone exists.
379///
380/// Mutations run only when the row is conflict-free (`predicate_matched == false`):
381/// no tombstone is present, or the tombstone's redirect already points to `target`.
382///
383/// Built as an inverted `Condition` filter:
384/// - Predicate: [`redirect_target_filter`]`(target)` — tombstone already points to `target`?
385/// - True branch: `BlockAllFilter` → 0 cells (already at target, safe state).
386/// - False branch: [`tombstone_filter`] → 0 cells when no tombstone exists.
387///
388/// Both safe states yield 0 cells, so `predicate_matched = false` in both cases.
389fn optional_target_predicate(target: &ObjectId, own_id: &ObjectId) -> MutatePredicate {
390    MutatePredicate::Exclude(v2::RowFilter {
391        filter: Some(v2::row_filter::Filter::Condition(Box::new(
392            v2::row_filter::Condition {
393                predicate_filter: Some(Box::new(redirect_target_filter(target, own_id))),
394                true_filter: Some(Box::new(v2::RowFilter {
395                    filter: Some(v2::row_filter::Filter::BlockAllFilter(true)),
396                })),
397                false_filter: Some(Box::new(tombstone_filter())),
398            },
399        ))),
400    })
401}
402
403/// The condition under which a [`BigTableBackend::check_and_mutate`] write proceeds.
404///
405/// Each variant pairs a row filter with the state that makes the write safe:
406/// `Include` writes when the row matches; `Exclude` writes when it does not.
407#[derive(Clone, Debug)]
408enum MutatePredicate {
409    /// Write proceeds when the filter matches the row.
410    ///
411    /// Mutations run in `true_mutations`; succeeds when `predicate_matched == true`.
412    Include(v2::RowFilter),
413    /// Write proceeds when the filter does not match the row.
414    ///
415    /// Mutations run in `false_mutations`; succeeds when `predicate_matched == false`.
416    Exclude(v2::RowFilter),
417}
418
419/// Creates a row filter that reads all non-payload columns (`m`, `r`, `t`).
420///
421/// Used by metadata-only reads to avoid fetching the (potentially large) payload column
422/// while still being able to detect both new- and legacy-format tombstones.
423fn metadata_filter() -> v2::RowFilter {
424    v2::RowFilter {
425        filter: Some(v2::row_filter::Filter::ColumnQualifierRegexFilter(
426            FILTER_META.to_owned(),
427        )),
428    }
429}
430
431fn mutation(mutation: mutation::Mutation) -> v2::Mutation {
432    v2::Mutation {
433        mutation: Some(mutation),
434    }
435}
436
437/// Creates a `DeleteFromRow` mutation wrapped in the outer [`v2::Mutation`] envelope.
438fn delete_row_mutation() -> v2::Mutation {
439    mutation(mutation::Mutation::DeleteFromRow(
440        mutation::DeleteFromRow {},
441    ))
442}
443
444/// Builds the three mutations that write an object row: clear existing data,
445/// then set the payload and metadata cells.
446///
447/// Used by both [`BigTableBackend::put_row`] (unconditional write) and
448/// [`BigTableBackend::put_non_tombstone`] (conditional write).
449fn object_mutations(
450    metadata: &Metadata,
451    payload: Vec<u8>,
452    now: SystemTime,
453) -> Result<[v2::Mutation; 3]> {
454    let (family, timestamp_micros) = match metadata.expiration_policy {
455        ExpirationPolicy::Manual => (FAMILY_MANUAL, -1),
456        ExpirationPolicy::TimeToLive(ttl) => (FAMILY_GC, ttl_to_micros(ttl, now)?),
457        ExpirationPolicy::TimeToIdle(tti) => (FAMILY_GC, ttl_to_micros(tti, now)?),
458    };
459
460    let metadata_bytes = serde_json::to_vec(metadata)
461        .map_err(|cause| Error::serde("failed to serialize metadata", cause))?;
462
463    Ok([
464        // NB: We explicitly delete the row to clear metadata on overwrite.
465        delete_row_mutation(),
466        mutation(mutation::Mutation::SetCell(mutation::SetCell {
467            family_name: family.to_owned(),
468            column_qualifier: COLUMN_PAYLOAD.to_owned(),
469            timestamp_micros,
470            value: payload,
471        })),
472        mutation(mutation::Mutation::SetCell(mutation::SetCell {
473            family_name: family.to_owned(),
474            column_qualifier: COLUMN_METADATA.to_owned(),
475            timestamp_micros,
476            value: metadata_bytes,
477        })),
478    ])
479}
480
481/// Metadata carried by tombstone rows in the `t` (tombstone-meta) column.
482///
483/// Tombstone-specific metadata evolves independently of object [`Metadata`]. Only fields
484/// that are meaningful on tombstones are included here.
485#[derive(Clone, Debug, Default, Deserialize, Serialize)]
486struct TombstoneMeta {
487    /// Expiration policy for this tombstone.
488    ///
489    /// Skipped during serialization when set to [`ExpirationPolicy::Manual`].
490    #[serde(default, skip_serializing_if = "ExpirationPolicy::is_manual")]
491    expiration_policy: ExpirationPolicy,
492}
493
494/// Builds the three mutations that write a tombstone row: clear existing data,
495/// then set the redirect sentinel and tombstone-meta cells.
496///
497/// Used by both [`BigTableBackend::put_tombstone_row`] (unconditional write) and the
498/// TTI bump path in tiered reads.
499fn tombstone_mutations(tombstone: &Tombstone, now: SystemTime) -> Result<[v2::Mutation; 3]> {
500    let (family, timestamp_micros) = match tombstone.expiration_policy {
501        ExpirationPolicy::Manual => (FAMILY_MANUAL, -1),
502        ExpirationPolicy::TimeToLive(ttl) => (FAMILY_GC, ttl_to_micros(ttl, now)?),
503        ExpirationPolicy::TimeToIdle(tti) => (FAMILY_GC, ttl_to_micros(tti, now)?),
504    };
505
506    let tombstone_meta = TombstoneMeta {
507        expiration_policy: tombstone.expiration_policy,
508    };
509
510    Ok([
511        delete_row_mutation(),
512        mutation(mutation::Mutation::SetCell(mutation::SetCell {
513            family_name: family.to_owned(),
514            column_qualifier: COLUMN_REDIRECT.to_owned(),
515            timestamp_micros,
516            value: tombstone.target.as_storage_path().to_string().into_bytes(),
517        })),
518        mutation(mutation::Mutation::SetCell(mutation::SetCell {
519            family_name: family.to_owned(),
520            column_qualifier: COLUMN_TOMBSTONE_META.to_owned(),
521            timestamp_micros,
522            value: serde_json::to_vec(&tombstone_meta)
523                .map_err(|cause| Error::serde("failed to serialize tombstone", cause))?,
524        })),
525    ])
526}
527
528/// Subset of [`Metadata`] that indicates a row is a tombstone instead of a real object.
529///
530/// Used to construct [`RowData`].
531#[derive(Debug, Deserialize)]
532struct LegacyTombstoneMeta {
533    /// Internal redirect tombstone marker.
534    ///
535    /// When `true`, this object is a legacy tombstone. This implies:
536    ///  - the payload is empty
537    ///  - metadata other than the expiration policy is not meaningful
538    ///  - the `r` and `t` columns are not present
539    #[serde(default)]
540    is_redirect_tombstone: bool,
541
542    /// Expiration policy for this tombstone.
543    #[serde(default)]
544    expiration_policy: ExpirationPolicy,
545}
546
547/// Parsed data from a BigTable row's cells.
548enum RowData {
549    /// A regular object row with payload and metadata.
550    Object {
551        metadata: Metadata,
552        payload: Vec<u8>,
553    },
554    /// A tombstone row indicating the real payload lives on the long-term backend.
555    Tombstone {
556        target: Vec<u8>,
557        meta: TombstoneMeta,
558        time_expires: Option<SystemTime>,
559    },
560}
561
562impl RowData {
563    /// Parses a set of row cells into a [`RowData`].
564    ///
565    /// New-format tombstones are identified by the presence of the `r` column.
566    /// Legacy tombstones (written before the column migration) are identified by
567    /// `is_redirect_tombstone: true` in the `m` column JSON; a
568    /// `bigtable.legacy_tombstone_read` metric is emitted on each such read.
569    fn from_cells(cells: Vec<RowCell>) -> Result<Self> {
570        let mut metadata_opt: Option<Metadata> = None;
571        let mut tombstone_meta_opt: Option<TombstoneMeta> = None;
572        let mut redirect_detected = false;
573        let mut redirect_target = Vec::new();
574        let mut expire_at = None;
575        let mut payload = Vec::new();
576
577        for cell in cells {
578            // NB: All cells are written with the same timestamp; last write is safe.
579
580            // Only derive expiration from GC-family cells — manual-family cells
581            // use server-assigned timestamps that don't represent expiration.
582            if cell.family_name == FAMILY_GC {
583                expire_at = micros_to_time(cell.timestamp_micros);
584            }
585
586            match cell.qualifier.as_slice() {
587                COLUMN_REDIRECT => {
588                    redirect_detected = true;
589                    redirect_target = cell.value;
590                }
591                COLUMN_PAYLOAD => {
592                    payload = cell.value;
593                }
594                COLUMN_TOMBSTONE_META => {
595                    tombstone_meta_opt =
596                        Some(serde_json::from_slice(&cell.value).map_err(|cause| {
597                            Error::serde("failed to deserialize tombstone meta", cause)
598                        })?);
599                }
600                COLUMN_METADATA => {
601                    if let Ok(legacy_meta) =
602                        serde_json::from_slice::<LegacyTombstoneMeta>(&cell.value)
603                        && legacy_meta.is_redirect_tombstone
604                    {
605                        redirect_detected = true;
606                        objectstore_metrics::count!("bigtable.legacy_tombstone_read");
607                        tombstone_meta_opt = Some(TombstoneMeta {
608                            expiration_policy: legacy_meta.expiration_policy,
609                        });
610                    } else {
611                        metadata_opt =
612                            Some(serde_json::from_slice(&cell.value).map_err(|cause| {
613                                Error::serde("failed to deserialize metadata", cause)
614                            })?);
615                    }
616                }
617                _ => {}
618            }
619        }
620
621        Ok(if redirect_detected {
622            RowData::Tombstone {
623                target: redirect_target,
624                meta: tombstone_meta_opt.unwrap_or_default(),
625                time_expires: expire_at,
626            }
627        } else {
628            // Metadata may have been skipped during read - payload-only read for TTI bump.
629            let mut metadata = metadata_opt.unwrap_or_default();
630            metadata.time_expires = expire_at;
631            RowData::Object { metadata, payload }
632        })
633    }
634
635    /// Returns the expiration policy for this row, regardless of variant.
636    fn expiration_policy(&self) -> ExpirationPolicy {
637        match self {
638            RowData::Object { metadata, .. } => metadata.expiration_policy,
639            RowData::Tombstone { meta, .. } => meta.expiration_policy,
640        }
641    }
642
643    /// Returns the resolved expiration timestamp for this row, regardless of variant.
644    fn time_expires(&self) -> Option<SystemTime> {
645        match self {
646            RowData::Object { metadata, .. } => metadata.time_expires,
647            RowData::Tombstone { time_expires, .. } => *time_expires,
648        }
649    }
650
651    /// Returns `true` if this row is expired as of the given `time`.
652    ///
653    /// Only applies to rows with an expiration policy set.
654    fn expires_before(&self, time: SystemTime) -> bool {
655        self.expiration_policy().is_timeout() && self.time_expires().is_some_and(|ts| ts < time)
656    }
657
658    /// Returns `true` if this row's TTI deadline should be bumped.
659    fn needs_tti_bump(&self) -> bool {
660        matches!(
661            self.expiration_policy(),
662            ExpirationPolicy::TimeToIdle(tti) if self.expires_before(SystemTime::now() + tti - TTI_DEBOUNCE)
663        )
664    }
665}
666
667/// Parses the raw `r` column bytes into a redirect target [`ObjectId`].
668///
669/// For tombstones with an empty `r` value, falls back to the ID of the tombstone
670/// itself and emits a `bigtable.empty_redirect_read` metric so deployments can
671/// track when it is safe to remove the legacy empty-value code path.
672fn parse_redirect_target(redirect_path: &[u8], tombstone_id: &ObjectId) -> Result<ObjectId> {
673    if redirect_path.is_empty() {
674        objectstore_metrics::count!("bigtable.empty_redirect_read");
675        Ok(tombstone_id.clone())
676    } else {
677        let redirect_str = std::str::from_utf8(redirect_path)
678            .map_err(|_| Error::generic("invalid UTF-8 in redirect path"))?;
679        ObjectId::from_storage_path(redirect_str)
680            .ok_or_else(|| Error::generic("corrupt redirect path"))
681    }
682}
683
684impl BigTableBackend {
685    /// Creates a new [`BigTableBackend`] from the given `config`.
686    ///
687    /// Pass an `endpoint` in the config to connect to a local emulator; omit it to use real GCP
688    /// credentials. `connections` controls the gRPC connection pool size (defaults to 1).
689    pub async fn new(config: BigTableConfig) -> anyhow::Result<Self> {
690        let BigTableConfig {
691            endpoint,
692            project_id,
693            instance_name,
694            table_name,
695            connections,
696        } = config;
697
698        let bigtable = if let Some(ref endpoint) = endpoint {
699            BigTableConnection::new_with_emulator(
700                endpoint,
701                &project_id,
702                &instance_name,
703                false, // is_read_only
704                Some(CONNECT_TIMEOUT),
705            )?
706        } else {
707            let token_provider = PrefetchingTokenProvider::gcp_auth(TOKEN_SCOPES).await?;
708            BigTableConnection::new_with_managed_transport(
709                &project_id,
710                &instance_name,
711                false, // is_read_only
712                Some(CONNECT_TIMEOUT),
713                Arc::new(token_provider),
714                connections.unwrap_or(1),
715                true, // prime_channels
716                None, // app_profile_id
717                MAX_CHANNEL_AGE,
718            )
719            .await?
720        };
721
722        let client = bigtable.client();
723
724        Ok(Self {
725            bigtable,
726            instance_path: format!("projects/{project_id}/instances/{instance_name}"),
727            table_path: client.get_full_table_name(&table_name),
728            table_name,
729        })
730    }
731
732    /// Reads a single row by key, returning parsed row data.
733    ///
734    /// Returns `None` if the row is absent or has expired.
735    async fn read_row(
736        &self,
737        path: &[u8],
738        filter: Option<v2::RowFilter>,
739        action: &'static str,
740    ) -> Result<Option<RowData>> {
741        let request = v2::ReadRowsRequest {
742            table_name: self.table_path.clone(),
743            rows: Some(v2::RowSet {
744                row_keys: vec![path.to_owned()],
745                row_ranges: vec![],
746            }),
747            filter,
748            rows_limit: 1,
749            ..Default::default()
750        };
751
752        let response = retry(action, || async {
753            self.bigtable.client().read_rows(request.clone()).await
754        })
755        .await?;
756        debug_assert!(response.len() <= 1, "Expected at most one row");
757
758        let Some((_, cells)) = response.into_iter().next() else {
759            objectstore_log::debug!("Object not found");
760            return Ok(None);
761        };
762
763        let row = RowData::from_cells(cells)?;
764        Ok(if row.expires_before(SystemTime::now()) {
765            None
766        } else {
767            Some(row)
768        })
769    }
770
771    async fn mutate(
772        &self,
773        path: Vec<u8>,
774        mutations: impl Into<Vec<v2::Mutation>>,
775        action: &'static str,
776    ) -> Result<v2::MutateRowResponse> {
777        let request = v2::MutateRowRequest {
778            table_name: self.table_path.clone(),
779            row_key: path,
780            mutations: mutations.into(),
781            ..Default::default()
782        };
783
784        let response = retry(action, || async {
785            self.bigtable.client().mutate_row(request.clone()).await
786        })
787        .await?;
788
789        Ok(response.into_inner())
790    }
791
792    async fn put_row(
793        &self,
794        path: Vec<u8>,
795        metadata: &Metadata,
796        payload: Vec<u8>,
797        action: &'static str,
798    ) -> Result<v2::MutateRowResponse> {
799        let mutations = object_mutations(metadata, payload, SystemTime::now())?;
800        self.mutate(path, mutations, action).await
801    }
802
803    async fn put_tombstone_row(
804        &self,
805        path: Vec<u8>,
806        tombstone: &Tombstone,
807        action: &'static str,
808    ) -> Result<v2::MutateRowResponse> {
809        let mutations = tombstone_mutations(tombstone, SystemTime::now())?;
810        self.mutate(path, mutations, action).await
811    }
812
813    /// Best-effort TTI bump for a row.
814    ///
815    /// If the payload isn't loaded, it will be fetched. Failures are ignored silently.
816    async fn bump_tti(&self, path: Vec<u8>, row: &RowData, loaded: bool, hv_id: &ObjectId) {
817        let expiration_policy = row.expiration_policy();
818
819        match row {
820            RowData::Tombstone { target, .. } => {
821                let target = match parse_redirect_target(target, hv_id) {
822                    Ok(target) => target,
823                    Err(e) => {
824                        objectstore_log::error!(!!&e, "invalid redirect target in tombstone row");
825                        return;
826                    }
827                };
828
829                let tombstone = Tombstone {
830                    target,
831                    expiration_policy,
832                };
833                let _ = self.put_tombstone_row(path, &tombstone, "tti-bump").await;
834            }
835            RowData::Object { metadata, payload } if loaded => {
836                let _ = self
837                    .put_row(path, metadata, payload.clone(), "tti-bump")
838                    .await;
839            }
840            RowData::Object { metadata, .. } => {
841                let payload_read = self
842                    .read_row(&path, Some(column_filter(COLUMN_PAYLOAD)), "tti-bump")
843                    .await;
844
845                if let Ok(Some(RowData::Object { payload, .. })) = payload_read {
846                    let _ = self.put_row(path, metadata, payload, "tti-bump").await;
847                }
848            }
849        }
850    }
851
852    /// Executes a `CheckAndMutateRow` request.
853    async fn check_and_mutate(
854        &self,
855        row_key: Vec<u8>,
856        predicate: MutatePredicate,
857        mutations: impl Into<Vec<v2::Mutation>>,
858        context: &'static str,
859    ) -> Result<bool> {
860        let (filter, true_mutations, false_mutations, success_on_match) = match predicate {
861            MutatePredicate::Include(f) => (f, mutations.into(), vec![], true),
862            MutatePredicate::Exclude(f) => (f, vec![], mutations.into(), false),
863        };
864
865        let request = v2::CheckAndMutateRowRequest {
866            table_name: self.table_path.clone(),
867            row_key,
868            predicate_filter: Some(filter),
869            true_mutations,
870            false_mutations,
871            ..Default::default()
872        };
873
874        let future = retry(context, || async {
875            self.bigtable
876                .client()
877                .check_and_mutate_row(request.clone())
878                .await
879        });
880
881        Ok(future.await?.predicate_matched == success_on_match)
882    }
883}
884
885#[async_trait::async_trait]
886impl Backend for BigTableBackend {
887    fn name(&self) -> &'static str {
888        "bigtable"
889    }
890
891    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
892    async fn put_object(
893        &self,
894        id: &ObjectId,
895        metadata: &Metadata,
896        mut stream: ClientStream,
897    ) -> Result<PutResponse> {
898        objectstore_log::debug!("Writing to Bigtable backend");
899        let path = id.as_storage_path().to_string().into_bytes();
900
901        let mut payload = ChunkedBytes::new(0);
902        while let Some(chunk) = stream.try_next().await? {
903            payload.push(chunk);
904        }
905
906        self.put_row(path, metadata, payload.into_bytes().into(), "put")
907            .await?;
908        Ok(())
909    }
910
911    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
912    async fn get_object(&self, id: &ObjectId) -> Result<GetResponse> {
913        match self.get_tiered_object(id).await? {
914            TieredGet::Object(metadata, payload) => Ok(Some((metadata, payload))),
915            TieredGet::Tombstone(_) => Err(Error::UnexpectedTombstone),
916            TieredGet::NotFound => Ok(None),
917        }
918    }
919
920    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
921    async fn get_metadata(&self, id: &ObjectId) -> Result<MetadataResponse> {
922        match self.get_tiered_metadata(id).await? {
923            TieredMetadata::Object(metadata) => Ok(Some(metadata)),
924            TieredMetadata::Tombstone(_) => Err(Error::UnexpectedTombstone),
925            TieredMetadata::NotFound => Ok(None),
926        }
927    }
928
929    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
930    async fn delete_object(&self, id: &ObjectId) -> Result<DeleteResponse> {
931        objectstore_log::debug!("Deleting from Bigtable backend");
932
933        let path = id.as_storage_path().to_string().into_bytes();
934        self.mutate(path, [delete_row_mutation()], "delete").await?;
935
936        Ok(())
937    }
938}
939
940#[async_trait::async_trait]
941impl HighVolumeBackend for BigTableBackend {
942    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
943    async fn put_non_tombstone(
944        &self,
945        id: &ObjectId,
946        metadata: &Metadata,
947        payload: Bytes,
948    ) -> Result<Option<Tombstone>> {
949        objectstore_log::debug!("Conditional put to Bigtable backend");
950
951        let path = id.as_storage_path().to_string().into_bytes();
952        let mutations = object_mutations(metadata, payload.to_vec(), SystemTime::now())?;
953
954        for _ in 0..CAS_RETRY_COUNT {
955            let write_succeeded = self
956                .check_and_mutate(
957                    path.clone(),
958                    tombstone_predicate(),
959                    mutations.clone(),
960                    "put_non_tombstone",
961                )
962                .await?;
963
964            if write_succeeded {
965                return Ok(None);
966            }
967
968            // A tombstone was present: read its data for the caller.
969            let row = self
970                .read_row(&path, Some(metadata_filter()), "put_non_tombstone")
971                .await?;
972
973            match row {
974                Some(RowData::Tombstone { target, meta, .. }) => {
975                    return Ok(Some(Tombstone {
976                        target: parse_redirect_target(&target, id)?,
977                        expiration_policy: meta.expiration_policy,
978                    }));
979                }
980                // Race: Tombstone was replaced by an object, retry to overwrite
981                Some(RowData::Object { .. }) => continue,
982                // Race: Tombstone was deleted, retry to write.
983                None => continue,
984            }
985        }
986
987        Err(Error::generic("BigTable: race loop in put_non_tombstone"))
988    }
989
990    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
991    async fn get_tiered_object(&self, id: &ObjectId) -> Result<TieredGet> {
992        objectstore_log::debug!("Reading from Bigtable backend");
993        let path = id.as_storage_path().to_string().into_bytes();
994
995        let Some(row) = self.read_row(&path, None, "get_tiered_object").await? else {
996            return Ok(TieredGet::NotFound);
997        };
998
999        if row.needs_tti_bump() {
1000            self.bump_tti(path.clone(), &row, true, id).await;
1001        }
1002
1003        Ok(match row {
1004            RowData::Tombstone { meta, target, .. } => TieredGet::Tombstone(Tombstone {
1005                target: parse_redirect_target(&target, id)?,
1006                expiration_policy: meta.expiration_policy,
1007            }),
1008            RowData::Object { metadata, payload } => {
1009                let mut metadata = metadata;
1010                metadata.size = Some(payload.len());
1011                TieredGet::Object(metadata, crate::stream::single(payload))
1012            }
1013        })
1014    }
1015
1016    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
1017    async fn get_tiered_metadata(&self, id: &ObjectId) -> Result<TieredMetadata> {
1018        objectstore_log::debug!("Reading metadata from Bigtable backend");
1019        let path = id.as_storage_path().to_string().into_bytes();
1020
1021        // Read metadata and tombstone columns — skip the (potentially large) payload.
1022        // NB: `metadata.size` will not be populated since the payload is not fetched.
1023        let row_opt = self
1024            .read_row(&path, Some(metadata_filter()), "get_tiered_metadata")
1025            .await?;
1026        let Some(row) = row_opt else {
1027            return Ok(TieredMetadata::NotFound);
1028        };
1029
1030        if row.needs_tti_bump() {
1031            self.bump_tti(path.clone(), &row, false, id).await;
1032        }
1033
1034        Ok(match row {
1035            RowData::Tombstone { meta, target, .. } => TieredMetadata::Tombstone(Tombstone {
1036                target: parse_redirect_target(&target, id)?,
1037                expiration_policy: meta.expiration_policy,
1038            }),
1039            RowData::Object { metadata, .. } => TieredMetadata::Object(metadata),
1040        })
1041    }
1042
1043    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
1044    async fn delete_non_tombstone(&self, id: &ObjectId) -> Result<Option<Tombstone>> {
1045        objectstore_log::debug!("Conditional delete from Bigtable backend");
1046
1047        let path = id.as_storage_path().to_string().into_bytes();
1048
1049        for _ in 0..CAS_RETRY_COUNT {
1050            let write_succeeded = self
1051                .check_and_mutate(
1052                    path.clone(),
1053                    tombstone_predicate(),
1054                    [delete_row_mutation()],
1055                    "delete_non_tombstone",
1056                )
1057                .await?;
1058
1059            if write_succeeded {
1060                return Ok(None);
1061            }
1062
1063            // A tombstone was present: read its data for the caller.
1064            let row = self
1065                .read_row(&path, Some(metadata_filter()), "delete_non_tombstone")
1066                .await?;
1067
1068            match row {
1069                Some(RowData::Tombstone { target, meta, .. }) => {
1070                    return Ok(Some(Tombstone {
1071                        target: parse_redirect_target(&target, id)?,
1072                        expiration_policy: meta.expiration_policy,
1073                    }));
1074                }
1075                // Race: An object replaced the tombstone, delete the new object now.
1076                Some(RowData::Object { .. }) => continue,
1077                // Race: Entry was deleted in the meanwhile, nothing left to do.
1078                None => return Ok(None),
1079            }
1080        }
1081
1082        Err(Error::generic(
1083            "BigTable: race loop in delete_non_tombstone",
1084        ))
1085    }
1086
1087    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
1088    async fn compare_and_write(
1089        &self,
1090        id: &ObjectId,
1091        current: Option<&ObjectId>,
1092        write: TieredWrite,
1093    ) -> Result<bool> {
1094        objectstore_log::debug!("CAS put to Bigtable backend");
1095
1096        let path = id.as_storage_path().to_string().into_bytes();
1097        let now = SystemTime::now();
1098
1099        let predicate = match (current, write.target()) {
1100            (Some(old), Some(new)) => update_predicate(old, new, id),
1101            (Some(target), None) => optional_target_predicate(target, id),
1102            (None, Some(target)) => optional_target_predicate(target, id),
1103            (None, None) => tombstone_predicate(),
1104        };
1105
1106        let mutations = match write {
1107            TieredWrite::Tombstone(tombstone) => tombstone_mutations(&tombstone, now)?.into(),
1108            TieredWrite::Object(m, p) => object_mutations(&m, p.to_vec(), now)?.into(),
1109            TieredWrite::Delete => vec![delete_row_mutation()],
1110        };
1111
1112        self.check_and_mutate(path, predicate, mutations, "compare_and_write")
1113            .await
1114    }
1115}
1116
1117/// Converts the given TTL duration to a microsecond-precision unix timestamp.
1118///
1119/// The TTL is anchored at the provided `from` timestamp, which defaults to `SystemTime::now()`. As
1120/// required by BigTable, the resulting timestamp has millisecond precision, with the last digits at
1121/// 0.
1122fn ttl_to_micros(ttl: Duration, from: SystemTime) -> Result<i64> {
1123    let deadline = from.checked_add(ttl).ok_or_else(|| Error::Generic {
1124        context: format!(
1125            "TTL duration overflow: {} plus {}s cannot be represented as SystemTime",
1126            humantime::format_rfc3339_seconds(from),
1127            ttl.as_secs()
1128        ),
1129        cause: None,
1130    })?;
1131    let millis = deadline
1132        .duration_since(SystemTime::UNIX_EPOCH)
1133        .map_err(|e| Error::Generic {
1134            context: format!(
1135                "unable to get duration since UNIX_EPOCH for SystemTime {}",
1136                humantime::format_rfc3339_seconds(deadline)
1137            ),
1138            cause: Some(Box::new(e)),
1139        })?
1140        .as_millis();
1141    (millis * 1000).try_into().map_err(|e| Error::Generic {
1142        context: format!("failed to convert {}ms to i64 microseconds", millis),
1143        cause: Some(Box::new(e)),
1144    })
1145}
1146
1147/// Converts a microsecond-precision unix timestamp to a `SystemTime`.
1148fn micros_to_time(micros: i64) -> Option<SystemTime> {
1149    let micros = u64::try_from(micros).ok()?;
1150    let duration = Duration::from_micros(micros);
1151    SystemTime::UNIX_EPOCH.checked_add(duration)
1152}
1153
1154/// Retries a BigTable RPC on transient errors.
1155async fn retry<T, F>(context: &'static str, f: impl Fn() -> F) -> Result<T>
1156where
1157    F: Future<Output = Result<T, BigTableError>> + Send,
1158{
1159    let mut retry_count = 0usize;
1160
1161    loop {
1162        match f().await {
1163            Ok(res) => return Ok(res),
1164            Err(e) if retry_count >= REQUEST_RETRY_COUNT || !is_retryable(&e) => {
1165                objectstore_metrics::count!("bigtable.failures", action = context);
1166                return Err(Error::Generic {
1167                    context: format!("Bigtable: `{context}` failed"),
1168                    cause: Some(Box::new(e)),
1169                });
1170            }
1171            Err(e) => {
1172                retry_count += 1;
1173                objectstore_metrics::count!("bigtable.retries", action = context);
1174                objectstore_log::warn!(!!&e, retry_count, context, "Retrying request");
1175            }
1176        }
1177    }
1178}
1179
1180fn is_retryable(error: &BigTableError) -> bool {
1181    match error {
1182        // Transient errors on auth token refresh
1183        BigTableError::GCPAuthError(_) => true,
1184        // Transient GRPC network failures
1185        BigTableError::TransportError(_) => true,
1186        // These could also indicate transient network failures
1187        BigTableError::IoError(_) => true,
1188        BigTableError::TimeoutError(_) => true,
1189
1190        // See https://docs.cloud.google.com/bigtable/docs/status-codes
1191        BigTableError::RpcError(status) => match status.code() {
1192            // Generic retriable status
1193            Code::Unavailable => true,
1194            // Timeouts
1195            Code::Cancelled => true,
1196            Code::DeadlineExceeded => true,
1197            // Token might have refreshed too late
1198            Code::Unauthenticated => true,
1199            // Unspecified, attempt to retry anyways
1200            Code::Aborted => true,
1201            Code::Internal => true,
1202            Code::FailedPrecondition => true,
1203            Code::Unknown => true,
1204            _ => false,
1205        },
1206        _ => false,
1207    }
1208}
1209
1210#[cfg(test)]
1211mod tests {
1212    use std::collections::BTreeMap;
1213
1214    use anyhow::Result;
1215    use objectstore_types::scope::{Scope, Scopes};
1216
1217    use super::*;
1218    use crate::id::ObjectContext;
1219    use crate::stream;
1220
1221    // NB: Most of these tests require a BigTable emulator running. This is done
1222    // automatically in CI.
1223    //
1224    // Refer to the readme for how to set up the emulator.
1225
1226    async fn create_test_backend() -> Result<BigTableBackend> {
1227        BigTableBackend::new(BigTableConfig {
1228            endpoint: Some("localhost:8086".into()),
1229            project_id: "testing".into(),
1230            instance_name: "objectstore".into(),
1231            table_name: "objectstore".into(),
1232            connections: None,
1233        })
1234        .await
1235    }
1236
1237    fn make_id() -> ObjectId {
1238        ObjectId::random(ObjectContext {
1239            usecase: "testing".into(),
1240            scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
1241        })
1242    }
1243
1244    async fn create_object(
1245        backend: &BigTableBackend,
1246        id: &ObjectId,
1247        metadata: &Metadata,
1248        payload: &[u8],
1249        now: SystemTime,
1250    ) -> Result<()> {
1251        let path = id.as_storage_path().to_string().into_bytes();
1252        let mutations = object_mutations(metadata, payload.to_vec(), now)?;
1253        backend.mutate(path, mutations, "test-setup").await?;
1254        Ok(())
1255    }
1256
1257    async fn create_tombstone(
1258        backend: &BigTableBackend,
1259        id: &ObjectId,
1260        tombstone: &Tombstone,
1261        now: SystemTime,
1262    ) -> Result<()> {
1263        let path = id.as_storage_path().to_string().into_bytes();
1264        let mutations = tombstone_mutations(tombstone, now)?;
1265        backend.mutate(path, mutations, "test-setup").await?;
1266        Ok(())
1267    }
1268
1269    /// Writes a legacy-format tombstone row directly into Bigtable.
1270    async fn write_legacy_tombstone(
1271        backend: &BigTableBackend,
1272        id: &ObjectId,
1273        expiration_policy: ExpirationPolicy,
1274        time_expires: Option<SystemTime>,
1275    ) -> Result<()> {
1276        let meta = if expiration_policy.is_manual() {
1277            r#"{"is_redirect_tombstone":true}"#.to_owned()
1278        } else {
1279            let policy_json = serde_json::to_string(&expiration_policy).unwrap();
1280            format!(r#"{{"is_redirect_tombstone":true,"expiration_policy":{policy_json}}}"#)
1281        };
1282
1283        let (family, timestamp_micros) = if expiration_policy.is_manual() {
1284            (FAMILY_MANUAL, -1)
1285        } else {
1286            let t =
1287                time_expires.unwrap_or(SystemTime::now() + expiration_policy.expires_in().unwrap());
1288            let timestamp = t
1289                .duration_since(SystemTime::UNIX_EPOCH)
1290                .unwrap()
1291                .as_millis();
1292            (FAMILY_GC, timestamp as i64 * 1000)
1293        };
1294
1295        let path = id.as_storage_path().to_string().into_bytes();
1296        let mutations = [mutation(mutation::Mutation::SetCell(mutation::SetCell {
1297            family_name: family.to_owned(),
1298            column_qualifier: COLUMN_METADATA.to_owned(),
1299            timestamp_micros,
1300            value: meta.into_bytes(),
1301        }))];
1302
1303        backend.mutate(path, mutations, "test-setup").await?;
1304
1305        Ok(())
1306    }
1307
1308    /// Writes a new-format tombstone row with an empty `r` value directly,
1309    /// simulating rows written by code before this change.
1310    async fn write_empty_redirect_tombstone(
1311        backend: &BigTableBackend,
1312        id: &ObjectId,
1313    ) -> Result<()> {
1314        let path = id.as_storage_path().to_string().into_bytes();
1315        let mutations = [
1316            mutation(mutation::Mutation::SetCell(mutation::SetCell {
1317                family_name: FAMILY_MANUAL.to_owned(),
1318                column_qualifier: COLUMN_REDIRECT.to_owned(),
1319                timestamp_micros: -1,
1320                value: b"".to_vec(), // empty — legacy format
1321            })),
1322            mutation(mutation::Mutation::SetCell(mutation::SetCell {
1323                family_name: FAMILY_MANUAL.to_owned(),
1324                column_qualifier: COLUMN_TOMBSTONE_META.to_owned(),
1325                timestamp_micros: -1,
1326                value: b"{}".to_vec(),
1327            })),
1328        ];
1329
1330        backend.mutate(path, mutations, "test-setup").await?;
1331
1332        Ok(())
1333    }
1334
1335    // --- Section 1: Object Operations ---
1336
1337    /// Verifies the full roundtrip: put → get_object (payload + metadata) → get_metadata (metadata).
1338    #[tokio::test]
1339    async fn test_roundtrip() -> Result<()> {
1340        let backend = create_test_backend().await?;
1341
1342        let id = make_id();
1343        let metadata = Metadata {
1344            content_type: "text/plain".into(),
1345            time_created: Some(SystemTime::now()),
1346            custom: BTreeMap::from_iter([("hello".into(), "world".into())]),
1347            ..Default::default()
1348        };
1349
1350        backend
1351            .put_object(&id, &metadata, stream::single("hello, world"))
1352            .await?;
1353
1354        let (obj_meta, stream) = backend.get_object(&id).await?.unwrap();
1355        let payload = stream::read_to_vec(stream).await?;
1356        assert_eq!(payload, b"hello, world");
1357        assert_eq!(obj_meta.content_type, metadata.content_type);
1358        assert_eq!(obj_meta.custom, metadata.custom);
1359
1360        let head_meta = backend.get_metadata(&id).await?.unwrap();
1361        assert_eq!(head_meta.content_type, metadata.content_type);
1362        assert_eq!(head_meta.custom, metadata.custom);
1363
1364        Ok(())
1365    }
1366
1367    /// Verifies that absent rows return None or succeed silently for all read/delete operations.
1368    #[tokio::test]
1369    async fn test_nonexistent() -> Result<()> {
1370        let backend = create_test_backend().await?;
1371
1372        let id = make_id();
1373        assert!(backend.get_object(&id).await?.is_none());
1374        assert!(backend.get_metadata(&id).await?.is_none());
1375        backend.delete_object(&id).await?;
1376
1377        Ok(())
1378    }
1379
1380    #[tokio::test]
1381    async fn test_overwrite() -> Result<()> {
1382        let backend = create_test_backend().await?;
1383
1384        let id = make_id();
1385        let first_metadata = Metadata {
1386            custom: BTreeMap::from_iter([("invalid".into(), "invalid".into())]),
1387            ..Default::default()
1388        };
1389        create_object(&backend, &id, &first_metadata, b"hello", SystemTime::now()).await?;
1390
1391        let second_metadata = Metadata {
1392            custom: BTreeMap::from_iter([("hello".into(), "world".into())]),
1393            ..Default::default()
1394        };
1395        backend
1396            .put_object(&id, &second_metadata, stream::single("world"))
1397            .await?;
1398
1399        let (meta, stream) = backend.get_object(&id).await?.unwrap();
1400        let payload = stream::read_to_vec(stream).await?;
1401        assert_eq!(payload, b"world");
1402        assert_eq!(meta.custom, second_metadata.custom);
1403
1404        Ok(())
1405    }
1406
1407    #[tokio::test]
1408    async fn test_read_after_delete() -> Result<()> {
1409        let backend = create_test_backend().await?;
1410
1411        let id = make_id();
1412        let metadata = Metadata::default();
1413        create_object(&backend, &id, &metadata, b"hello", SystemTime::now()).await?;
1414        backend.delete_object(&id).await?;
1415
1416        assert!(backend.get_object(&id).await?.is_none());
1417
1418        Ok(())
1419    }
1420
1421    /// Verifies TTI bump via both `get_object` (loaded=true path) and `get_metadata` (loaded=false path).
1422    ///
1423    /// The bump condition is: `expire_at < now + tti - TTI_DEBOUNCE`. We write a stale
1424    /// timestamp just inside the bump window (still in the future, so the row is not GC'd)
1425    /// and confirm that a subsequent read returns a later expiry.
1426    #[tokio::test]
1427    async fn test_tti_bump() -> Result<()> {
1428        let backend = create_test_backend().await?;
1429        // TTI must exceed TTI_DEBOUNCE (1 day) for the bump condition to be reachable.
1430        let tti = Duration::from_secs(2 * 24 * 3600); // 2 days
1431        let metadata = Metadata {
1432            expiration_policy: ExpirationPolicy::TimeToIdle(tti),
1433            ..Default::default()
1434        };
1435
1436        // Pass a backdated `now` so the written expiry is inside the bump window:
1437        // expire_at = past_now + tti = now - TTI_DEBOUNCE - 60s (stale but not yet expired).
1438        let past_now = SystemTime::now() - TTI_DEBOUNCE - Duration::from_secs(60);
1439
1440        // Sub-sequence 1: get_object triggers bump (loaded=true path).
1441        let id1 = make_id();
1442        create_object(&backend, &id1, &metadata, b"hello, world", past_now).await?;
1443
1444        // get_object reads the stale row, triggers bump, and returns the pre-bump metadata.
1445        let (pre_obj_meta, _) = backend.get_object(&id1).await?.unwrap();
1446        let pre_obj_expiry = pre_obj_meta.time_expires.unwrap();
1447
1448        // A second get_metadata reads the freshly bumped row.
1449        let post_obj_meta = backend.get_metadata(&id1).await?.unwrap();
1450        let post_obj_expiry = post_obj_meta.time_expires.unwrap();
1451        assert!(
1452            post_obj_expiry > pre_obj_expiry,
1453            "bump should extend expiry"
1454        );
1455
1456        // Sub-sequence 2: get_metadata triggers bump (loaded=false path).
1457        let id2 = make_id();
1458        create_object(&backend, &id2, &metadata, b"hello, world", past_now).await?;
1459
1460        // First get_metadata sees the stale row and triggers a bump.
1461        let pre_meta = backend.get_metadata(&id2).await?.unwrap();
1462        let pre_expiry = pre_meta.time_expires.unwrap();
1463
1464        // Second get_metadata reads the freshly bumped row.
1465        let post_meta = backend.get_metadata(&id2).await?.unwrap();
1466        let post_expiry = post_meta.time_expires.unwrap();
1467        assert!(post_expiry > pre_expiry, "bump should extend expiry");
1468
1469        // Payload must be intact after the loaded=false bump (which re-fetches the payload).
1470        let (_, stream) = backend.get_object(&id2).await?.unwrap();
1471        let payload = stream::read_to_vec(stream).await?;
1472        assert_eq!(payload, b"hello, world");
1473
1474        Ok(())
1475    }
1476
1477    #[tokio::test]
1478    async fn test_tti_no_bump_when_fresh() -> Result<()> {
1479        let backend = create_test_backend().await?;
1480
1481        let id = make_id();
1482        // TTI must exceed TTI_DEBOUNCE (1 day) for the bump condition to be reachable.
1483        let tti = Duration::from_secs(2 * 24 * 3600); // 2 days
1484        let metadata = Metadata {
1485            expiration_policy: ExpirationPolicy::TimeToIdle(tti),
1486            ..Default::default()
1487        };
1488        create_object(&backend, &id, &metadata, b"hello, world", SystemTime::now()).await?;
1489
1490        // A freshly written object has time_expires ≈ now + 2d, well outside the bump
1491        // window (now + 2d - 1d = now + 1d). No bump should occur.
1492        let first = backend.get_metadata(&id).await?.unwrap();
1493        let second = backend.get_metadata(&id).await?.unwrap();
1494
1495        assert_eq!(
1496            first.time_expires.unwrap(),
1497            second.time_expires.unwrap(),
1498            "fresh TTI object must not be bumped"
1499        );
1500
1501        Ok(())
1502    }
1503
1504    // --- Section 2: Expiration ---
1505
1506    #[tokio::test]
1507    async fn test_ttl_immediate() -> Result<()> {
1508        // NB: We create a TTL that immediately expires in this test. This might be optimized away
1509        // in a future implementation, so we will have to update this test accordingly.
1510
1511        let backend = create_test_backend().await?;
1512
1513        let id = make_id();
1514        let metadata = Metadata {
1515            expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_secs(0)),
1516            ..Default::default()
1517        };
1518        create_object(&backend, &id, &metadata, b"hello, world", SystemTime::now()).await?;
1519
1520        assert!(backend.get_object(&id).await?.is_none());
1521
1522        Ok(())
1523    }
1524
1525    #[tokio::test]
1526    async fn test_tti_immediate() -> Result<()> {
1527        // NB: We create a TTI that immediately expires in this test. This might be optimized away
1528        // in a future implementation, so we will have to update this test accordingly.
1529
1530        let backend = create_test_backend().await?;
1531
1532        let id = make_id();
1533        let metadata = Metadata {
1534            expiration_policy: ExpirationPolicy::TimeToIdle(Duration::from_secs(0)),
1535            ..Default::default()
1536        };
1537        create_object(&backend, &id, &metadata, b"hello, world", SystemTime::now()).await?;
1538
1539        assert!(backend.get_object(&id).await?.is_none());
1540
1541        Ok(())
1542    }
1543
1544    // --- Section 3: Tiered Operations ---
1545
1546    /// Covers all three row states for `get_tiered_object` and `get_tiered_metadata`.
1547    ///
1548    /// - **empty**: both return NotFound.
1549    /// - **object**: put_object, both return the Object variant with correct payload/metadata.
1550    /// - **tombstone**: CAS-write with a distinct `lt_id`, both return the Tombstone variant
1551    ///   with `target == lt_id`.
1552    #[tokio::test]
1553    async fn test_tiered_get() -> Result<()> {
1554        let backend = create_test_backend().await?;
1555
1556        // empty
1557        let id = make_id();
1558        assert!(matches!(
1559            backend.get_tiered_object(&id).await?,
1560            TieredGet::NotFound
1561        ));
1562        assert!(matches!(
1563            backend.get_tiered_metadata(&id).await?,
1564            TieredMetadata::NotFound
1565        ));
1566
1567        // object
1568        let id = make_id();
1569        let put_meta = Metadata {
1570            content_type: "text/plain".into(),
1571            custom: BTreeMap::from_iter([("k".into(), "v".into())]),
1572            ..Default::default()
1573        };
1574        create_object(&backend, &id, &put_meta, b"payload", SystemTime::now()).await?;
1575
1576        let TieredGet::Object(obj_meta, obj_stream) = backend.get_tiered_object(&id).await? else {
1577            panic!("expected TieredGet::Object");
1578        };
1579        let obj_payload = stream::read_to_vec(obj_stream).await?;
1580        assert_eq!(obj_payload, b"payload");
1581        assert_eq!(obj_meta.content_type, put_meta.content_type);
1582        assert_eq!(obj_meta.custom, put_meta.custom);
1583
1584        let TieredMetadata::Object(head_meta) = backend.get_tiered_metadata(&id).await? else {
1585            panic!("expected TieredMetadata::Object");
1586        };
1587        assert_eq!(head_meta.content_type, put_meta.content_type);
1588        assert_eq!(head_meta.custom, put_meta.custom);
1589
1590        // tombstone
1591        let hv_id = make_id();
1592        let lt_id = ObjectId::random(hv_id.context().clone());
1593        let tombstone = Tombstone {
1594            target: lt_id.clone(),
1595            expiration_policy: ExpirationPolicy::Manual,
1596        };
1597        create_tombstone(&backend, &hv_id, &tombstone, SystemTime::now()).await?;
1598
1599        match backend.get_tiered_object(&hv_id).await? {
1600            TieredGet::Tombstone(get_t) => assert_eq!(get_t.target, lt_id),
1601            other => panic!("expected TieredGet::Tombstone, got {other:?}"),
1602        }
1603        match backend.get_tiered_metadata(&hv_id).await? {
1604            TieredMetadata::Tombstone(meta_t) => assert_eq!(meta_t.target, lt_id,),
1605            other => panic!("expected TieredMetadata::Tombstone, got {other:?}"),
1606        }
1607
1608        Ok(())
1609    }
1610
1611    /// Covers all three row states for `put_non_tombstone`.
1612    ///
1613    /// - **empty**: returns None, object is readable.
1614    /// - **object**: overwrites with new payload, returns None.
1615    /// - **tombstone**: returns Some(Tombstone) with the correct target; tombstone still intact.
1616    #[tokio::test]
1617    async fn test_put_non_tombstone() -> Result<()> {
1618        let backend = create_test_backend().await?;
1619
1620        // empty: put_non_tombstone on absent row succeeds and makes object readable.
1621        let id = make_id();
1622        let metadata = Metadata::default();
1623        let result = backend
1624            .put_non_tombstone(&id, &metadata, Bytes::from_static(b"first"))
1625            .await?;
1626        assert_eq!(result, None, "expected None on empty row");
1627        let (_, stream) = backend.get_object(&id).await?.unwrap();
1628        assert_eq!(&stream::read_to_vec(stream).await?, b"first");
1629
1630        // object: put_non_tombstone on existing object replaces payload, returns None.
1631        let id = make_id();
1632        create_object(&backend, &id, &metadata, b"old", SystemTime::now()).await?;
1633        let result = backend
1634            .put_non_tombstone(&id, &metadata, Bytes::from_static(b"new"))
1635            .await?;
1636        assert_eq!(result, None, "expected None when overwriting object");
1637        let (_, stream) = backend.get_object(&id).await?.unwrap();
1638        assert_eq!(&stream::read_to_vec(stream).await?, b"new");
1639
1640        // tombstone: put_non_tombstone returns Some(Tombstone) and leaves tombstone intact.
1641        let hv_id = make_id();
1642        let lt_id = ObjectId::random(hv_id.context().clone());
1643        let tombstone = Tombstone {
1644            target: lt_id.clone(),
1645            expiration_policy: ExpirationPolicy::Manual,
1646        };
1647        create_tombstone(&backend, &hv_id, &tombstone, SystemTime::now()).await?;
1648        let result = backend
1649            .put_non_tombstone(&hv_id, &metadata, Bytes::new())
1650            .await?;
1651        let returned = result.expect("expected Some(Tombstone) when row is a tombstone");
1652        assert_eq!(returned.target, lt_id);
1653        assert!(
1654            matches!(
1655                backend.get_tiered_metadata(&hv_id).await?,
1656                TieredMetadata::Tombstone(_)
1657            ),
1658            "tombstone must still exist after put_non_tombstone"
1659        );
1660
1661        Ok(())
1662    }
1663
1664    /// Covers all three row states for `delete_non_tombstone`.
1665    ///
1666    /// - **empty**: returns None.
1667    /// - **object**: returns None, row gone.
1668    /// - **tombstone**: returns Some(Tombstone) with correct target; tombstone still intact.
1669    ///
1670    /// Verifies that the `r` column is correctly detected by both the `ReadRows` column
1671    /// filter and the `CheckAndMutate` `tombstone_predicate`.
1672    #[tokio::test]
1673    async fn test_delete_non_tombstone() -> Result<()> {
1674        let backend = create_test_backend().await?;
1675
1676        // empty
1677        let id = make_id();
1678        assert_eq!(backend.delete_non_tombstone(&id).await?, None);
1679
1680        // object
1681        let id = make_id();
1682        let metadata = Metadata::default();
1683        create_object(&backend, &id, &metadata, b"hello, world", SystemTime::now()).await?;
1684        assert_eq!(backend.delete_non_tombstone(&id).await?, None);
1685        assert!(backend.get_object(&id).await?.is_none());
1686
1687        // tombstone
1688        let id = make_id();
1689        let tombstone = Tombstone {
1690            target: id.clone(),
1691            expiration_policy: ExpirationPolicy::Manual,
1692        };
1693        create_tombstone(&backend, &id, &tombstone, SystemTime::now()).await?;
1694        let tombstone = backend
1695            .delete_non_tombstone(&id)
1696            .await?
1697            .expect("expected Some(tombstone)");
1698        assert_eq!(tombstone.target, id, "tombstone target must be returned");
1699        assert!(
1700            matches!(
1701                backend.get_tiered_metadata(&id).await?,
1702                TieredMetadata::Tombstone(_)
1703            ),
1704            "tombstone must still exist after delete_non_tombstone"
1705        );
1706
1707        Ok(())
1708    }
1709
1710    // --- Section 4: Compare-and-Write ---
1711
1712    /// Creating a tombstone on an empty row succeeds; a retry of the same CAS also succeeds.
1713    ///
1714    /// After creation, both tiered and legacy APIs reflect the tombstone.
1715    #[tokio::test]
1716    async fn test_cas_create_tombstone() -> Result<()> {
1717        let backend = create_test_backend().await?;
1718
1719        let hv_id = make_id();
1720        let lt_id = ObjectId::random(hv_id.context().clone());
1721        let expiration_policy = ExpirationPolicy::TimeToLive(Duration::from_secs(3600));
1722        let tombstone = Tombstone {
1723            target: lt_id.clone(),
1724            expiration_policy,
1725        };
1726
1727        // First create succeeds.
1728        let committed = backend
1729            .compare_and_write(&hv_id, None, TieredWrite::Tombstone(tombstone.clone()))
1730            .await?;
1731        assert!(committed, "expected CAS success on empty row");
1732
1733        // Tiered reads must see the tombstone with correct target and policy.
1734        let TieredMetadata::Tombstone(t) = backend.get_tiered_metadata(&hv_id).await? else {
1735            panic!("expected TieredMetadata::Tombstone");
1736        };
1737        assert_eq!(t.target, lt_id, "target must round-trip via r column");
1738        assert_eq!(t.expiration_policy, expiration_policy);
1739        match backend.get_tiered_object(&hv_id).await? {
1740            TieredGet::Tombstone(t) => assert_eq!(t.target, lt_id, "round-trip via r column"),
1741            other => panic!("expected TieredGet::Tombstone, got {other:?}"),
1742        }
1743
1744        // Legacy reads must error rather than leak tombstone data.
1745        assert!(matches!(
1746            backend.get_object(&hv_id).await,
1747            Err(Error::UnexpectedTombstone)
1748        ));
1749        assert!(matches!(
1750            backend.get_metadata(&hv_id).await,
1751            Err(Error::UnexpectedTombstone)
1752        ));
1753
1754        // Idempotent retry: retry with the same target succeeds
1755        let second = backend
1756            .compare_and_write(&hv_id, None, TieredWrite::Tombstone(tombstone))
1757            .await?;
1758        assert!(second, "idempotent retry");
1759
1760        Ok(())
1761    }
1762
1763    /// Swapping a tombstone target: wrong expected → false, correct expected → true.
1764    #[tokio::test]
1765    async fn test_cas_swap_tombstone() -> Result<()> {
1766        let backend = create_test_backend().await?;
1767
1768        let hv_id = make_id();
1769        let old_lt_id = ObjectId::random(hv_id.context().clone());
1770        let wrong_lt_id = ObjectId::random(hv_id.context().clone());
1771        let new_lt_id = ObjectId::random(hv_id.context().clone());
1772
1773        let tombstone = Tombstone {
1774            target: old_lt_id.clone(),
1775            expiration_policy: ExpirationPolicy::Manual,
1776        };
1777        create_tombstone(&backend, &hv_id, &tombstone, SystemTime::now()).await?;
1778
1779        // Wrong target: CAS fails, tombstone unchanged.
1780        let write = TieredWrite::Tombstone(Tombstone {
1781            target: new_lt_id.clone(),
1782            expiration_policy: ExpirationPolicy::Manual,
1783        });
1784        let swapped = backend
1785            .compare_and_write(&hv_id, Some(&wrong_lt_id), write.clone())
1786            .await?;
1787        assert!(!swapped, "expected CAS failure due to wrong target");
1788        match backend.get_tiered_metadata(&hv_id).await? {
1789            TieredMetadata::Tombstone(t) => assert_eq!(t.target, old_lt_id),
1790            other => panic!("expected tombstone, got {other:?}"),
1791        }
1792
1793        // Correct target: CAS succeeds, target updated.
1794        let swapped = backend
1795            .compare_and_write(&hv_id, Some(&old_lt_id), write.clone())
1796            .await?;
1797        assert!(swapped, "expected CAS success with correct target");
1798        match backend.get_tiered_metadata(&hv_id).await? {
1799            TieredMetadata::Tombstone(t) => assert_eq!(t.target, new_lt_id),
1800            other => panic!("expected tombstone, got {other:?}"),
1801        }
1802
1803        // Idempotent retry: same A→B swap returns true.
1804        let retry = backend
1805            .compare_and_write(&hv_id, Some(&old_lt_id), write)
1806            .await?;
1807        assert!(retry, "idempotent retry");
1808
1809        Ok(())
1810    }
1811
1812    /// Swapping a tombstone for inline object data: wrong expected → false, correct → true.
1813    #[tokio::test]
1814    async fn test_cas_swap_inline() -> Result<()> {
1815        let backend = create_test_backend().await?;
1816
1817        let id = make_id();
1818        let lt_id = ObjectId::random(id.context().clone());
1819        let wrong_id = ObjectId::random(id.context().clone());
1820
1821        let tombstone = Tombstone {
1822            target: lt_id.clone(),
1823            expiration_policy: ExpirationPolicy::Manual,
1824        };
1825        create_tombstone(&backend, &id, &tombstone, SystemTime::now()).await?;
1826
1827        // Wrong target: CAS fails, tombstone intact.
1828        let write = TieredWrite::Object(Metadata::default(), Bytes::new());
1829        let swapped = backend
1830            .compare_and_write(&id, Some(&wrong_id), write)
1831            .await?;
1832        assert!(!swapped, "expected CAS failure with wrong target");
1833        assert!(matches!(
1834            backend.get_tiered_metadata(&id).await?,
1835            TieredMetadata::Tombstone(_)
1836        ));
1837
1838        // Correct target: CAS succeeds, row becomes an inline object.
1839        let payload = Bytes::from_static(b"hello inline");
1840        let write = TieredWrite::Object(Metadata::default(), payload.clone());
1841        let swapped = backend
1842            .compare_and_write(&id, Some(&lt_id), write.clone())
1843            .await?;
1844        assert!(swapped, "expected CAS success with correct target");
1845        let TieredGet::Object(_, stream) = backend.get_tiered_object(&id).await? else {
1846            panic!("expected inline object after swap");
1847        };
1848        assert_eq!(&stream::read_to_vec(stream).await?, payload.as_ref());
1849
1850        // Idempotent retry: row is already inline (no tombstone), same CAS returns true.
1851        let retry = backend.compare_and_write(&id, Some(&lt_id), write).await?;
1852        assert!(retry, "idempotent retry");
1853
1854        Ok(())
1855    }
1856
1857    /// CAS-write an object onto an empty row (expected=None, write=Object) succeeds.
1858    #[tokio::test]
1859    async fn test_cas_create_object_on_empty_row() -> Result<()> {
1860        let backend = create_test_backend().await?;
1861
1862        let id = make_id();
1863        let payload = Bytes::from_static(b"cas object");
1864        let write = TieredWrite::Object(Metadata::default(), payload.clone());
1865        let committed = backend.compare_and_write(&id, None, write).await?;
1866        assert!(committed, "expected CAS success on empty row");
1867
1868        let TieredGet::Object(_, stream) = backend.get_tiered_object(&id).await? else {
1869            panic!("expected Object after CAS-create");
1870        };
1871        assert_eq!(&stream::read_to_vec(stream).await?, payload.as_ref());
1872
1873        Ok(())
1874    }
1875
1876    /// CAS-delete: wrong expected → false; correct expected → true, row gone.
1877    #[tokio::test]
1878    async fn test_cas_delete() -> Result<()> {
1879        let backend = create_test_backend().await?;
1880
1881        let id = make_id();
1882        let lt_id = ObjectId::random(id.context().clone());
1883        let wrong_id = ObjectId::random(id.context().clone());
1884
1885        let tombstone = Tombstone {
1886            target: lt_id.clone(),
1887            expiration_policy: ExpirationPolicy::Manual,
1888        };
1889        create_tombstone(&backend, &id, &tombstone, SystemTime::now()).await?;
1890
1891        // Wrong target: fails, row preserved.
1892        let deleted = backend
1893            .compare_and_write(&id, Some(&wrong_id), TieredWrite::Delete)
1894            .await?;
1895        assert!(!deleted, "expected CAS failure with wrong target");
1896        assert!(matches!(
1897            backend.get_tiered_metadata(&id).await?,
1898            TieredMetadata::Tombstone(_)
1899        ));
1900
1901        // Correct target: succeeds, row gone.
1902        let deleted = backend
1903            .compare_and_write(&id, Some(&lt_id), TieredWrite::Delete)
1904            .await?;
1905        assert!(deleted, "expected CAS delete success");
1906        assert!(matches!(
1907            backend.get_tiered_metadata(&id).await?,
1908            TieredMetadata::NotFound
1909        ));
1910
1911        // Idempotent retry: row is already absent (no tombstone), same delete returns true.
1912        let retry = backend
1913            .compare_and_write(&id, Some(&lt_id), TieredWrite::Delete)
1914            .await?;
1915        assert!(retry, "idempotent retry");
1916
1917        // Inline object replaced tombstone: Safe to delete since it is an idempotent operation.
1918        let id2 = make_id();
1919        let fake_lt_id = ObjectId::random(id2.context().clone());
1920        let metadata = Metadata::default();
1921        create_object(&backend, &id2, &metadata, b"data", SystemTime::now()).await?;
1922        let deleted = backend
1923            .compare_and_write(&id2, Some(&fake_lt_id), TieredWrite::Delete)
1924            .await?;
1925        assert!(deleted, "expected idempotent deletion");
1926
1927        Ok(())
1928    }
1929
1930    // --- Section 5: Legacy Tombstone Compatibility ---
1931
1932    /// Legacy Manual and TTL tombstones are correctly read via the tiered APIs.
1933    ///
1934    /// Uses `Manual` expiration so `timestamp_micros = -1` (server-assigned ≈ write time)
1935    /// does not trigger immediate expiry.
1936    #[tokio::test]
1937    async fn test_legacy_tombstone_reads() -> Result<()> {
1938        let backend = create_test_backend().await?;
1939
1940        // Manual policy: get_tiered_metadata returns Tombstone(Manual), get_tiered_object returns Tombstone.
1941        let id = make_id();
1942        write_legacy_tombstone(&backend, &id, ExpirationPolicy::Manual, None).await?;
1943
1944        let TieredMetadata::Tombstone(t) = backend.get_tiered_metadata(&id).await? else {
1945            panic!("expected tombstone");
1946        };
1947        assert_eq!(t.expiration_policy, ExpirationPolicy::Manual);
1948        assert!(matches!(
1949            backend.get_tiered_object(&id).await?,
1950            TieredGet::Tombstone(_)
1951        ));
1952
1953        // TTL policy: get_tiered_metadata returns Tombstone with the correct TTL policy.
1954        //
1955        // A future cell timestamp (now + TTL) is required so `expires_before` does not
1956        // immediately filter the row.
1957        let id = make_id();
1958        let ttl = Duration::from_secs(2 * 24 * 3600);
1959        write_legacy_tombstone(&backend, &id, ExpirationPolicy::TimeToLive(ttl), None).await?;
1960
1961        let TieredMetadata::Tombstone(t) = backend.get_tiered_metadata(&id).await? else {
1962            panic!("expected TieredMetadata::Tombstone");
1963        };
1964        assert_eq!(t.expiration_policy, ExpirationPolicy::TimeToLive(ttl));
1965
1966        Ok(())
1967    }
1968
1969    /// A legacy tombstone with TTI policy is upgraded to the new `r`/`t` column format on read.
1970    ///
1971    /// The bump path calls `put_tombstone_row`, which rewrites the row with `r` + `t` columns.
1972    /// The upgraded row has a fresh cell timestamp (≈ now + TTI), so `time_expires` increases.
1973    #[tokio::test]
1974    async fn test_legacy_tombstone_tti_upgrade() -> Result<()> {
1975        let backend = create_test_backend().await?;
1976        let id = make_id();
1977        let path = id.as_storage_path().to_string().into_bytes();
1978
1979        let tti = Duration::from_secs(2 * 24 * 3600); // must exceed TTI_DEBOUNCE (1 day)
1980
1981        // Place time_expires just inside the bump window: past `now + tti - TTI_DEBOUNCE`
1982        // but still in the future so `expires_before(now)` does not filter the row.
1983        let old_deadline = SystemTime::now() + tti - TTI_DEBOUNCE - Duration::from_secs(60);
1984        write_legacy_tombstone(
1985            &backend,
1986            &id,
1987            ExpirationPolicy::TimeToIdle(tti),
1988            Some(old_deadline),
1989        )
1990        .await?;
1991
1992        // First read detects the stale TTI and triggers `put_tombstone_row`.
1993        let TieredMetadata::Tombstone(_) = backend.get_tiered_metadata(&id).await? else {
1994            panic!("expected tombstone");
1995        };
1996
1997        // After the bump, the row is rewritten with a fresh timestamp (≈ now + TTI).
1998        let new_deadline = match backend.read_row(&path, None, "test-verify").await? {
1999            Some(RowData::Tombstone { time_expires, .. }) => time_expires.unwrap(),
2000            _ => panic!("expected tombstone row after bump"),
2001        };
2002
2003        assert!(
2004            new_deadline > old_deadline,
2005            "TTI bump should extend tombstone expiry: {old_deadline:?} -> {new_deadline:?}"
2006        );
2007
2008        Ok(())
2009    }
2010
2011    /// Legacy tombstones are handled correctly by all conditional write operations.
2012    ///
2013    /// Covers: `put_non_tombstone`, `delete_non_tombstone`, CAS-delete for both the
2014    /// legacy-metadata format and the empty-redirect format.
2015    #[tokio::test]
2016    async fn test_legacy_tombstone_conditional_ops() -> Result<()> {
2017        let backend = create_test_backend().await?;
2018
2019        // put_non_tombstone returns Some(target == id) for a legacy tombstone.
2020        let id = make_id();
2021        write_legacy_tombstone(&backend, &id, ExpirationPolicy::Manual, None).await?;
2022        let t_opt = backend
2023            .put_non_tombstone(&id, &Metadata::default(), Bytes::new())
2024            .await?;
2025        assert_eq!(t_opt.map(|t| t.target).as_ref(), Some(&id));
2026
2027        // delete_non_tombstone returns Some(target == id) for a legacy tombstone.
2028        let id = make_id();
2029        write_legacy_tombstone(&backend, &id, ExpirationPolicy::Manual, None).await?;
2030        let t_opt = backend.delete_non_tombstone(&id).await?;
2031        assert_eq!(t_opt.map(|t| t.target).as_ref(), Some(&id));
2032
2033        // CAS-delete succeeds on a legacy-metadata tombstone (target resolves to hv_id).
2034        let id = make_id();
2035        write_legacy_tombstone(&backend, &id, ExpirationPolicy::Manual, None).await?;
2036        let deleted = backend
2037            .compare_and_write(&id, Some(&id), TieredWrite::Delete)
2038            .await?;
2039        assert!(
2040            deleted,
2041            "CAS-delete must succeed on legacy-metadata tombstone"
2042        );
2043        assert!(matches!(
2044            backend.get_tiered_metadata(&id).await?,
2045            TieredMetadata::NotFound
2046        ));
2047
2048        // CAS-delete succeeds on an empty-redirect tombstone (target resolves to hv_id).
2049        let id = make_id();
2050        write_empty_redirect_tombstone(&backend, &id).await?;
2051        let deleted = backend
2052            .compare_and_write(&id, Some(&id), TieredWrite::Delete)
2053            .await?;
2054        assert!(
2055            deleted,
2056            "CAS-delete must succeed on empty-redirect tombstone"
2057        );
2058        assert!(matches!(
2059            backend.get_tiered_metadata(&id).await?,
2060            TieredMetadata::NotFound
2061        ));
2062
2063        Ok(())
2064    }
2065
2066    /// An empty `r` value falls back to the HV id when resolving the tombstone target.
2067    #[tokio::test]
2068    async fn test_empty_redirect_falls_back_to_hv_id() -> Result<()> {
2069        let backend = create_test_backend().await?;
2070        let id = make_id();
2071
2072        write_empty_redirect_tombstone(&backend, &id).await?;
2073        match backend.get_tiered_metadata(&id).await? {
2074            TieredMetadata::Tombstone(t) => assert_eq!(t.target, id, "must fall back to hv_id"),
2075            other => panic!("expected tombstone, got {other:?}"),
2076        }
2077
2078        Ok(())
2079    }
2080
2081    // --- Section 6: Expired Tombstone Handling ---
2082
2083    /// CAS with `current=None` must succeed when the row holds an expired
2084    /// tombstone. The physical row still exists but is logically gone.
2085    #[tokio::test]
2086    async fn test_cas_create_tombstone_over_expired() -> Result<()> {
2087        let backend = create_test_backend().await?;
2088
2089        let id = make_id();
2090        let old_lt_id = ObjectId::random(id.context().clone());
2091        let old_tombstone = Tombstone {
2092            target: old_lt_id,
2093            expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_secs(0)),
2094        };
2095        create_tombstone(&backend, &id, &old_tombstone, SystemTime::now()).await?;
2096
2097        let new_lt_id = ObjectId::random(id.context().clone());
2098        let new_tombstone = Tombstone {
2099            target: new_lt_id.clone(),
2100            expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_secs(3600)),
2101        };
2102        let committed = backend
2103            .compare_and_write(&id, None, TieredWrite::Tombstone(new_tombstone))
2104            .await?;
2105        assert!(
2106            committed,
2107            "CAS with current=None must succeed over an expired tombstone"
2108        );
2109
2110        let TieredMetadata::Tombstone(t) = backend.get_tiered_metadata(&id).await? else {
2111            panic!("expected new tombstone to be readable");
2112        };
2113        assert_eq!(t.target, new_lt_id);
2114
2115        Ok(())
2116    }
2117
2118    /// `put_non_tombstone` must succeed when the row holds only an expired
2119    /// tombstone — the expired row is logically absent.
2120    #[tokio::test]
2121    async fn test_put_non_tombstone_over_expired() -> Result<()> {
2122        let backend = create_test_backend().await?;
2123
2124        let id = make_id();
2125        let lt_id = ObjectId::random(id.context().clone());
2126        let tombstone = Tombstone {
2127            target: lt_id,
2128            expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_secs(0)),
2129        };
2130        create_tombstone(&backend, &id, &tombstone, SystemTime::now()).await?;
2131
2132        let result = backend
2133            .put_non_tombstone(&id, &Metadata::default(), Bytes::from_static(b"data"))
2134            .await?;
2135        assert_eq!(
2136            result, None,
2137            "put_non_tombstone must succeed (return None) over an expired tombstone"
2138        );
2139
2140        let (_, stream) = backend.get_object(&id).await?.unwrap();
2141        assert_eq!(&stream::read_to_vec(stream).await?, b"data");
2142
2143        Ok(())
2144    }
2145}