objectstore_service/backend/common.rs
1//! Shared trait definition and types for all backends.
2
3use std::fmt;
4
5use objectstore_types::metadata::{ExpirationPolicy, Metadata};
6use objectstore_types::range::{ByteRange, ContentRange};
7
8use bytes::Bytes;
9
10use crate::error::{Error, Result};
11use crate::id::ObjectId;
12use crate::multipart::{
13 AbortMultipartResponse, CompleteMultipartResponse, CompletedPart, InitiateMultipartResponse,
14 ListPartsResponse, PartNumber, UploadId, UploadPartResponse,
15};
16use crate::stream::{ClientStream, PayloadStream};
17
18/// User agent string used for outgoing requests.
19///
20/// This intentionally has a "sentry" prefix so that it can easily be traced back to us.
21pub const USER_AGENT: &str = concat!("sentry-objectstore/", env!("CARGO_PKG_VERSION"));
22
23/// Backend response for put operations.
24pub type PutResponse = ();
25/// Backend response for get operations.
26pub type GetResponse = Option<(Metadata, Option<ContentRange>, PayloadStream)>;
27/// Backend response for metadata-only get operations.
28pub type MetadataResponse = Option<Metadata>;
29/// Backend response for delete operations.
30pub type DeleteResponse = ();
31
32/// Trait implemented by all storage backends.
33#[async_trait::async_trait]
34pub trait Backend: fmt::Debug + Send + Sync + 'static {
35 /// The backend name, used for diagnostics.
36 fn name(&self) -> &'static str;
37
38 /// Stores an object at the given path with the given metadata.
39 async fn put_object(
40 &self,
41 id: &ObjectId,
42 metadata: &Metadata,
43 stream: ClientStream,
44 ) -> Result<PutResponse>;
45
46 /// Retrieves (part of) an object at the given path, returning its metadata, a description of
47 /// the part being returned, and the payload.
48 async fn get_object(&self, id: &ObjectId, range: Option<ByteRange>) -> Result<GetResponse>;
49
50 /// Retrieves only the metadata for an object, without the payload.
51 async fn get_metadata(&self, id: &ObjectId) -> Result<MetadataResponse> {
52 Ok(self
53 .get_object(id, None)
54 .await?
55 .map(|(metadata, _range, _stream)| metadata))
56 }
57
58 /// Deletes the object at the given path.
59 async fn delete_object(&self, id: &ObjectId) -> Result<DeleteResponse>;
60
61 /// Waits for any outstanding background operations to complete before shutdown.
62 ///
63 /// The default implementation is a no-op. Backends that spawn background tasks
64 /// (such as [`TieredStorage`](super::tiered::TieredStorage)) should override this
65 /// to wait for those tasks to complete.
66 async fn join(&self) {}
67
68 /// Borrows this backend as a [`MultipartUploadBackend`] if supported.
69 ///
70 /// The default returns [`Error::NotImplemented`]. Backends that implement
71 /// [`MultipartUploadBackend`] should override this to return `Ok(self)`.
72 fn as_multipart_upload_backend(&self) -> Result<&dyn MultipartUploadBackend> {
73 Err(Error::NotImplemented)
74 }
75}
76
77/// Trait for backends that support our S3-style multipart upload protocol.
78#[async_trait::async_trait]
79pub trait MultipartUploadBackend: Backend + fmt::Debug + Send + Sync + 'static {
80 /// Initiates a new multipart upload at `id` with the given metadata.
81 async fn initiate_multipart(
82 &self,
83 id: &ObjectId,
84 metadata: &Metadata,
85 ) -> Result<InitiateMultipartResponse>;
86
87 /// Uploads a single part of the upload identified by `(id, upload_id)`.
88 async fn upload_part(
89 &self,
90 id: &ObjectId,
91 upload_id: &UploadId,
92 part_number: PartNumber,
93 content_length: u64,
94 content_md5: Option<&str>,
95 body: ClientStream,
96 ) -> Result<UploadPartResponse>;
97
98 /// Lists the parts uploaded so far for `(id, upload_id)`.
99 async fn list_parts(
100 &self,
101 id: &ObjectId,
102 upload_id: &UploadId,
103 max_parts: Option<u32>,
104 part_number_marker: Option<PartNumber>,
105 ) -> Result<ListPartsResponse>;
106
107 /// Aborts the upload identified by `(id, upload_id)`.
108 async fn abort_multipart(
109 &self,
110 id: &ObjectId,
111 upload_id: &UploadId,
112 ) -> Result<AbortMultipartResponse>;
113
114 /// Finalizes the upload identified by `(id, upload_id)` with the given
115 /// ordered list of parts.
116 ///
117 /// Note that this returns `Result<Option<CompleteMultipartError>>`.
118 /// It's therefore possible to get `Ok(Some(err))`, meaning that at the server level this will
119 /// translate to HTTP `200 OK` with an error contained in the response body.
120 /// We need to do it this way to mirror backends that also behave like this (namely S3 and
121 /// GCS).
122 async fn complete_multipart(
123 &self,
124 id: &ObjectId,
125 upload_id: &UploadId,
126 parts: Vec<CompletedPart>,
127 ) -> Result<CompleteMultipartResponse>;
128}
129
130/// Trait for backends that support tombstone-conditional operations.
131///
132/// Only backends suitable for the high-volume tier of
133/// [`TieredStorage`](super::tiered::TieredStorage) implement this trait.
134/// The conditional methods provide atomic operations to avoid overwriting
135/// redirect tombstones.
136#[async_trait::async_trait]
137pub trait HighVolumeBackend: Backend {
138 /// Writes the object only if NO redirect tombstone exists at this key.
139 ///
140 /// Returns `None` after storing the object, or `Some(tombstone)` (skipping
141 /// the write) when a redirect tombstone is present. The returned tombstone
142 /// carries the target LT `ObjectId` so the caller can route without a
143 /// second round trip.
144 ///
145 /// Takes [`Bytes`] instead of a [`ClientStream`] because callers on this
146 /// path have already fully buffered the payload.
147 async fn put_non_tombstone(
148 &self,
149 id: &ObjectId,
150 metadata: &Metadata,
151 payload: Bytes,
152 ) -> Result<Option<Tombstone>>;
153
154 /// Retrieves (part of) an object with explicit tombstone awareness.
155 ///
156 /// Returns [`TieredGet::Tombstone`] instead of synthesizing a tombstone
157 /// object, making the caller's routing logic a compile-time distinction.
158 async fn get_tiered_object(&self, id: &ObjectId, range: Option<ByteRange>)
159 -> Result<TieredGet>;
160
161 /// Retrieves only metadata with explicit tombstone awareness.
162 ///
163 /// Implementations should skip the payload column where possible to avoid
164 /// fetching up to 1 MiB of data just to discover a tombstone.
165 async fn get_tiered_metadata(&self, id: &ObjectId) -> Result<TieredMetadata>;
166
167 /// Deletes the object only if it is NOT a redirect tombstone.
168 ///
169 /// Returns `None` after deleting the row (or if the row was already absent),
170 /// or `Some(tombstone)` (leaving the row intact) when the object is a
171 /// redirect tombstone. The returned tombstone carries the target LT
172 /// `ObjectId` so the caller can delete from long-term storage directly,
173 /// without a second round trip.
174 async fn delete_non_tombstone(&self, id: &ObjectId) -> Result<Option<Tombstone>>;
175
176 /// Atomically mutates the row if the current redirect state matches.
177 ///
178 /// `current` determines the precondition:
179 /// - `None`: succeeds only if no live tombstone exists (row absent, inline,
180 /// or tombstone present but logically expired).
181 /// - `Some(target)`: succeeds only if a tombstone exists whose redirect
182 /// resolves to `target`.
183 ///
184 /// **This operation is idempotent:** if the object is already in the target
185 /// state, it returns `true`. Whether the mutation runs again is up to the
186 /// implementation.
187 ///
188 /// Returns `true` on success or idempotent match, `false` if a conflicting
189 /// state was found (another writer won the race).
190 async fn compare_and_write(
191 &self,
192 id: &ObjectId,
193 current: Option<&ObjectId>,
194 write: TieredWrite,
195 ) -> Result<bool>;
196}
197
198/// Information about a redirect tombstone in the high-volume backend.
199#[derive(Clone, Debug, PartialEq, Eq)]
200pub struct Tombstone {
201 /// The [`ObjectId`] of the object in the long-term backend.
202 ///
203 /// For legacy tombstones with an empty `r` column, the HV backend resolves
204 /// this to the HV `ObjectId` itself before surfacing the tombstone to callers.
205 pub target: ObjectId,
206
207 /// The expiration policy copied from the original object.
208 pub expiration_policy: ExpirationPolicy,
209}
210
211/// Typed response from [`HighVolumeBackend::get_tiered_object`].
212pub enum TieredGet {
213 /// A real object was found.
214 Object(Metadata, Option<ContentRange>, PayloadStream),
215 /// A redirect tombstone was found; the real object lives in the long-term backend.
216 Tombstone(Tombstone),
217 /// No entry exists at this key.
218 NotFound,
219}
220
221impl fmt::Debug for TieredGet {
222 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
223 match self {
224 TieredGet::Object(metadata, content_range, _stream) => f
225 .debug_tuple("Object")
226 .field(metadata)
227 .field(content_range)
228 .finish_non_exhaustive(),
229 TieredGet::Tombstone(info) => f.debug_tuple("Tombstone").field(info).finish(),
230 TieredGet::NotFound => write!(f, "NotFound"),
231 }
232 }
233}
234
235/// Typed metadata-only response from [`HighVolumeBackend::get_tiered_metadata`].
236#[derive(Debug)]
237pub enum TieredMetadata {
238 /// Metadata for a real object was found.
239 Object(Metadata),
240 /// A redirect tombstone was found; the real object lives in the long-term backend.
241 Tombstone(Tombstone),
242 /// No entry exists at this key.
243 NotFound,
244}
245
246/// The write operation performed by [`HighVolumeBackend::compare_and_write`].
247#[derive(Clone, Debug)]
248pub enum TieredWrite {
249 /// Write a redirect tombstone.
250 Tombstone(Tombstone),
251 /// Write inline object data.
252 Object(Metadata, Bytes),
253 /// Delete the row entirely.
254 Delete,
255}
256
257impl TieredWrite {
258 /// Returns the tombstone target if this is a tombstone write, or `None` otherwise.
259 pub fn target(&self) -> Option<&ObjectId> {
260 match self {
261 TieredWrite::Tombstone(t) => Some(&t.target),
262 _ => None,
263 }
264 }
265}
266
267/// Creates a reqwest client with required defaults.
268///
269/// Automatic decompression is disabled because backends store pre-compressed
270/// payloads and manage `Content-Encoding` themselves.
271pub(super) fn reqwest_client() -> reqwest::Client {
272 reqwest::Client::builder()
273 .user_agent(USER_AGENT)
274 .hickory_dns(true)
275 .no_zstd()
276 .no_brotli()
277 .no_gzip()
278 .no_deflate()
279 .build()
280 .expect("Client::new()")
281}