objectstore_service/backend/common.rs
1//! Shared trait definition and types for all backends.
2
3use std::fmt;
4use std::sync::Arc;
5
6use objectstore_types::metadata::{ExpirationPolicy, Metadata};
7
8use bytes::Bytes;
9
10use crate::error::{Error, Result};
11use crate::id::ObjectId;
12use crate::multipart::{
13 AbortMultipartResponse, CompleteMultipartResponse, CompletedPart, InitiateMultipartResponse,
14 ListPartsResponse, PartNumber, UploadId, UploadPartResponse,
15};
16use crate::stream::{ClientStream, PayloadStream};
17
18/// User agent string used for outgoing requests.
19///
20/// This intentionally has a "sentry" prefix so that it can easily be traced back to us.
21pub const USER_AGENT: &str = concat!("sentry-objectstore/", env!("CARGO_PKG_VERSION"));
22
23/// Backend response for put operations.
24pub type PutResponse = ();
25/// Backend response for get operations.
26pub type GetResponse = Option<(Metadata, PayloadStream)>;
27/// Backend response for metadata-only get operations.
28pub type MetadataResponse = Option<Metadata>;
29/// Backend response for delete operations.
30pub type DeleteResponse = ();
31
32/// Trait implemented by all storage backends.
33#[async_trait::async_trait]
34pub trait Backend: fmt::Debug + Send + Sync + 'static {
35 /// The backend name, used for diagnostics.
36 fn name(&self) -> &'static str;
37
38 /// Stores an object at the given path with the given metadata.
39 async fn put_object(
40 &self,
41 id: &ObjectId,
42 metadata: &Metadata,
43 stream: ClientStream,
44 ) -> Result<PutResponse>;
45
46 /// Retrieves an object at the given path, returning its metadata and a stream of bytes.
47 async fn get_object(&self, id: &ObjectId) -> Result<GetResponse>;
48
49 /// Retrieves only the metadata for an object, without the payload.
50 async fn get_metadata(&self, id: &ObjectId) -> Result<MetadataResponse> {
51 Ok(self
52 .get_object(id)
53 .await?
54 .map(|(metadata, _stream)| metadata))
55 }
56
57 /// Deletes the object at the given path.
58 async fn delete_object(&self, id: &ObjectId) -> Result<DeleteResponse>;
59
60 /// Waits for any outstanding background operations to complete before shutdown.
61 ///
62 /// The default implementation is a no-op. Backends that spawn background tasks
63 /// (such as [`TieredStorage`](super::tiered::TieredStorage)) should override this
64 /// to wait for those tasks to complete.
65 async fn join(&self) {}
66
67 /// Casts this backend into an [`Arc<dyn MultipartUploadBackend>`] if supported.
68 ///
69 /// The default returns [`Error::NotImplemented`]. Backends that implement
70 /// [`MultipartUploadBackend`] should override this to return `Ok(self)`.
71 fn as_multipart_upload_backend(self: Arc<Self>) -> Result<Arc<dyn MultipartUploadBackend>> {
72 Err(Error::NotImplemented)
73 }
74}
75
76/// Trait for backends that support our S3-style multipart upload protocol.
77#[async_trait::async_trait]
78pub trait MultipartUploadBackend: Backend + fmt::Debug + Send + Sync + 'static {
79 /// Initiates a new multipart upload at `id` with the given metadata.
80 async fn initiate_multipart(
81 &self,
82 id: &ObjectId,
83 metadata: &Metadata,
84 ) -> Result<InitiateMultipartResponse>;
85
86 /// Uploads a single part of the upload identified by `(id, upload_id)`.
87 async fn upload_part(
88 &self,
89 id: &ObjectId,
90 upload_id: &UploadId,
91 part_number: PartNumber,
92 content_length: u64,
93 content_md5: Option<&str>,
94 body: ClientStream,
95 ) -> Result<UploadPartResponse>;
96
97 /// Lists the parts uploaded so far for `(id, upload_id)`.
98 async fn list_parts(
99 &self,
100 id: &ObjectId,
101 upload_id: &UploadId,
102 max_parts: Option<u32>,
103 part_number_marker: Option<PartNumber>,
104 ) -> Result<ListPartsResponse>;
105
106 /// Aborts the upload identified by `(id, upload_id)`.
107 async fn abort_multipart(
108 &self,
109 id: &ObjectId,
110 upload_id: &UploadId,
111 ) -> Result<AbortMultipartResponse>;
112
113 /// Finalizes the upload identified by `(id, upload_id)` with the given
114 /// ordered list of parts.
115 ///
116 /// Note that this returns `Result<Option<CompleteMultipartError>>`.
117 /// It's therefore possible to get `Ok(Some(err))`, meaning that at the server level this will
118 /// translate to HTTP `200 OK` with an error contained in the response body.
119 /// We need to do it this way to mirror backends that also behave like this (namely S3 and
120 /// GCS).
121 async fn complete_multipart(
122 &self,
123 id: &ObjectId,
124 upload_id: &UploadId,
125 parts: Vec<CompletedPart>,
126 ) -> Result<CompleteMultipartResponse>;
127}
128
129/// Trait for backends that support tombstone-conditional operations.
130///
131/// Only backends suitable for the high-volume tier of
132/// [`TieredStorage`](super::tiered::TieredStorage) implement this trait.
133/// The conditional methods provide atomic operations to avoid overwriting
134/// redirect tombstones.
135#[async_trait::async_trait]
136pub trait HighVolumeBackend: Backend {
137 /// Writes the object only if NO redirect tombstone exists at this key.
138 ///
139 /// Returns `None` after storing the object, or `Some(tombstone)` (skipping
140 /// the write) when a redirect tombstone is present. The returned tombstone
141 /// carries the target LT `ObjectId` so the caller can route without a
142 /// second round trip.
143 ///
144 /// Takes [`Bytes`] instead of a [`ClientStream`] because callers on this
145 /// path have already fully buffered the payload.
146 async fn put_non_tombstone(
147 &self,
148 id: &ObjectId,
149 metadata: &Metadata,
150 payload: Bytes,
151 ) -> Result<Option<Tombstone>>;
152
153 /// Retrieves an object with explicit tombstone awareness.
154 ///
155 /// Returns [`TieredGet::Tombstone`] instead of synthesizing a tombstone
156 /// object, making the caller's routing logic a compile-time distinction.
157 async fn get_tiered_object(&self, id: &ObjectId) -> Result<TieredGet>;
158
159 /// Retrieves only metadata with explicit tombstone awareness.
160 ///
161 /// Implementations should skip the payload column where possible to avoid
162 /// fetching up to 1 MiB of data just to discover a tombstone.
163 async fn get_tiered_metadata(&self, id: &ObjectId) -> Result<TieredMetadata>;
164
165 /// Deletes the object only if it is NOT a redirect tombstone.
166 ///
167 /// Returns `None` after deleting the row (or if the row was already absent),
168 /// or `Some(tombstone)` (leaving the row intact) when the object is a
169 /// redirect tombstone. The returned tombstone carries the target LT
170 /// `ObjectId` so the caller can delete from long-term storage directly,
171 /// without a second round trip.
172 async fn delete_non_tombstone(&self, id: &ObjectId) -> Result<Option<Tombstone>>;
173
174 /// Atomically mutates the row if the current redirect state matches.
175 ///
176 /// `current` determines the precondition:
177 /// - `None`: succeeds only if no live tombstone exists (row absent, inline,
178 /// or tombstone present but logically expired).
179 /// - `Some(target)`: succeeds only if a tombstone exists whose redirect
180 /// resolves to `target`.
181 ///
182 /// **This operation is idempotent:** if the object is already in the target
183 /// state, it returns `true`. Whether the mutation runs again is up to the
184 /// implementation.
185 ///
186 /// Returns `true` on success or idempotent match, `false` if a conflicting
187 /// state was found (another writer won the race).
188 async fn compare_and_write(
189 &self,
190 id: &ObjectId,
191 current: Option<&ObjectId>,
192 write: TieredWrite,
193 ) -> Result<bool>;
194}
195
196/// Information about a redirect tombstone in the high-volume backend.
197#[derive(Clone, Debug, PartialEq)]
198pub struct Tombstone {
199 /// The [`ObjectId`] of the object in the long-term backend.
200 ///
201 /// For legacy tombstones with an empty `r` column, the HV backend resolves
202 /// this to the HV `ObjectId` itself before surfacing the tombstone to callers.
203 pub target: ObjectId,
204
205 /// The expiration policy copied from the original object.
206 pub expiration_policy: ExpirationPolicy,
207}
208
209/// Typed response from [`HighVolumeBackend::get_tiered_object`].
210pub enum TieredGet {
211 /// A real object was found.
212 Object(Metadata, PayloadStream),
213 /// A redirect tombstone was found; the real object lives in the long-term backend.
214 Tombstone(Tombstone),
215 /// No entry exists at this key.
216 NotFound,
217}
218
219impl fmt::Debug for TieredGet {
220 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
221 match self {
222 TieredGet::Object(metadata, _stream) => f
223 .debug_tuple("Object")
224 .field(metadata)
225 .finish_non_exhaustive(),
226 TieredGet::Tombstone(info) => f.debug_tuple("Tombstone").field(info).finish(),
227 TieredGet::NotFound => write!(f, "NotFound"),
228 }
229 }
230}
231
232/// Typed metadata-only response from [`HighVolumeBackend::get_tiered_metadata`].
233#[derive(Debug)]
234pub enum TieredMetadata {
235 /// Metadata for a real object was found.
236 Object(Metadata),
237 /// A redirect tombstone was found; the real object lives in the long-term backend.
238 Tombstone(Tombstone),
239 /// No entry exists at this key.
240 NotFound,
241}
242
243/// The write operation performed by [`HighVolumeBackend::compare_and_write`].
244#[derive(Clone, Debug)]
245pub enum TieredWrite {
246 /// Write a redirect tombstone.
247 Tombstone(Tombstone),
248 /// Write inline object data.
249 Object(Metadata, Bytes),
250 /// Delete the row entirely.
251 Delete,
252}
253
254impl TieredWrite {
255 /// Returns the tombstone target if this is a tombstone write, or `None` otherwise.
256 pub fn target(&self) -> Option<&ObjectId> {
257 match self {
258 TieredWrite::Tombstone(t) => Some(&t.target),
259 _ => None,
260 }
261 }
262}
263
264/// Creates a reqwest client with required defaults.
265///
266/// Automatic decompression is disabled because backends store pre-compressed
267/// payloads and manage `Content-Encoding` themselves.
268pub(super) fn reqwest_client() -> reqwest::Client {
269 reqwest::Client::builder()
270 .user_agent(USER_AGENT)
271 .hickory_dns(true)
272 .no_zstd()
273 .no_brotli()
274 .no_gzip()
275 .no_deflate()
276 .build()
277 .expect("Client::new()")
278}