objectstore_service/
lib.rs

1//! The Service layer is providing the fundamental storage abstraction,
2//! providing durable access to underlying blobs.
3//!
4//! It is designed as a library crate to be used by the `server`.
5#![warn(missing_docs)]
6#![warn(missing_debug_implementations)]
7
8// TODO(ja): Re-organize modules
9mod backend;
10mod error;
11pub use error::{ServiceError, ServiceResult};
12pub mod id;
13
14use std::path::Path;
15use std::sync::Arc;
16use std::sync::atomic::{AtomicU64, Ordering};
17use std::time::Instant;
18
19use bytes::{Bytes, BytesMut};
20use futures_util::{StreamExt, TryStreamExt, stream::BoxStream};
21use objectstore_types::Metadata;
22
23use crate::backend::common::BoxedBackend;
24use crate::id::{ObjectContext, ObjectId};
25
26/// The threshold up until which we will go to the "high volume" backend.
27const BACKEND_SIZE_THRESHOLD: usize = 1024 * 1024; // 1 MiB
28
29enum BackendChoice {
30    HighVolume,
31    LongTerm,
32}
33
34// TODO(ja): Move into a submodule
35/// High-level asynchronous service for storing and retrieving objects.
36#[derive(Clone, Debug)]
37pub struct StorageService(Arc<StorageServiceInner>);
38
39#[derive(Debug)]
40struct StorageServiceInner {
41    high_volume_backend: BoxedBackend,
42    long_term_backend: BoxedBackend,
43}
44
45/// Type alias for data streams used in service APIs.
46pub type PayloadStream = BoxStream<'static, std::io::Result<Bytes>>;
47
48/// Configuration to initialize a [`StorageService`].
49#[derive(Debug, Clone)]
50pub enum StorageConfig<'a> {
51    /// Use a local filesystem as the storage backend.
52    FileSystem {
53        /// The path to the directory where files will be stored.
54        path: &'a Path,
55    },
56    /// Use an S3-compatible storage backend.
57    S3Compatible {
58        /// Optional endpoint URL for the S3-compatible storage.
59        endpoint: &'a str,
60        /// The name of the bucket to use.
61        bucket: &'a str,
62    },
63    /// Use Google Cloud Storage as storage backend.
64    Gcs {
65        /// Optional endpoint URL for the S3-compatible storage.
66        ///
67        /// Assumes an emulator without authentication if set.
68        endpoint: Option<&'a str>,
69        /// The name of the bucket to use.
70        bucket: &'a str,
71    },
72    /// Use BigTable as storage backend.
73    BigTable {
74        /// Optional endpoint URL for the BigTable storage.
75        ///
76        /// Assumes an emulator without authentication if set.
77        endpoint: Option<&'a str>,
78        /// The Google Cloud project ID.
79        project_id: &'a str,
80        /// The BigTable instance name.
81        instance_name: &'a str,
82        /// The BigTable table name.
83        table_name: &'a str,
84        /// The number of concurrent connections to BigTable.
85        ///
86        /// Defaults to 2x the number of worker threads.
87        connections: Option<usize>,
88    },
89}
90
91impl StorageService {
92    /// Creates a new `StorageService` with the specified configuration.
93    pub async fn new(
94        high_volume_config: StorageConfig<'_>,
95        long_term_config: StorageConfig<'_>,
96    ) -> anyhow::Result<Self> {
97        let high_volume_backend = create_backend(high_volume_config).await?;
98        let long_term_backend = create_backend(long_term_config).await?;
99
100        let inner = StorageServiceInner {
101            high_volume_backend,
102            long_term_backend,
103        };
104        Ok(Self(Arc::new(inner)))
105    }
106
107    /// Creates or overwrites an object.
108    ///
109    /// The object is identified by the components of an [`ObjectId`]. The `context` is required,
110    /// while the `key` can be assigned automatically if set to `None`.
111    pub async fn insert_object(
112        &self,
113        context: ObjectContext,
114        key: Option<String>,
115        metadata: &Metadata,
116        mut stream: PayloadStream,
117    ) -> ServiceResult<ObjectId> {
118        let start = Instant::now();
119
120        let mut first_chunk = BytesMut::new();
121        let mut backend = BackendChoice::HighVolume;
122        while let Some(chunk) = stream.try_next().await? {
123            first_chunk.extend_from_slice(&chunk);
124
125            if first_chunk.len() > BACKEND_SIZE_THRESHOLD {
126                backend = BackendChoice::LongTerm;
127                break;
128            }
129        }
130
131        let has_key = key.is_some();
132        let id = ObjectId::optional(context, key);
133
134        // There might currently be a tombstone at the given path from a previously stored object.
135        if has_key {
136            let previously_stored_object = self.0.high_volume_backend.get_object(&id).await?;
137            if is_tombstoned(&previously_stored_object) {
138                // Write the object to the other backend and keep the tombstone in place
139                backend = BackendChoice::LongTerm;
140            }
141        };
142
143        let (backend_choice, backend_ty, stored_size) = match backend {
144            BackendChoice::HighVolume => {
145                let stored_size = first_chunk.len() as u64;
146                let stream = futures_util::stream::once(async { Ok(first_chunk.into()) }).boxed();
147
148                self.0
149                    .high_volume_backend
150                    .put_object(&id, metadata, stream)
151                    .await?;
152                (
153                    "high-volume",
154                    self.0.high_volume_backend.name(),
155                    stored_size,
156                )
157            }
158            BackendChoice::LongTerm => {
159                let stored_size = Arc::new(AtomicU64::new(0));
160                let stream = futures_util::stream::once(async { Ok(first_chunk.into()) })
161                    .chain(stream)
162                    .inspect({
163                        let stored_size = Arc::clone(&stored_size);
164                        move |res| {
165                            if let Ok(chunk) = res {
166                                stored_size.fetch_add(chunk.len() as u64, Ordering::Relaxed);
167                            }
168                        }
169                    })
170                    .boxed();
171
172                // first write the object
173                self.0
174                    .long_term_backend
175                    .put_object(&id, metadata, stream)
176                    .await?;
177
178                let redirect_metadata = Metadata {
179                    is_redirect_tombstone: Some(true),
180                    expiration_policy: metadata.expiration_policy,
181                    ..Default::default()
182                };
183                let redirect_stream = futures_util::stream::empty().boxed();
184                let redirect_request =
185                    self.0
186                        .high_volume_backend
187                        .put_object(&id, &redirect_metadata, redirect_stream);
188
189                // then we write the tombstone
190                let redirect_result = redirect_request.await;
191                if redirect_result.is_err() {
192                    // and clean up on any kind of error
193                    self.0.long_term_backend.delete_object(&id).await?;
194                }
195                redirect_result?;
196
197                (
198                    "long-term",
199                    self.0.long_term_backend.name(),
200                    stored_size.load(Ordering::Acquire),
201                )
202            }
203        };
204
205        merni::distribution!(
206            "put.latency"@s: start.elapsed(),
207            "usecase" => id.usecase(),
208            "backend_choice" => backend_choice,
209            "backend_type" => backend_ty
210        );
211        merni::distribution!(
212            "put.size"@b: stored_size,
213            "usecase" => id.usecase(),
214            "backend_choice" => backend_choice,
215            "backend_type" => backend_ty
216        );
217
218        Ok(id)
219    }
220
221    /// Streams the contents of an object stored at the given key.
222    pub async fn get_object(
223        &self,
224        id: &ObjectId,
225    ) -> ServiceResult<Option<(Metadata, PayloadStream)>> {
226        let start = Instant::now();
227
228        let mut backend_choice = "high-volume";
229        let mut backend_type = self.0.high_volume_backend.name();
230        let mut result = self.0.high_volume_backend.get_object(id).await?;
231
232        if is_tombstoned(&result) {
233            result = self.0.long_term_backend.get_object(id).await?;
234            backend_choice = "long-term";
235            backend_type = self.0.long_term_backend.name();
236        }
237
238        merni::distribution!(
239            "get.latency.pre-response"@s: start.elapsed(),
240            "usecase" => id.usecase(),
241            "backend_choice" => backend_choice,
242            "backend_type" => backend_type
243        );
244
245        if let Some((metadata, _stream)) = &result {
246            if let Some(size) = metadata.size {
247                merni::distribution!(
248                    "get.size"@b: size,
249                    "usecase" => id.usecase(),
250                    "backend_choice" => backend_choice,
251                    "backend_type" => backend_type
252                );
253            } else {
254                tracing::warn!(?backend_type, "Missing object size");
255            }
256        }
257
258        Ok(result)
259    }
260
261    /// Deletes an object stored at the given key, if it exists.
262    pub async fn delete_object(&self, id: &ObjectId) -> ServiceResult<()> {
263        let start = Instant::now();
264
265        if let Some((metadata, _stream)) = self.0.high_volume_backend.get_object(id).await? {
266            if metadata.is_redirect_tombstone == Some(true) {
267                self.0.long_term_backend.delete_object(id).await?;
268            }
269            self.0.high_volume_backend.delete_object(id).await?;
270        }
271
272        merni::distribution!(
273            "delete.latency"@s: start.elapsed(),
274            "usecase" => id.usecase()
275        );
276
277        Ok(())
278    }
279}
280
281fn is_tombstoned(result: &Option<(Metadata, PayloadStream)>) -> bool {
282    matches!(
283        result,
284        Some((
285            Metadata {
286                is_redirect_tombstone: Some(true),
287                ..
288            },
289            _
290        ))
291    )
292}
293
294async fn create_backend(config: StorageConfig<'_>) -> anyhow::Result<BoxedBackend> {
295    Ok(match config {
296        StorageConfig::FileSystem { path } => {
297            Box::new(backend::local_fs::LocalFsBackend::new(path))
298        }
299        StorageConfig::S3Compatible { endpoint, bucket } => Box::new(
300            backend::s3_compatible::S3CompatibleBackend::without_token(endpoint, bucket),
301        ),
302        StorageConfig::Gcs { endpoint, bucket } => {
303            Box::new(backend::gcs::GcsBackend::new(endpoint, bucket).await?)
304        }
305        StorageConfig::BigTable {
306            endpoint,
307            project_id,
308            instance_name,
309            table_name,
310            connections,
311        } => Box::new(
312            backend::bigtable::BigTableBackend::new(
313                endpoint,
314                project_id,
315                instance_name,
316                table_name,
317                connections,
318            )
319            .await?,
320        ),
321    })
322}
323
324#[cfg(test)]
325mod tests {
326    use bytes::BytesMut;
327    use futures_util::{StreamExt, TryStreamExt};
328
329    use objectstore_types::scope::{Scope, Scopes};
330
331    use super::*;
332
333    fn make_stream(contents: &[u8]) -> PayloadStream {
334        tokio_stream::once(Ok(contents.to_vec().into())).boxed()
335    }
336
337    fn make_context() -> ObjectContext {
338        ObjectContext {
339            usecase: "testing".into(),
340            scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
341        }
342    }
343
344    #[tokio::test]
345    async fn stores_files() {
346        let tempdir = tempfile::tempdir().unwrap();
347        let config = StorageConfig::FileSystem {
348            path: tempdir.path(),
349        };
350        let service = StorageService::new(config.clone(), config).await.unwrap();
351
352        let key = service
353            .insert_object(
354                make_context(),
355                Some("testing".into()),
356                &Default::default(),
357                make_stream(b"oh hai!"),
358            )
359            .await
360            .unwrap();
361
362        let (_metadata, stream) = service.get_object(&key).await.unwrap().unwrap();
363        let file_contents: BytesMut = stream.try_collect().await.unwrap();
364
365        assert_eq!(file_contents.as_ref(), b"oh hai!");
366    }
367
368    #[tokio::test]
369    async fn works_with_gcs() {
370        let config = StorageConfig::Gcs {
371            endpoint: Some("http://localhost:8087"),
372            bucket: "test-bucket", // aligned with the env var in devservices and CI
373        };
374        let service = StorageService::new(config.clone(), config).await.unwrap();
375
376        let key = service
377            .insert_object(
378                make_context(),
379                Some("testing".into()),
380                &Default::default(),
381                make_stream(b"oh hai!"),
382            )
383            .await
384            .unwrap();
385
386        let (_metadata, stream) = service.get_object(&key).await.unwrap().unwrap();
387        let file_contents: BytesMut = stream.try_collect().await.unwrap();
388
389        assert_eq!(file_contents.as_ref(), b"oh hai!");
390    }
391}