1use std::fmt;
32use std::future::Future;
33use std::sync::Arc;
34use std::time::{Duration, SystemTime};
35
36use bigtable_rs::bigtable::{BigTableConnection, Error as BigTableError, RowCell};
37use bigtable_rs::google::bigtable::v2::{self, mutation};
38use bytes::Bytes;
39use futures_util::TryStreamExt;
40use objectstore_types::metadata::{ExpirationPolicy, Metadata};
41use serde::{Deserialize, Serialize};
42use tonic::Code;
43
44use crate::backend::common::{
45 Backend, DeleteResponse, GetResponse, HighVolumeBackend, MetadataResponse, PutResponse,
46 TieredGet, TieredMetadata, TieredWrite, Tombstone,
47};
48use crate::error::{Error, Result};
49use crate::gcp_auth::PrefetchingTokenProvider;
50use crate::id::ObjectId;
51use crate::stream::{ChunkedBytes, ClientStream};
52
53#[derive(Debug, Clone, Deserialize, Serialize)]
75pub struct BigTableConfig {
76 pub endpoint: Option<String>,
89
90 pub project_id: String,
98
99 pub instance_name: String,
105
106 pub table_name: String,
114
115 pub connections: Option<usize>,
125}
126
127const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
129const MAX_CHANNEL_AGE: Option<Duration> = Some(Duration::from_mins(50));
137const TTI_DEBOUNCE: Duration = Duration::from_secs(24 * 3600); const TOKEN_SCOPES: &[&str] = &["https://www.googleapis.com/auth/bigtable.data"];
141
142const REQUEST_RETRY_COUNT: usize = 2;
144const CAS_RETRY_COUNT: usize = 3;
146
147const COLUMN_PAYLOAD: &[u8] = b"p";
149const COLUMN_METADATA: &[u8] = b"m";
151const COLUMN_REDIRECT: &[u8] = b"r";
153const COLUMN_TOMBSTONE_META: &[u8] = b"t";
155const FILTER_META: &[u8] = b"^[mrt]$";
157
158const FAMILY_GC: &str = "fg";
163const FAMILY_MANUAL: &str = "fm";
165
166pub struct BigTableBackend {
168 bigtable: BigTableConnection,
169
170 instance_path: String,
171 table_path: String,
172 table_name: String,
173}
174
175impl fmt::Debug for BigTableBackend {
176 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
177 f.debug_struct("BigTableBackend")
178 .field("instance_path", &self.instance_path)
179 .field("table_path", &self.table_path)
180 .field("table_name", &self.table_name)
181 .finish_non_exhaustive()
182 }
183}
184
185fn column_filter(column: &[u8]) -> v2::RowFilter {
187 v2::RowFilter {
188 filter: Some(v2::row_filter::Filter::ColumnQualifierRegexFilter(
189 [b"^", column, b"$"].concat(),
190 )),
191 }
192}
193
194fn legacy_tombstone_filter() -> v2::RowFilter {
199 v2::RowFilter {
200 filter: Some(v2::row_filter::Filter::Chain(v2::row_filter::Chain {
201 filters: vec![
202 column_filter(COLUMN_METADATA),
203 v2::RowFilter {
204 filter: Some(v2::row_filter::Filter::ValueRegexFilter(
205 b"^\\{\"is_redirect_tombstone\":true[,}].*".to_vec(),
206 )),
207 },
208 ],
209 })),
210 }
211}
212
213fn tombstone_filter() -> v2::RowFilter {
221 v2::RowFilter {
222 filter: Some(v2::row_filter::Filter::Interleave(
223 v2::row_filter::Interleave {
224 filters: vec![
225 column_filter(COLUMN_REDIRECT),
227 legacy_tombstone_filter(),
229 ],
230 },
231 )),
232 }
233}
234
235fn tombstone_predicate() -> MutatePredicate {
241 MutatePredicate::Exclude(tombstone_filter())
242}
243
244fn exact_value_regex(value: &str) -> Vec<u8> {
249 format!("^{}$", regex::escape(value)).into_bytes()
250}
251
252fn redirect_target_filter(target: &ObjectId, own_id: &ObjectId) -> v2::RowFilter {
270 let target_path = exact_value_regex(&target.as_storage_path().to_string());
271
272 let exact_match = v2::RowFilter {
273 filter: Some(v2::row_filter::Filter::Chain(v2::row_filter::Chain {
274 filters: vec![
275 column_filter(COLUMN_REDIRECT),
276 v2::RowFilter {
277 filter: Some(v2::row_filter::Filter::ValueRegexFilter(target_path)),
278 },
279 ],
280 })),
281 };
282
283 if target != own_id {
284 return exact_match;
285 }
286
287 let empty_redirect_match = v2::RowFilter {
288 filter: Some(v2::row_filter::Filter::Chain(v2::row_filter::Chain {
289 filters: vec![
290 column_filter(COLUMN_REDIRECT),
291 v2::RowFilter {
292 filter: Some(v2::row_filter::Filter::ValueRegexFilter(b"^$".to_vec())),
293 },
294 ],
295 })),
296 };
297
298 v2::RowFilter {
302 filter: Some(v2::row_filter::Filter::Interleave(
303 v2::row_filter::Interleave {
304 filters: vec![exact_match, empty_redirect_match, legacy_tombstone_filter()],
305 },
306 )),
307 }
308}
309
310fn update_predicate(old: &ObjectId, new: &ObjectId, own_id: &ObjectId) -> MutatePredicate {
317 MutatePredicate::Include(v2::RowFilter {
318 filter: Some(v2::row_filter::Filter::Interleave(
319 v2::row_filter::Interleave {
320 filters: vec![
321 redirect_target_filter(old, own_id),
322 redirect_target_filter(new, own_id),
323 ],
324 },
325 )),
326 })
327}
328
329fn optional_target_predicate(target: &ObjectId, own_id: &ObjectId) -> MutatePredicate {
341 MutatePredicate::Exclude(v2::RowFilter {
342 filter: Some(v2::row_filter::Filter::Condition(Box::new(
343 v2::row_filter::Condition {
344 predicate_filter: Some(Box::new(redirect_target_filter(target, own_id))),
345 true_filter: Some(Box::new(v2::RowFilter {
346 filter: Some(v2::row_filter::Filter::BlockAllFilter(true)),
347 })),
348 false_filter: Some(Box::new(tombstone_filter())),
349 },
350 ))),
351 })
352}
353
354#[derive(Clone, Debug)]
359enum MutatePredicate {
360 Include(v2::RowFilter),
364 Exclude(v2::RowFilter),
368}
369
370fn metadata_filter() -> v2::RowFilter {
375 v2::RowFilter {
376 filter: Some(v2::row_filter::Filter::ColumnQualifierRegexFilter(
377 FILTER_META.to_owned(),
378 )),
379 }
380}
381
382fn mutation(mutation: mutation::Mutation) -> v2::Mutation {
383 v2::Mutation {
384 mutation: Some(mutation),
385 }
386}
387
388fn delete_row_mutation() -> v2::Mutation {
390 mutation(mutation::Mutation::DeleteFromRow(
391 mutation::DeleteFromRow {},
392 ))
393}
394
395fn object_mutations(
401 metadata: &Metadata,
402 payload: Vec<u8>,
403 now: SystemTime,
404) -> Result<[v2::Mutation; 3]> {
405 let (family, timestamp_micros) = match metadata.expiration_policy {
406 ExpirationPolicy::Manual => (FAMILY_MANUAL, -1),
407 ExpirationPolicy::TimeToLive(ttl) => (FAMILY_GC, ttl_to_micros(ttl, now)?),
408 ExpirationPolicy::TimeToIdle(tti) => (FAMILY_GC, ttl_to_micros(tti, now)?),
409 };
410
411 let metadata_bytes = serde_json::to_vec(metadata)
412 .map_err(|cause| Error::serde("failed to serialize metadata", cause))?;
413
414 Ok([
415 delete_row_mutation(),
417 mutation(mutation::Mutation::SetCell(mutation::SetCell {
418 family_name: family.to_owned(),
419 column_qualifier: COLUMN_PAYLOAD.to_owned(),
420 timestamp_micros,
421 value: payload,
422 })),
423 mutation(mutation::Mutation::SetCell(mutation::SetCell {
424 family_name: family.to_owned(),
425 column_qualifier: COLUMN_METADATA.to_owned(),
426 timestamp_micros,
427 value: metadata_bytes,
428 })),
429 ])
430}
431
432#[derive(Clone, Debug, Default, Deserialize, Serialize)]
437struct TombstoneMeta {
438 #[serde(default, skip_serializing_if = "ExpirationPolicy::is_manual")]
442 expiration_policy: ExpirationPolicy,
443}
444
445fn tombstone_mutations(tombstone: &Tombstone, now: SystemTime) -> Result<[v2::Mutation; 3]> {
451 let (family, timestamp_micros) = match tombstone.expiration_policy {
452 ExpirationPolicy::Manual => (FAMILY_MANUAL, -1),
453 ExpirationPolicy::TimeToLive(ttl) => (FAMILY_GC, ttl_to_micros(ttl, now)?),
454 ExpirationPolicy::TimeToIdle(tti) => (FAMILY_GC, ttl_to_micros(tti, now)?),
455 };
456
457 let tombstone_meta = TombstoneMeta {
458 expiration_policy: tombstone.expiration_policy,
459 };
460
461 Ok([
462 delete_row_mutation(),
463 mutation(mutation::Mutation::SetCell(mutation::SetCell {
464 family_name: family.to_owned(),
465 column_qualifier: COLUMN_REDIRECT.to_owned(),
466 timestamp_micros,
467 value: tombstone.target.as_storage_path().to_string().into_bytes(),
468 })),
469 mutation(mutation::Mutation::SetCell(mutation::SetCell {
470 family_name: family.to_owned(),
471 column_qualifier: COLUMN_TOMBSTONE_META.to_owned(),
472 timestamp_micros,
473 value: serde_json::to_vec(&tombstone_meta)
474 .map_err(|cause| Error::serde("failed to serialize tombstone", cause))?,
475 })),
476 ])
477}
478
479#[derive(Debug, Deserialize)]
483struct LegacyTombstoneMeta {
484 #[serde(default)]
491 is_redirect_tombstone: bool,
492
493 #[serde(default)]
495 expiration_policy: ExpirationPolicy,
496}
497
498enum RowData {
500 Object {
502 metadata: Metadata,
503 payload: Vec<u8>,
504 },
505 Tombstone {
507 target: Vec<u8>,
508 meta: TombstoneMeta,
509 time_expires: Option<SystemTime>,
510 },
511}
512
513impl RowData {
514 fn from_cells(cells: Vec<RowCell>) -> Result<Self> {
521 let mut metadata_opt: Option<Metadata> = None;
522 let mut tombstone_meta_opt: Option<TombstoneMeta> = None;
523 let mut redirect_detected = false;
524 let mut redirect_target = Vec::new();
525 let mut expire_at = None;
526 let mut payload = Vec::new();
527
528 for cell in cells {
529 expire_at = micros_to_time(cell.timestamp_micros);
531
532 match cell.qualifier.as_slice() {
533 COLUMN_REDIRECT => {
534 redirect_detected = true;
535 redirect_target = cell.value;
536 }
537 COLUMN_PAYLOAD => {
538 payload = cell.value;
539 }
540 COLUMN_TOMBSTONE_META => {
541 tombstone_meta_opt =
542 Some(serde_json::from_slice(&cell.value).map_err(|cause| {
543 Error::serde("failed to deserialize tombstone meta", cause)
544 })?);
545 }
546 COLUMN_METADATA => {
547 if let Ok(legacy_meta) =
548 serde_json::from_slice::<LegacyTombstoneMeta>(&cell.value)
549 && legacy_meta.is_redirect_tombstone
550 {
551 redirect_detected = true;
552 objectstore_metrics::count!("bigtable.legacy_tombstone_read");
553 tombstone_meta_opt = Some(TombstoneMeta {
554 expiration_policy: legacy_meta.expiration_policy,
555 });
556 } else {
557 metadata_opt =
558 Some(serde_json::from_slice(&cell.value).map_err(|cause| {
559 Error::serde("failed to deserialize metadata", cause)
560 })?);
561 }
562 }
563 _ => {}
564 }
565 }
566
567 Ok(if redirect_detected {
568 RowData::Tombstone {
569 target: redirect_target,
570 meta: tombstone_meta_opt.unwrap_or_default(),
571 time_expires: expire_at,
572 }
573 } else {
574 let mut metadata = metadata_opt.unwrap_or_default();
576 metadata.time_expires = expire_at;
577 RowData::Object { metadata, payload }
578 })
579 }
580
581 fn expiration_policy(&self) -> ExpirationPolicy {
583 match self {
584 RowData::Object { metadata, .. } => metadata.expiration_policy,
585 RowData::Tombstone { meta, .. } => meta.expiration_policy,
586 }
587 }
588
589 fn time_expires(&self) -> Option<SystemTime> {
591 match self {
592 RowData::Object { metadata, .. } => metadata.time_expires,
593 RowData::Tombstone { time_expires, .. } => *time_expires,
594 }
595 }
596
597 fn expires_before(&self, time: SystemTime) -> bool {
601 self.expiration_policy().is_timeout() && self.time_expires().is_some_and(|ts| ts < time)
602 }
603
604 fn needs_tti_bump(&self) -> bool {
606 matches!(
607 self.expiration_policy(),
608 ExpirationPolicy::TimeToIdle(tti) if self.expires_before(SystemTime::now() + tti - TTI_DEBOUNCE)
609 )
610 }
611}
612
613fn parse_redirect_target(redirect_path: &[u8], tombstone_id: &ObjectId) -> Result<ObjectId> {
619 if redirect_path.is_empty() {
620 objectstore_metrics::count!("bigtable.empty_redirect_read");
621 Ok(tombstone_id.clone())
622 } else {
623 let redirect_str = std::str::from_utf8(redirect_path)
624 .map_err(|_| Error::generic("invalid UTF-8 in redirect path"))?;
625 ObjectId::from_storage_path(redirect_str)
626 .ok_or_else(|| Error::generic("corrupt redirect path"))
627 }
628}
629
630impl BigTableBackend {
631 pub async fn new(config: BigTableConfig) -> anyhow::Result<Self> {
636 let BigTableConfig {
637 endpoint,
638 project_id,
639 instance_name,
640 table_name,
641 connections,
642 } = config;
643
644 let bigtable = if let Some(ref endpoint) = endpoint {
645 BigTableConnection::new_with_emulator(
646 endpoint,
647 &project_id,
648 &instance_name,
649 false, Some(CONNECT_TIMEOUT),
651 )?
652 } else {
653 let token_provider = PrefetchingTokenProvider::gcp_auth(TOKEN_SCOPES).await?;
654 BigTableConnection::new_with_managed_transport(
655 &project_id,
656 &instance_name,
657 false, Some(CONNECT_TIMEOUT),
659 Arc::new(token_provider),
660 connections.unwrap_or(1),
661 true, None, MAX_CHANNEL_AGE,
664 )
665 .await?
666 };
667
668 let client = bigtable.client();
669
670 Ok(Self {
671 bigtable,
672 instance_path: format!("projects/{project_id}/instances/{instance_name}"),
673 table_path: client.get_full_table_name(&table_name),
674 table_name,
675 })
676 }
677
678 async fn read_row(
682 &self,
683 path: &[u8],
684 filter: Option<v2::RowFilter>,
685 action: &'static str,
686 ) -> Result<Option<RowData>> {
687 let request = v2::ReadRowsRequest {
688 table_name: self.table_path.clone(),
689 rows: Some(v2::RowSet {
690 row_keys: vec![path.to_owned()],
691 row_ranges: vec![],
692 }),
693 filter,
694 rows_limit: 1,
695 ..Default::default()
696 };
697
698 let response = retry(action, || async {
699 self.bigtable.client().read_rows(request.clone()).await
700 })
701 .await?;
702 debug_assert!(response.len() <= 1, "Expected at most one row");
703
704 let Some((_, cells)) = response.into_iter().next() else {
705 objectstore_log::debug!("Object not found");
706 return Ok(None);
707 };
708
709 let row = RowData::from_cells(cells)?;
710 Ok(if row.expires_before(SystemTime::now()) {
711 None
712 } else {
713 Some(row)
714 })
715 }
716
717 async fn mutate(
718 &self,
719 path: Vec<u8>,
720 mutations: impl Into<Vec<v2::Mutation>>,
721 action: &'static str,
722 ) -> Result<v2::MutateRowResponse> {
723 let request = v2::MutateRowRequest {
724 table_name: self.table_path.clone(),
725 row_key: path,
726 mutations: mutations.into(),
727 ..Default::default()
728 };
729
730 let response = retry(action, || async {
731 self.bigtable.client().mutate_row(request.clone()).await
732 })
733 .await?;
734
735 Ok(response.into_inner())
736 }
737
738 async fn put_row(
739 &self,
740 path: Vec<u8>,
741 metadata: &Metadata,
742 payload: Vec<u8>,
743 action: &'static str,
744 ) -> Result<v2::MutateRowResponse> {
745 let mutations = object_mutations(metadata, payload, SystemTime::now())?;
746 self.mutate(path, mutations, action).await
747 }
748
749 async fn put_tombstone_row(
750 &self,
751 path: Vec<u8>,
752 tombstone: &Tombstone,
753 action: &'static str,
754 ) -> Result<v2::MutateRowResponse> {
755 let mutations = tombstone_mutations(tombstone, SystemTime::now())?;
756 self.mutate(path, mutations, action).await
757 }
758
759 async fn bump_tti(&self, path: Vec<u8>, row: &RowData, loaded: bool, hv_id: &ObjectId) {
763 let expiration_policy = row.expiration_policy();
764
765 match row {
766 RowData::Tombstone { target, .. } => {
767 let target = match parse_redirect_target(target, hv_id) {
768 Ok(target) => target,
769 Err(e) => {
770 objectstore_log::error!(!!&e, "invalid redirect target in tombstone row");
771 return;
772 }
773 };
774
775 let tombstone = Tombstone {
776 target,
777 expiration_policy,
778 };
779 let _ = self.put_tombstone_row(path, &tombstone, "tti-bump").await;
780 }
781 RowData::Object { metadata, payload } if loaded => {
782 let _ = self
783 .put_row(path, metadata, payload.clone(), "tti-bump")
784 .await;
785 }
786 RowData::Object { metadata, .. } => {
787 let payload_read = self
788 .read_row(&path, Some(column_filter(COLUMN_PAYLOAD)), "tti-bump")
789 .await;
790
791 if let Ok(Some(RowData::Object { payload, .. })) = payload_read {
792 let _ = self.put_row(path, metadata, payload, "tti-bump").await;
793 }
794 }
795 }
796 }
797
798 async fn check_and_mutate(
800 &self,
801 row_key: Vec<u8>,
802 predicate: MutatePredicate,
803 mutations: impl Into<Vec<v2::Mutation>>,
804 context: &'static str,
805 ) -> Result<bool> {
806 let (filter, true_mutations, false_mutations, success_on_match) = match predicate {
807 MutatePredicate::Include(f) => (f, mutations.into(), vec![], true),
808 MutatePredicate::Exclude(f) => (f, vec![], mutations.into(), false),
809 };
810
811 let request = v2::CheckAndMutateRowRequest {
812 table_name: self.table_path.clone(),
813 row_key,
814 predicate_filter: Some(filter),
815 true_mutations,
816 false_mutations,
817 ..Default::default()
818 };
819
820 let future = retry(context, || async {
821 self.bigtable
822 .client()
823 .check_and_mutate_row(request.clone())
824 .await
825 });
826
827 Ok(future.await?.predicate_matched == success_on_match)
828 }
829}
830
831#[async_trait::async_trait]
832impl Backend for BigTableBackend {
833 fn name(&self) -> &'static str {
834 "bigtable"
835 }
836
837 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
838 async fn put_object(
839 &self,
840 id: &ObjectId,
841 metadata: &Metadata,
842 mut stream: ClientStream,
843 ) -> Result<PutResponse> {
844 objectstore_log::debug!("Writing to Bigtable backend");
845 let path = id.as_storage_path().to_string().into_bytes();
846
847 let mut payload = ChunkedBytes::new(0);
848 while let Some(chunk) = stream.try_next().await? {
849 payload.push(chunk);
850 }
851
852 self.put_row(path, metadata, payload.into_bytes().into(), "put")
853 .await?;
854 Ok(())
855 }
856
857 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
858 async fn get_object(&self, id: &ObjectId) -> Result<GetResponse> {
859 match self.get_tiered_object(id).await? {
860 TieredGet::Object(metadata, payload) => Ok(Some((metadata, payload))),
861 TieredGet::Tombstone(_) => Err(Error::UnexpectedTombstone),
862 TieredGet::NotFound => Ok(None),
863 }
864 }
865
866 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
867 async fn get_metadata(&self, id: &ObjectId) -> Result<MetadataResponse> {
868 match self.get_tiered_metadata(id).await? {
869 TieredMetadata::Object(metadata) => Ok(Some(metadata)),
870 TieredMetadata::Tombstone(_) => Err(Error::UnexpectedTombstone),
871 TieredMetadata::NotFound => Ok(None),
872 }
873 }
874
875 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
876 async fn delete_object(&self, id: &ObjectId) -> Result<DeleteResponse> {
877 objectstore_log::debug!("Deleting from Bigtable backend");
878
879 let path = id.as_storage_path().to_string().into_bytes();
880 self.mutate(path, [delete_row_mutation()], "delete").await?;
881
882 Ok(())
883 }
884}
885
886#[async_trait::async_trait]
887impl HighVolumeBackend for BigTableBackend {
888 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
889 async fn put_non_tombstone(
890 &self,
891 id: &ObjectId,
892 metadata: &Metadata,
893 payload: Bytes,
894 ) -> Result<Option<Tombstone>> {
895 objectstore_log::debug!("Conditional put to Bigtable backend");
896
897 let path = id.as_storage_path().to_string().into_bytes();
898 let mutations = object_mutations(metadata, payload.to_vec(), SystemTime::now())?;
899
900 for _ in 0..CAS_RETRY_COUNT {
901 let write_succeeded = self
902 .check_and_mutate(
903 path.clone(),
904 tombstone_predicate(),
905 mutations.clone(),
906 "put_non_tombstone",
907 )
908 .await?;
909
910 if write_succeeded {
911 return Ok(None);
912 }
913
914 let row = self
916 .read_row(&path, Some(metadata_filter()), "put_non_tombstone")
917 .await?;
918
919 match row {
920 Some(RowData::Tombstone { target, meta, .. }) => {
921 return Ok(Some(Tombstone {
922 target: parse_redirect_target(&target, id)?,
923 expiration_policy: meta.expiration_policy,
924 }));
925 }
926 Some(RowData::Object { .. }) => continue,
928 None => continue,
930 }
931 }
932
933 Err(Error::generic("BigTable: race loop in put_non_tombstone"))
934 }
935
936 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
937 async fn get_tiered_object(&self, id: &ObjectId) -> Result<TieredGet> {
938 objectstore_log::debug!("Reading from Bigtable backend");
939 let path = id.as_storage_path().to_string().into_bytes();
940
941 let Some(row) = self.read_row(&path, None, "get_tiered_object").await? else {
942 return Ok(TieredGet::NotFound);
943 };
944
945 if row.needs_tti_bump() {
946 self.bump_tti(path.clone(), &row, true, id).await;
947 }
948
949 Ok(match row {
950 RowData::Tombstone { meta, target, .. } => TieredGet::Tombstone(Tombstone {
951 target: parse_redirect_target(&target, id)?,
952 expiration_policy: meta.expiration_policy,
953 }),
954 RowData::Object { metadata, payload } => {
955 let mut metadata = metadata;
956 metadata.size = Some(payload.len());
957 TieredGet::Object(metadata, crate::stream::single(payload))
958 }
959 })
960 }
961
962 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
963 async fn get_tiered_metadata(&self, id: &ObjectId) -> Result<TieredMetadata> {
964 objectstore_log::debug!("Reading metadata from Bigtable backend");
965 let path = id.as_storage_path().to_string().into_bytes();
966
967 let row_opt = self
970 .read_row(&path, Some(metadata_filter()), "get_tiered_metadata")
971 .await?;
972 let Some(row) = row_opt else {
973 return Ok(TieredMetadata::NotFound);
974 };
975
976 if row.needs_tti_bump() {
977 self.bump_tti(path.clone(), &row, false, id).await;
978 }
979
980 Ok(match row {
981 RowData::Tombstone { meta, target, .. } => TieredMetadata::Tombstone(Tombstone {
982 target: parse_redirect_target(&target, id)?,
983 expiration_policy: meta.expiration_policy,
984 }),
985 RowData::Object { metadata, .. } => TieredMetadata::Object(metadata),
986 })
987 }
988
989 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
990 async fn delete_non_tombstone(&self, id: &ObjectId) -> Result<Option<Tombstone>> {
991 objectstore_log::debug!("Conditional delete from Bigtable backend");
992
993 let path = id.as_storage_path().to_string().into_bytes();
994
995 for _ in 0..CAS_RETRY_COUNT {
996 let write_succeeded = self
997 .check_and_mutate(
998 path.clone(),
999 tombstone_predicate(),
1000 [delete_row_mutation()],
1001 "delete_non_tombstone",
1002 )
1003 .await?;
1004
1005 if write_succeeded {
1006 return Ok(None);
1007 }
1008
1009 let row = self
1011 .read_row(&path, Some(metadata_filter()), "delete_non_tombstone")
1012 .await?;
1013
1014 match row {
1015 Some(RowData::Tombstone { target, meta, .. }) => {
1016 return Ok(Some(Tombstone {
1017 target: parse_redirect_target(&target, id)?,
1018 expiration_policy: meta.expiration_policy,
1019 }));
1020 }
1021 Some(RowData::Object { .. }) => continue,
1023 None => return Ok(None),
1025 }
1026 }
1027
1028 Err(Error::generic(
1029 "BigTable: race loop in delete_non_tombstone",
1030 ))
1031 }
1032
1033 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
1034 async fn compare_and_write(
1035 &self,
1036 id: &ObjectId,
1037 current: Option<&ObjectId>,
1038 write: TieredWrite,
1039 ) -> Result<bool> {
1040 objectstore_log::debug!("CAS put to Bigtable backend");
1041
1042 let path = id.as_storage_path().to_string().into_bytes();
1043 let now = SystemTime::now();
1044
1045 let predicate = match (current, write.target()) {
1046 (Some(old), Some(new)) => update_predicate(old, new, id),
1047 (Some(target), None) => optional_target_predicate(target, id),
1048 (None, Some(target)) => optional_target_predicate(target, id),
1049 (None, None) => tombstone_predicate(),
1050 };
1051
1052 let mutations = match write {
1053 TieredWrite::Tombstone(tombstone) => tombstone_mutations(&tombstone, now)?.into(),
1054 TieredWrite::Object(m, p) => object_mutations(&m, p.to_vec(), now)?.into(),
1055 TieredWrite::Delete => vec![delete_row_mutation()],
1056 };
1057
1058 self.check_and_mutate(path, predicate, mutations, "compare_and_write")
1059 .await
1060 }
1061}
1062
1063fn ttl_to_micros(ttl: Duration, from: SystemTime) -> Result<i64> {
1069 let deadline = from.checked_add(ttl).ok_or_else(|| Error::Generic {
1070 context: format!(
1071 "TTL duration overflow: {} plus {}s cannot be represented as SystemTime",
1072 humantime::format_rfc3339_seconds(from),
1073 ttl.as_secs()
1074 ),
1075 cause: None,
1076 })?;
1077 let millis = deadline
1078 .duration_since(SystemTime::UNIX_EPOCH)
1079 .map_err(|e| Error::Generic {
1080 context: format!(
1081 "unable to get duration since UNIX_EPOCH for SystemTime {}",
1082 humantime::format_rfc3339_seconds(deadline)
1083 ),
1084 cause: Some(Box::new(e)),
1085 })?
1086 .as_millis();
1087 (millis * 1000).try_into().map_err(|e| Error::Generic {
1088 context: format!("failed to convert {}ms to i64 microseconds", millis),
1089 cause: Some(Box::new(e)),
1090 })
1091}
1092
1093fn micros_to_time(micros: i64) -> Option<SystemTime> {
1095 let micros = u64::try_from(micros).ok()?;
1096 let duration = Duration::from_micros(micros);
1097 SystemTime::UNIX_EPOCH.checked_add(duration)
1098}
1099
1100async fn retry<T, F>(context: &'static str, f: impl Fn() -> F) -> Result<T>
1102where
1103 F: Future<Output = Result<T, BigTableError>> + Send,
1104{
1105 let mut retry_count = 0usize;
1106
1107 loop {
1108 match f().await {
1109 Ok(res) => return Ok(res),
1110 Err(e) if retry_count >= REQUEST_RETRY_COUNT || !is_retryable(&e) => {
1111 objectstore_metrics::count!("bigtable.failures", action = context);
1112 return Err(Error::Generic {
1113 context: format!("Bigtable: `{context}` failed"),
1114 cause: Some(Box::new(e)),
1115 });
1116 }
1117 Err(e) => {
1118 retry_count += 1;
1119 objectstore_metrics::count!("bigtable.retries", action = context);
1120 objectstore_log::warn!(!!&e, retry_count, context, "Retrying request");
1121 }
1122 }
1123 }
1124}
1125
1126fn is_retryable(error: &BigTableError) -> bool {
1127 match error {
1128 BigTableError::GCPAuthError(_) => true,
1130 BigTableError::TransportError(_) => true,
1132 BigTableError::IoError(_) => true,
1134 BigTableError::TimeoutError(_) => true,
1135
1136 BigTableError::RpcError(status) => match status.code() {
1138 Code::Unavailable => true,
1140 Code::Cancelled => true,
1142 Code::DeadlineExceeded => true,
1143 Code::Unauthenticated => true,
1145 Code::Aborted => true,
1147 Code::Internal => true,
1148 Code::FailedPrecondition => true,
1149 Code::Unknown => true,
1150 _ => false,
1151 },
1152 _ => false,
1153 }
1154}
1155
1156#[cfg(test)]
1157mod tests {
1158 use std::collections::BTreeMap;
1159
1160 use anyhow::Result;
1161 use objectstore_types::scope::{Scope, Scopes};
1162
1163 use super::*;
1164 use crate::id::ObjectContext;
1165 use crate::stream;
1166
1167 async fn create_test_backend() -> Result<BigTableBackend> {
1173 BigTableBackend::new(BigTableConfig {
1174 endpoint: Some("localhost:8086".into()),
1175 project_id: "testing".into(),
1176 instance_name: "objectstore".into(),
1177 table_name: "objectstore".into(),
1178 connections: None,
1179 })
1180 .await
1181 }
1182
1183 fn make_id() -> ObjectId {
1184 ObjectId::random(ObjectContext {
1185 usecase: "testing".into(),
1186 scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
1187 })
1188 }
1189
1190 async fn create_object(
1191 backend: &BigTableBackend,
1192 id: &ObjectId,
1193 metadata: &Metadata,
1194 payload: &[u8],
1195 now: SystemTime,
1196 ) -> Result<()> {
1197 let path = id.as_storage_path().to_string().into_bytes();
1198 let mutations = object_mutations(metadata, payload.to_vec(), now)?;
1199 backend.mutate(path, mutations, "test-setup").await?;
1200 Ok(())
1201 }
1202
1203 async fn create_tombstone(
1204 backend: &BigTableBackend,
1205 id: &ObjectId,
1206 tombstone: &Tombstone,
1207 now: SystemTime,
1208 ) -> Result<()> {
1209 let path = id.as_storage_path().to_string().into_bytes();
1210 let mutations = tombstone_mutations(tombstone, now)?;
1211 backend.mutate(path, mutations, "test-setup").await?;
1212 Ok(())
1213 }
1214
1215 async fn write_legacy_tombstone(
1217 backend: &BigTableBackend,
1218 id: &ObjectId,
1219 expiration_policy: ExpirationPolicy,
1220 time_expires: Option<SystemTime>,
1221 ) -> Result<()> {
1222 let meta = if expiration_policy.is_manual() {
1223 r#"{"is_redirect_tombstone":true}"#.to_owned()
1224 } else {
1225 let policy_json = serde_json::to_string(&expiration_policy).unwrap();
1226 format!(r#"{{"is_redirect_tombstone":true,"expiration_policy":{policy_json}}}"#)
1227 };
1228
1229 let (family, timestamp_micros) = if expiration_policy.is_manual() {
1230 (FAMILY_MANUAL, -1)
1231 } else {
1232 let t =
1233 time_expires.unwrap_or(SystemTime::now() + expiration_policy.expires_in().unwrap());
1234 let timestamp = t
1235 .duration_since(SystemTime::UNIX_EPOCH)
1236 .unwrap()
1237 .as_millis();
1238 (FAMILY_GC, timestamp as i64 * 1000)
1239 };
1240
1241 let path = id.as_storage_path().to_string().into_bytes();
1242 let mutations = [mutation(mutation::Mutation::SetCell(mutation::SetCell {
1243 family_name: family.to_owned(),
1244 column_qualifier: COLUMN_METADATA.to_owned(),
1245 timestamp_micros,
1246 value: meta.into_bytes(),
1247 }))];
1248
1249 backend.mutate(path, mutations, "test-setup").await?;
1250
1251 Ok(())
1252 }
1253
1254 async fn write_empty_redirect_tombstone(
1257 backend: &BigTableBackend,
1258 id: &ObjectId,
1259 ) -> Result<()> {
1260 let path = id.as_storage_path().to_string().into_bytes();
1261 let mutations = [
1262 mutation(mutation::Mutation::SetCell(mutation::SetCell {
1263 family_name: FAMILY_MANUAL.to_owned(),
1264 column_qualifier: COLUMN_REDIRECT.to_owned(),
1265 timestamp_micros: -1,
1266 value: b"".to_vec(), })),
1268 mutation(mutation::Mutation::SetCell(mutation::SetCell {
1269 family_name: FAMILY_MANUAL.to_owned(),
1270 column_qualifier: COLUMN_TOMBSTONE_META.to_owned(),
1271 timestamp_micros: -1,
1272 value: b"{}".to_vec(),
1273 })),
1274 ];
1275
1276 backend.mutate(path, mutations, "test-setup").await?;
1277
1278 Ok(())
1279 }
1280
1281 #[tokio::test]
1285 async fn test_roundtrip() -> Result<()> {
1286 let backend = create_test_backend().await?;
1287
1288 let id = make_id();
1289 let metadata = Metadata {
1290 content_type: "text/plain".into(),
1291 time_created: Some(SystemTime::now()),
1292 custom: BTreeMap::from_iter([("hello".into(), "world".into())]),
1293 ..Default::default()
1294 };
1295
1296 backend
1297 .put_object(&id, &metadata, stream::single("hello, world"))
1298 .await?;
1299
1300 let (obj_meta, stream) = backend.get_object(&id).await?.unwrap();
1301 let payload = stream::read_to_vec(stream).await?;
1302 assert_eq!(payload, b"hello, world");
1303 assert_eq!(obj_meta.content_type, metadata.content_type);
1304 assert_eq!(obj_meta.custom, metadata.custom);
1305
1306 let head_meta = backend.get_metadata(&id).await?.unwrap();
1307 assert_eq!(head_meta.content_type, metadata.content_type);
1308 assert_eq!(head_meta.custom, metadata.custom);
1309
1310 Ok(())
1311 }
1312
1313 #[tokio::test]
1315 async fn test_nonexistent() -> Result<()> {
1316 let backend = create_test_backend().await?;
1317
1318 let id = make_id();
1319 assert!(backend.get_object(&id).await?.is_none());
1320 assert!(backend.get_metadata(&id).await?.is_none());
1321 backend.delete_object(&id).await?;
1322
1323 Ok(())
1324 }
1325
1326 #[tokio::test]
1327 async fn test_overwrite() -> Result<()> {
1328 let backend = create_test_backend().await?;
1329
1330 let id = make_id();
1331 let first_metadata = Metadata {
1332 custom: BTreeMap::from_iter([("invalid".into(), "invalid".into())]),
1333 ..Default::default()
1334 };
1335 create_object(&backend, &id, &first_metadata, b"hello", SystemTime::now()).await?;
1336
1337 let second_metadata = Metadata {
1338 custom: BTreeMap::from_iter([("hello".into(), "world".into())]),
1339 ..Default::default()
1340 };
1341 backend
1342 .put_object(&id, &second_metadata, stream::single("world"))
1343 .await?;
1344
1345 let (meta, stream) = backend.get_object(&id).await?.unwrap();
1346 let payload = stream::read_to_vec(stream).await?;
1347 assert_eq!(payload, b"world");
1348 assert_eq!(meta.custom, second_metadata.custom);
1349
1350 Ok(())
1351 }
1352
1353 #[tokio::test]
1354 async fn test_read_after_delete() -> Result<()> {
1355 let backend = create_test_backend().await?;
1356
1357 let id = make_id();
1358 let metadata = Metadata::default();
1359 create_object(&backend, &id, &metadata, b"hello", SystemTime::now()).await?;
1360 backend.delete_object(&id).await?;
1361
1362 assert!(backend.get_object(&id).await?.is_none());
1363
1364 Ok(())
1365 }
1366
1367 #[tokio::test]
1373 async fn test_tti_bump() -> Result<()> {
1374 let backend = create_test_backend().await?;
1375 let tti = Duration::from_secs(2 * 24 * 3600); let metadata = Metadata {
1378 expiration_policy: ExpirationPolicy::TimeToIdle(tti),
1379 ..Default::default()
1380 };
1381
1382 let past_now = SystemTime::now() - TTI_DEBOUNCE - Duration::from_secs(60);
1385
1386 let id1 = make_id();
1388 create_object(&backend, &id1, &metadata, b"hello, world", past_now).await?;
1389
1390 let (pre_obj_meta, _) = backend.get_object(&id1).await?.unwrap();
1392 let pre_obj_expiry = pre_obj_meta.time_expires.unwrap();
1393
1394 let post_obj_meta = backend.get_metadata(&id1).await?.unwrap();
1396 let post_obj_expiry = post_obj_meta.time_expires.unwrap();
1397 assert!(
1398 post_obj_expiry > pre_obj_expiry,
1399 "bump should extend expiry"
1400 );
1401
1402 let id2 = make_id();
1404 create_object(&backend, &id2, &metadata, b"hello, world", past_now).await?;
1405
1406 let pre_meta = backend.get_metadata(&id2).await?.unwrap();
1408 let pre_expiry = pre_meta.time_expires.unwrap();
1409
1410 let post_meta = backend.get_metadata(&id2).await?.unwrap();
1412 let post_expiry = post_meta.time_expires.unwrap();
1413 assert!(post_expiry > pre_expiry, "bump should extend expiry");
1414
1415 let (_, stream) = backend.get_object(&id2).await?.unwrap();
1417 let payload = stream::read_to_vec(stream).await?;
1418 assert_eq!(payload, b"hello, world");
1419
1420 Ok(())
1421 }
1422
1423 #[tokio::test]
1424 async fn test_tti_no_bump_when_fresh() -> Result<()> {
1425 let backend = create_test_backend().await?;
1426
1427 let id = make_id();
1428 let tti = Duration::from_secs(2 * 24 * 3600); let metadata = Metadata {
1431 expiration_policy: ExpirationPolicy::TimeToIdle(tti),
1432 ..Default::default()
1433 };
1434 create_object(&backend, &id, &metadata, b"hello, world", SystemTime::now()).await?;
1435
1436 let first = backend.get_metadata(&id).await?.unwrap();
1439 let second = backend.get_metadata(&id).await?.unwrap();
1440
1441 assert_eq!(
1442 first.time_expires.unwrap(),
1443 second.time_expires.unwrap(),
1444 "fresh TTI object must not be bumped"
1445 );
1446
1447 Ok(())
1448 }
1449
1450 #[tokio::test]
1453 async fn test_ttl_immediate() -> Result<()> {
1454 let backend = create_test_backend().await?;
1458
1459 let id = make_id();
1460 let metadata = Metadata {
1461 expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_secs(0)),
1462 ..Default::default()
1463 };
1464 create_object(&backend, &id, &metadata, b"hello, world", SystemTime::now()).await?;
1465
1466 assert!(backend.get_object(&id).await?.is_none());
1467
1468 Ok(())
1469 }
1470
1471 #[tokio::test]
1472 async fn test_tti_immediate() -> Result<()> {
1473 let backend = create_test_backend().await?;
1477
1478 let id = make_id();
1479 let metadata = Metadata {
1480 expiration_policy: ExpirationPolicy::TimeToIdle(Duration::from_secs(0)),
1481 ..Default::default()
1482 };
1483 create_object(&backend, &id, &metadata, b"hello, world", SystemTime::now()).await?;
1484
1485 assert!(backend.get_object(&id).await?.is_none());
1486
1487 Ok(())
1488 }
1489
1490 #[tokio::test]
1499 async fn test_tiered_get() -> Result<()> {
1500 let backend = create_test_backend().await?;
1501
1502 let id = make_id();
1504 assert!(matches!(
1505 backend.get_tiered_object(&id).await?,
1506 TieredGet::NotFound
1507 ));
1508 assert!(matches!(
1509 backend.get_tiered_metadata(&id).await?,
1510 TieredMetadata::NotFound
1511 ));
1512
1513 let id = make_id();
1515 let put_meta = Metadata {
1516 content_type: "text/plain".into(),
1517 custom: BTreeMap::from_iter([("k".into(), "v".into())]),
1518 ..Default::default()
1519 };
1520 create_object(&backend, &id, &put_meta, b"payload", SystemTime::now()).await?;
1521
1522 let TieredGet::Object(obj_meta, obj_stream) = backend.get_tiered_object(&id).await? else {
1523 panic!("expected TieredGet::Object");
1524 };
1525 let obj_payload = stream::read_to_vec(obj_stream).await?;
1526 assert_eq!(obj_payload, b"payload");
1527 assert_eq!(obj_meta.content_type, put_meta.content_type);
1528 assert_eq!(obj_meta.custom, put_meta.custom);
1529
1530 let TieredMetadata::Object(head_meta) = backend.get_tiered_metadata(&id).await? else {
1531 panic!("expected TieredMetadata::Object");
1532 };
1533 assert_eq!(head_meta.content_type, put_meta.content_type);
1534 assert_eq!(head_meta.custom, put_meta.custom);
1535
1536 let hv_id = make_id();
1538 let lt_id = ObjectId::random(hv_id.context().clone());
1539 let tombstone = Tombstone {
1540 target: lt_id.clone(),
1541 expiration_policy: ExpirationPolicy::Manual,
1542 };
1543 create_tombstone(&backend, &hv_id, &tombstone, SystemTime::now()).await?;
1544
1545 match backend.get_tiered_object(&hv_id).await? {
1546 TieredGet::Tombstone(get_t) => assert_eq!(get_t.target, lt_id),
1547 other => panic!("expected TieredGet::Tombstone, got {other:?}"),
1548 }
1549 match backend.get_tiered_metadata(&hv_id).await? {
1550 TieredMetadata::Tombstone(meta_t) => assert_eq!(meta_t.target, lt_id,),
1551 other => panic!("expected TieredMetadata::Tombstone, got {other:?}"),
1552 }
1553
1554 Ok(())
1555 }
1556
1557 #[tokio::test]
1563 async fn test_put_non_tombstone() -> Result<()> {
1564 let backend = create_test_backend().await?;
1565
1566 let id = make_id();
1568 let metadata = Metadata::default();
1569 let result = backend
1570 .put_non_tombstone(&id, &metadata, Bytes::from_static(b"first"))
1571 .await?;
1572 assert_eq!(result, None, "expected None on empty row");
1573 let (_, stream) = backend.get_object(&id).await?.unwrap();
1574 assert_eq!(&stream::read_to_vec(stream).await?, b"first");
1575
1576 let id = make_id();
1578 create_object(&backend, &id, &metadata, b"old", SystemTime::now()).await?;
1579 let result = backend
1580 .put_non_tombstone(&id, &metadata, Bytes::from_static(b"new"))
1581 .await?;
1582 assert_eq!(result, None, "expected None when overwriting object");
1583 let (_, stream) = backend.get_object(&id).await?.unwrap();
1584 assert_eq!(&stream::read_to_vec(stream).await?, b"new");
1585
1586 let hv_id = make_id();
1588 let lt_id = ObjectId::random(hv_id.context().clone());
1589 let tombstone = Tombstone {
1590 target: lt_id.clone(),
1591 expiration_policy: ExpirationPolicy::Manual,
1592 };
1593 create_tombstone(&backend, &hv_id, &tombstone, SystemTime::now()).await?;
1594 let result = backend
1595 .put_non_tombstone(&hv_id, &metadata, Bytes::new())
1596 .await?;
1597 let returned = result.expect("expected Some(Tombstone) when row is a tombstone");
1598 assert_eq!(returned.target, lt_id);
1599 assert!(
1600 matches!(
1601 backend.get_tiered_metadata(&hv_id).await?,
1602 TieredMetadata::Tombstone(_)
1603 ),
1604 "tombstone must still exist after put_non_tombstone"
1605 );
1606
1607 Ok(())
1608 }
1609
1610 #[tokio::test]
1619 async fn test_delete_non_tombstone() -> Result<()> {
1620 let backend = create_test_backend().await?;
1621
1622 let id = make_id();
1624 assert_eq!(backend.delete_non_tombstone(&id).await?, None);
1625
1626 let id = make_id();
1628 let metadata = Metadata::default();
1629 create_object(&backend, &id, &metadata, b"hello, world", SystemTime::now()).await?;
1630 assert_eq!(backend.delete_non_tombstone(&id).await?, None);
1631 assert!(backend.get_object(&id).await?.is_none());
1632
1633 let id = make_id();
1635 let tombstone = Tombstone {
1636 target: id.clone(),
1637 expiration_policy: ExpirationPolicy::Manual,
1638 };
1639 create_tombstone(&backend, &id, &tombstone, SystemTime::now()).await?;
1640 let tombstone = backend
1641 .delete_non_tombstone(&id)
1642 .await?
1643 .expect("expected Some(tombstone)");
1644 assert_eq!(tombstone.target, id, "tombstone target must be returned");
1645 assert!(
1646 matches!(
1647 backend.get_tiered_metadata(&id).await?,
1648 TieredMetadata::Tombstone(_)
1649 ),
1650 "tombstone must still exist after delete_non_tombstone"
1651 );
1652
1653 Ok(())
1654 }
1655
1656 #[tokio::test]
1662 async fn test_cas_create_tombstone() -> Result<()> {
1663 let backend = create_test_backend().await?;
1664
1665 let hv_id = make_id();
1666 let lt_id = ObjectId::random(hv_id.context().clone());
1667 let expiration_policy = ExpirationPolicy::TimeToLive(Duration::from_secs(3600));
1668 let tombstone = Tombstone {
1669 target: lt_id.clone(),
1670 expiration_policy,
1671 };
1672
1673 let committed = backend
1675 .compare_and_write(&hv_id, None, TieredWrite::Tombstone(tombstone.clone()))
1676 .await?;
1677 assert!(committed, "expected CAS success on empty row");
1678
1679 let TieredMetadata::Tombstone(t) = backend.get_tiered_metadata(&hv_id).await? else {
1681 panic!("expected TieredMetadata::Tombstone");
1682 };
1683 assert_eq!(t.target, lt_id, "target must round-trip via r column");
1684 assert_eq!(t.expiration_policy, expiration_policy);
1685 match backend.get_tiered_object(&hv_id).await? {
1686 TieredGet::Tombstone(t) => assert_eq!(t.target, lt_id, "round-trip via r column"),
1687 other => panic!("expected TieredGet::Tombstone, got {other:?}"),
1688 }
1689
1690 assert!(matches!(
1692 backend.get_object(&hv_id).await,
1693 Err(Error::UnexpectedTombstone)
1694 ));
1695 assert!(matches!(
1696 backend.get_metadata(&hv_id).await,
1697 Err(Error::UnexpectedTombstone)
1698 ));
1699
1700 let second = backend
1702 .compare_and_write(&hv_id, None, TieredWrite::Tombstone(tombstone))
1703 .await?;
1704 assert!(second, "idempotent retry");
1705
1706 Ok(())
1707 }
1708
1709 #[tokio::test]
1711 async fn test_cas_swap_tombstone() -> Result<()> {
1712 let backend = create_test_backend().await?;
1713
1714 let hv_id = make_id();
1715 let old_lt_id = ObjectId::random(hv_id.context().clone());
1716 let wrong_lt_id = ObjectId::random(hv_id.context().clone());
1717 let new_lt_id = ObjectId::random(hv_id.context().clone());
1718
1719 let tombstone = Tombstone {
1720 target: old_lt_id.clone(),
1721 expiration_policy: ExpirationPolicy::Manual,
1722 };
1723 create_tombstone(&backend, &hv_id, &tombstone, SystemTime::now()).await?;
1724
1725 let write = TieredWrite::Tombstone(Tombstone {
1727 target: new_lt_id.clone(),
1728 expiration_policy: ExpirationPolicy::Manual,
1729 });
1730 let swapped = backend
1731 .compare_and_write(&hv_id, Some(&wrong_lt_id), write.clone())
1732 .await?;
1733 assert!(!swapped, "expected CAS failure due to wrong target");
1734 match backend.get_tiered_metadata(&hv_id).await? {
1735 TieredMetadata::Tombstone(t) => assert_eq!(t.target, old_lt_id),
1736 other => panic!("expected tombstone, got {other:?}"),
1737 }
1738
1739 let swapped = backend
1741 .compare_and_write(&hv_id, Some(&old_lt_id), write.clone())
1742 .await?;
1743 assert!(swapped, "expected CAS success with correct target");
1744 match backend.get_tiered_metadata(&hv_id).await? {
1745 TieredMetadata::Tombstone(t) => assert_eq!(t.target, new_lt_id),
1746 other => panic!("expected tombstone, got {other:?}"),
1747 }
1748
1749 let retry = backend
1751 .compare_and_write(&hv_id, Some(&old_lt_id), write)
1752 .await?;
1753 assert!(retry, "idempotent retry");
1754
1755 Ok(())
1756 }
1757
1758 #[tokio::test]
1760 async fn test_cas_swap_inline() -> Result<()> {
1761 let backend = create_test_backend().await?;
1762
1763 let id = make_id();
1764 let lt_id = ObjectId::random(id.context().clone());
1765 let wrong_id = ObjectId::random(id.context().clone());
1766
1767 let tombstone = Tombstone {
1768 target: lt_id.clone(),
1769 expiration_policy: ExpirationPolicy::Manual,
1770 };
1771 create_tombstone(&backend, &id, &tombstone, SystemTime::now()).await?;
1772
1773 let write = TieredWrite::Object(Metadata::default(), Bytes::new());
1775 let swapped = backend
1776 .compare_and_write(&id, Some(&wrong_id), write)
1777 .await?;
1778 assert!(!swapped, "expected CAS failure with wrong target");
1779 assert!(matches!(
1780 backend.get_tiered_metadata(&id).await?,
1781 TieredMetadata::Tombstone(_)
1782 ));
1783
1784 let payload = Bytes::from_static(b"hello inline");
1786 let write = TieredWrite::Object(Metadata::default(), payload.clone());
1787 let swapped = backend
1788 .compare_and_write(&id, Some(<_id), write.clone())
1789 .await?;
1790 assert!(swapped, "expected CAS success with correct target");
1791 let TieredGet::Object(_, stream) = backend.get_tiered_object(&id).await? else {
1792 panic!("expected inline object after swap");
1793 };
1794 assert_eq!(&stream::read_to_vec(stream).await?, payload.as_ref());
1795
1796 let retry = backend.compare_and_write(&id, Some(<_id), write).await?;
1798 assert!(retry, "idempotent retry");
1799
1800 Ok(())
1801 }
1802
1803 #[tokio::test]
1805 async fn test_cas_create_object_on_empty_row() -> Result<()> {
1806 let backend = create_test_backend().await?;
1807
1808 let id = make_id();
1809 let payload = Bytes::from_static(b"cas object");
1810 let write = TieredWrite::Object(Metadata::default(), payload.clone());
1811 let committed = backend.compare_and_write(&id, None, write).await?;
1812 assert!(committed, "expected CAS success on empty row");
1813
1814 let TieredGet::Object(_, stream) = backend.get_tiered_object(&id).await? else {
1815 panic!("expected Object after CAS-create");
1816 };
1817 assert_eq!(&stream::read_to_vec(stream).await?, payload.as_ref());
1818
1819 Ok(())
1820 }
1821
1822 #[tokio::test]
1824 async fn test_cas_delete() -> Result<()> {
1825 let backend = create_test_backend().await?;
1826
1827 let id = make_id();
1828 let lt_id = ObjectId::random(id.context().clone());
1829 let wrong_id = ObjectId::random(id.context().clone());
1830
1831 let tombstone = Tombstone {
1832 target: lt_id.clone(),
1833 expiration_policy: ExpirationPolicy::Manual,
1834 };
1835 create_tombstone(&backend, &id, &tombstone, SystemTime::now()).await?;
1836
1837 let deleted = backend
1839 .compare_and_write(&id, Some(&wrong_id), TieredWrite::Delete)
1840 .await?;
1841 assert!(!deleted, "expected CAS failure with wrong target");
1842 assert!(matches!(
1843 backend.get_tiered_metadata(&id).await?,
1844 TieredMetadata::Tombstone(_)
1845 ));
1846
1847 let deleted = backend
1849 .compare_and_write(&id, Some(<_id), TieredWrite::Delete)
1850 .await?;
1851 assert!(deleted, "expected CAS delete success");
1852 assert!(matches!(
1853 backend.get_tiered_metadata(&id).await?,
1854 TieredMetadata::NotFound
1855 ));
1856
1857 let retry = backend
1859 .compare_and_write(&id, Some(<_id), TieredWrite::Delete)
1860 .await?;
1861 assert!(retry, "idempotent retry");
1862
1863 let id2 = make_id();
1865 let fake_lt_id = ObjectId::random(id2.context().clone());
1866 let metadata = Metadata::default();
1867 create_object(&backend, &id2, &metadata, b"data", SystemTime::now()).await?;
1868 let deleted = backend
1869 .compare_and_write(&id2, Some(&fake_lt_id), TieredWrite::Delete)
1870 .await?;
1871 assert!(deleted, "expected idempotent deletion");
1872
1873 Ok(())
1874 }
1875
1876 #[tokio::test]
1883 async fn test_legacy_tombstone_reads() -> Result<()> {
1884 let backend = create_test_backend().await?;
1885
1886 let id = make_id();
1888 write_legacy_tombstone(&backend, &id, ExpirationPolicy::Manual, None).await?;
1889
1890 let TieredMetadata::Tombstone(t) = backend.get_tiered_metadata(&id).await? else {
1891 panic!("expected tombstone");
1892 };
1893 assert_eq!(t.expiration_policy, ExpirationPolicy::Manual);
1894 assert!(matches!(
1895 backend.get_tiered_object(&id).await?,
1896 TieredGet::Tombstone(_)
1897 ));
1898
1899 let id = make_id();
1904 let ttl = Duration::from_secs(2 * 24 * 3600);
1905 write_legacy_tombstone(&backend, &id, ExpirationPolicy::TimeToLive(ttl), None).await?;
1906
1907 let TieredMetadata::Tombstone(t) = backend.get_tiered_metadata(&id).await? else {
1908 panic!("expected TieredMetadata::Tombstone");
1909 };
1910 assert_eq!(t.expiration_policy, ExpirationPolicy::TimeToLive(ttl));
1911
1912 Ok(())
1913 }
1914
1915 #[tokio::test]
1920 async fn test_legacy_tombstone_tti_upgrade() -> Result<()> {
1921 let backend = create_test_backend().await?;
1922 let id = make_id();
1923 let path = id.as_storage_path().to_string().into_bytes();
1924
1925 let tti = Duration::from_secs(2 * 24 * 3600); let old_deadline = SystemTime::now() + tti - TTI_DEBOUNCE - Duration::from_secs(60);
1930 write_legacy_tombstone(
1931 &backend,
1932 &id,
1933 ExpirationPolicy::TimeToIdle(tti),
1934 Some(old_deadline),
1935 )
1936 .await?;
1937
1938 let TieredMetadata::Tombstone(_) = backend.get_tiered_metadata(&id).await? else {
1940 panic!("expected tombstone");
1941 };
1942
1943 let new_deadline = match backend.read_row(&path, None, "test-verify").await? {
1945 Some(RowData::Tombstone { time_expires, .. }) => time_expires.unwrap(),
1946 _ => panic!("expected tombstone row after bump"),
1947 };
1948
1949 assert!(
1950 new_deadline > old_deadline,
1951 "TTI bump should extend tombstone expiry: {old_deadline:?} -> {new_deadline:?}"
1952 );
1953
1954 Ok(())
1955 }
1956
1957 #[tokio::test]
1962 async fn test_legacy_tombstone_conditional_ops() -> Result<()> {
1963 let backend = create_test_backend().await?;
1964
1965 let id = make_id();
1967 write_legacy_tombstone(&backend, &id, ExpirationPolicy::Manual, None).await?;
1968 let t_opt = backend
1969 .put_non_tombstone(&id, &Metadata::default(), Bytes::new())
1970 .await?;
1971 assert_eq!(t_opt.map(|t| t.target).as_ref(), Some(&id));
1972
1973 let id = make_id();
1975 write_legacy_tombstone(&backend, &id, ExpirationPolicy::Manual, None).await?;
1976 let t_opt = backend.delete_non_tombstone(&id).await?;
1977 assert_eq!(t_opt.map(|t| t.target).as_ref(), Some(&id));
1978
1979 let id = make_id();
1981 write_legacy_tombstone(&backend, &id, ExpirationPolicy::Manual, None).await?;
1982 let deleted = backend
1983 .compare_and_write(&id, Some(&id), TieredWrite::Delete)
1984 .await?;
1985 assert!(
1986 deleted,
1987 "CAS-delete must succeed on legacy-metadata tombstone"
1988 );
1989 assert!(matches!(
1990 backend.get_tiered_metadata(&id).await?,
1991 TieredMetadata::NotFound
1992 ));
1993
1994 let id = make_id();
1996 write_empty_redirect_tombstone(&backend, &id).await?;
1997 let deleted = backend
1998 .compare_and_write(&id, Some(&id), TieredWrite::Delete)
1999 .await?;
2000 assert!(
2001 deleted,
2002 "CAS-delete must succeed on empty-redirect tombstone"
2003 );
2004 assert!(matches!(
2005 backend.get_tiered_metadata(&id).await?,
2006 TieredMetadata::NotFound
2007 ));
2008
2009 Ok(())
2010 }
2011
2012 #[tokio::test]
2014 async fn test_empty_redirect_falls_back_to_hv_id() -> Result<()> {
2015 let backend = create_test_backend().await?;
2016 let id = make_id();
2017
2018 write_empty_redirect_tombstone(&backend, &id).await?;
2019 match backend.get_tiered_metadata(&id).await? {
2020 TieredMetadata::Tombstone(t) => assert_eq!(t.target, id, "must fall back to hv_id"),
2021 other => panic!("expected tombstone, got {other:?}"),
2022 }
2023
2024 Ok(())
2025 }
2026}