Skip to main content

objectstore_service/backend/
common.rs

1//! Shared trait definition and types for all backends.
2
3use std::fmt;
4use std::sync::Arc;
5
6use objectstore_types::metadata::{ExpirationPolicy, Metadata};
7
8use bytes::Bytes;
9
10use crate::error::{Error, Result};
11use crate::id::ObjectId;
12use crate::multipart::{
13    AbortMultipartResponse, CompleteMultipartResponse, CompletedPart, InitiateMultipartResponse,
14    ListPartsResponse, PartNumber, UploadId, UploadPartResponse,
15};
16use crate::stream::{ClientStream, PayloadStream};
17
18/// User agent string used for outgoing requests.
19///
20/// This intentionally has a "sentry" prefix so that it can easily be traced back to us.
21pub const USER_AGENT: &str = concat!("sentry-objectstore/", env!("CARGO_PKG_VERSION"));
22
23/// Backend response for put operations.
24pub type PutResponse = ();
25/// Backend response for get operations.
26pub type GetResponse = Option<(Metadata, PayloadStream)>;
27/// Backend response for metadata-only get operations.
28pub type MetadataResponse = Option<Metadata>;
29/// Backend response for delete operations.
30pub type DeleteResponse = ();
31
32/// Trait implemented by all storage backends.
33#[async_trait::async_trait]
34pub trait Backend: fmt::Debug + Send + Sync + 'static {
35    /// The backend name, used for diagnostics.
36    fn name(&self) -> &'static str;
37
38    /// Stores an object at the given path with the given metadata.
39    async fn put_object(
40        &self,
41        id: &ObjectId,
42        metadata: &Metadata,
43        stream: ClientStream,
44    ) -> Result<PutResponse>;
45
46    /// Retrieves an object at the given path, returning its metadata and a stream of bytes.
47    async fn get_object(&self, id: &ObjectId) -> Result<GetResponse>;
48
49    /// Retrieves only the metadata for an object, without the payload.
50    async fn get_metadata(&self, id: &ObjectId) -> Result<MetadataResponse> {
51        Ok(self
52            .get_object(id)
53            .await?
54            .map(|(metadata, _stream)| metadata))
55    }
56
57    /// Deletes the object at the given path.
58    async fn delete_object(&self, id: &ObjectId) -> Result<DeleteResponse>;
59
60    /// Waits for any outstanding background operations to complete before shutdown.
61    ///
62    /// The default implementation is a no-op. Backends that spawn background tasks
63    /// (such as [`TieredStorage`](super::tiered::TieredStorage)) should override this
64    /// to wait for those tasks to complete.
65    async fn join(&self) {}
66
67    /// Casts this backend into an [`Arc<dyn MultipartUploadBackend>`] if supported.
68    ///
69    /// The default returns [`Error::NotImplemented`]. Backends that implement
70    /// [`MultipartUploadBackend`] should override this to return `Ok(self)`.
71    fn as_multipart_upload_backend(self: Arc<Self>) -> Result<Arc<dyn MultipartUploadBackend>> {
72        Err(Error::NotImplemented)
73    }
74}
75
76/// Trait for backends that support our S3-style multipart upload protocol.
77#[async_trait::async_trait]
78pub trait MultipartUploadBackend: Backend + fmt::Debug + Send + Sync + 'static {
79    /// Initiates a new multipart upload at `id` with the given metadata.
80    async fn initiate_multipart(
81        &self,
82        id: &ObjectId,
83        metadata: &Metadata,
84    ) -> Result<InitiateMultipartResponse>;
85
86    /// Uploads a single part of the upload identified by `(id, upload_id)`.
87    async fn upload_part(
88        &self,
89        id: &ObjectId,
90        upload_id: &UploadId,
91        part_number: PartNumber,
92        content_length: u64,
93        content_md5: Option<&str>,
94        body: ClientStream,
95    ) -> Result<UploadPartResponse>;
96
97    /// Lists the parts uploaded so far for `(id, upload_id)`.
98    async fn list_parts(
99        &self,
100        id: &ObjectId,
101        upload_id: &UploadId,
102        max_parts: Option<u32>,
103        part_number_marker: Option<PartNumber>,
104    ) -> Result<ListPartsResponse>;
105
106    /// Aborts the upload identified by `(id, upload_id)`.
107    async fn abort_multipart(
108        &self,
109        id: &ObjectId,
110        upload_id: &UploadId,
111    ) -> Result<AbortMultipartResponse>;
112
113    /// Finalizes the upload identified by `(id, upload_id)` with the given
114    /// ordered list of parts.
115    ///
116    /// Note that this returns `Result<Option<CompleteMultipartError>>`.
117    /// It's therefore possible to get `Ok(Some(err))`, meaning that at the server level this will
118    /// translate to HTTP `200 OK` with an error contained in the response body.
119    /// We need to do it this way to mirror backends that also behave like this (namely S3 and
120    /// GCS).
121    async fn complete_multipart(
122        &self,
123        id: &ObjectId,
124        upload_id: &UploadId,
125        parts: Vec<CompletedPart>,
126    ) -> Result<CompleteMultipartResponse>;
127}
128
129/// Trait for backends that support tombstone-conditional operations.
130///
131/// Only backends suitable for the high-volume tier of
132/// [`TieredStorage`](super::tiered::TieredStorage) implement this trait.
133/// The conditional methods provide atomic operations to avoid overwriting
134/// redirect tombstones.
135#[async_trait::async_trait]
136pub trait HighVolumeBackend: Backend {
137    /// Writes the object only if NO redirect tombstone exists at this key.
138    ///
139    /// Returns `None` after storing the object, or `Some(tombstone)` (skipping
140    /// the write) when a redirect tombstone is present. The returned tombstone
141    /// carries the target LT `ObjectId` so the caller can route without a
142    /// second round trip.
143    ///
144    /// Takes [`Bytes`] instead of a [`ClientStream`] because callers on this
145    /// path have already fully buffered the payload.
146    async fn put_non_tombstone(
147        &self,
148        id: &ObjectId,
149        metadata: &Metadata,
150        payload: Bytes,
151    ) -> Result<Option<Tombstone>>;
152
153    /// Retrieves an object with explicit tombstone awareness.
154    ///
155    /// Returns [`TieredGet::Tombstone`] instead of synthesizing a tombstone
156    /// object, making the caller's routing logic a compile-time distinction.
157    async fn get_tiered_object(&self, id: &ObjectId) -> Result<TieredGet>;
158
159    /// Retrieves only metadata with explicit tombstone awareness.
160    ///
161    /// Implementations should skip the payload column where possible to avoid
162    /// fetching up to 1 MiB of data just to discover a tombstone.
163    async fn get_tiered_metadata(&self, id: &ObjectId) -> Result<TieredMetadata>;
164
165    /// Deletes the object only if it is NOT a redirect tombstone.
166    ///
167    /// Returns `None` after deleting the row (or if the row was already absent),
168    /// or `Some(tombstone)` (leaving the row intact) when the object is a
169    /// redirect tombstone. The returned tombstone carries the target LT
170    /// `ObjectId` so the caller can delete from long-term storage directly,
171    /// without a second round trip.
172    async fn delete_non_tombstone(&self, id: &ObjectId) -> Result<Option<Tombstone>>;
173
174    /// Atomically mutates the row if the current redirect state matches.
175    ///
176    /// `current` determines the precondition:
177    /// - `None`: succeeds only if no live tombstone exists (row absent, inline,
178    ///   or tombstone present but logically expired).
179    /// - `Some(target)`: succeeds only if a tombstone exists whose redirect
180    ///   resolves to `target`.
181    ///
182    /// **This operation is idempotent:** if the object is already in the target
183    /// state, it returns `true`. Whether the mutation runs again is up to the
184    /// implementation.
185    ///
186    /// Returns `true` on success or idempotent match, `false` if a conflicting
187    /// state was found (another writer won the race).
188    async fn compare_and_write(
189        &self,
190        id: &ObjectId,
191        current: Option<&ObjectId>,
192        write: TieredWrite,
193    ) -> Result<bool>;
194}
195
196/// Information about a redirect tombstone in the high-volume backend.
197#[derive(Clone, Debug, PartialEq)]
198pub struct Tombstone {
199    /// The [`ObjectId`] of the object in the long-term backend.
200    ///
201    /// For legacy tombstones with an empty `r` column, the HV backend resolves
202    /// this to the HV `ObjectId` itself before surfacing the tombstone to callers.
203    pub target: ObjectId,
204
205    /// The expiration policy copied from the original object.
206    pub expiration_policy: ExpirationPolicy,
207}
208
209/// Typed response from [`HighVolumeBackend::get_tiered_object`].
210pub enum TieredGet {
211    /// A real object was found.
212    Object(Metadata, PayloadStream),
213    /// A redirect tombstone was found; the real object lives in the long-term backend.
214    Tombstone(Tombstone),
215    /// No entry exists at this key.
216    NotFound,
217}
218
219impl fmt::Debug for TieredGet {
220    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
221        match self {
222            TieredGet::Object(metadata, _stream) => f
223                .debug_tuple("Object")
224                .field(metadata)
225                .finish_non_exhaustive(),
226            TieredGet::Tombstone(info) => f.debug_tuple("Tombstone").field(info).finish(),
227            TieredGet::NotFound => write!(f, "NotFound"),
228        }
229    }
230}
231
232/// Typed metadata-only response from [`HighVolumeBackend::get_tiered_metadata`].
233#[derive(Debug)]
234pub enum TieredMetadata {
235    /// Metadata for a real object was found.
236    Object(Metadata),
237    /// A redirect tombstone was found; the real object lives in the long-term backend.
238    Tombstone(Tombstone),
239    /// No entry exists at this key.
240    NotFound,
241}
242
243/// The write operation performed by [`HighVolumeBackend::compare_and_write`].
244#[derive(Clone, Debug)]
245pub enum TieredWrite {
246    /// Write a redirect tombstone.
247    Tombstone(Tombstone),
248    /// Write inline object data.
249    Object(Metadata, Bytes),
250    /// Delete the row entirely.
251    Delete,
252}
253
254impl TieredWrite {
255    /// Returns the tombstone target if this is a tombstone write, or `None` otherwise.
256    pub fn target(&self) -> Option<&ObjectId> {
257        match self {
258            TieredWrite::Tombstone(t) => Some(&t.target),
259            _ => None,
260        }
261    }
262}
263
264/// Creates a reqwest client with required defaults.
265///
266/// Automatic decompression is disabled because backends store pre-compressed
267/// payloads and manage `Content-Encoding` themselves.
268pub(super) fn reqwest_client() -> reqwest::Client {
269    reqwest::Client::builder()
270        .user_agent(USER_AGENT)
271        .hickory_dns(true)
272        .no_zstd()
273        .no_brotli()
274        .no_gzip()
275        .no_deflate()
276        .build()
277        .expect("Client::new()")
278}