1use std::fmt;
32use std::future::Future;
33use std::sync::Arc;
34use std::time::{Duration, SystemTime};
35
36use bigtable_rs::bigtable::{BigTableConnection, Error as BigTableError, RowCell};
37use bigtable_rs::google::bigtable::v2::{self, mutation};
38use bytes::Bytes;
39use futures_util::TryStreamExt;
40use objectstore_types::metadata::{ExpirationPolicy, Metadata};
41use objectstore_types::range::{ByteRange, ContentRange};
42use serde::{Deserialize, Serialize};
43use tonic::Code;
44
45use crate::backend::common::{
46 Backend, DeleteResponse, GetResponse, HighVolumeBackend, MetadataResponse, PutResponse,
47 TieredGet, TieredMetadata, TieredWrite, Tombstone,
48};
49use crate::error::{Error, Result};
50use crate::gcp_auth::PrefetchingTokenProvider;
51use crate::id::ObjectId;
52use crate::stream::{ChunkedBytes, ClientStream};
53
54#[derive(Debug, Clone, Deserialize, Serialize)]
76pub struct BigTableConfig {
77 pub endpoint: Option<String>,
90
91 pub project_id: String,
99
100 pub instance_name: String,
106
107 pub table_name: String,
115
116 pub connections: Option<usize>,
126}
127
128const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
130const MAX_CHANNEL_AGE: Option<Duration> = Some(Duration::from_mins(50));
138const TTI_DEBOUNCE: Duration = Duration::from_hours(24);
140const TOKEN_SCOPES: &[&str] = &["https://www.googleapis.com/auth/bigtable.data"];
142
143const REQUEST_RETRY_COUNT: usize = 2;
145const CAS_RETRY_COUNT: usize = 3;
147
148const COLUMN_PAYLOAD: &[u8] = b"p";
150const COLUMN_METADATA: &[u8] = b"m";
152const COLUMN_REDIRECT: &[u8] = b"r";
154const COLUMN_TOMBSTONE_META: &[u8] = b"t";
156const FILTER_META: &[u8] = b"^[mrt]$";
158
159const FAMILY_GC: &str = "fg";
164const FAMILY_MANUAL: &str = "fm";
166
167pub struct BigTableBackend {
169 bigtable: BigTableConnection,
170
171 instance_path: String,
172 table_path: String,
173 table_name: String,
174}
175
176impl fmt::Debug for BigTableBackend {
177 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
178 f.debug_struct("BigTableBackend")
179 .field("instance_path", &self.instance_path)
180 .field("table_path", &self.table_path)
181 .field("table_name", &self.table_name)
182 .finish_non_exhaustive()
183 }
184}
185
186fn column_filter(column: &[u8]) -> v2::RowFilter {
188 v2::RowFilter {
189 filter: Some(v2::row_filter::Filter::ColumnQualifierRegexFilter(
190 [b"^", column, b"$"].concat(),
191 )),
192 }
193}
194
195fn legacy_tombstone_filter() -> v2::RowFilter {
200 v2::RowFilter {
201 filter: Some(v2::row_filter::Filter::Chain(v2::row_filter::Chain {
202 filters: vec![
203 column_filter(COLUMN_METADATA),
204 v2::RowFilter {
205 filter: Some(v2::row_filter::Filter::ValueRegexFilter(
206 b"^\\{\"is_redirect_tombstone\":true[,}].*".to_vec(),
207 )),
208 },
209 ],
210 })),
211 }
212}
213
214fn live_row_filter(inner: v2::RowFilter) -> v2::RowFilter {
216 let now_micros = SystemTime::now()
217 .duration_since(SystemTime::UNIX_EPOCH)
218 .unwrap_or_default()
219 .as_millis() as i64
220 * 1000;
221
222 v2::RowFilter {
223 filter: Some(v2::row_filter::Filter::Interleave(
224 v2::row_filter::Interleave {
225 filters: vec![
226 v2::RowFilter {
228 filter: Some(v2::row_filter::Filter::Chain(v2::row_filter::Chain {
229 filters: vec![
230 v2::RowFilter {
231 filter: Some(v2::row_filter::Filter::FamilyNameRegexFilter(
232 format!("^{FAMILY_MANUAL}$"),
233 )),
234 },
235 inner.clone(),
236 ],
237 })),
238 },
239 v2::RowFilter {
241 filter: Some(v2::row_filter::Filter::Chain(v2::row_filter::Chain {
242 filters: vec![
243 v2::RowFilter {
244 filter: Some(v2::row_filter::Filter::FamilyNameRegexFilter(
245 format!("^{FAMILY_GC}$"),
246 )),
247 },
248 v2::RowFilter {
249 filter: Some(v2::row_filter::Filter::TimestampRangeFilter(
250 v2::TimestampRange {
251 start_timestamp_micros: now_micros,
252 end_timestamp_micros: 0,
253 },
254 )),
255 },
256 inner,
257 ],
258 })),
259 },
260 ],
261 },
262 )),
263 }
264}
265
266fn tombstone_filter() -> v2::RowFilter {
274 let filter = v2::RowFilter {
275 filter: Some(v2::row_filter::Filter::Interleave(
276 v2::row_filter::Interleave {
277 filters: vec![column_filter(COLUMN_REDIRECT), legacy_tombstone_filter()],
278 },
279 )),
280 };
281 live_row_filter(filter)
282}
283
284fn tombstone_predicate() -> MutatePredicate {
290 MutatePredicate::Exclude(tombstone_filter())
291}
292
293fn exact_value_regex(value: &str) -> Vec<u8> {
298 format!("^{}$", regex::escape(value)).into_bytes()
299}
300
301fn redirect_target_filter(target: &ObjectId, own_id: &ObjectId) -> v2::RowFilter {
319 let target_path = exact_value_regex(&target.as_storage_path().to_string());
320
321 let exact_match = v2::RowFilter {
322 filter: Some(v2::row_filter::Filter::Chain(v2::row_filter::Chain {
323 filters: vec![
324 column_filter(COLUMN_REDIRECT),
325 v2::RowFilter {
326 filter: Some(v2::row_filter::Filter::ValueRegexFilter(target_path)),
327 },
328 ],
329 })),
330 };
331
332 if target != own_id {
333 return live_row_filter(exact_match);
334 }
335
336 let empty_redirect_match = v2::RowFilter {
337 filter: Some(v2::row_filter::Filter::Chain(v2::row_filter::Chain {
338 filters: vec![
339 column_filter(COLUMN_REDIRECT),
340 v2::RowFilter {
341 filter: Some(v2::row_filter::Filter::ValueRegexFilter(b"^$".to_vec())),
342 },
343 ],
344 })),
345 };
346
347 let filter = v2::RowFilter {
351 filter: Some(v2::row_filter::Filter::Interleave(
352 v2::row_filter::Interleave {
353 filters: vec![exact_match, empty_redirect_match, legacy_tombstone_filter()],
354 },
355 )),
356 };
357 live_row_filter(filter)
358}
359
360fn update_predicate(old: &ObjectId, new: &ObjectId, own_id: &ObjectId) -> MutatePredicate {
367 MutatePredicate::Include(v2::RowFilter {
368 filter: Some(v2::row_filter::Filter::Interleave(
369 v2::row_filter::Interleave {
370 filters: vec![
371 redirect_target_filter(old, own_id),
372 redirect_target_filter(new, own_id),
373 ],
374 },
375 )),
376 })
377}
378
379fn optional_target_predicate(target: &ObjectId, own_id: &ObjectId) -> MutatePredicate {
391 MutatePredicate::Exclude(v2::RowFilter {
392 filter: Some(v2::row_filter::Filter::Condition(Box::new(
393 v2::row_filter::Condition {
394 predicate_filter: Some(Box::new(redirect_target_filter(target, own_id))),
395 true_filter: Some(Box::new(v2::RowFilter {
396 filter: Some(v2::row_filter::Filter::BlockAllFilter(true)),
397 })),
398 false_filter: Some(Box::new(tombstone_filter())),
399 },
400 ))),
401 })
402}
403
404#[derive(Clone, Debug)]
409enum MutatePredicate {
410 Include(v2::RowFilter),
414 Exclude(v2::RowFilter),
418}
419
420fn metadata_filter() -> v2::RowFilter {
425 v2::RowFilter {
426 filter: Some(v2::row_filter::Filter::ColumnQualifierRegexFilter(
427 FILTER_META.to_owned(),
428 )),
429 }
430}
431
432fn mutation(mutation: mutation::Mutation) -> v2::Mutation {
433 v2::Mutation {
434 mutation: Some(mutation),
435 }
436}
437
438fn delete_row_mutation() -> v2::Mutation {
440 mutation(mutation::Mutation::DeleteFromRow(
441 mutation::DeleteFromRow {},
442 ))
443}
444
445fn object_mutations(
451 mut metadata: Metadata,
452 payload: Vec<u8>,
453 now: SystemTime,
454) -> Result<[v2::Mutation; 3]> {
455 let (family, timestamp_micros) = match metadata.expiration_policy {
456 ExpirationPolicy::Manual => (FAMILY_MANUAL, -1),
457 ExpirationPolicy::TimeToLive(ttl) => (FAMILY_GC, ttl_to_micros(ttl, now)?),
458 ExpirationPolicy::TimeToIdle(tti) => (FAMILY_GC, ttl_to_micros(tti, now)?),
459 };
460
461 metadata.size = Some(payload.len());
463
464 let metadata_bytes = serde_json::to_vec(&metadata)
465 .map_err(|cause| Error::serde("failed to serialize metadata", cause))?;
466
467 Ok([
468 delete_row_mutation(),
470 mutation(mutation::Mutation::SetCell(mutation::SetCell {
471 family_name: family.to_owned(),
472 column_qualifier: COLUMN_PAYLOAD.to_owned(),
473 timestamp_micros,
474 value: payload,
475 })),
476 mutation(mutation::Mutation::SetCell(mutation::SetCell {
477 family_name: family.to_owned(),
478 column_qualifier: COLUMN_METADATA.to_owned(),
479 timestamp_micros,
480 value: metadata_bytes,
481 })),
482 ])
483}
484
485#[derive(Clone, Debug, Default, Deserialize, Serialize)]
490struct TombstoneMeta {
491 #[serde(default, skip_serializing_if = "ExpirationPolicy::is_manual")]
495 expiration_policy: ExpirationPolicy,
496}
497
498fn tombstone_mutations(tombstone: &Tombstone, now: SystemTime) -> Result<[v2::Mutation; 3]> {
504 let (family, timestamp_micros) = match tombstone.expiration_policy {
505 ExpirationPolicy::Manual => (FAMILY_MANUAL, -1),
506 ExpirationPolicy::TimeToLive(ttl) => (FAMILY_GC, ttl_to_micros(ttl, now)?),
507 ExpirationPolicy::TimeToIdle(tti) => (FAMILY_GC, ttl_to_micros(tti, now)?),
508 };
509
510 let tombstone_meta = TombstoneMeta {
511 expiration_policy: tombstone.expiration_policy,
512 };
513
514 Ok([
515 delete_row_mutation(),
516 mutation(mutation::Mutation::SetCell(mutation::SetCell {
517 family_name: family.to_owned(),
518 column_qualifier: COLUMN_REDIRECT.to_owned(),
519 timestamp_micros,
520 value: tombstone.target.as_storage_path().to_string().into_bytes(),
521 })),
522 mutation(mutation::Mutation::SetCell(mutation::SetCell {
523 family_name: family.to_owned(),
524 column_qualifier: COLUMN_TOMBSTONE_META.to_owned(),
525 timestamp_micros,
526 value: serde_json::to_vec(&tombstone_meta)
527 .map_err(|cause| Error::serde("failed to serialize tombstone", cause))?,
528 })),
529 ])
530}
531
532#[derive(Debug, Deserialize)]
536struct LegacyTombstoneMeta {
537 #[serde(default)]
544 is_redirect_tombstone: bool,
545
546 #[serde(default)]
548 expiration_policy: ExpirationPolicy,
549}
550
551enum RowData {
553 Object {
555 metadata: Metadata,
556 payload: Vec<u8>,
557 },
558 Tombstone {
560 target: Vec<u8>,
561 meta: TombstoneMeta,
562 time_expires: Option<SystemTime>,
563 },
564}
565
566impl RowData {
567 fn from_cells(cells: Vec<RowCell>) -> Result<Self> {
574 let mut metadata_opt: Option<Metadata> = None;
575 let mut tombstone_meta_opt: Option<TombstoneMeta> = None;
576 let mut redirect_detected = false;
577 let mut redirect_target = Vec::new();
578 let mut expire_at = None;
579 let mut payload = Vec::new();
580
581 for cell in cells {
582 if cell.family_name == FAMILY_GC {
587 expire_at = micros_to_time(cell.timestamp_micros);
588 }
589
590 match cell.qualifier.as_slice() {
591 COLUMN_REDIRECT => {
592 redirect_detected = true;
593 redirect_target = cell.value;
594 }
595 COLUMN_PAYLOAD => {
596 payload = cell.value;
597 }
598 COLUMN_TOMBSTONE_META => {
599 tombstone_meta_opt =
600 Some(serde_json::from_slice(&cell.value).map_err(|cause| {
601 Error::serde("failed to deserialize tombstone meta", cause)
602 })?);
603 }
604 COLUMN_METADATA => {
605 if let Ok(legacy_meta) =
606 serde_json::from_slice::<LegacyTombstoneMeta>(&cell.value)
607 && legacy_meta.is_redirect_tombstone
608 {
609 redirect_detected = true;
610 objectstore_metrics::count!("bigtable.legacy_tombstone_read");
611 tombstone_meta_opt = Some(TombstoneMeta {
612 expiration_policy: legacy_meta.expiration_policy,
613 });
614 } else {
615 metadata_opt =
616 Some(serde_json::from_slice(&cell.value).map_err(|cause| {
617 Error::serde("failed to deserialize metadata", cause)
618 })?);
619 }
620 }
621 _ => {}
622 }
623 }
624
625 Ok(if redirect_detected {
626 RowData::Tombstone {
627 target: redirect_target,
628 meta: tombstone_meta_opt.unwrap_or_default(),
629 time_expires: expire_at,
630 }
631 } else {
632 let mut metadata = metadata_opt.unwrap_or_default();
634 metadata.time_expires = expire_at;
635 RowData::Object { metadata, payload }
636 })
637 }
638
639 fn expiration_policy(&self) -> ExpirationPolicy {
641 match self {
642 RowData::Object { metadata, .. } => metadata.expiration_policy,
643 RowData::Tombstone { meta, .. } => meta.expiration_policy,
644 }
645 }
646
647 fn time_expires(&self) -> Option<SystemTime> {
649 match self {
650 RowData::Object { metadata, .. } => metadata.time_expires,
651 RowData::Tombstone { time_expires, .. } => *time_expires,
652 }
653 }
654
655 fn expires_before(&self, time: SystemTime) -> bool {
659 self.expiration_policy().is_timeout() && self.time_expires().is_some_and(|ts| ts < time)
660 }
661
662 fn needs_tti_bump(&self) -> bool {
664 matches!(
665 self.expiration_policy(),
666 ExpirationPolicy::TimeToIdle(tti) if self.expires_before(SystemTime::now() + tti - TTI_DEBOUNCE)
667 )
668 }
669}
670
671fn parse_redirect_target(redirect_path: &[u8], tombstone_id: &ObjectId) -> Result<ObjectId> {
677 if redirect_path.is_empty() {
678 objectstore_metrics::count!("bigtable.empty_redirect_read");
679 Ok(tombstone_id.clone())
680 } else {
681 let redirect_str = std::str::from_utf8(redirect_path)
682 .map_err(|_| Error::generic("invalid UTF-8 in redirect path"))?;
683 ObjectId::from_storage_path(redirect_str)
684 .ok_or_else(|| Error::generic("corrupt redirect path"))
685 }
686}
687
688impl BigTableBackend {
689 pub async fn new(config: BigTableConfig) -> anyhow::Result<Self> {
694 let BigTableConfig {
695 endpoint,
696 project_id,
697 instance_name,
698 table_name,
699 connections,
700 } = config;
701
702 let bigtable = if let Some(ref endpoint) = endpoint {
703 BigTableConnection::new_with_emulator(
704 endpoint,
705 &project_id,
706 &instance_name,
707 false, Some(CONNECT_TIMEOUT),
709 )?
710 } else {
711 let token_provider = PrefetchingTokenProvider::gcp_auth(TOKEN_SCOPES).await?;
712 BigTableConnection::new_with_managed_transport(
713 &project_id,
714 &instance_name,
715 false, Some(CONNECT_TIMEOUT),
717 Arc::new(token_provider),
718 connections.unwrap_or(1),
719 true, None, MAX_CHANNEL_AGE,
722 )
723 .await?
724 };
725
726 let client = bigtable.client();
727
728 Ok(Self {
729 bigtable,
730 instance_path: format!("projects/{project_id}/instances/{instance_name}"),
731 table_path: client.get_full_table_name(&table_name),
732 table_name,
733 })
734 }
735
736 async fn read_row(
740 &self,
741 path: &[u8],
742 filter: Option<v2::RowFilter>,
743 action: &'static str,
744 ) -> Result<Option<RowData>> {
745 let request = v2::ReadRowsRequest {
746 table_name: self.table_path.clone(),
747 rows: Some(v2::RowSet {
748 row_keys: vec![path.to_owned()],
749 row_ranges: vec![],
750 }),
751 filter,
752 rows_limit: 1,
753 ..Default::default()
754 };
755
756 let response = retry(action, || async {
757 self.bigtable.client().read_rows(request.clone()).await
758 })
759 .await?;
760 debug_assert!(response.len() <= 1, "Expected at most one row");
761
762 let Some((_, cells)) = response.into_iter().next() else {
763 objectstore_log::debug!("Object not found");
764 return Ok(None);
765 };
766
767 let row = RowData::from_cells(cells)?;
768 Ok(if row.expires_before(SystemTime::now()) {
769 None
770 } else {
771 Some(row)
772 })
773 }
774
775 async fn mutate(
776 &self,
777 path: Vec<u8>,
778 mutations: impl Into<Vec<v2::Mutation>>,
779 action: &'static str,
780 ) -> Result<v2::MutateRowResponse> {
781 let request = v2::MutateRowRequest {
782 table_name: self.table_path.clone(),
783 row_key: path,
784 mutations: mutations.into(),
785 ..Default::default()
786 };
787
788 let response = retry(action, || async {
789 self.bigtable.client().mutate_row(request.clone()).await
790 })
791 .await?;
792
793 Ok(response.into_inner())
794 }
795
796 async fn put_row(
797 &self,
798 path: Vec<u8>,
799 metadata: &Metadata,
800 payload: Vec<u8>,
801 action: &'static str,
802 ) -> Result<v2::MutateRowResponse> {
803 let mutations = object_mutations(metadata.clone(), payload, SystemTime::now())?;
804 self.mutate(path, mutations, action).await
805 }
806
807 async fn put_tombstone_row(
808 &self,
809 path: Vec<u8>,
810 tombstone: &Tombstone,
811 action: &'static str,
812 ) -> Result<v2::MutateRowResponse> {
813 let mutations = tombstone_mutations(tombstone, SystemTime::now())?;
814 self.mutate(path, mutations, action).await
815 }
816
817 async fn bump_tti(&self, path: Vec<u8>, row: &RowData, loaded: bool, hv_id: &ObjectId) {
821 let expiration_policy = row.expiration_policy();
822
823 match row {
824 RowData::Tombstone { target, .. } => {
825 let target = match parse_redirect_target(target, hv_id) {
826 Ok(target) => target,
827 Err(e) => {
828 objectstore_log::error!(!!&e, "invalid redirect target in tombstone row");
829 return;
830 }
831 };
832
833 let tombstone = Tombstone {
834 target,
835 expiration_policy,
836 };
837 let _ = self.put_tombstone_row(path, &tombstone, "tti-bump").await;
838 }
839 RowData::Object { metadata, payload } if loaded => {
840 let _ = self
841 .put_row(path, metadata, payload.clone(), "tti-bump")
842 .await;
843 }
844 RowData::Object { metadata, .. } => {
845 let payload_read = self
846 .read_row(&path, Some(column_filter(COLUMN_PAYLOAD)), "tti-bump")
847 .await;
848
849 if let Ok(Some(RowData::Object { payload, .. })) = payload_read {
850 let _ = self.put_row(path, metadata, payload, "tti-bump").await;
851 }
852 }
853 }
854 }
855
856 async fn check_and_mutate(
858 &self,
859 row_key: Vec<u8>,
860 predicate: MutatePredicate,
861 mutations: impl Into<Vec<v2::Mutation>>,
862 context: &'static str,
863 ) -> Result<bool> {
864 let (filter, true_mutations, false_mutations, success_on_match) = match predicate {
865 MutatePredicate::Include(f) => (f, mutations.into(), vec![], true),
866 MutatePredicate::Exclude(f) => (f, vec![], mutations.into(), false),
867 };
868
869 let request = v2::CheckAndMutateRowRequest {
870 table_name: self.table_path.clone(),
871 row_key,
872 predicate_filter: Some(filter),
873 true_mutations,
874 false_mutations,
875 ..Default::default()
876 };
877
878 let future = retry(context, || async {
879 self.bigtable
880 .client()
881 .check_and_mutate_row(request.clone())
882 .await
883 });
884
885 Ok(future.await?.predicate_matched == success_on_match)
886 }
887}
888
889#[async_trait::async_trait]
890impl Backend for BigTableBackend {
891 fn name(&self) -> &'static str {
892 "bigtable"
893 }
894
895 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
896 async fn put_object(
897 &self,
898 id: &ObjectId,
899 metadata: &Metadata,
900 mut stream: ClientStream,
901 ) -> Result<PutResponse> {
902 objectstore_log::debug!("Writing to Bigtable backend");
903 let path = id.as_storage_path().to_string().into_bytes();
904
905 let mut payload = ChunkedBytes::new(0);
906 while let Some(chunk) = stream.try_next().await? {
907 payload.push(chunk);
908 }
909
910 self.put_row(path, metadata, payload.into_bytes().into(), "put")
911 .await?;
912 Ok(())
913 }
914
915 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
916 async fn get_object(&self, id: &ObjectId, range: Option<ByteRange>) -> Result<GetResponse> {
917 match self.get_tiered_object(id, range).await? {
918 TieredGet::Object(metadata, content_range, payload) => {
919 Ok(Some((metadata, content_range, payload)))
920 }
921 TieredGet::Tombstone(_) => Err(Error::UnexpectedTombstone),
922 TieredGet::NotFound => Ok(None),
923 }
924 }
925
926 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
927 async fn get_metadata(&self, id: &ObjectId) -> Result<MetadataResponse> {
928 match self.get_tiered_metadata(id).await? {
929 TieredMetadata::Object(metadata) => Ok(Some(metadata)),
930 TieredMetadata::Tombstone(_) => Err(Error::UnexpectedTombstone),
931 TieredMetadata::NotFound => Ok(None),
932 }
933 }
934
935 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
936 async fn delete_object(&self, id: &ObjectId) -> Result<DeleteResponse> {
937 objectstore_log::debug!("Deleting from Bigtable backend");
938
939 let path = id.as_storage_path().to_string().into_bytes();
940 self.mutate(path, [delete_row_mutation()], "delete").await?;
941
942 Ok(())
943 }
944}
945
946#[async_trait::async_trait]
947impl HighVolumeBackend for BigTableBackend {
948 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
949 async fn put_non_tombstone(
950 &self,
951 id: &ObjectId,
952 metadata: &Metadata,
953 payload: Bytes,
954 ) -> Result<Option<Tombstone>> {
955 objectstore_log::debug!("Conditional put to Bigtable backend");
956
957 let path = id.as_storage_path().to_string().into_bytes();
958 let mutations = object_mutations(metadata.clone(), payload.to_vec(), SystemTime::now())?;
959
960 for _ in 0..CAS_RETRY_COUNT {
961 let write_succeeded = self
962 .check_and_mutate(
963 path.clone(),
964 tombstone_predicate(),
965 mutations.clone(),
966 "put_non_tombstone",
967 )
968 .await?;
969
970 if write_succeeded {
971 return Ok(None);
972 }
973
974 let row = self
976 .read_row(&path, Some(metadata_filter()), "put_non_tombstone")
977 .await?;
978
979 match row {
980 Some(RowData::Tombstone { target, meta, .. }) => {
981 return Ok(Some(Tombstone {
982 target: parse_redirect_target(&target, id)?,
983 expiration_policy: meta.expiration_policy,
984 }));
985 }
986 Some(RowData::Object { .. }) => continue,
988 None => continue,
990 }
991 }
992
993 Err(Error::generic("BigTable: race loop in put_non_tombstone"))
994 }
995
996 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
997 async fn get_tiered_object(
998 &self,
999 id: &ObjectId,
1000 range: Option<ByteRange>,
1001 ) -> Result<TieredGet> {
1002 objectstore_log::debug!("Reading from Bigtable backend");
1003 let path = id.as_storage_path().to_string().into_bytes();
1004
1005 let Some(row) = self.read_row(&path, None, "get_tiered_object").await? else {
1006 return Ok(TieredGet::NotFound);
1007 };
1008
1009 if row.needs_tti_bump() {
1010 self.bump_tti(path.clone(), &row, true, id).await;
1011 }
1012
1013 Ok(match row {
1014 RowData::Tombstone { meta, target, .. } => TieredGet::Tombstone(Tombstone {
1015 target: parse_redirect_target(&target, id)?,
1016 expiration_policy: meta.expiration_policy,
1017 }),
1018 RowData::Object { metadata, payload } => {
1019 let mut metadata = metadata;
1020 let payload = Bytes::from(payload);
1021 if metadata.size.is_none() {
1022 metadata.size = Some(payload.len());
1024 }
1025
1026 let (content_range, payload) = apply_range(payload, range)?;
1027 TieredGet::Object(metadata, content_range, crate::stream::single(payload))
1028 }
1029 })
1030 }
1031
1032 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
1033 async fn get_tiered_metadata(&self, id: &ObjectId) -> Result<TieredMetadata> {
1034 objectstore_log::debug!("Reading metadata from Bigtable backend");
1035 let path = id.as_storage_path().to_string().into_bytes();
1036
1037 let row_opt = self
1041 .read_row(&path, Some(metadata_filter()), "get_tiered_metadata")
1042 .await?;
1043 let Some(row) = row_opt else {
1044 return Ok(TieredMetadata::NotFound);
1045 };
1046
1047 if row.needs_tti_bump() {
1048 self.bump_tti(path.clone(), &row, false, id).await;
1049 }
1050
1051 Ok(match row {
1052 RowData::Tombstone { meta, target, .. } => TieredMetadata::Tombstone(Tombstone {
1053 target: parse_redirect_target(&target, id)?,
1054 expiration_policy: meta.expiration_policy,
1055 }),
1056 RowData::Object { metadata, .. } => TieredMetadata::Object(metadata),
1057 })
1058 }
1059
1060 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
1061 async fn delete_non_tombstone(&self, id: &ObjectId) -> Result<Option<Tombstone>> {
1062 objectstore_log::debug!("Conditional delete from Bigtable backend");
1063
1064 let path = id.as_storage_path().to_string().into_bytes();
1065
1066 for _ in 0..CAS_RETRY_COUNT {
1067 let write_succeeded = self
1068 .check_and_mutate(
1069 path.clone(),
1070 tombstone_predicate(),
1071 [delete_row_mutation()],
1072 "delete_non_tombstone",
1073 )
1074 .await?;
1075
1076 if write_succeeded {
1077 return Ok(None);
1078 }
1079
1080 let row = self
1082 .read_row(&path, Some(metadata_filter()), "delete_non_tombstone")
1083 .await?;
1084
1085 match row {
1086 Some(RowData::Tombstone { target, meta, .. }) => {
1087 return Ok(Some(Tombstone {
1088 target: parse_redirect_target(&target, id)?,
1089 expiration_policy: meta.expiration_policy,
1090 }));
1091 }
1092 Some(RowData::Object { .. }) => continue,
1094 None => return Ok(None),
1096 }
1097 }
1098
1099 Err(Error::generic(
1100 "BigTable: race loop in delete_non_tombstone",
1101 ))
1102 }
1103
1104 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
1105 async fn compare_and_write(
1106 &self,
1107 id: &ObjectId,
1108 current: Option<&ObjectId>,
1109 write: TieredWrite,
1110 ) -> Result<bool> {
1111 objectstore_log::debug!("CAS put to Bigtable backend");
1112
1113 let path = id.as_storage_path().to_string().into_bytes();
1114 let now = SystemTime::now();
1115
1116 let predicate = match (current, write.target()) {
1117 (Some(old), Some(new)) => update_predicate(old, new, id),
1118 (Some(target), None) => optional_target_predicate(target, id),
1119 (None, Some(target)) => optional_target_predicate(target, id),
1120 (None, None) => tombstone_predicate(),
1121 };
1122
1123 let mutations = match write {
1124 TieredWrite::Tombstone(tombstone) => tombstone_mutations(&tombstone, now)?.into(),
1125 TieredWrite::Object(m, p) => object_mutations(m, p.to_vec(), now)?.into(),
1126 TieredWrite::Delete => vec![delete_row_mutation()],
1127 };
1128
1129 self.check_and_mutate(path, predicate, mutations, "compare_and_write")
1130 .await
1131 }
1132}
1133
1134fn ttl_to_micros(ttl: Duration, from: SystemTime) -> Result<i64> {
1140 let deadline = from.checked_add(ttl).ok_or_else(|| Error::Generic {
1141 context: format!(
1142 "TTL duration overflow: {} plus {}s cannot be represented as SystemTime",
1143 humantime::format_rfc3339_seconds(from),
1144 ttl.as_secs()
1145 ),
1146 cause: None,
1147 })?;
1148 let millis = deadline
1149 .duration_since(SystemTime::UNIX_EPOCH)
1150 .map_err(|e| Error::Generic {
1151 context: format!(
1152 "unable to get duration since UNIX_EPOCH for SystemTime {}",
1153 humantime::format_rfc3339_seconds(deadline)
1154 ),
1155 cause: Some(Box::new(e)),
1156 })?
1157 .as_millis();
1158 (millis * 1000).try_into().map_err(|e| Error::Generic {
1159 context: format!("failed to convert {millis}ms to i64 microseconds"),
1160 cause: Some(Box::new(e)),
1161 })
1162}
1163
1164fn micros_to_time(micros: i64) -> Option<SystemTime> {
1166 let micros = u64::try_from(micros).ok()?;
1167 let duration = Duration::from_micros(micros);
1168 SystemTime::UNIX_EPOCH.checked_add(duration)
1169}
1170
1171async fn retry<T, F>(context: &'static str, f: impl Fn() -> F) -> Result<T>
1173where
1174 F: Future<Output = Result<T, BigTableError>> + Send,
1175{
1176 let mut retry_count = 0usize;
1177
1178 loop {
1179 match f().await {
1180 Ok(res) => return Ok(res),
1181 Err(e) if retry_count >= REQUEST_RETRY_COUNT || !is_retryable(&e) => {
1182 objectstore_metrics::count!("bigtable.failures", action = context);
1183 return Err(Error::Generic {
1184 context: format!("Bigtable: `{context}` failed"),
1185 cause: Some(Box::new(e)),
1186 });
1187 }
1188 Err(e) => {
1189 retry_count += 1;
1190 objectstore_metrics::count!("bigtable.retries", action = context);
1191 objectstore_log::warn!(!!&e, retry_count, context, "Retrying request");
1192 }
1193 }
1194 }
1195}
1196
1197fn is_retryable(error: &BigTableError) -> bool {
1198 match error {
1199 BigTableError::GCPAuthError(_) => true,
1201 BigTableError::TransportError(_) => true,
1203 BigTableError::IoError(_) => true,
1205 BigTableError::TimeoutError(_) => true,
1206
1207 BigTableError::RpcError(status) => match status.code() {
1209 Code::Unavailable => true,
1211 Code::Cancelled => true,
1213 Code::DeadlineExceeded => true,
1214 Code::Unauthenticated => true,
1216 Code::Aborted => true,
1218 Code::Internal => true,
1219 Code::FailedPrecondition => true,
1220 Code::Unknown => true,
1221 _ => false,
1222 },
1223 _ => false,
1224 }
1225}
1226
1227fn apply_range(payload: Bytes, range: Option<ByteRange>) -> Result<(Option<ContentRange>, Bytes)> {
1233 let Some(byte_range) = range else {
1234 return Ok((None, payload));
1235 };
1236
1237 let total = payload.len() as u64;
1238 let content_range = byte_range
1239 .resolve(total)
1240 .ok_or(Error::RangeNotSatisfiable { total })?;
1241
1242 let sliced = payload.slice(content_range.start as usize..content_range.end as usize + 1);
1243 Ok((Some(content_range), sliced))
1244}
1245
1246#[cfg(test)]
1247mod tests {
1248 use std::collections::BTreeMap;
1249
1250 use anyhow::Result;
1251 use objectstore_types::scope::{Scope, Scopes};
1252
1253 use super::*;
1254 use crate::id::ObjectContext;
1255 use crate::stream;
1256
1257 async fn create_test_backend() -> Result<BigTableBackend> {
1263 BigTableBackend::new(BigTableConfig {
1264 endpoint: Some("localhost:8086".into()),
1265 project_id: "testing".into(),
1266 instance_name: "objectstore".into(),
1267 table_name: "objectstore".into(),
1268 connections: None,
1269 })
1270 .await
1271 }
1272
1273 fn make_id() -> ObjectId {
1274 ObjectId::random(ObjectContext {
1275 usecase: "testing".into(),
1276 scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
1277 })
1278 }
1279
1280 async fn create_object(
1281 backend: &BigTableBackend,
1282 id: &ObjectId,
1283 metadata: &Metadata,
1284 payload: &[u8],
1285 now: SystemTime,
1286 ) -> Result<()> {
1287 let path = id.as_storage_path().to_string().into_bytes();
1288 let mutations = object_mutations(metadata.clone(), payload.to_vec(), now)?;
1289 backend.mutate(path, mutations, "test-setup").await?;
1290 Ok(())
1291 }
1292
1293 async fn create_tombstone(
1294 backend: &BigTableBackend,
1295 id: &ObjectId,
1296 tombstone: &Tombstone,
1297 now: SystemTime,
1298 ) -> Result<()> {
1299 let path = id.as_storage_path().to_string().into_bytes();
1300 let mutations = tombstone_mutations(tombstone, now)?;
1301 backend.mutate(path, mutations, "test-setup").await?;
1302 Ok(())
1303 }
1304
1305 async fn write_legacy_tombstone(
1307 backend: &BigTableBackend,
1308 id: &ObjectId,
1309 expiration_policy: ExpirationPolicy,
1310 time_expires: Option<SystemTime>,
1311 ) -> Result<()> {
1312 let meta = if expiration_policy.is_manual() {
1313 r#"{"is_redirect_tombstone":true}"#.to_owned()
1314 } else {
1315 let policy_json = serde_json::to_string(&expiration_policy).unwrap();
1316 format!(r#"{{"is_redirect_tombstone":true,"expiration_policy":{policy_json}}}"#)
1317 };
1318
1319 let (family, timestamp_micros) = if expiration_policy.is_manual() {
1320 (FAMILY_MANUAL, -1)
1321 } else {
1322 let t =
1323 time_expires.unwrap_or(SystemTime::now() + expiration_policy.expires_in().unwrap());
1324 let timestamp = t
1325 .duration_since(SystemTime::UNIX_EPOCH)
1326 .unwrap()
1327 .as_millis();
1328 (FAMILY_GC, timestamp as i64 * 1000)
1329 };
1330
1331 let path = id.as_storage_path().to_string().into_bytes();
1332 let mutations = [mutation(mutation::Mutation::SetCell(mutation::SetCell {
1333 family_name: family.to_owned(),
1334 column_qualifier: COLUMN_METADATA.to_owned(),
1335 timestamp_micros,
1336 value: meta.into_bytes(),
1337 }))];
1338
1339 backend.mutate(path, mutations, "test-setup").await?;
1340
1341 Ok(())
1342 }
1343
1344 async fn write_empty_redirect_tombstone(
1347 backend: &BigTableBackend,
1348 id: &ObjectId,
1349 ) -> Result<()> {
1350 let path = id.as_storage_path().to_string().into_bytes();
1351 let mutations = [
1352 mutation(mutation::Mutation::SetCell(mutation::SetCell {
1353 family_name: FAMILY_MANUAL.to_owned(),
1354 column_qualifier: COLUMN_REDIRECT.to_owned(),
1355 timestamp_micros: -1,
1356 value: b"".to_vec(), })),
1358 mutation(mutation::Mutation::SetCell(mutation::SetCell {
1359 family_name: FAMILY_MANUAL.to_owned(),
1360 column_qualifier: COLUMN_TOMBSTONE_META.to_owned(),
1361 timestamp_micros: -1,
1362 value: b"{}".to_vec(),
1363 })),
1364 ];
1365
1366 backend.mutate(path, mutations, "test-setup").await?;
1367
1368 Ok(())
1369 }
1370
1371 #[tokio::test]
1375 async fn test_roundtrip() -> Result<()> {
1376 let backend = create_test_backend().await?;
1377
1378 let id = make_id();
1379 let metadata = Metadata {
1380 content_type: "text/plain".into(),
1381 time_created: Some(SystemTime::now()),
1382 custom: BTreeMap::from_iter([("hello".into(), "world".into())]),
1383 ..Default::default()
1384 };
1385
1386 backend
1387 .put_object(&id, &metadata, stream::single("hello, world"))
1388 .await?;
1389
1390 let (obj_meta, _, stream) = backend.get_object(&id, None).await?.unwrap();
1391 let payload = stream::read_to_vec(stream).await?;
1392 assert_eq!(payload, b"hello, world");
1393 assert_eq!(obj_meta.content_type, metadata.content_type);
1394 assert_eq!(obj_meta.custom, metadata.custom);
1395
1396 let head_meta = backend.get_metadata(&id).await?.unwrap();
1397 assert_eq!(head_meta.content_type, metadata.content_type);
1398 assert_eq!(head_meta.custom, metadata.custom);
1399
1400 Ok(())
1401 }
1402
1403 #[tokio::test]
1405 async fn test_nonexistent() -> Result<()> {
1406 let backend = create_test_backend().await?;
1407
1408 let id = make_id();
1409 assert!(backend.get_object(&id, None).await?.is_none());
1410 assert!(backend.get_metadata(&id).await?.is_none());
1411 backend.delete_object(&id).await?;
1412
1413 Ok(())
1414 }
1415
1416 #[tokio::test]
1417 async fn test_overwrite() -> Result<()> {
1418 let backend = create_test_backend().await?;
1419
1420 let id = make_id();
1421 let first_metadata = Metadata {
1422 custom: BTreeMap::from_iter([("invalid".into(), "invalid".into())]),
1423 ..Default::default()
1424 };
1425 create_object(&backend, &id, &first_metadata, b"hello", SystemTime::now()).await?;
1426
1427 let second_metadata = Metadata {
1428 custom: BTreeMap::from_iter([("hello".into(), "world".into())]),
1429 ..Default::default()
1430 };
1431 backend
1432 .put_object(&id, &second_metadata, stream::single("world"))
1433 .await?;
1434
1435 let (meta, _, stream) = backend.get_object(&id, None).await?.unwrap();
1436 let payload = stream::read_to_vec(stream).await?;
1437 assert_eq!(payload, b"world");
1438 assert_eq!(meta.custom, second_metadata.custom);
1439
1440 Ok(())
1441 }
1442
1443 #[tokio::test]
1444 async fn test_read_after_delete() -> Result<()> {
1445 let backend = create_test_backend().await?;
1446
1447 let id = make_id();
1448 let metadata = Metadata::default();
1449 create_object(&backend, &id, &metadata, b"hello", SystemTime::now()).await?;
1450 backend.delete_object(&id).await?;
1451
1452 assert!(backend.get_object(&id, None).await?.is_none());
1453
1454 Ok(())
1455 }
1456
1457 #[tokio::test]
1463 async fn test_tti_bump() -> Result<()> {
1464 let backend = create_test_backend().await?;
1465 let tti = Duration::from_hours(2 * 24);
1467 let metadata = Metadata {
1468 expiration_policy: ExpirationPolicy::TimeToIdle(tti),
1469 ..Default::default()
1470 };
1471
1472 let past_now = SystemTime::now() - TTI_DEBOUNCE - Duration::from_mins(1);
1475
1476 let id1 = make_id();
1478 create_object(&backend, &id1, &metadata, b"hello, world", past_now).await?;
1479
1480 let (pre_obj_meta, _, _) = backend.get_object(&id1, None).await?.unwrap();
1482 let pre_obj_expiry = pre_obj_meta.time_expires.unwrap();
1483
1484 let post_obj_meta = backend.get_metadata(&id1).await?.unwrap();
1486 let post_obj_expiry = post_obj_meta.time_expires.unwrap();
1487 assert!(
1488 post_obj_expiry > pre_obj_expiry,
1489 "bump should extend expiry"
1490 );
1491
1492 let id2 = make_id();
1494 create_object(&backend, &id2, &metadata, b"hello, world", past_now).await?;
1495
1496 let pre_meta = backend.get_metadata(&id2).await?.unwrap();
1498 let pre_expiry = pre_meta.time_expires.unwrap();
1499
1500 let post_meta = backend.get_metadata(&id2).await?.unwrap();
1502 let post_expiry = post_meta.time_expires.unwrap();
1503 assert!(post_expiry > pre_expiry, "bump should extend expiry");
1504
1505 let (_, _, stream) = backend.get_object(&id2, None).await?.unwrap();
1507 let payload = stream::read_to_vec(stream).await?;
1508 assert_eq!(payload, b"hello, world");
1509
1510 Ok(())
1511 }
1512
1513 #[tokio::test]
1514 async fn test_tti_no_bump_when_fresh() -> Result<()> {
1515 let backend = create_test_backend().await?;
1516
1517 let id = make_id();
1518 let tti = Duration::from_hours(2 * 24);
1520 let metadata = Metadata {
1521 expiration_policy: ExpirationPolicy::TimeToIdle(tti),
1522 ..Default::default()
1523 };
1524 create_object(&backend, &id, &metadata, b"hello, world", SystemTime::now()).await?;
1525
1526 let first = backend.get_metadata(&id).await?.unwrap();
1529 let second = backend.get_metadata(&id).await?.unwrap();
1530
1531 assert_eq!(
1532 first.time_expires.unwrap(),
1533 second.time_expires.unwrap(),
1534 "fresh TTI object must not be bumped"
1535 );
1536
1537 Ok(())
1538 }
1539
1540 #[tokio::test]
1543 async fn test_ttl_immediate() -> Result<()> {
1544 let backend = create_test_backend().await?;
1548
1549 let id = make_id();
1550 let metadata = Metadata {
1551 expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_secs(0)),
1552 ..Default::default()
1553 };
1554 create_object(&backend, &id, &metadata, b"hello, world", SystemTime::now()).await?;
1555
1556 assert!(backend.get_object(&id, None).await?.is_none());
1557
1558 Ok(())
1559 }
1560
1561 #[tokio::test]
1562 async fn test_tti_immediate() -> Result<()> {
1563 let backend = create_test_backend().await?;
1567
1568 let id = make_id();
1569 let metadata = Metadata {
1570 expiration_policy: ExpirationPolicy::TimeToIdle(Duration::from_secs(0)),
1571 ..Default::default()
1572 };
1573 create_object(&backend, &id, &metadata, b"hello, world", SystemTime::now()).await?;
1574
1575 assert!(backend.get_object(&id, None).await?.is_none());
1576
1577 Ok(())
1578 }
1579
1580 #[tokio::test]
1589 async fn test_tiered_get() -> Result<()> {
1590 let backend = create_test_backend().await?;
1591
1592 let id = make_id();
1594 assert!(matches!(
1595 backend.get_tiered_object(&id, None).await?,
1596 TieredGet::NotFound
1597 ));
1598 assert!(matches!(
1599 backend.get_tiered_metadata(&id).await?,
1600 TieredMetadata::NotFound
1601 ));
1602
1603 let id = make_id();
1605 let put_meta = Metadata {
1606 content_type: "text/plain".into(),
1607 custom: BTreeMap::from_iter([("k".into(), "v".into())]),
1608 ..Default::default()
1609 };
1610 create_object(&backend, &id, &put_meta, b"payload", SystemTime::now()).await?;
1611
1612 let TieredGet::Object(obj_meta, _, obj_stream) =
1613 backend.get_tiered_object(&id, None).await?
1614 else {
1615 panic!("expected TieredGet::Object");
1616 };
1617 let obj_payload = stream::read_to_vec(obj_stream).await?;
1618 assert_eq!(obj_payload, b"payload");
1619 assert_eq!(obj_meta.content_type, put_meta.content_type);
1620 assert_eq!(obj_meta.custom, put_meta.custom);
1621
1622 let TieredMetadata::Object(head_meta) = backend.get_tiered_metadata(&id).await? else {
1623 panic!("expected TieredMetadata::Object");
1624 };
1625 assert_eq!(head_meta.content_type, put_meta.content_type);
1626 assert_eq!(head_meta.custom, put_meta.custom);
1627
1628 let hv_id = make_id();
1630 let lt_id = ObjectId::random(hv_id.context().clone());
1631 let tombstone = Tombstone {
1632 target: lt_id.clone(),
1633 expiration_policy: ExpirationPolicy::Manual,
1634 };
1635 create_tombstone(&backend, &hv_id, &tombstone, SystemTime::now()).await?;
1636
1637 match backend.get_tiered_object(&hv_id, None).await? {
1638 TieredGet::Tombstone(get_t) => assert_eq!(get_t.target, lt_id),
1639 other => panic!("expected TieredGet::Tombstone, got {other:?}"),
1640 }
1641 match backend.get_tiered_metadata(&hv_id).await? {
1642 TieredMetadata::Tombstone(meta_t) => assert_eq!(meta_t.target, lt_id,),
1643 other => panic!("expected TieredMetadata::Tombstone, got {other:?}"),
1644 }
1645
1646 Ok(())
1647 }
1648
1649 #[tokio::test]
1655 async fn test_put_non_tombstone() -> Result<()> {
1656 let backend = create_test_backend().await?;
1657
1658 let id = make_id();
1660 let metadata = Metadata::default();
1661 let result = backend
1662 .put_non_tombstone(&id, &metadata, Bytes::from_static(b"first"))
1663 .await?;
1664 assert_eq!(result, None, "expected None on empty row");
1665 let (_, _, stream) = backend.get_object(&id, None).await?.unwrap();
1666 assert_eq!(&stream::read_to_vec(stream).await?, b"first");
1667
1668 let id = make_id();
1670 create_object(&backend, &id, &metadata, b"old", SystemTime::now()).await?;
1671 let result = backend
1672 .put_non_tombstone(&id, &metadata, Bytes::from_static(b"new"))
1673 .await?;
1674 assert_eq!(result, None, "expected None when overwriting object");
1675 let (_, _, stream) = backend.get_object(&id, None).await?.unwrap();
1676 assert_eq!(&stream::read_to_vec(stream).await?, b"new");
1677
1678 let hv_id = make_id();
1680 let lt_id = ObjectId::random(hv_id.context().clone());
1681 let tombstone = Tombstone {
1682 target: lt_id.clone(),
1683 expiration_policy: ExpirationPolicy::Manual,
1684 };
1685 create_tombstone(&backend, &hv_id, &tombstone, SystemTime::now()).await?;
1686 let result = backend
1687 .put_non_tombstone(&hv_id, &metadata, Bytes::new())
1688 .await?;
1689 let returned = result.expect("expected Some(Tombstone) when row is a tombstone");
1690 assert_eq!(returned.target, lt_id);
1691 assert!(
1692 matches!(
1693 backend.get_tiered_metadata(&hv_id).await?,
1694 TieredMetadata::Tombstone(_)
1695 ),
1696 "tombstone must still exist after put_non_tombstone"
1697 );
1698
1699 Ok(())
1700 }
1701
1702 #[tokio::test]
1711 async fn test_delete_non_tombstone() -> Result<()> {
1712 let backend = create_test_backend().await?;
1713
1714 let id = make_id();
1716 assert_eq!(backend.delete_non_tombstone(&id).await?, None);
1717
1718 let id = make_id();
1720 let metadata = Metadata::default();
1721 create_object(&backend, &id, &metadata, b"hello, world", SystemTime::now()).await?;
1722 assert_eq!(backend.delete_non_tombstone(&id).await?, None);
1723 assert!(backend.get_object(&id, None).await?.is_none());
1724
1725 let id = make_id();
1727 let tombstone = Tombstone {
1728 target: id.clone(),
1729 expiration_policy: ExpirationPolicy::Manual,
1730 };
1731 create_tombstone(&backend, &id, &tombstone, SystemTime::now()).await?;
1732 let tombstone = backend
1733 .delete_non_tombstone(&id)
1734 .await?
1735 .expect("expected Some(tombstone)");
1736 assert_eq!(tombstone.target, id, "tombstone target must be returned");
1737 assert!(
1738 matches!(
1739 backend.get_tiered_metadata(&id).await?,
1740 TieredMetadata::Tombstone(_)
1741 ),
1742 "tombstone must still exist after delete_non_tombstone"
1743 );
1744
1745 Ok(())
1746 }
1747
1748 #[tokio::test]
1754 async fn test_cas_create_tombstone() -> Result<()> {
1755 let backend = create_test_backend().await?;
1756
1757 let hv_id = make_id();
1758 let lt_id = ObjectId::random(hv_id.context().clone());
1759 let expiration_policy = ExpirationPolicy::TimeToLive(Duration::from_hours(1));
1760 let tombstone = Tombstone {
1761 target: lt_id.clone(),
1762 expiration_policy,
1763 };
1764
1765 let committed = backend
1767 .compare_and_write(&hv_id, None, TieredWrite::Tombstone(tombstone.clone()))
1768 .await?;
1769 assert!(committed, "expected CAS success on empty row");
1770
1771 let TieredMetadata::Tombstone(t) = backend.get_tiered_metadata(&hv_id).await? else {
1773 panic!("expected TieredMetadata::Tombstone");
1774 };
1775 assert_eq!(t.target, lt_id, "target must round-trip via r column");
1776 assert_eq!(t.expiration_policy, expiration_policy);
1777 match backend.get_tiered_object(&hv_id, None).await? {
1778 TieredGet::Tombstone(t) => assert_eq!(t.target, lt_id, "round-trip via r column"),
1779 other => panic!("expected TieredGet::Tombstone, got {other:?}"),
1780 }
1781
1782 assert!(matches!(
1784 backend.get_object(&hv_id, None).await,
1785 Err(Error::UnexpectedTombstone)
1786 ));
1787 assert!(matches!(
1788 backend.get_metadata(&hv_id).await,
1789 Err(Error::UnexpectedTombstone)
1790 ));
1791
1792 let second = backend
1794 .compare_and_write(&hv_id, None, TieredWrite::Tombstone(tombstone))
1795 .await?;
1796 assert!(second, "idempotent retry");
1797
1798 Ok(())
1799 }
1800
1801 #[tokio::test]
1803 async fn test_cas_swap_tombstone() -> Result<()> {
1804 let backend = create_test_backend().await?;
1805
1806 let hv_id = make_id();
1807 let old_lt_id = ObjectId::random(hv_id.context().clone());
1808 let wrong_lt_id = ObjectId::random(hv_id.context().clone());
1809 let new_lt_id = ObjectId::random(hv_id.context().clone());
1810
1811 let tombstone = Tombstone {
1812 target: old_lt_id.clone(),
1813 expiration_policy: ExpirationPolicy::Manual,
1814 };
1815 create_tombstone(&backend, &hv_id, &tombstone, SystemTime::now()).await?;
1816
1817 let write = TieredWrite::Tombstone(Tombstone {
1819 target: new_lt_id.clone(),
1820 expiration_policy: ExpirationPolicy::Manual,
1821 });
1822 let swapped = backend
1823 .compare_and_write(&hv_id, Some(&wrong_lt_id), write.clone())
1824 .await?;
1825 assert!(!swapped, "expected CAS failure due to wrong target");
1826 match backend.get_tiered_metadata(&hv_id).await? {
1827 TieredMetadata::Tombstone(t) => assert_eq!(t.target, old_lt_id),
1828 other => panic!("expected tombstone, got {other:?}"),
1829 }
1830
1831 let swapped = backend
1833 .compare_and_write(&hv_id, Some(&old_lt_id), write.clone())
1834 .await?;
1835 assert!(swapped, "expected CAS success with correct target");
1836 match backend.get_tiered_metadata(&hv_id).await? {
1837 TieredMetadata::Tombstone(t) => assert_eq!(t.target, new_lt_id),
1838 other => panic!("expected tombstone, got {other:?}"),
1839 }
1840
1841 let retry = backend
1843 .compare_and_write(&hv_id, Some(&old_lt_id), write)
1844 .await?;
1845 assert!(retry, "idempotent retry");
1846
1847 Ok(())
1848 }
1849
1850 #[tokio::test]
1852 async fn test_cas_swap_inline() -> Result<()> {
1853 let backend = create_test_backend().await?;
1854
1855 let id = make_id();
1856 let lt_id = ObjectId::random(id.context().clone());
1857 let wrong_id = ObjectId::random(id.context().clone());
1858
1859 let tombstone = Tombstone {
1860 target: lt_id.clone(),
1861 expiration_policy: ExpirationPolicy::Manual,
1862 };
1863 create_tombstone(&backend, &id, &tombstone, SystemTime::now()).await?;
1864
1865 let write = TieredWrite::Object(Metadata::default(), Bytes::new());
1867 let swapped = backend
1868 .compare_and_write(&id, Some(&wrong_id), write)
1869 .await?;
1870 assert!(!swapped, "expected CAS failure with wrong target");
1871 assert!(matches!(
1872 backend.get_tiered_metadata(&id).await?,
1873 TieredMetadata::Tombstone(_)
1874 ));
1875
1876 let payload = Bytes::from_static(b"hello inline");
1878 let write = TieredWrite::Object(Metadata::default(), payload.clone());
1879 let swapped = backend
1880 .compare_and_write(&id, Some(<_id), write.clone())
1881 .await?;
1882 assert!(swapped, "expected CAS success with correct target");
1883 let TieredGet::Object(_, _, stream) = backend.get_tiered_object(&id, None).await? else {
1884 panic!("expected inline object after swap");
1885 };
1886 assert_eq!(&stream::read_to_vec(stream).await?, payload.as_ref());
1887
1888 let retry = backend.compare_and_write(&id, Some(<_id), write).await?;
1890 assert!(retry, "idempotent retry");
1891
1892 Ok(())
1893 }
1894
1895 #[tokio::test]
1897 async fn test_cas_create_object_on_empty_row() -> Result<()> {
1898 let backend = create_test_backend().await?;
1899
1900 let id = make_id();
1901 let payload = Bytes::from_static(b"cas object");
1902 let write = TieredWrite::Object(Metadata::default(), payload.clone());
1903 let committed = backend.compare_and_write(&id, None, write).await?;
1904 assert!(committed, "expected CAS success on empty row");
1905
1906 let TieredGet::Object(_, _, stream) = backend.get_tiered_object(&id, None).await? else {
1907 panic!("expected Object after CAS-create");
1908 };
1909 assert_eq!(&stream::read_to_vec(stream).await?, payload.as_ref());
1910
1911 Ok(())
1912 }
1913
1914 #[tokio::test]
1916 async fn test_cas_delete() -> Result<()> {
1917 let backend = create_test_backend().await?;
1918
1919 let id = make_id();
1920 let lt_id = ObjectId::random(id.context().clone());
1921 let wrong_id = ObjectId::random(id.context().clone());
1922
1923 let tombstone = Tombstone {
1924 target: lt_id.clone(),
1925 expiration_policy: ExpirationPolicy::Manual,
1926 };
1927 create_tombstone(&backend, &id, &tombstone, SystemTime::now()).await?;
1928
1929 let deleted = backend
1931 .compare_and_write(&id, Some(&wrong_id), TieredWrite::Delete)
1932 .await?;
1933 assert!(!deleted, "expected CAS failure with wrong target");
1934 assert!(matches!(
1935 backend.get_tiered_metadata(&id).await?,
1936 TieredMetadata::Tombstone(_)
1937 ));
1938
1939 let deleted = backend
1941 .compare_and_write(&id, Some(<_id), TieredWrite::Delete)
1942 .await?;
1943 assert!(deleted, "expected CAS delete success");
1944 assert!(matches!(
1945 backend.get_tiered_metadata(&id).await?,
1946 TieredMetadata::NotFound
1947 ));
1948
1949 let retry = backend
1951 .compare_and_write(&id, Some(<_id), TieredWrite::Delete)
1952 .await?;
1953 assert!(retry, "idempotent retry");
1954
1955 let id2 = make_id();
1957 let fake_lt_id = ObjectId::random(id2.context().clone());
1958 let metadata = Metadata::default();
1959 create_object(&backend, &id2, &metadata, b"data", SystemTime::now()).await?;
1960 let deleted = backend
1961 .compare_and_write(&id2, Some(&fake_lt_id), TieredWrite::Delete)
1962 .await?;
1963 assert!(deleted, "expected idempotent deletion");
1964
1965 Ok(())
1966 }
1967
1968 #[tokio::test]
1975 async fn test_legacy_tombstone_reads() -> Result<()> {
1976 let backend = create_test_backend().await?;
1977
1978 let id = make_id();
1980 write_legacy_tombstone(&backend, &id, ExpirationPolicy::Manual, None).await?;
1981
1982 let TieredMetadata::Tombstone(t) = backend.get_tiered_metadata(&id).await? else {
1983 panic!("expected tombstone");
1984 };
1985 assert_eq!(t.expiration_policy, ExpirationPolicy::Manual);
1986 assert!(matches!(
1987 backend.get_tiered_object(&id, None).await?,
1988 TieredGet::Tombstone(_)
1989 ));
1990
1991 let id = make_id();
1996 let ttl = Duration::from_hours(2 * 24);
1997 write_legacy_tombstone(&backend, &id, ExpirationPolicy::TimeToLive(ttl), None).await?;
1998
1999 let TieredMetadata::Tombstone(t) = backend.get_tiered_metadata(&id).await? else {
2000 panic!("expected TieredMetadata::Tombstone");
2001 };
2002 assert_eq!(t.expiration_policy, ExpirationPolicy::TimeToLive(ttl));
2003
2004 Ok(())
2005 }
2006
2007 #[tokio::test]
2012 async fn test_legacy_tombstone_tti_upgrade() -> Result<()> {
2013 let backend = create_test_backend().await?;
2014 let id = make_id();
2015 let path = id.as_storage_path().to_string().into_bytes();
2016
2017 let tti = Duration::from_hours(2 * 24); let old_deadline = SystemTime::now() + tti - TTI_DEBOUNCE - Duration::from_mins(1);
2022 write_legacy_tombstone(
2023 &backend,
2024 &id,
2025 ExpirationPolicy::TimeToIdle(tti),
2026 Some(old_deadline),
2027 )
2028 .await?;
2029
2030 let TieredMetadata::Tombstone(_) = backend.get_tiered_metadata(&id).await? else {
2032 panic!("expected tombstone");
2033 };
2034
2035 let new_deadline = match backend.read_row(&path, None, "test-verify").await? {
2037 Some(RowData::Tombstone { time_expires, .. }) => time_expires.unwrap(),
2038 _ => panic!("expected tombstone row after bump"),
2039 };
2040
2041 assert!(
2042 new_deadline > old_deadline,
2043 "TTI bump should extend tombstone expiry: {old_deadline:?} -> {new_deadline:?}"
2044 );
2045
2046 Ok(())
2047 }
2048
2049 #[tokio::test]
2054 async fn test_legacy_tombstone_conditional_ops() -> Result<()> {
2055 let backend = create_test_backend().await?;
2056
2057 let id = make_id();
2059 write_legacy_tombstone(&backend, &id, ExpirationPolicy::Manual, None).await?;
2060 let t_opt = backend
2061 .put_non_tombstone(&id, &Metadata::default(), Bytes::new())
2062 .await?;
2063 assert_eq!(t_opt.map(|t| t.target).as_ref(), Some(&id));
2064
2065 let id = make_id();
2067 write_legacy_tombstone(&backend, &id, ExpirationPolicy::Manual, None).await?;
2068 let t_opt = backend.delete_non_tombstone(&id).await?;
2069 assert_eq!(t_opt.map(|t| t.target).as_ref(), Some(&id));
2070
2071 let id = make_id();
2073 write_legacy_tombstone(&backend, &id, ExpirationPolicy::Manual, None).await?;
2074 let deleted = backend
2075 .compare_and_write(&id, Some(&id), TieredWrite::Delete)
2076 .await?;
2077 assert!(
2078 deleted,
2079 "CAS-delete must succeed on legacy-metadata tombstone"
2080 );
2081 assert!(matches!(
2082 backend.get_tiered_metadata(&id).await?,
2083 TieredMetadata::NotFound
2084 ));
2085
2086 let id = make_id();
2088 write_empty_redirect_tombstone(&backend, &id).await?;
2089 let deleted = backend
2090 .compare_and_write(&id, Some(&id), TieredWrite::Delete)
2091 .await?;
2092 assert!(
2093 deleted,
2094 "CAS-delete must succeed on empty-redirect tombstone"
2095 );
2096 assert!(matches!(
2097 backend.get_tiered_metadata(&id).await?,
2098 TieredMetadata::NotFound
2099 ));
2100
2101 Ok(())
2102 }
2103
2104 #[tokio::test]
2106 async fn test_empty_redirect_falls_back_to_hv_id() -> Result<()> {
2107 let backend = create_test_backend().await?;
2108 let id = make_id();
2109
2110 write_empty_redirect_tombstone(&backend, &id).await?;
2111 match backend.get_tiered_metadata(&id).await? {
2112 TieredMetadata::Tombstone(t) => assert_eq!(t.target, id, "must fall back to hv_id"),
2113 other => panic!("expected tombstone, got {other:?}"),
2114 }
2115
2116 Ok(())
2117 }
2118
2119 #[tokio::test]
2124 async fn test_cas_create_tombstone_over_expired() -> Result<()> {
2125 let backend = create_test_backend().await?;
2126
2127 let id = make_id();
2128 let old_lt_id = ObjectId::random(id.context().clone());
2129 let old_tombstone = Tombstone {
2130 target: old_lt_id,
2131 expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_secs(0)),
2132 };
2133 create_tombstone(&backend, &id, &old_tombstone, SystemTime::now()).await?;
2134
2135 let new_lt_id = ObjectId::random(id.context().clone());
2136 let new_tombstone = Tombstone {
2137 target: new_lt_id.clone(),
2138 expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_hours(1)),
2139 };
2140 let committed = backend
2141 .compare_and_write(&id, None, TieredWrite::Tombstone(new_tombstone))
2142 .await?;
2143 assert!(
2144 committed,
2145 "CAS with current=None must succeed over an expired tombstone"
2146 );
2147
2148 let TieredMetadata::Tombstone(t) = backend.get_tiered_metadata(&id).await? else {
2149 panic!("expected new tombstone to be readable");
2150 };
2151 assert_eq!(t.target, new_lt_id);
2152
2153 Ok(())
2154 }
2155
2156 #[tokio::test]
2159 async fn test_put_non_tombstone_over_expired() -> Result<()> {
2160 let backend = create_test_backend().await?;
2161
2162 let id = make_id();
2163 let lt_id = ObjectId::random(id.context().clone());
2164 let tombstone = Tombstone {
2165 target: lt_id,
2166 expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_secs(0)),
2167 };
2168 create_tombstone(&backend, &id, &tombstone, SystemTime::now()).await?;
2169
2170 let result = backend
2171 .put_non_tombstone(&id, &Metadata::default(), Bytes::from_static(b"data"))
2172 .await?;
2173 assert_eq!(
2174 result, None,
2175 "put_non_tombstone must succeed (return None) over an expired tombstone"
2176 );
2177
2178 let (_, _, stream) = backend.get_object(&id, None).await?.unwrap();
2179 assert_eq!(&stream::read_to_vec(stream).await?, b"data");
2180
2181 Ok(())
2182 }
2183
2184 async fn put_range_test_object(backend: &BigTableBackend) -> Result<ObjectId> {
2187 let id = make_id();
2188 let metadata = Metadata {
2189 content_type: "text/plain".into(),
2190 ..Default::default()
2191 };
2192 let payload = b"Hello, range requests!";
2193 backend
2194 .put_object(&id, &metadata, stream::single(payload.as_slice()))
2195 .await?;
2196 Ok(id)
2197 }
2198
2199 #[tokio::test]
2200 async fn get_object_range_bounded() -> Result<()> {
2201 let backend = create_test_backend().await?;
2202 let id = put_range_test_object(&backend).await?;
2203
2204 let (_, content_range, stream) = backend
2205 .get_object(&id, Some(ByteRange::Bounded(7, 11)))
2206 .await?
2207 .unwrap();
2208 let data = stream::read_to_vec(stream).await?;
2209 assert_eq!(&data, b"range");
2210
2211 let content_range = content_range.unwrap();
2212 assert_eq!(content_range.start, 7);
2213 assert_eq!(content_range.end, 11);
2214 assert_eq!(content_range.total, 22);
2215
2216 Ok(())
2217 }
2218
2219 #[tokio::test]
2220 async fn get_object_range_from() -> Result<()> {
2221 let backend = create_test_backend().await?;
2222 let id = put_range_test_object(&backend).await?;
2223
2224 let (_, content_range, stream) = backend
2225 .get_object(&id, Some(ByteRange::From(7)))
2226 .await?
2227 .unwrap();
2228 let data = stream::read_to_vec(stream).await?;
2229 assert_eq!(&data, b"range requests!");
2230
2231 let content_range = content_range.unwrap();
2232 assert_eq!(content_range.start, 7);
2233 assert_eq!(content_range.end, 21);
2234 assert_eq!(content_range.total, 22);
2235
2236 Ok(())
2237 }
2238
2239 #[tokio::test]
2240 async fn get_object_range_last() -> Result<()> {
2241 let backend = create_test_backend().await?;
2242 let id = put_range_test_object(&backend).await?;
2243
2244 let (_, content_range, stream) = backend
2245 .get_object(&id, Some(ByteRange::Last(9)))
2246 .await?
2247 .unwrap();
2248 let data = stream::read_to_vec(stream).await?;
2249 assert_eq!(&data, b"requests!");
2250
2251 let content_range = content_range.unwrap();
2252 assert_eq!(content_range.start, 13);
2253 assert_eq!(content_range.end, 21);
2254 assert_eq!(content_range.total, 22);
2255
2256 Ok(())
2257 }
2258
2259 #[tokio::test]
2260 async fn get_object_range_unsatisfiable() -> Result<()> {
2261 let backend = create_test_backend().await?;
2262 let id = put_range_test_object(&backend).await?;
2263
2264 match backend.get_object(&id, Some(ByteRange::From(100))).await {
2265 Err(Error::RangeNotSatisfiable { total }) => assert_eq!(total, 22),
2266 Ok(_) => panic!("expected RangeNotSatisfiable, got Ok"),
2267 Err(e) => panic!("expected RangeNotSatisfiable, got {e:?}"),
2268 }
2269
2270 Ok(())
2271 }
2272
2273 #[tokio::test]
2274 async fn get_object_no_range_returns_full_payload() -> Result<()> {
2275 let backend = create_test_backend().await?;
2276 let id = put_range_test_object(&backend).await?;
2277
2278 let (_, content_range, stream) = backend.get_object(&id, None).await?.unwrap();
2279 let data = stream::read_to_vec(stream).await?;
2280 assert_eq!(&data, b"Hello, range requests!");
2281 assert!(content_range.is_none());
2282
2283 Ok(())
2284 }
2285}