Skip to main content

objectstore_service/backend/
common.rs

1//! Shared trait definition and types for all backends.
2
3use std::fmt;
4
5use objectstore_types::metadata::{ExpirationPolicy, Metadata};
6use objectstore_types::range::{ByteRange, ContentRange};
7
8use bytes::Bytes;
9
10use crate::error::{Error, Result};
11use crate::id::ObjectId;
12use crate::multipart::{
13    AbortMultipartResponse, CompleteMultipartResponse, CompletedPart, InitiateMultipartResponse,
14    ListPartsResponse, PartNumber, UploadId, UploadPartResponse,
15};
16use crate::stream::{ClientStream, PayloadStream};
17
18/// User agent string used for outgoing requests.
19///
20/// This intentionally has a "sentry" prefix so that it can easily be traced back to us.
21pub const USER_AGENT: &str = concat!("sentry-objectstore/", env!("CARGO_PKG_VERSION"));
22
23/// Backend response for put operations.
24pub type PutResponse = ();
25/// Backend response for get operations.
26pub type GetResponse = Option<(Metadata, Option<ContentRange>, PayloadStream)>;
27/// Backend response for metadata-only get operations.
28pub type MetadataResponse = Option<Metadata>;
29/// Backend response for delete operations.
30pub type DeleteResponse = ();
31
32/// Trait implemented by all storage backends.
33#[async_trait::async_trait]
34pub trait Backend: fmt::Debug + Send + Sync + 'static {
35    /// The backend name, used for diagnostics.
36    fn name(&self) -> &'static str;
37
38    /// Stores an object at the given path with the given metadata.
39    async fn put_object(
40        &self,
41        id: &ObjectId,
42        metadata: &Metadata,
43        stream: ClientStream,
44    ) -> Result<PutResponse>;
45
46    /// Retrieves (part of) an object at the given path, returning its metadata, a description of
47    /// the part being returned, and the payload.
48    async fn get_object(&self, id: &ObjectId, range: Option<ByteRange>) -> Result<GetResponse>;
49
50    /// Retrieves only the metadata for an object, without the payload.
51    async fn get_metadata(&self, id: &ObjectId) -> Result<MetadataResponse> {
52        Ok(self
53            .get_object(id, None)
54            .await?
55            .map(|(metadata, _range, _stream)| metadata))
56    }
57
58    /// Deletes the object at the given path.
59    async fn delete_object(&self, id: &ObjectId) -> Result<DeleteResponse>;
60
61    /// Waits for any outstanding background operations to complete before shutdown.
62    ///
63    /// The default implementation is a no-op. Backends that spawn background tasks
64    /// (such as [`TieredStorage`](super::tiered::TieredStorage)) should override this
65    /// to wait for those tasks to complete.
66    async fn join(&self) {}
67
68    /// Borrows this backend as a [`MultipartUploadBackend`] if supported.
69    ///
70    /// The default returns [`Error::NotImplemented`]. Backends that implement
71    /// [`MultipartUploadBackend`] should override this to return `Ok(self)`.
72    fn as_multipart_upload_backend(&self) -> Result<&dyn MultipartUploadBackend> {
73        Err(Error::NotImplemented)
74    }
75}
76
77/// Trait for backends that support our S3-style multipart upload protocol.
78#[async_trait::async_trait]
79pub trait MultipartUploadBackend: Backend + fmt::Debug + Send + Sync + 'static {
80    /// Initiates a new multipart upload at `id` with the given metadata.
81    async fn initiate_multipart(
82        &self,
83        id: &ObjectId,
84        metadata: &Metadata,
85    ) -> Result<InitiateMultipartResponse>;
86
87    /// Uploads a single part of the upload identified by `(id, upload_id)`.
88    async fn upload_part(
89        &self,
90        id: &ObjectId,
91        upload_id: &UploadId,
92        part_number: PartNumber,
93        content_length: u64,
94        content_md5: Option<&str>,
95        body: ClientStream,
96    ) -> Result<UploadPartResponse>;
97
98    /// Lists the parts uploaded so far for `(id, upload_id)`.
99    async fn list_parts(
100        &self,
101        id: &ObjectId,
102        upload_id: &UploadId,
103        max_parts: Option<u32>,
104        part_number_marker: Option<PartNumber>,
105    ) -> Result<ListPartsResponse>;
106
107    /// Aborts the upload identified by `(id, upload_id)`.
108    async fn abort_multipart(
109        &self,
110        id: &ObjectId,
111        upload_id: &UploadId,
112    ) -> Result<AbortMultipartResponse>;
113
114    /// Finalizes the upload identified by `(id, upload_id)` with the given
115    /// ordered list of parts.
116    ///
117    /// Note that this returns `Result<Option<CompleteMultipartError>>`.
118    /// It's therefore possible to get `Ok(Some(err))`, meaning that at the server level this will
119    /// translate to HTTP `200 OK` with an error contained in the response body.
120    /// We need to do it this way to mirror backends that also behave like this (namely S3 and
121    /// GCS).
122    async fn complete_multipart(
123        &self,
124        id: &ObjectId,
125        upload_id: &UploadId,
126        parts: Vec<CompletedPart>,
127    ) -> Result<CompleteMultipartResponse>;
128}
129
130/// Trait for backends that support tombstone-conditional operations.
131///
132/// Only backends suitable for the high-volume tier of
133/// [`TieredStorage`](super::tiered::TieredStorage) implement this trait.
134/// The conditional methods provide atomic operations to avoid overwriting
135/// redirect tombstones.
136#[async_trait::async_trait]
137pub trait HighVolumeBackend: Backend {
138    /// Writes the object only if NO redirect tombstone exists at this key.
139    ///
140    /// Returns `None` after storing the object, or `Some(tombstone)` (skipping
141    /// the write) when a redirect tombstone is present. The returned tombstone
142    /// carries the target LT `ObjectId` so the caller can route without a
143    /// second round trip.
144    ///
145    /// Takes [`Bytes`] instead of a [`ClientStream`] because callers on this
146    /// path have already fully buffered the payload.
147    async fn put_non_tombstone(
148        &self,
149        id: &ObjectId,
150        metadata: &Metadata,
151        payload: Bytes,
152    ) -> Result<Option<Tombstone>>;
153
154    /// Retrieves (part of) an object with explicit tombstone awareness.
155    ///
156    /// Returns [`TieredGet::Tombstone`] instead of synthesizing a tombstone
157    /// object, making the caller's routing logic a compile-time distinction.
158    async fn get_tiered_object(&self, id: &ObjectId, range: Option<ByteRange>)
159    -> Result<TieredGet>;
160
161    /// Retrieves only metadata with explicit tombstone awareness.
162    ///
163    /// Implementations should skip the payload column where possible to avoid
164    /// fetching up to 1 MiB of data just to discover a tombstone.
165    async fn get_tiered_metadata(&self, id: &ObjectId) -> Result<TieredMetadata>;
166
167    /// Deletes the object only if it is NOT a redirect tombstone.
168    ///
169    /// Returns `None` after deleting the row (or if the row was already absent),
170    /// or `Some(tombstone)` (leaving the row intact) when the object is a
171    /// redirect tombstone. The returned tombstone carries the target LT
172    /// `ObjectId` so the caller can delete from long-term storage directly,
173    /// without a second round trip.
174    async fn delete_non_tombstone(&self, id: &ObjectId) -> Result<Option<Tombstone>>;
175
176    /// Atomically mutates the row if the current redirect state matches.
177    ///
178    /// `current` determines the precondition:
179    /// - `None`: succeeds only if no live tombstone exists (row absent, inline,
180    ///   or tombstone present but logically expired).
181    /// - `Some(target)`: succeeds only if a tombstone exists whose redirect
182    ///   resolves to `target`.
183    ///
184    /// **This operation is idempotent:** if the object is already in the target
185    /// state, it returns `true`. Whether the mutation runs again is up to the
186    /// implementation.
187    ///
188    /// Returns `true` on success or idempotent match, `false` if a conflicting
189    /// state was found (another writer won the race).
190    async fn compare_and_write(
191        &self,
192        id: &ObjectId,
193        current: Option<&ObjectId>,
194        write: TieredWrite,
195    ) -> Result<bool>;
196}
197
198/// Information about a redirect tombstone in the high-volume backend.
199#[derive(Clone, Debug, PartialEq, Eq)]
200pub struct Tombstone {
201    /// The [`ObjectId`] of the object in the long-term backend.
202    ///
203    /// For legacy tombstones with an empty `r` column, the HV backend resolves
204    /// this to the HV `ObjectId` itself before surfacing the tombstone to callers.
205    pub target: ObjectId,
206
207    /// The expiration policy copied from the original object.
208    pub expiration_policy: ExpirationPolicy,
209}
210
211/// Typed response from [`HighVolumeBackend::get_tiered_object`].
212pub enum TieredGet {
213    /// A real object was found.
214    Object(Metadata, Option<ContentRange>, PayloadStream),
215    /// A redirect tombstone was found; the real object lives in the long-term backend.
216    Tombstone(Tombstone),
217    /// No entry exists at this key.
218    NotFound,
219}
220
221impl fmt::Debug for TieredGet {
222    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
223        match self {
224            TieredGet::Object(metadata, content_range, _stream) => f
225                .debug_tuple("Object")
226                .field(metadata)
227                .field(content_range)
228                .finish_non_exhaustive(),
229            TieredGet::Tombstone(info) => f.debug_tuple("Tombstone").field(info).finish(),
230            TieredGet::NotFound => write!(f, "NotFound"),
231        }
232    }
233}
234
235/// Typed metadata-only response from [`HighVolumeBackend::get_tiered_metadata`].
236#[derive(Debug)]
237pub enum TieredMetadata {
238    /// Metadata for a real object was found.
239    Object(Metadata),
240    /// A redirect tombstone was found; the real object lives in the long-term backend.
241    Tombstone(Tombstone),
242    /// No entry exists at this key.
243    NotFound,
244}
245
246/// The write operation performed by [`HighVolumeBackend::compare_and_write`].
247#[derive(Clone, Debug)]
248pub enum TieredWrite {
249    /// Write a redirect tombstone.
250    Tombstone(Tombstone),
251    /// Write inline object data.
252    Object(Metadata, Bytes),
253    /// Delete the row entirely.
254    Delete,
255}
256
257impl TieredWrite {
258    /// Returns the tombstone target if this is a tombstone write, or `None` otherwise.
259    pub fn target(&self) -> Option<&ObjectId> {
260        match self {
261            TieredWrite::Tombstone(t) => Some(&t.target),
262            _ => None,
263        }
264    }
265}
266
267/// Creates a reqwest client with required defaults.
268///
269/// Automatic decompression is disabled because backends store pre-compressed
270/// payloads and manage `Content-Encoding` themselves.
271pub(super) fn reqwest_client() -> reqwest::Client {
272    reqwest::Client::builder()
273        .user_agent(USER_AGENT)
274        .hickory_dns(true)
275        .no_zstd()
276        .no_brotli()
277        .no_gzip()
278        .no_deflate()
279        .build()
280        .expect("Client::new()")
281}