1use std::fmt;
32use std::future::Future;
33use std::sync::Arc;
34use std::time::{Duration, SystemTime};
35
36use bigtable_rs::bigtable::{BigTableConnection, Error as BigTableError, RowCell};
37use bigtable_rs::google::bigtable::v2::{self, mutation};
38use bytes::Bytes;
39use futures_util::TryStreamExt;
40use objectstore_types::metadata::{ExpirationPolicy, Metadata};
41use serde::{Deserialize, Serialize};
42use tonic::Code;
43
44use crate::backend::common::{
45 Backend, DeleteResponse, GetResponse, HighVolumeBackend, MetadataResponse, PutResponse,
46 TieredGet, TieredMetadata, TieredWrite, Tombstone,
47};
48use crate::error::{Error, Result};
49use crate::gcp_auth::PrefetchingTokenProvider;
50use crate::id::ObjectId;
51use crate::stream::{ChunkedBytes, ClientStream};
52
53#[derive(Debug, Clone, Deserialize, Serialize)]
75pub struct BigTableConfig {
76 pub endpoint: Option<String>,
89
90 pub project_id: String,
98
99 pub instance_name: String,
105
106 pub table_name: String,
114
115 pub connections: Option<usize>,
125}
126
127const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
129const MAX_CHANNEL_AGE: Option<Duration> = Some(Duration::from_mins(50));
137const TTI_DEBOUNCE: Duration = Duration::from_secs(24 * 3600); const TOKEN_SCOPES: &[&str] = &["https://www.googleapis.com/auth/bigtable.data"];
141
142const REQUEST_RETRY_COUNT: usize = 2;
144const CAS_RETRY_COUNT: usize = 3;
146
147const COLUMN_PAYLOAD: &[u8] = b"p";
149const COLUMN_METADATA: &[u8] = b"m";
151const COLUMN_REDIRECT: &[u8] = b"r";
153const COLUMN_TOMBSTONE_META: &[u8] = b"t";
155const FILTER_META: &[u8] = b"^[mrt]$";
157
158const FAMILY_GC: &str = "fg";
163const FAMILY_MANUAL: &str = "fm";
165
166pub struct BigTableBackend {
168 bigtable: BigTableConnection,
169
170 instance_path: String,
171 table_path: String,
172 table_name: String,
173}
174
175impl fmt::Debug for BigTableBackend {
176 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
177 f.debug_struct("BigTableBackend")
178 .field("instance_path", &self.instance_path)
179 .field("table_path", &self.table_path)
180 .field("table_name", &self.table_name)
181 .finish_non_exhaustive()
182 }
183}
184
185fn column_filter(column: &[u8]) -> v2::RowFilter {
187 v2::RowFilter {
188 filter: Some(v2::row_filter::Filter::ColumnQualifierRegexFilter(
189 [b"^", column, b"$"].concat(),
190 )),
191 }
192}
193
194fn legacy_tombstone_filter() -> v2::RowFilter {
199 v2::RowFilter {
200 filter: Some(v2::row_filter::Filter::Chain(v2::row_filter::Chain {
201 filters: vec![
202 column_filter(COLUMN_METADATA),
203 v2::RowFilter {
204 filter: Some(v2::row_filter::Filter::ValueRegexFilter(
205 b"^\\{\"is_redirect_tombstone\":true[,}].*".to_vec(),
206 )),
207 },
208 ],
209 })),
210 }
211}
212
213fn live_row_filter(inner: v2::RowFilter) -> v2::RowFilter {
215 let now_micros = SystemTime::now()
216 .duration_since(SystemTime::UNIX_EPOCH)
217 .unwrap_or_default()
218 .as_millis() as i64
219 * 1000;
220
221 v2::RowFilter {
222 filter: Some(v2::row_filter::Filter::Interleave(
223 v2::row_filter::Interleave {
224 filters: vec![
225 v2::RowFilter {
227 filter: Some(v2::row_filter::Filter::Chain(v2::row_filter::Chain {
228 filters: vec![
229 v2::RowFilter {
230 filter: Some(v2::row_filter::Filter::FamilyNameRegexFilter(
231 format!("^{FAMILY_MANUAL}$"),
232 )),
233 },
234 inner.clone(),
235 ],
236 })),
237 },
238 v2::RowFilter {
240 filter: Some(v2::row_filter::Filter::Chain(v2::row_filter::Chain {
241 filters: vec![
242 v2::RowFilter {
243 filter: Some(v2::row_filter::Filter::FamilyNameRegexFilter(
244 format!("^{FAMILY_GC}$"),
245 )),
246 },
247 v2::RowFilter {
248 filter: Some(v2::row_filter::Filter::TimestampRangeFilter(
249 v2::TimestampRange {
250 start_timestamp_micros: now_micros,
251 end_timestamp_micros: 0,
252 },
253 )),
254 },
255 inner,
256 ],
257 })),
258 },
259 ],
260 },
261 )),
262 }
263}
264
265fn tombstone_filter() -> v2::RowFilter {
273 let filter = v2::RowFilter {
274 filter: Some(v2::row_filter::Filter::Interleave(
275 v2::row_filter::Interleave {
276 filters: vec![column_filter(COLUMN_REDIRECT), legacy_tombstone_filter()],
277 },
278 )),
279 };
280 live_row_filter(filter)
281}
282
283fn tombstone_predicate() -> MutatePredicate {
289 MutatePredicate::Exclude(tombstone_filter())
290}
291
292fn exact_value_regex(value: &str) -> Vec<u8> {
297 format!("^{}$", regex::escape(value)).into_bytes()
298}
299
300fn redirect_target_filter(target: &ObjectId, own_id: &ObjectId) -> v2::RowFilter {
318 let target_path = exact_value_regex(&target.as_storage_path().to_string());
319
320 let exact_match = v2::RowFilter {
321 filter: Some(v2::row_filter::Filter::Chain(v2::row_filter::Chain {
322 filters: vec![
323 column_filter(COLUMN_REDIRECT),
324 v2::RowFilter {
325 filter: Some(v2::row_filter::Filter::ValueRegexFilter(target_path)),
326 },
327 ],
328 })),
329 };
330
331 if target != own_id {
332 return live_row_filter(exact_match);
333 }
334
335 let empty_redirect_match = v2::RowFilter {
336 filter: Some(v2::row_filter::Filter::Chain(v2::row_filter::Chain {
337 filters: vec![
338 column_filter(COLUMN_REDIRECT),
339 v2::RowFilter {
340 filter: Some(v2::row_filter::Filter::ValueRegexFilter(b"^$".to_vec())),
341 },
342 ],
343 })),
344 };
345
346 let filter = v2::RowFilter {
350 filter: Some(v2::row_filter::Filter::Interleave(
351 v2::row_filter::Interleave {
352 filters: vec![exact_match, empty_redirect_match, legacy_tombstone_filter()],
353 },
354 )),
355 };
356 live_row_filter(filter)
357}
358
359fn update_predicate(old: &ObjectId, new: &ObjectId, own_id: &ObjectId) -> MutatePredicate {
366 MutatePredicate::Include(v2::RowFilter {
367 filter: Some(v2::row_filter::Filter::Interleave(
368 v2::row_filter::Interleave {
369 filters: vec![
370 redirect_target_filter(old, own_id),
371 redirect_target_filter(new, own_id),
372 ],
373 },
374 )),
375 })
376}
377
378fn optional_target_predicate(target: &ObjectId, own_id: &ObjectId) -> MutatePredicate {
390 MutatePredicate::Exclude(v2::RowFilter {
391 filter: Some(v2::row_filter::Filter::Condition(Box::new(
392 v2::row_filter::Condition {
393 predicate_filter: Some(Box::new(redirect_target_filter(target, own_id))),
394 true_filter: Some(Box::new(v2::RowFilter {
395 filter: Some(v2::row_filter::Filter::BlockAllFilter(true)),
396 })),
397 false_filter: Some(Box::new(tombstone_filter())),
398 },
399 ))),
400 })
401}
402
403#[derive(Clone, Debug)]
408enum MutatePredicate {
409 Include(v2::RowFilter),
413 Exclude(v2::RowFilter),
417}
418
419fn metadata_filter() -> v2::RowFilter {
424 v2::RowFilter {
425 filter: Some(v2::row_filter::Filter::ColumnQualifierRegexFilter(
426 FILTER_META.to_owned(),
427 )),
428 }
429}
430
431fn mutation(mutation: mutation::Mutation) -> v2::Mutation {
432 v2::Mutation {
433 mutation: Some(mutation),
434 }
435}
436
437fn delete_row_mutation() -> v2::Mutation {
439 mutation(mutation::Mutation::DeleteFromRow(
440 mutation::DeleteFromRow {},
441 ))
442}
443
444fn object_mutations(
450 metadata: &Metadata,
451 payload: Vec<u8>,
452 now: SystemTime,
453) -> Result<[v2::Mutation; 3]> {
454 let (family, timestamp_micros) = match metadata.expiration_policy {
455 ExpirationPolicy::Manual => (FAMILY_MANUAL, -1),
456 ExpirationPolicy::TimeToLive(ttl) => (FAMILY_GC, ttl_to_micros(ttl, now)?),
457 ExpirationPolicy::TimeToIdle(tti) => (FAMILY_GC, ttl_to_micros(tti, now)?),
458 };
459
460 let metadata_bytes = serde_json::to_vec(metadata)
461 .map_err(|cause| Error::serde("failed to serialize metadata", cause))?;
462
463 Ok([
464 delete_row_mutation(),
466 mutation(mutation::Mutation::SetCell(mutation::SetCell {
467 family_name: family.to_owned(),
468 column_qualifier: COLUMN_PAYLOAD.to_owned(),
469 timestamp_micros,
470 value: payload,
471 })),
472 mutation(mutation::Mutation::SetCell(mutation::SetCell {
473 family_name: family.to_owned(),
474 column_qualifier: COLUMN_METADATA.to_owned(),
475 timestamp_micros,
476 value: metadata_bytes,
477 })),
478 ])
479}
480
481#[derive(Clone, Debug, Default, Deserialize, Serialize)]
486struct TombstoneMeta {
487 #[serde(default, skip_serializing_if = "ExpirationPolicy::is_manual")]
491 expiration_policy: ExpirationPolicy,
492}
493
494fn tombstone_mutations(tombstone: &Tombstone, now: SystemTime) -> Result<[v2::Mutation; 3]> {
500 let (family, timestamp_micros) = match tombstone.expiration_policy {
501 ExpirationPolicy::Manual => (FAMILY_MANUAL, -1),
502 ExpirationPolicy::TimeToLive(ttl) => (FAMILY_GC, ttl_to_micros(ttl, now)?),
503 ExpirationPolicy::TimeToIdle(tti) => (FAMILY_GC, ttl_to_micros(tti, now)?),
504 };
505
506 let tombstone_meta = TombstoneMeta {
507 expiration_policy: tombstone.expiration_policy,
508 };
509
510 Ok([
511 delete_row_mutation(),
512 mutation(mutation::Mutation::SetCell(mutation::SetCell {
513 family_name: family.to_owned(),
514 column_qualifier: COLUMN_REDIRECT.to_owned(),
515 timestamp_micros,
516 value: tombstone.target.as_storage_path().to_string().into_bytes(),
517 })),
518 mutation(mutation::Mutation::SetCell(mutation::SetCell {
519 family_name: family.to_owned(),
520 column_qualifier: COLUMN_TOMBSTONE_META.to_owned(),
521 timestamp_micros,
522 value: serde_json::to_vec(&tombstone_meta)
523 .map_err(|cause| Error::serde("failed to serialize tombstone", cause))?,
524 })),
525 ])
526}
527
528#[derive(Debug, Deserialize)]
532struct LegacyTombstoneMeta {
533 #[serde(default)]
540 is_redirect_tombstone: bool,
541
542 #[serde(default)]
544 expiration_policy: ExpirationPolicy,
545}
546
547enum RowData {
549 Object {
551 metadata: Metadata,
552 payload: Vec<u8>,
553 },
554 Tombstone {
556 target: Vec<u8>,
557 meta: TombstoneMeta,
558 time_expires: Option<SystemTime>,
559 },
560}
561
562impl RowData {
563 fn from_cells(cells: Vec<RowCell>) -> Result<Self> {
570 let mut metadata_opt: Option<Metadata> = None;
571 let mut tombstone_meta_opt: Option<TombstoneMeta> = None;
572 let mut redirect_detected = false;
573 let mut redirect_target = Vec::new();
574 let mut expire_at = None;
575 let mut payload = Vec::new();
576
577 for cell in cells {
578 if cell.family_name == FAMILY_GC {
583 expire_at = micros_to_time(cell.timestamp_micros);
584 }
585
586 match cell.qualifier.as_slice() {
587 COLUMN_REDIRECT => {
588 redirect_detected = true;
589 redirect_target = cell.value;
590 }
591 COLUMN_PAYLOAD => {
592 payload = cell.value;
593 }
594 COLUMN_TOMBSTONE_META => {
595 tombstone_meta_opt =
596 Some(serde_json::from_slice(&cell.value).map_err(|cause| {
597 Error::serde("failed to deserialize tombstone meta", cause)
598 })?);
599 }
600 COLUMN_METADATA => {
601 if let Ok(legacy_meta) =
602 serde_json::from_slice::<LegacyTombstoneMeta>(&cell.value)
603 && legacy_meta.is_redirect_tombstone
604 {
605 redirect_detected = true;
606 objectstore_metrics::count!("bigtable.legacy_tombstone_read");
607 tombstone_meta_opt = Some(TombstoneMeta {
608 expiration_policy: legacy_meta.expiration_policy,
609 });
610 } else {
611 metadata_opt =
612 Some(serde_json::from_slice(&cell.value).map_err(|cause| {
613 Error::serde("failed to deserialize metadata", cause)
614 })?);
615 }
616 }
617 _ => {}
618 }
619 }
620
621 Ok(if redirect_detected {
622 RowData::Tombstone {
623 target: redirect_target,
624 meta: tombstone_meta_opt.unwrap_or_default(),
625 time_expires: expire_at,
626 }
627 } else {
628 let mut metadata = metadata_opt.unwrap_or_default();
630 metadata.time_expires = expire_at;
631 RowData::Object { metadata, payload }
632 })
633 }
634
635 fn expiration_policy(&self) -> ExpirationPolicy {
637 match self {
638 RowData::Object { metadata, .. } => metadata.expiration_policy,
639 RowData::Tombstone { meta, .. } => meta.expiration_policy,
640 }
641 }
642
643 fn time_expires(&self) -> Option<SystemTime> {
645 match self {
646 RowData::Object { metadata, .. } => metadata.time_expires,
647 RowData::Tombstone { time_expires, .. } => *time_expires,
648 }
649 }
650
651 fn expires_before(&self, time: SystemTime) -> bool {
655 self.expiration_policy().is_timeout() && self.time_expires().is_some_and(|ts| ts < time)
656 }
657
658 fn needs_tti_bump(&self) -> bool {
660 matches!(
661 self.expiration_policy(),
662 ExpirationPolicy::TimeToIdle(tti) if self.expires_before(SystemTime::now() + tti - TTI_DEBOUNCE)
663 )
664 }
665}
666
667fn parse_redirect_target(redirect_path: &[u8], tombstone_id: &ObjectId) -> Result<ObjectId> {
673 if redirect_path.is_empty() {
674 objectstore_metrics::count!("bigtable.empty_redirect_read");
675 Ok(tombstone_id.clone())
676 } else {
677 let redirect_str = std::str::from_utf8(redirect_path)
678 .map_err(|_| Error::generic("invalid UTF-8 in redirect path"))?;
679 ObjectId::from_storage_path(redirect_str)
680 .ok_or_else(|| Error::generic("corrupt redirect path"))
681 }
682}
683
684impl BigTableBackend {
685 pub async fn new(config: BigTableConfig) -> anyhow::Result<Self> {
690 let BigTableConfig {
691 endpoint,
692 project_id,
693 instance_name,
694 table_name,
695 connections,
696 } = config;
697
698 let bigtable = if let Some(ref endpoint) = endpoint {
699 BigTableConnection::new_with_emulator(
700 endpoint,
701 &project_id,
702 &instance_name,
703 false, Some(CONNECT_TIMEOUT),
705 )?
706 } else {
707 let token_provider = PrefetchingTokenProvider::gcp_auth(TOKEN_SCOPES).await?;
708 BigTableConnection::new_with_managed_transport(
709 &project_id,
710 &instance_name,
711 false, Some(CONNECT_TIMEOUT),
713 Arc::new(token_provider),
714 connections.unwrap_or(1),
715 true, None, MAX_CHANNEL_AGE,
718 )
719 .await?
720 };
721
722 let client = bigtable.client();
723
724 Ok(Self {
725 bigtable,
726 instance_path: format!("projects/{project_id}/instances/{instance_name}"),
727 table_path: client.get_full_table_name(&table_name),
728 table_name,
729 })
730 }
731
732 async fn read_row(
736 &self,
737 path: &[u8],
738 filter: Option<v2::RowFilter>,
739 action: &'static str,
740 ) -> Result<Option<RowData>> {
741 let request = v2::ReadRowsRequest {
742 table_name: self.table_path.clone(),
743 rows: Some(v2::RowSet {
744 row_keys: vec![path.to_owned()],
745 row_ranges: vec![],
746 }),
747 filter,
748 rows_limit: 1,
749 ..Default::default()
750 };
751
752 let response = retry(action, || async {
753 self.bigtable.client().read_rows(request.clone()).await
754 })
755 .await?;
756 debug_assert!(response.len() <= 1, "Expected at most one row");
757
758 let Some((_, cells)) = response.into_iter().next() else {
759 objectstore_log::debug!("Object not found");
760 return Ok(None);
761 };
762
763 let row = RowData::from_cells(cells)?;
764 Ok(if row.expires_before(SystemTime::now()) {
765 None
766 } else {
767 Some(row)
768 })
769 }
770
771 async fn mutate(
772 &self,
773 path: Vec<u8>,
774 mutations: impl Into<Vec<v2::Mutation>>,
775 action: &'static str,
776 ) -> Result<v2::MutateRowResponse> {
777 let request = v2::MutateRowRequest {
778 table_name: self.table_path.clone(),
779 row_key: path,
780 mutations: mutations.into(),
781 ..Default::default()
782 };
783
784 let response = retry(action, || async {
785 self.bigtable.client().mutate_row(request.clone()).await
786 })
787 .await?;
788
789 Ok(response.into_inner())
790 }
791
792 async fn put_row(
793 &self,
794 path: Vec<u8>,
795 metadata: &Metadata,
796 payload: Vec<u8>,
797 action: &'static str,
798 ) -> Result<v2::MutateRowResponse> {
799 let mutations = object_mutations(metadata, payload, SystemTime::now())?;
800 self.mutate(path, mutations, action).await
801 }
802
803 async fn put_tombstone_row(
804 &self,
805 path: Vec<u8>,
806 tombstone: &Tombstone,
807 action: &'static str,
808 ) -> Result<v2::MutateRowResponse> {
809 let mutations = tombstone_mutations(tombstone, SystemTime::now())?;
810 self.mutate(path, mutations, action).await
811 }
812
813 async fn bump_tti(&self, path: Vec<u8>, row: &RowData, loaded: bool, hv_id: &ObjectId) {
817 let expiration_policy = row.expiration_policy();
818
819 match row {
820 RowData::Tombstone { target, .. } => {
821 let target = match parse_redirect_target(target, hv_id) {
822 Ok(target) => target,
823 Err(e) => {
824 objectstore_log::error!(!!&e, "invalid redirect target in tombstone row");
825 return;
826 }
827 };
828
829 let tombstone = Tombstone {
830 target,
831 expiration_policy,
832 };
833 let _ = self.put_tombstone_row(path, &tombstone, "tti-bump").await;
834 }
835 RowData::Object { metadata, payload } if loaded => {
836 let _ = self
837 .put_row(path, metadata, payload.clone(), "tti-bump")
838 .await;
839 }
840 RowData::Object { metadata, .. } => {
841 let payload_read = self
842 .read_row(&path, Some(column_filter(COLUMN_PAYLOAD)), "tti-bump")
843 .await;
844
845 if let Ok(Some(RowData::Object { payload, .. })) = payload_read {
846 let _ = self.put_row(path, metadata, payload, "tti-bump").await;
847 }
848 }
849 }
850 }
851
852 async fn check_and_mutate(
854 &self,
855 row_key: Vec<u8>,
856 predicate: MutatePredicate,
857 mutations: impl Into<Vec<v2::Mutation>>,
858 context: &'static str,
859 ) -> Result<bool> {
860 let (filter, true_mutations, false_mutations, success_on_match) = match predicate {
861 MutatePredicate::Include(f) => (f, mutations.into(), vec![], true),
862 MutatePredicate::Exclude(f) => (f, vec![], mutations.into(), false),
863 };
864
865 let request = v2::CheckAndMutateRowRequest {
866 table_name: self.table_path.clone(),
867 row_key,
868 predicate_filter: Some(filter),
869 true_mutations,
870 false_mutations,
871 ..Default::default()
872 };
873
874 let future = retry(context, || async {
875 self.bigtable
876 .client()
877 .check_and_mutate_row(request.clone())
878 .await
879 });
880
881 Ok(future.await?.predicate_matched == success_on_match)
882 }
883}
884
885#[async_trait::async_trait]
886impl Backend for BigTableBackend {
887 fn name(&self) -> &'static str {
888 "bigtable"
889 }
890
891 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
892 async fn put_object(
893 &self,
894 id: &ObjectId,
895 metadata: &Metadata,
896 mut stream: ClientStream,
897 ) -> Result<PutResponse> {
898 objectstore_log::debug!("Writing to Bigtable backend");
899 let path = id.as_storage_path().to_string().into_bytes();
900
901 let mut payload = ChunkedBytes::new(0);
902 while let Some(chunk) = stream.try_next().await? {
903 payload.push(chunk);
904 }
905
906 self.put_row(path, metadata, payload.into_bytes().into(), "put")
907 .await?;
908 Ok(())
909 }
910
911 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
912 async fn get_object(&self, id: &ObjectId) -> Result<GetResponse> {
913 match self.get_tiered_object(id).await? {
914 TieredGet::Object(metadata, payload) => Ok(Some((metadata, payload))),
915 TieredGet::Tombstone(_) => Err(Error::UnexpectedTombstone),
916 TieredGet::NotFound => Ok(None),
917 }
918 }
919
920 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
921 async fn get_metadata(&self, id: &ObjectId) -> Result<MetadataResponse> {
922 match self.get_tiered_metadata(id).await? {
923 TieredMetadata::Object(metadata) => Ok(Some(metadata)),
924 TieredMetadata::Tombstone(_) => Err(Error::UnexpectedTombstone),
925 TieredMetadata::NotFound => Ok(None),
926 }
927 }
928
929 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
930 async fn delete_object(&self, id: &ObjectId) -> Result<DeleteResponse> {
931 objectstore_log::debug!("Deleting from Bigtable backend");
932
933 let path = id.as_storage_path().to_string().into_bytes();
934 self.mutate(path, [delete_row_mutation()], "delete").await?;
935
936 Ok(())
937 }
938}
939
940#[async_trait::async_trait]
941impl HighVolumeBackend for BigTableBackend {
942 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
943 async fn put_non_tombstone(
944 &self,
945 id: &ObjectId,
946 metadata: &Metadata,
947 payload: Bytes,
948 ) -> Result<Option<Tombstone>> {
949 objectstore_log::debug!("Conditional put to Bigtable backend");
950
951 let path = id.as_storage_path().to_string().into_bytes();
952 let mutations = object_mutations(metadata, payload.to_vec(), SystemTime::now())?;
953
954 for _ in 0..CAS_RETRY_COUNT {
955 let write_succeeded = self
956 .check_and_mutate(
957 path.clone(),
958 tombstone_predicate(),
959 mutations.clone(),
960 "put_non_tombstone",
961 )
962 .await?;
963
964 if write_succeeded {
965 return Ok(None);
966 }
967
968 let row = self
970 .read_row(&path, Some(metadata_filter()), "put_non_tombstone")
971 .await?;
972
973 match row {
974 Some(RowData::Tombstone { target, meta, .. }) => {
975 return Ok(Some(Tombstone {
976 target: parse_redirect_target(&target, id)?,
977 expiration_policy: meta.expiration_policy,
978 }));
979 }
980 Some(RowData::Object { .. }) => continue,
982 None => continue,
984 }
985 }
986
987 Err(Error::generic("BigTable: race loop in put_non_tombstone"))
988 }
989
990 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
991 async fn get_tiered_object(&self, id: &ObjectId) -> Result<TieredGet> {
992 objectstore_log::debug!("Reading from Bigtable backend");
993 let path = id.as_storage_path().to_string().into_bytes();
994
995 let Some(row) = self.read_row(&path, None, "get_tiered_object").await? else {
996 return Ok(TieredGet::NotFound);
997 };
998
999 if row.needs_tti_bump() {
1000 self.bump_tti(path.clone(), &row, true, id).await;
1001 }
1002
1003 Ok(match row {
1004 RowData::Tombstone { meta, target, .. } => TieredGet::Tombstone(Tombstone {
1005 target: parse_redirect_target(&target, id)?,
1006 expiration_policy: meta.expiration_policy,
1007 }),
1008 RowData::Object { metadata, payload } => {
1009 let mut metadata = metadata;
1010 metadata.size = Some(payload.len());
1011 TieredGet::Object(metadata, crate::stream::single(payload))
1012 }
1013 })
1014 }
1015
1016 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
1017 async fn get_tiered_metadata(&self, id: &ObjectId) -> Result<TieredMetadata> {
1018 objectstore_log::debug!("Reading metadata from Bigtable backend");
1019 let path = id.as_storage_path().to_string().into_bytes();
1020
1021 let row_opt = self
1024 .read_row(&path, Some(metadata_filter()), "get_tiered_metadata")
1025 .await?;
1026 let Some(row) = row_opt else {
1027 return Ok(TieredMetadata::NotFound);
1028 };
1029
1030 if row.needs_tti_bump() {
1031 self.bump_tti(path.clone(), &row, false, id).await;
1032 }
1033
1034 Ok(match row {
1035 RowData::Tombstone { meta, target, .. } => TieredMetadata::Tombstone(Tombstone {
1036 target: parse_redirect_target(&target, id)?,
1037 expiration_policy: meta.expiration_policy,
1038 }),
1039 RowData::Object { metadata, .. } => TieredMetadata::Object(metadata),
1040 })
1041 }
1042
1043 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
1044 async fn delete_non_tombstone(&self, id: &ObjectId) -> Result<Option<Tombstone>> {
1045 objectstore_log::debug!("Conditional delete from Bigtable backend");
1046
1047 let path = id.as_storage_path().to_string().into_bytes();
1048
1049 for _ in 0..CAS_RETRY_COUNT {
1050 let write_succeeded = self
1051 .check_and_mutate(
1052 path.clone(),
1053 tombstone_predicate(),
1054 [delete_row_mutation()],
1055 "delete_non_tombstone",
1056 )
1057 .await?;
1058
1059 if write_succeeded {
1060 return Ok(None);
1061 }
1062
1063 let row = self
1065 .read_row(&path, Some(metadata_filter()), "delete_non_tombstone")
1066 .await?;
1067
1068 match row {
1069 Some(RowData::Tombstone { target, meta, .. }) => {
1070 return Ok(Some(Tombstone {
1071 target: parse_redirect_target(&target, id)?,
1072 expiration_policy: meta.expiration_policy,
1073 }));
1074 }
1075 Some(RowData::Object { .. }) => continue,
1077 None => return Ok(None),
1079 }
1080 }
1081
1082 Err(Error::generic(
1083 "BigTable: race loop in delete_non_tombstone",
1084 ))
1085 }
1086
1087 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
1088 async fn compare_and_write(
1089 &self,
1090 id: &ObjectId,
1091 current: Option<&ObjectId>,
1092 write: TieredWrite,
1093 ) -> Result<bool> {
1094 objectstore_log::debug!("CAS put to Bigtable backend");
1095
1096 let path = id.as_storage_path().to_string().into_bytes();
1097 let now = SystemTime::now();
1098
1099 let predicate = match (current, write.target()) {
1100 (Some(old), Some(new)) => update_predicate(old, new, id),
1101 (Some(target), None) => optional_target_predicate(target, id),
1102 (None, Some(target)) => optional_target_predicate(target, id),
1103 (None, None) => tombstone_predicate(),
1104 };
1105
1106 let mutations = match write {
1107 TieredWrite::Tombstone(tombstone) => tombstone_mutations(&tombstone, now)?.into(),
1108 TieredWrite::Object(m, p) => object_mutations(&m, p.to_vec(), now)?.into(),
1109 TieredWrite::Delete => vec![delete_row_mutation()],
1110 };
1111
1112 self.check_and_mutate(path, predicate, mutations, "compare_and_write")
1113 .await
1114 }
1115}
1116
1117fn ttl_to_micros(ttl: Duration, from: SystemTime) -> Result<i64> {
1123 let deadline = from.checked_add(ttl).ok_or_else(|| Error::Generic {
1124 context: format!(
1125 "TTL duration overflow: {} plus {}s cannot be represented as SystemTime",
1126 humantime::format_rfc3339_seconds(from),
1127 ttl.as_secs()
1128 ),
1129 cause: None,
1130 })?;
1131 let millis = deadline
1132 .duration_since(SystemTime::UNIX_EPOCH)
1133 .map_err(|e| Error::Generic {
1134 context: format!(
1135 "unable to get duration since UNIX_EPOCH for SystemTime {}",
1136 humantime::format_rfc3339_seconds(deadline)
1137 ),
1138 cause: Some(Box::new(e)),
1139 })?
1140 .as_millis();
1141 (millis * 1000).try_into().map_err(|e| Error::Generic {
1142 context: format!("failed to convert {}ms to i64 microseconds", millis),
1143 cause: Some(Box::new(e)),
1144 })
1145}
1146
1147fn micros_to_time(micros: i64) -> Option<SystemTime> {
1149 let micros = u64::try_from(micros).ok()?;
1150 let duration = Duration::from_micros(micros);
1151 SystemTime::UNIX_EPOCH.checked_add(duration)
1152}
1153
1154async fn retry<T, F>(context: &'static str, f: impl Fn() -> F) -> Result<T>
1156where
1157 F: Future<Output = Result<T, BigTableError>> + Send,
1158{
1159 let mut retry_count = 0usize;
1160
1161 loop {
1162 match f().await {
1163 Ok(res) => return Ok(res),
1164 Err(e) if retry_count >= REQUEST_RETRY_COUNT || !is_retryable(&e) => {
1165 objectstore_metrics::count!("bigtable.failures", action = context);
1166 return Err(Error::Generic {
1167 context: format!("Bigtable: `{context}` failed"),
1168 cause: Some(Box::new(e)),
1169 });
1170 }
1171 Err(e) => {
1172 retry_count += 1;
1173 objectstore_metrics::count!("bigtable.retries", action = context);
1174 objectstore_log::warn!(!!&e, retry_count, context, "Retrying request");
1175 }
1176 }
1177 }
1178}
1179
1180fn is_retryable(error: &BigTableError) -> bool {
1181 match error {
1182 BigTableError::GCPAuthError(_) => true,
1184 BigTableError::TransportError(_) => true,
1186 BigTableError::IoError(_) => true,
1188 BigTableError::TimeoutError(_) => true,
1189
1190 BigTableError::RpcError(status) => match status.code() {
1192 Code::Unavailable => true,
1194 Code::Cancelled => true,
1196 Code::DeadlineExceeded => true,
1197 Code::Unauthenticated => true,
1199 Code::Aborted => true,
1201 Code::Internal => true,
1202 Code::FailedPrecondition => true,
1203 Code::Unknown => true,
1204 _ => false,
1205 },
1206 _ => false,
1207 }
1208}
1209
1210#[cfg(test)]
1211mod tests {
1212 use std::collections::BTreeMap;
1213
1214 use anyhow::Result;
1215 use objectstore_types::scope::{Scope, Scopes};
1216
1217 use super::*;
1218 use crate::id::ObjectContext;
1219 use crate::stream;
1220
1221 async fn create_test_backend() -> Result<BigTableBackend> {
1227 BigTableBackend::new(BigTableConfig {
1228 endpoint: Some("localhost:8086".into()),
1229 project_id: "testing".into(),
1230 instance_name: "objectstore".into(),
1231 table_name: "objectstore".into(),
1232 connections: None,
1233 })
1234 .await
1235 }
1236
1237 fn make_id() -> ObjectId {
1238 ObjectId::random(ObjectContext {
1239 usecase: "testing".into(),
1240 scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
1241 })
1242 }
1243
1244 async fn create_object(
1245 backend: &BigTableBackend,
1246 id: &ObjectId,
1247 metadata: &Metadata,
1248 payload: &[u8],
1249 now: SystemTime,
1250 ) -> Result<()> {
1251 let path = id.as_storage_path().to_string().into_bytes();
1252 let mutations = object_mutations(metadata, payload.to_vec(), now)?;
1253 backend.mutate(path, mutations, "test-setup").await?;
1254 Ok(())
1255 }
1256
1257 async fn create_tombstone(
1258 backend: &BigTableBackend,
1259 id: &ObjectId,
1260 tombstone: &Tombstone,
1261 now: SystemTime,
1262 ) -> Result<()> {
1263 let path = id.as_storage_path().to_string().into_bytes();
1264 let mutations = tombstone_mutations(tombstone, now)?;
1265 backend.mutate(path, mutations, "test-setup").await?;
1266 Ok(())
1267 }
1268
1269 async fn write_legacy_tombstone(
1271 backend: &BigTableBackend,
1272 id: &ObjectId,
1273 expiration_policy: ExpirationPolicy,
1274 time_expires: Option<SystemTime>,
1275 ) -> Result<()> {
1276 let meta = if expiration_policy.is_manual() {
1277 r#"{"is_redirect_tombstone":true}"#.to_owned()
1278 } else {
1279 let policy_json = serde_json::to_string(&expiration_policy).unwrap();
1280 format!(r#"{{"is_redirect_tombstone":true,"expiration_policy":{policy_json}}}"#)
1281 };
1282
1283 let (family, timestamp_micros) = if expiration_policy.is_manual() {
1284 (FAMILY_MANUAL, -1)
1285 } else {
1286 let t =
1287 time_expires.unwrap_or(SystemTime::now() + expiration_policy.expires_in().unwrap());
1288 let timestamp = t
1289 .duration_since(SystemTime::UNIX_EPOCH)
1290 .unwrap()
1291 .as_millis();
1292 (FAMILY_GC, timestamp as i64 * 1000)
1293 };
1294
1295 let path = id.as_storage_path().to_string().into_bytes();
1296 let mutations = [mutation(mutation::Mutation::SetCell(mutation::SetCell {
1297 family_name: family.to_owned(),
1298 column_qualifier: COLUMN_METADATA.to_owned(),
1299 timestamp_micros,
1300 value: meta.into_bytes(),
1301 }))];
1302
1303 backend.mutate(path, mutations, "test-setup").await?;
1304
1305 Ok(())
1306 }
1307
1308 async fn write_empty_redirect_tombstone(
1311 backend: &BigTableBackend,
1312 id: &ObjectId,
1313 ) -> Result<()> {
1314 let path = id.as_storage_path().to_string().into_bytes();
1315 let mutations = [
1316 mutation(mutation::Mutation::SetCell(mutation::SetCell {
1317 family_name: FAMILY_MANUAL.to_owned(),
1318 column_qualifier: COLUMN_REDIRECT.to_owned(),
1319 timestamp_micros: -1,
1320 value: b"".to_vec(), })),
1322 mutation(mutation::Mutation::SetCell(mutation::SetCell {
1323 family_name: FAMILY_MANUAL.to_owned(),
1324 column_qualifier: COLUMN_TOMBSTONE_META.to_owned(),
1325 timestamp_micros: -1,
1326 value: b"{}".to_vec(),
1327 })),
1328 ];
1329
1330 backend.mutate(path, mutations, "test-setup").await?;
1331
1332 Ok(())
1333 }
1334
1335 #[tokio::test]
1339 async fn test_roundtrip() -> Result<()> {
1340 let backend = create_test_backend().await?;
1341
1342 let id = make_id();
1343 let metadata = Metadata {
1344 content_type: "text/plain".into(),
1345 time_created: Some(SystemTime::now()),
1346 custom: BTreeMap::from_iter([("hello".into(), "world".into())]),
1347 ..Default::default()
1348 };
1349
1350 backend
1351 .put_object(&id, &metadata, stream::single("hello, world"))
1352 .await?;
1353
1354 let (obj_meta, stream) = backend.get_object(&id).await?.unwrap();
1355 let payload = stream::read_to_vec(stream).await?;
1356 assert_eq!(payload, b"hello, world");
1357 assert_eq!(obj_meta.content_type, metadata.content_type);
1358 assert_eq!(obj_meta.custom, metadata.custom);
1359
1360 let head_meta = backend.get_metadata(&id).await?.unwrap();
1361 assert_eq!(head_meta.content_type, metadata.content_type);
1362 assert_eq!(head_meta.custom, metadata.custom);
1363
1364 Ok(())
1365 }
1366
1367 #[tokio::test]
1369 async fn test_nonexistent() -> Result<()> {
1370 let backend = create_test_backend().await?;
1371
1372 let id = make_id();
1373 assert!(backend.get_object(&id).await?.is_none());
1374 assert!(backend.get_metadata(&id).await?.is_none());
1375 backend.delete_object(&id).await?;
1376
1377 Ok(())
1378 }
1379
1380 #[tokio::test]
1381 async fn test_overwrite() -> Result<()> {
1382 let backend = create_test_backend().await?;
1383
1384 let id = make_id();
1385 let first_metadata = Metadata {
1386 custom: BTreeMap::from_iter([("invalid".into(), "invalid".into())]),
1387 ..Default::default()
1388 };
1389 create_object(&backend, &id, &first_metadata, b"hello", SystemTime::now()).await?;
1390
1391 let second_metadata = Metadata {
1392 custom: BTreeMap::from_iter([("hello".into(), "world".into())]),
1393 ..Default::default()
1394 };
1395 backend
1396 .put_object(&id, &second_metadata, stream::single("world"))
1397 .await?;
1398
1399 let (meta, stream) = backend.get_object(&id).await?.unwrap();
1400 let payload = stream::read_to_vec(stream).await?;
1401 assert_eq!(payload, b"world");
1402 assert_eq!(meta.custom, second_metadata.custom);
1403
1404 Ok(())
1405 }
1406
1407 #[tokio::test]
1408 async fn test_read_after_delete() -> Result<()> {
1409 let backend = create_test_backend().await?;
1410
1411 let id = make_id();
1412 let metadata = Metadata::default();
1413 create_object(&backend, &id, &metadata, b"hello", SystemTime::now()).await?;
1414 backend.delete_object(&id).await?;
1415
1416 assert!(backend.get_object(&id).await?.is_none());
1417
1418 Ok(())
1419 }
1420
1421 #[tokio::test]
1427 async fn test_tti_bump() -> Result<()> {
1428 let backend = create_test_backend().await?;
1429 let tti = Duration::from_secs(2 * 24 * 3600); let metadata = Metadata {
1432 expiration_policy: ExpirationPolicy::TimeToIdle(tti),
1433 ..Default::default()
1434 };
1435
1436 let past_now = SystemTime::now() - TTI_DEBOUNCE - Duration::from_secs(60);
1439
1440 let id1 = make_id();
1442 create_object(&backend, &id1, &metadata, b"hello, world", past_now).await?;
1443
1444 let (pre_obj_meta, _) = backend.get_object(&id1).await?.unwrap();
1446 let pre_obj_expiry = pre_obj_meta.time_expires.unwrap();
1447
1448 let post_obj_meta = backend.get_metadata(&id1).await?.unwrap();
1450 let post_obj_expiry = post_obj_meta.time_expires.unwrap();
1451 assert!(
1452 post_obj_expiry > pre_obj_expiry,
1453 "bump should extend expiry"
1454 );
1455
1456 let id2 = make_id();
1458 create_object(&backend, &id2, &metadata, b"hello, world", past_now).await?;
1459
1460 let pre_meta = backend.get_metadata(&id2).await?.unwrap();
1462 let pre_expiry = pre_meta.time_expires.unwrap();
1463
1464 let post_meta = backend.get_metadata(&id2).await?.unwrap();
1466 let post_expiry = post_meta.time_expires.unwrap();
1467 assert!(post_expiry > pre_expiry, "bump should extend expiry");
1468
1469 let (_, stream) = backend.get_object(&id2).await?.unwrap();
1471 let payload = stream::read_to_vec(stream).await?;
1472 assert_eq!(payload, b"hello, world");
1473
1474 Ok(())
1475 }
1476
1477 #[tokio::test]
1478 async fn test_tti_no_bump_when_fresh() -> Result<()> {
1479 let backend = create_test_backend().await?;
1480
1481 let id = make_id();
1482 let tti = Duration::from_secs(2 * 24 * 3600); let metadata = Metadata {
1485 expiration_policy: ExpirationPolicy::TimeToIdle(tti),
1486 ..Default::default()
1487 };
1488 create_object(&backend, &id, &metadata, b"hello, world", SystemTime::now()).await?;
1489
1490 let first = backend.get_metadata(&id).await?.unwrap();
1493 let second = backend.get_metadata(&id).await?.unwrap();
1494
1495 assert_eq!(
1496 first.time_expires.unwrap(),
1497 second.time_expires.unwrap(),
1498 "fresh TTI object must not be bumped"
1499 );
1500
1501 Ok(())
1502 }
1503
1504 #[tokio::test]
1507 async fn test_ttl_immediate() -> Result<()> {
1508 let backend = create_test_backend().await?;
1512
1513 let id = make_id();
1514 let metadata = Metadata {
1515 expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_secs(0)),
1516 ..Default::default()
1517 };
1518 create_object(&backend, &id, &metadata, b"hello, world", SystemTime::now()).await?;
1519
1520 assert!(backend.get_object(&id).await?.is_none());
1521
1522 Ok(())
1523 }
1524
1525 #[tokio::test]
1526 async fn test_tti_immediate() -> Result<()> {
1527 let backend = create_test_backend().await?;
1531
1532 let id = make_id();
1533 let metadata = Metadata {
1534 expiration_policy: ExpirationPolicy::TimeToIdle(Duration::from_secs(0)),
1535 ..Default::default()
1536 };
1537 create_object(&backend, &id, &metadata, b"hello, world", SystemTime::now()).await?;
1538
1539 assert!(backend.get_object(&id).await?.is_none());
1540
1541 Ok(())
1542 }
1543
1544 #[tokio::test]
1553 async fn test_tiered_get() -> Result<()> {
1554 let backend = create_test_backend().await?;
1555
1556 let id = make_id();
1558 assert!(matches!(
1559 backend.get_tiered_object(&id).await?,
1560 TieredGet::NotFound
1561 ));
1562 assert!(matches!(
1563 backend.get_tiered_metadata(&id).await?,
1564 TieredMetadata::NotFound
1565 ));
1566
1567 let id = make_id();
1569 let put_meta = Metadata {
1570 content_type: "text/plain".into(),
1571 custom: BTreeMap::from_iter([("k".into(), "v".into())]),
1572 ..Default::default()
1573 };
1574 create_object(&backend, &id, &put_meta, b"payload", SystemTime::now()).await?;
1575
1576 let TieredGet::Object(obj_meta, obj_stream) = backend.get_tiered_object(&id).await? else {
1577 panic!("expected TieredGet::Object");
1578 };
1579 let obj_payload = stream::read_to_vec(obj_stream).await?;
1580 assert_eq!(obj_payload, b"payload");
1581 assert_eq!(obj_meta.content_type, put_meta.content_type);
1582 assert_eq!(obj_meta.custom, put_meta.custom);
1583
1584 let TieredMetadata::Object(head_meta) = backend.get_tiered_metadata(&id).await? else {
1585 panic!("expected TieredMetadata::Object");
1586 };
1587 assert_eq!(head_meta.content_type, put_meta.content_type);
1588 assert_eq!(head_meta.custom, put_meta.custom);
1589
1590 let hv_id = make_id();
1592 let lt_id = ObjectId::random(hv_id.context().clone());
1593 let tombstone = Tombstone {
1594 target: lt_id.clone(),
1595 expiration_policy: ExpirationPolicy::Manual,
1596 };
1597 create_tombstone(&backend, &hv_id, &tombstone, SystemTime::now()).await?;
1598
1599 match backend.get_tiered_object(&hv_id).await? {
1600 TieredGet::Tombstone(get_t) => assert_eq!(get_t.target, lt_id),
1601 other => panic!("expected TieredGet::Tombstone, got {other:?}"),
1602 }
1603 match backend.get_tiered_metadata(&hv_id).await? {
1604 TieredMetadata::Tombstone(meta_t) => assert_eq!(meta_t.target, lt_id,),
1605 other => panic!("expected TieredMetadata::Tombstone, got {other:?}"),
1606 }
1607
1608 Ok(())
1609 }
1610
1611 #[tokio::test]
1617 async fn test_put_non_tombstone() -> Result<()> {
1618 let backend = create_test_backend().await?;
1619
1620 let id = make_id();
1622 let metadata = Metadata::default();
1623 let result = backend
1624 .put_non_tombstone(&id, &metadata, Bytes::from_static(b"first"))
1625 .await?;
1626 assert_eq!(result, None, "expected None on empty row");
1627 let (_, stream) = backend.get_object(&id).await?.unwrap();
1628 assert_eq!(&stream::read_to_vec(stream).await?, b"first");
1629
1630 let id = make_id();
1632 create_object(&backend, &id, &metadata, b"old", SystemTime::now()).await?;
1633 let result = backend
1634 .put_non_tombstone(&id, &metadata, Bytes::from_static(b"new"))
1635 .await?;
1636 assert_eq!(result, None, "expected None when overwriting object");
1637 let (_, stream) = backend.get_object(&id).await?.unwrap();
1638 assert_eq!(&stream::read_to_vec(stream).await?, b"new");
1639
1640 let hv_id = make_id();
1642 let lt_id = ObjectId::random(hv_id.context().clone());
1643 let tombstone = Tombstone {
1644 target: lt_id.clone(),
1645 expiration_policy: ExpirationPolicy::Manual,
1646 };
1647 create_tombstone(&backend, &hv_id, &tombstone, SystemTime::now()).await?;
1648 let result = backend
1649 .put_non_tombstone(&hv_id, &metadata, Bytes::new())
1650 .await?;
1651 let returned = result.expect("expected Some(Tombstone) when row is a tombstone");
1652 assert_eq!(returned.target, lt_id);
1653 assert!(
1654 matches!(
1655 backend.get_tiered_metadata(&hv_id).await?,
1656 TieredMetadata::Tombstone(_)
1657 ),
1658 "tombstone must still exist after put_non_tombstone"
1659 );
1660
1661 Ok(())
1662 }
1663
1664 #[tokio::test]
1673 async fn test_delete_non_tombstone() -> Result<()> {
1674 let backend = create_test_backend().await?;
1675
1676 let id = make_id();
1678 assert_eq!(backend.delete_non_tombstone(&id).await?, None);
1679
1680 let id = make_id();
1682 let metadata = Metadata::default();
1683 create_object(&backend, &id, &metadata, b"hello, world", SystemTime::now()).await?;
1684 assert_eq!(backend.delete_non_tombstone(&id).await?, None);
1685 assert!(backend.get_object(&id).await?.is_none());
1686
1687 let id = make_id();
1689 let tombstone = Tombstone {
1690 target: id.clone(),
1691 expiration_policy: ExpirationPolicy::Manual,
1692 };
1693 create_tombstone(&backend, &id, &tombstone, SystemTime::now()).await?;
1694 let tombstone = backend
1695 .delete_non_tombstone(&id)
1696 .await?
1697 .expect("expected Some(tombstone)");
1698 assert_eq!(tombstone.target, id, "tombstone target must be returned");
1699 assert!(
1700 matches!(
1701 backend.get_tiered_metadata(&id).await?,
1702 TieredMetadata::Tombstone(_)
1703 ),
1704 "tombstone must still exist after delete_non_tombstone"
1705 );
1706
1707 Ok(())
1708 }
1709
1710 #[tokio::test]
1716 async fn test_cas_create_tombstone() -> Result<()> {
1717 let backend = create_test_backend().await?;
1718
1719 let hv_id = make_id();
1720 let lt_id = ObjectId::random(hv_id.context().clone());
1721 let expiration_policy = ExpirationPolicy::TimeToLive(Duration::from_secs(3600));
1722 let tombstone = Tombstone {
1723 target: lt_id.clone(),
1724 expiration_policy,
1725 };
1726
1727 let committed = backend
1729 .compare_and_write(&hv_id, None, TieredWrite::Tombstone(tombstone.clone()))
1730 .await?;
1731 assert!(committed, "expected CAS success on empty row");
1732
1733 let TieredMetadata::Tombstone(t) = backend.get_tiered_metadata(&hv_id).await? else {
1735 panic!("expected TieredMetadata::Tombstone");
1736 };
1737 assert_eq!(t.target, lt_id, "target must round-trip via r column");
1738 assert_eq!(t.expiration_policy, expiration_policy);
1739 match backend.get_tiered_object(&hv_id).await? {
1740 TieredGet::Tombstone(t) => assert_eq!(t.target, lt_id, "round-trip via r column"),
1741 other => panic!("expected TieredGet::Tombstone, got {other:?}"),
1742 }
1743
1744 assert!(matches!(
1746 backend.get_object(&hv_id).await,
1747 Err(Error::UnexpectedTombstone)
1748 ));
1749 assert!(matches!(
1750 backend.get_metadata(&hv_id).await,
1751 Err(Error::UnexpectedTombstone)
1752 ));
1753
1754 let second = backend
1756 .compare_and_write(&hv_id, None, TieredWrite::Tombstone(tombstone))
1757 .await?;
1758 assert!(second, "idempotent retry");
1759
1760 Ok(())
1761 }
1762
1763 #[tokio::test]
1765 async fn test_cas_swap_tombstone() -> Result<()> {
1766 let backend = create_test_backend().await?;
1767
1768 let hv_id = make_id();
1769 let old_lt_id = ObjectId::random(hv_id.context().clone());
1770 let wrong_lt_id = ObjectId::random(hv_id.context().clone());
1771 let new_lt_id = ObjectId::random(hv_id.context().clone());
1772
1773 let tombstone = Tombstone {
1774 target: old_lt_id.clone(),
1775 expiration_policy: ExpirationPolicy::Manual,
1776 };
1777 create_tombstone(&backend, &hv_id, &tombstone, SystemTime::now()).await?;
1778
1779 let write = TieredWrite::Tombstone(Tombstone {
1781 target: new_lt_id.clone(),
1782 expiration_policy: ExpirationPolicy::Manual,
1783 });
1784 let swapped = backend
1785 .compare_and_write(&hv_id, Some(&wrong_lt_id), write.clone())
1786 .await?;
1787 assert!(!swapped, "expected CAS failure due to wrong target");
1788 match backend.get_tiered_metadata(&hv_id).await? {
1789 TieredMetadata::Tombstone(t) => assert_eq!(t.target, old_lt_id),
1790 other => panic!("expected tombstone, got {other:?}"),
1791 }
1792
1793 let swapped = backend
1795 .compare_and_write(&hv_id, Some(&old_lt_id), write.clone())
1796 .await?;
1797 assert!(swapped, "expected CAS success with correct target");
1798 match backend.get_tiered_metadata(&hv_id).await? {
1799 TieredMetadata::Tombstone(t) => assert_eq!(t.target, new_lt_id),
1800 other => panic!("expected tombstone, got {other:?}"),
1801 }
1802
1803 let retry = backend
1805 .compare_and_write(&hv_id, Some(&old_lt_id), write)
1806 .await?;
1807 assert!(retry, "idempotent retry");
1808
1809 Ok(())
1810 }
1811
1812 #[tokio::test]
1814 async fn test_cas_swap_inline() -> Result<()> {
1815 let backend = create_test_backend().await?;
1816
1817 let id = make_id();
1818 let lt_id = ObjectId::random(id.context().clone());
1819 let wrong_id = ObjectId::random(id.context().clone());
1820
1821 let tombstone = Tombstone {
1822 target: lt_id.clone(),
1823 expiration_policy: ExpirationPolicy::Manual,
1824 };
1825 create_tombstone(&backend, &id, &tombstone, SystemTime::now()).await?;
1826
1827 let write = TieredWrite::Object(Metadata::default(), Bytes::new());
1829 let swapped = backend
1830 .compare_and_write(&id, Some(&wrong_id), write)
1831 .await?;
1832 assert!(!swapped, "expected CAS failure with wrong target");
1833 assert!(matches!(
1834 backend.get_tiered_metadata(&id).await?,
1835 TieredMetadata::Tombstone(_)
1836 ));
1837
1838 let payload = Bytes::from_static(b"hello inline");
1840 let write = TieredWrite::Object(Metadata::default(), payload.clone());
1841 let swapped = backend
1842 .compare_and_write(&id, Some(<_id), write.clone())
1843 .await?;
1844 assert!(swapped, "expected CAS success with correct target");
1845 let TieredGet::Object(_, stream) = backend.get_tiered_object(&id).await? else {
1846 panic!("expected inline object after swap");
1847 };
1848 assert_eq!(&stream::read_to_vec(stream).await?, payload.as_ref());
1849
1850 let retry = backend.compare_and_write(&id, Some(<_id), write).await?;
1852 assert!(retry, "idempotent retry");
1853
1854 Ok(())
1855 }
1856
1857 #[tokio::test]
1859 async fn test_cas_create_object_on_empty_row() -> Result<()> {
1860 let backend = create_test_backend().await?;
1861
1862 let id = make_id();
1863 let payload = Bytes::from_static(b"cas object");
1864 let write = TieredWrite::Object(Metadata::default(), payload.clone());
1865 let committed = backend.compare_and_write(&id, None, write).await?;
1866 assert!(committed, "expected CAS success on empty row");
1867
1868 let TieredGet::Object(_, stream) = backend.get_tiered_object(&id).await? else {
1869 panic!("expected Object after CAS-create");
1870 };
1871 assert_eq!(&stream::read_to_vec(stream).await?, payload.as_ref());
1872
1873 Ok(())
1874 }
1875
1876 #[tokio::test]
1878 async fn test_cas_delete() -> Result<()> {
1879 let backend = create_test_backend().await?;
1880
1881 let id = make_id();
1882 let lt_id = ObjectId::random(id.context().clone());
1883 let wrong_id = ObjectId::random(id.context().clone());
1884
1885 let tombstone = Tombstone {
1886 target: lt_id.clone(),
1887 expiration_policy: ExpirationPolicy::Manual,
1888 };
1889 create_tombstone(&backend, &id, &tombstone, SystemTime::now()).await?;
1890
1891 let deleted = backend
1893 .compare_and_write(&id, Some(&wrong_id), TieredWrite::Delete)
1894 .await?;
1895 assert!(!deleted, "expected CAS failure with wrong target");
1896 assert!(matches!(
1897 backend.get_tiered_metadata(&id).await?,
1898 TieredMetadata::Tombstone(_)
1899 ));
1900
1901 let deleted = backend
1903 .compare_and_write(&id, Some(<_id), TieredWrite::Delete)
1904 .await?;
1905 assert!(deleted, "expected CAS delete success");
1906 assert!(matches!(
1907 backend.get_tiered_metadata(&id).await?,
1908 TieredMetadata::NotFound
1909 ));
1910
1911 let retry = backend
1913 .compare_and_write(&id, Some(<_id), TieredWrite::Delete)
1914 .await?;
1915 assert!(retry, "idempotent retry");
1916
1917 let id2 = make_id();
1919 let fake_lt_id = ObjectId::random(id2.context().clone());
1920 let metadata = Metadata::default();
1921 create_object(&backend, &id2, &metadata, b"data", SystemTime::now()).await?;
1922 let deleted = backend
1923 .compare_and_write(&id2, Some(&fake_lt_id), TieredWrite::Delete)
1924 .await?;
1925 assert!(deleted, "expected idempotent deletion");
1926
1927 Ok(())
1928 }
1929
1930 #[tokio::test]
1937 async fn test_legacy_tombstone_reads() -> Result<()> {
1938 let backend = create_test_backend().await?;
1939
1940 let id = make_id();
1942 write_legacy_tombstone(&backend, &id, ExpirationPolicy::Manual, None).await?;
1943
1944 let TieredMetadata::Tombstone(t) = backend.get_tiered_metadata(&id).await? else {
1945 panic!("expected tombstone");
1946 };
1947 assert_eq!(t.expiration_policy, ExpirationPolicy::Manual);
1948 assert!(matches!(
1949 backend.get_tiered_object(&id).await?,
1950 TieredGet::Tombstone(_)
1951 ));
1952
1953 let id = make_id();
1958 let ttl = Duration::from_secs(2 * 24 * 3600);
1959 write_legacy_tombstone(&backend, &id, ExpirationPolicy::TimeToLive(ttl), None).await?;
1960
1961 let TieredMetadata::Tombstone(t) = backend.get_tiered_metadata(&id).await? else {
1962 panic!("expected TieredMetadata::Tombstone");
1963 };
1964 assert_eq!(t.expiration_policy, ExpirationPolicy::TimeToLive(ttl));
1965
1966 Ok(())
1967 }
1968
1969 #[tokio::test]
1974 async fn test_legacy_tombstone_tti_upgrade() -> Result<()> {
1975 let backend = create_test_backend().await?;
1976 let id = make_id();
1977 let path = id.as_storage_path().to_string().into_bytes();
1978
1979 let tti = Duration::from_secs(2 * 24 * 3600); let old_deadline = SystemTime::now() + tti - TTI_DEBOUNCE - Duration::from_secs(60);
1984 write_legacy_tombstone(
1985 &backend,
1986 &id,
1987 ExpirationPolicy::TimeToIdle(tti),
1988 Some(old_deadline),
1989 )
1990 .await?;
1991
1992 let TieredMetadata::Tombstone(_) = backend.get_tiered_metadata(&id).await? else {
1994 panic!("expected tombstone");
1995 };
1996
1997 let new_deadline = match backend.read_row(&path, None, "test-verify").await? {
1999 Some(RowData::Tombstone { time_expires, .. }) => time_expires.unwrap(),
2000 _ => panic!("expected tombstone row after bump"),
2001 };
2002
2003 assert!(
2004 new_deadline > old_deadline,
2005 "TTI bump should extend tombstone expiry: {old_deadline:?} -> {new_deadline:?}"
2006 );
2007
2008 Ok(())
2009 }
2010
2011 #[tokio::test]
2016 async fn test_legacy_tombstone_conditional_ops() -> Result<()> {
2017 let backend = create_test_backend().await?;
2018
2019 let id = make_id();
2021 write_legacy_tombstone(&backend, &id, ExpirationPolicy::Manual, None).await?;
2022 let t_opt = backend
2023 .put_non_tombstone(&id, &Metadata::default(), Bytes::new())
2024 .await?;
2025 assert_eq!(t_opt.map(|t| t.target).as_ref(), Some(&id));
2026
2027 let id = make_id();
2029 write_legacy_tombstone(&backend, &id, ExpirationPolicy::Manual, None).await?;
2030 let t_opt = backend.delete_non_tombstone(&id).await?;
2031 assert_eq!(t_opt.map(|t| t.target).as_ref(), Some(&id));
2032
2033 let id = make_id();
2035 write_legacy_tombstone(&backend, &id, ExpirationPolicy::Manual, None).await?;
2036 let deleted = backend
2037 .compare_and_write(&id, Some(&id), TieredWrite::Delete)
2038 .await?;
2039 assert!(
2040 deleted,
2041 "CAS-delete must succeed on legacy-metadata tombstone"
2042 );
2043 assert!(matches!(
2044 backend.get_tiered_metadata(&id).await?,
2045 TieredMetadata::NotFound
2046 ));
2047
2048 let id = make_id();
2050 write_empty_redirect_tombstone(&backend, &id).await?;
2051 let deleted = backend
2052 .compare_and_write(&id, Some(&id), TieredWrite::Delete)
2053 .await?;
2054 assert!(
2055 deleted,
2056 "CAS-delete must succeed on empty-redirect tombstone"
2057 );
2058 assert!(matches!(
2059 backend.get_tiered_metadata(&id).await?,
2060 TieredMetadata::NotFound
2061 ));
2062
2063 Ok(())
2064 }
2065
2066 #[tokio::test]
2068 async fn test_empty_redirect_falls_back_to_hv_id() -> Result<()> {
2069 let backend = create_test_backend().await?;
2070 let id = make_id();
2071
2072 write_empty_redirect_tombstone(&backend, &id).await?;
2073 match backend.get_tiered_metadata(&id).await? {
2074 TieredMetadata::Tombstone(t) => assert_eq!(t.target, id, "must fall back to hv_id"),
2075 other => panic!("expected tombstone, got {other:?}"),
2076 }
2077
2078 Ok(())
2079 }
2080
2081 #[tokio::test]
2086 async fn test_cas_create_tombstone_over_expired() -> Result<()> {
2087 let backend = create_test_backend().await?;
2088
2089 let id = make_id();
2090 let old_lt_id = ObjectId::random(id.context().clone());
2091 let old_tombstone = Tombstone {
2092 target: old_lt_id,
2093 expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_secs(0)),
2094 };
2095 create_tombstone(&backend, &id, &old_tombstone, SystemTime::now()).await?;
2096
2097 let new_lt_id = ObjectId::random(id.context().clone());
2098 let new_tombstone = Tombstone {
2099 target: new_lt_id.clone(),
2100 expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_secs(3600)),
2101 };
2102 let committed = backend
2103 .compare_and_write(&id, None, TieredWrite::Tombstone(new_tombstone))
2104 .await?;
2105 assert!(
2106 committed,
2107 "CAS with current=None must succeed over an expired tombstone"
2108 );
2109
2110 let TieredMetadata::Tombstone(t) = backend.get_tiered_metadata(&id).await? else {
2111 panic!("expected new tombstone to be readable");
2112 };
2113 assert_eq!(t.target, new_lt_id);
2114
2115 Ok(())
2116 }
2117
2118 #[tokio::test]
2121 async fn test_put_non_tombstone_over_expired() -> Result<()> {
2122 let backend = create_test_backend().await?;
2123
2124 let id = make_id();
2125 let lt_id = ObjectId::random(id.context().clone());
2126 let tombstone = Tombstone {
2127 target: lt_id,
2128 expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_secs(0)),
2129 };
2130 create_tombstone(&backend, &id, &tombstone, SystemTime::now()).await?;
2131
2132 let result = backend
2133 .put_non_tombstone(&id, &Metadata::default(), Bytes::from_static(b"data"))
2134 .await?;
2135 assert_eq!(
2136 result, None,
2137 "put_non_tombstone must succeed (return None) over an expired tombstone"
2138 );
2139
2140 let (_, stream) = backend.get_object(&id).await?.unwrap();
2141 assert_eq!(&stream::read_to_vec(stream).await?, b"data");
2142
2143 Ok(())
2144 }
2145}