objectstore_service/
lib.rs

1//! The Service layer is providing the fundamental storage abstraction,
2//! providing durable access to underlying blobs.
3//!
4//! It is designed as a library crate to be used by the `server`.
5#![warn(missing_docs)]
6#![warn(missing_debug_implementations)]
7
8// TODO(ja): Re-organize modules
9mod backend;
10mod error;
11pub use error::{ServiceError, ServiceResult};
12pub mod id;
13
14use std::path::Path;
15use std::sync::Arc;
16use std::sync::atomic::{AtomicU64, Ordering};
17use std::time::Instant;
18
19use bytes::{Bytes, BytesMut};
20use futures_util::{StreamExt, TryStreamExt, stream::BoxStream};
21use objectstore_types::Metadata;
22
23use crate::backend::common::BoxedBackend;
24use crate::id::{ObjectContext, ObjectId};
25
26/// The threshold up until which we will go to the "high volume" backend.
27const BACKEND_SIZE_THRESHOLD: usize = 1024 * 1024; // 1 MiB
28
29enum BackendChoice {
30    HighVolume,
31    LongTerm,
32}
33
34/// Service response for get operations.
35pub type GetResponse = Option<(Metadata, PayloadStream)>;
36/// Service response for insert operations.
37pub type InsertResponse = ObjectId;
38/// Service response for delete operations.
39pub type DeleteResponse = ();
40
41// TODO(ja): Move into a submodule
42/// High-level asynchronous service for storing and retrieving objects.
43#[derive(Clone, Debug)]
44pub struct StorageService(Arc<StorageServiceInner>);
45
46#[derive(Debug)]
47struct StorageServiceInner {
48    high_volume_backend: BoxedBackend,
49    long_term_backend: BoxedBackend,
50}
51
52/// Type alias for data streams used in service APIs.
53pub type PayloadStream = BoxStream<'static, std::io::Result<Bytes>>;
54
55/// Configuration to initialize a [`StorageService`].
56#[derive(Debug, Clone)]
57pub enum StorageConfig<'a> {
58    /// Use a local filesystem as the storage backend.
59    FileSystem {
60        /// The path to the directory where files will be stored.
61        path: &'a Path,
62    },
63    /// Use an S3-compatible storage backend.
64    S3Compatible {
65        /// Optional endpoint URL for the S3-compatible storage.
66        endpoint: &'a str,
67        /// The name of the bucket to use.
68        bucket: &'a str,
69    },
70    /// Use Google Cloud Storage as storage backend.
71    Gcs {
72        /// Optional endpoint URL for the S3-compatible storage.
73        ///
74        /// Assumes an emulator without authentication if set.
75        endpoint: Option<&'a str>,
76        /// The name of the bucket to use.
77        bucket: &'a str,
78    },
79    /// Use BigTable as storage backend.
80    BigTable {
81        /// Optional endpoint URL for the BigTable storage.
82        ///
83        /// Assumes an emulator without authentication if set.
84        endpoint: Option<&'a str>,
85        /// The Google Cloud project ID.
86        project_id: &'a str,
87        /// The BigTable instance name.
88        instance_name: &'a str,
89        /// The BigTable table name.
90        table_name: &'a str,
91        /// The number of concurrent connections to BigTable.
92        ///
93        /// Defaults to 2x the number of worker threads.
94        connections: Option<usize>,
95    },
96}
97
98impl StorageService {
99    /// Creates a new `StorageService` with the specified configuration.
100    pub async fn new(
101        high_volume_config: StorageConfig<'_>,
102        long_term_config: StorageConfig<'_>,
103    ) -> anyhow::Result<Self> {
104        let high_volume_backend = create_backend(high_volume_config).await?;
105        let long_term_backend = create_backend(long_term_config).await?;
106
107        let inner = StorageServiceInner {
108            high_volume_backend,
109            long_term_backend,
110        };
111        Ok(Self(Arc::new(inner)))
112    }
113
114    /// Creates or overwrites an object.
115    ///
116    /// The object is identified by the components of an [`ObjectId`]. The `context` is required,
117    /// while the `key` can be assigned automatically if set to `None`.
118    pub async fn insert_object(
119        &self,
120        context: ObjectContext,
121        key: Option<String>,
122        metadata: &Metadata,
123        mut stream: PayloadStream,
124    ) -> ServiceResult<InsertResponse> {
125        let start = Instant::now();
126
127        let mut first_chunk = BytesMut::new();
128        let mut backend = BackendChoice::HighVolume;
129        while let Some(chunk) = stream.try_next().await? {
130            first_chunk.extend_from_slice(&chunk);
131
132            if first_chunk.len() > BACKEND_SIZE_THRESHOLD {
133                backend = BackendChoice::LongTerm;
134                break;
135            }
136        }
137
138        let has_key = key.is_some();
139        let id = ObjectId::optional(context, key);
140
141        // There might currently be a tombstone at the given path from a previously stored object.
142        if has_key {
143            let previously_stored_object = self.0.high_volume_backend.get_object(&id).await?;
144            if is_tombstoned(&previously_stored_object) {
145                // Write the object to the other backend and keep the tombstone in place
146                backend = BackendChoice::LongTerm;
147            }
148        };
149
150        let (backend_choice, backend_ty, stored_size) = match backend {
151            BackendChoice::HighVolume => {
152                let stored_size = first_chunk.len() as u64;
153                let stream = futures_util::stream::once(async { Ok(first_chunk.into()) }).boxed();
154
155                self.0
156                    .high_volume_backend
157                    .put_object(&id, metadata, stream)
158                    .await?;
159                (
160                    "high-volume",
161                    self.0.high_volume_backend.name(),
162                    stored_size,
163                )
164            }
165            BackendChoice::LongTerm => {
166                let stored_size = Arc::new(AtomicU64::new(0));
167                let stream = futures_util::stream::once(async { Ok(first_chunk.into()) })
168                    .chain(stream)
169                    .inspect({
170                        let stored_size = Arc::clone(&stored_size);
171                        move |res| {
172                            if let Ok(chunk) = res {
173                                stored_size.fetch_add(chunk.len() as u64, Ordering::Relaxed);
174                            }
175                        }
176                    })
177                    .boxed();
178
179                // first write the object
180                self.0
181                    .long_term_backend
182                    .put_object(&id, metadata, stream)
183                    .await?;
184
185                let redirect_metadata = Metadata {
186                    is_redirect_tombstone: Some(true),
187                    expiration_policy: metadata.expiration_policy,
188                    ..Default::default()
189                };
190                let redirect_stream = futures_util::stream::empty().boxed();
191                let redirect_request =
192                    self.0
193                        .high_volume_backend
194                        .put_object(&id, &redirect_metadata, redirect_stream);
195
196                // then we write the tombstone
197                let redirect_result = redirect_request.await;
198                if redirect_result.is_err() {
199                    // and clean up on any kind of error
200                    self.0.long_term_backend.delete_object(&id).await?;
201                }
202                redirect_result?;
203
204                (
205                    "long-term",
206                    self.0.long_term_backend.name(),
207                    stored_size.load(Ordering::Acquire),
208                )
209            }
210        };
211
212        merni::distribution!(
213            "put.latency"@s: start.elapsed(),
214            "usecase" => id.usecase(),
215            "backend_choice" => backend_choice,
216            "backend_type" => backend_ty
217        );
218        merni::distribution!(
219            "put.size"@b: stored_size,
220            "usecase" => id.usecase(),
221            "backend_choice" => backend_choice,
222            "backend_type" => backend_ty
223        );
224
225        Ok(id)
226    }
227
228    /// Streams the contents of an object stored at the given key.
229    pub async fn get_object(&self, id: &ObjectId) -> ServiceResult<GetResponse> {
230        let start = Instant::now();
231
232        let mut backend_choice = "high-volume";
233        let mut backend_type = self.0.high_volume_backend.name();
234        let mut result = self.0.high_volume_backend.get_object(id).await?;
235
236        if is_tombstoned(&result) {
237            result = self.0.long_term_backend.get_object(id).await?;
238            backend_choice = "long-term";
239            backend_type = self.0.long_term_backend.name();
240        }
241
242        merni::distribution!(
243            "get.latency.pre-response"@s: start.elapsed(),
244            "usecase" => id.usecase(),
245            "backend_choice" => backend_choice,
246            "backend_type" => backend_type
247        );
248
249        if let Some((metadata, _stream)) = &result {
250            if let Some(size) = metadata.size {
251                merni::distribution!(
252                    "get.size"@b: size,
253                    "usecase" => id.usecase(),
254                    "backend_choice" => backend_choice,
255                    "backend_type" => backend_type
256                );
257            } else {
258                tracing::warn!(?backend_type, "Missing object size");
259            }
260        }
261
262        Ok(result)
263    }
264
265    /// Deletes an object stored at the given key, if it exists.
266    pub async fn delete_object(&self, id: &ObjectId) -> ServiceResult<DeleteResponse> {
267        let start = Instant::now();
268
269        if let Some((metadata, _stream)) = self.0.high_volume_backend.get_object(id).await? {
270            if metadata.is_redirect_tombstone == Some(true) {
271                self.0.long_term_backend.delete_object(id).await?;
272            }
273            self.0.high_volume_backend.delete_object(id).await?;
274        }
275
276        merni::distribution!(
277            "delete.latency"@s: start.elapsed(),
278            "usecase" => id.usecase()
279        );
280
281        Ok(())
282    }
283}
284
285fn is_tombstoned(result: &GetResponse) -> bool {
286    matches!(
287        result,
288        Some((
289            Metadata {
290                is_redirect_tombstone: Some(true),
291                ..
292            },
293            _
294        ))
295    )
296}
297
298async fn create_backend(config: StorageConfig<'_>) -> anyhow::Result<BoxedBackend> {
299    Ok(match config {
300        StorageConfig::FileSystem { path } => {
301            Box::new(backend::local_fs::LocalFsBackend::new(path))
302        }
303        StorageConfig::S3Compatible { endpoint, bucket } => Box::new(
304            backend::s3_compatible::S3CompatibleBackend::without_token(endpoint, bucket),
305        ),
306        StorageConfig::Gcs { endpoint, bucket } => {
307            Box::new(backend::gcs::GcsBackend::new(endpoint, bucket).await?)
308        }
309        StorageConfig::BigTable {
310            endpoint,
311            project_id,
312            instance_name,
313            table_name,
314            connections,
315        } => Box::new(
316            backend::bigtable::BigTableBackend::new(
317                endpoint,
318                project_id,
319                instance_name,
320                table_name,
321                connections,
322            )
323            .await?,
324        ),
325    })
326}
327
328#[cfg(test)]
329mod tests {
330    use bytes::BytesMut;
331    use futures_util::{StreamExt, TryStreamExt};
332
333    use objectstore_types::scope::{Scope, Scopes};
334
335    use super::*;
336
337    fn make_stream(contents: &[u8]) -> PayloadStream {
338        tokio_stream::once(Ok(contents.to_vec().into())).boxed()
339    }
340
341    fn make_context() -> ObjectContext {
342        ObjectContext {
343            usecase: "testing".into(),
344            scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
345        }
346    }
347
348    #[tokio::test]
349    async fn stores_files() {
350        let tempdir = tempfile::tempdir().unwrap();
351        let config = StorageConfig::FileSystem {
352            path: tempdir.path(),
353        };
354        let service = StorageService::new(config.clone(), config).await.unwrap();
355
356        let key = service
357            .insert_object(
358                make_context(),
359                Some("testing".into()),
360                &Default::default(),
361                make_stream(b"oh hai!"),
362            )
363            .await
364            .unwrap();
365
366        let (_metadata, stream) = service.get_object(&key).await.unwrap().unwrap();
367        let file_contents: BytesMut = stream.try_collect().await.unwrap();
368
369        assert_eq!(file_contents.as_ref(), b"oh hai!");
370    }
371
372    #[tokio::test]
373    async fn works_with_gcs() {
374        let config = StorageConfig::Gcs {
375            endpoint: Some("http://localhost:8087"),
376            bucket: "test-bucket", // aligned with the env var in devservices and CI
377        };
378        let service = StorageService::new(config.clone(), config).await.unwrap();
379
380        let key = service
381            .insert_object(
382                make_context(),
383                Some("testing".into()),
384                &Default::default(),
385                make_stream(b"oh hai!"),
386            )
387            .await
388            .unwrap();
389
390        let (_metadata, stream) = service.get_object(&key).await.unwrap().unwrap();
391        let file_contents: BytesMut = stream.try_collect().await.unwrap();
392
393        assert_eq!(file_contents.as_ref(), b"oh hai!");
394    }
395}