objectstore_service/
lib.rs

1//! The Service layer is providing the fundamental storage abstraction,
2//! providing durable access to underlying blobs.
3//!
4//! It is designed as a library crate to be used by the `server`.
5#![warn(missing_docs)]
6#![warn(missing_debug_implementations)]
7
8mod backend;
9mod path;
10
11use bytes::BytesMut;
12use futures_util::{StreamExt, TryStreamExt};
13use objectstore_types::Metadata;
14
15use std::path::Path;
16use std::sync::Arc;
17use std::sync::atomic::{AtomicU64, Ordering};
18use std::time::Instant;
19
20use crate::backend::common::{BackendStream, BoxedBackend};
21
22pub use path::*;
23
24/// The threshold up until which we will go to the "high volume" backend.
25const BACKEND_SIZE_THRESHOLD: usize = 1024 * 1024; // 1 MiB
26
27enum BackendChoice {
28    HighVolume,
29    LongTerm,
30}
31
32/// High-level asynchronous service for storing and retrieving objects.
33#[derive(Clone, Debug)]
34pub struct StorageService(Arc<StorageServiceInner>);
35
36#[derive(Debug)]
37struct StorageServiceInner {
38    high_volume_backend: BoxedBackend,
39    long_term_backend: BoxedBackend,
40}
41
42/// Configuration to initialize a [`StorageService`].
43#[derive(Debug, Clone)]
44pub enum StorageConfig<'a> {
45    /// Use a local filesystem as the storage backend.
46    FileSystem {
47        /// The path to the directory where files will be stored.
48        path: &'a Path,
49    },
50    /// Use an S3-compatible storage backend.
51    S3Compatible {
52        /// Optional endpoint URL for the S3-compatible storage.
53        endpoint: &'a str,
54        /// The name of the bucket to use.
55        bucket: &'a str,
56    },
57    /// Use Google Cloud Storage as storage backend.
58    Gcs {
59        /// Optional endpoint URL for the S3-compatible storage.
60        ///
61        /// Assumes an emulator without authentication if set.
62        endpoint: Option<&'a str>,
63        /// The name of the bucket to use.
64        bucket: &'a str,
65    },
66    /// Use BigTable as storage backend.
67    BigTable {
68        /// Optional endpoint URL for the BigTable storage.
69        ///
70        /// Assumes an emulator without authentication if set.
71        endpoint: Option<&'a str>,
72        /// The Google Cloud project ID.
73        project_id: &'a str,
74        /// The BigTable instance name.
75        instance_name: &'a str,
76        /// The BigTable table name.
77        table_name: &'a str,
78        /// The number of concurrent connections to BigTable.
79        ///
80        /// Defaults to 2x the number of worker threads.
81        connections: Option<usize>,
82    },
83}
84
85impl StorageService {
86    /// Creates a new `StorageService` with the specified configuration.
87    pub async fn new(
88        high_volume_config: StorageConfig<'_>,
89        long_term_config: StorageConfig<'_>,
90    ) -> anyhow::Result<Self> {
91        let high_volume_backend = create_backend(high_volume_config).await?;
92        let long_term_backend = create_backend(long_term_config).await?;
93
94        let inner = StorageServiceInner {
95            high_volume_backend,
96            long_term_backend,
97        };
98        Ok(Self(Arc::new(inner)))
99    }
100
101    /// Stores or overwrites an object at the given key.
102    pub async fn put_object(
103        &self,
104        path: ObjectPath,
105        metadata: &Metadata,
106        mut stream: BackendStream,
107    ) -> anyhow::Result<ObjectPath> {
108        let start = Instant::now();
109
110        let mut first_chunk = BytesMut::new();
111        let mut backend = BackendChoice::HighVolume;
112        while let Some(chunk) = stream.try_next().await? {
113            first_chunk.extend_from_slice(&chunk);
114
115            if first_chunk.len() > BACKEND_SIZE_THRESHOLD {
116                backend = BackendChoice::LongTerm;
117                break;
118            }
119        }
120
121        // There might currently be a tombstone at the given path from a previously stored object.
122        let previously_stored_object = self.0.high_volume_backend.get_object(&path).await?;
123        if is_tombstoned(&previously_stored_object) {
124            // Write the object to the other backend and keep the tombstone in place
125            backend = BackendChoice::LongTerm;
126        }
127
128        let (backend_choice, backend_ty, stored_size) = match backend {
129            BackendChoice::HighVolume => {
130                let stored_size = first_chunk.len() as u64;
131                let stream = futures_util::stream::once(async { Ok(first_chunk.into()) }).boxed();
132
133                self.0
134                    .high_volume_backend
135                    .put_object(&path, metadata, stream)
136                    .await?;
137                (
138                    "high-volume",
139                    self.0.high_volume_backend.name(),
140                    stored_size,
141                )
142            }
143            BackendChoice::LongTerm => {
144                let stored_size = Arc::new(AtomicU64::new(0));
145                let stream = futures_util::stream::once(async { Ok(first_chunk.into()) })
146                    .chain(stream)
147                    .inspect({
148                        let stored_size = Arc::clone(&stored_size);
149                        move |res| {
150                            if let Ok(chunk) = res {
151                                stored_size.fetch_add(chunk.len() as u64, Ordering::Relaxed);
152                            }
153                        }
154                    })
155                    .boxed();
156
157                // first write the object
158                self.0
159                    .long_term_backend
160                    .put_object(&path, metadata, stream)
161                    .await?;
162
163                let redirect_metadata = Metadata {
164                    is_redirect_tombstone: Some(true),
165                    expiration_policy: metadata.expiration_policy,
166                    ..Default::default()
167                };
168                let redirect_stream = futures_util::stream::empty().boxed();
169                let redirect_request = self.0.high_volume_backend.put_object(
170                    &path,
171                    &redirect_metadata,
172                    redirect_stream,
173                );
174
175                // then we write the tombstone
176                let redirect_result = redirect_request.await;
177                if redirect_result.is_err() {
178                    // and clean up on any kind of error
179                    self.0.long_term_backend.delete_object(&path).await?;
180                }
181                redirect_result?;
182
183                (
184                    "long-term",
185                    self.0.long_term_backend.name(),
186                    stored_size.load(Ordering::Acquire),
187                )
188            }
189        };
190
191        merni::distribution!(
192            "put.latency"@s: start.elapsed(),
193            "usecase" => path.usecase,
194            "backend_choice" => backend_choice,
195            "backend_type" => backend_ty
196        );
197        merni::distribution!(
198            "put.size"@b: stored_size,
199            "usecase" => path.usecase,
200            "backend_choice" => backend_choice,
201            "backend_type" => backend_ty
202        );
203
204        Ok(path)
205    }
206
207    /// Streams the contents of an object stored at the given key.
208    pub async fn get_object(
209        &self,
210        path: &ObjectPath,
211    ) -> anyhow::Result<Option<(Metadata, BackendStream)>> {
212        let start = Instant::now();
213
214        let mut backend_choice = "high-volume";
215        let mut backend_type = self.0.high_volume_backend.name();
216        let mut result = self.0.high_volume_backend.get_object(path).await?;
217
218        if is_tombstoned(&result) {
219            result = self.0.long_term_backend.get_object(path).await?;
220            backend_choice = "long-term";
221            backend_type = self.0.long_term_backend.name();
222        }
223
224        merni::distribution!(
225            "get.latency.pre-response"@s: start.elapsed(),
226            "usecase" => path.usecase,
227            "backend_choice" => backend_choice,
228            "backend_type" => backend_type
229        );
230
231        if let Some((metadata, _stream)) = &result {
232            if let Some(size) = metadata.size {
233                merni::distribution!(
234                    "get.size"@b: size,
235                    "usecase" => path.usecase,
236                    "backend_choice" => backend_choice,
237                    "backend_type" => backend_type
238                );
239            } else {
240                tracing::warn!(?backend_type, "Missing object size");
241            }
242        }
243
244        Ok(result)
245    }
246
247    /// Deletes an object stored at the given key, if it exists.
248    pub async fn delete_object(&self, path: &ObjectPath) -> anyhow::Result<()> {
249        let start = Instant::now();
250
251        if let Some((metadata, _stream)) = self.0.high_volume_backend.get_object(path).await? {
252            if metadata.is_redirect_tombstone == Some(true) {
253                self.0.long_term_backend.delete_object(path).await?;
254            }
255            self.0.high_volume_backend.delete_object(path).await?;
256        }
257
258        merni::distribution!(
259            "delete.latency"@s: start.elapsed(),
260            "usecase" => path.usecase
261        );
262
263        Ok(())
264    }
265}
266
267fn is_tombstoned(result: &Option<(Metadata, BackendStream)>) -> bool {
268    matches!(
269        result,
270        Some((
271            Metadata {
272                is_redirect_tombstone: Some(true),
273                ..
274            },
275            _
276        ))
277    )
278}
279
280async fn create_backend(config: StorageConfig<'_>) -> anyhow::Result<BoxedBackend> {
281    Ok(match config {
282        StorageConfig::FileSystem { path } => {
283            Box::new(backend::local_fs::LocalFsBackend::new(path))
284        }
285        StorageConfig::S3Compatible { endpoint, bucket } => Box::new(
286            backend::s3_compatible::S3CompatibleBackend::without_token(endpoint, bucket),
287        ),
288        StorageConfig::Gcs { endpoint, bucket } => {
289            Box::new(backend::gcs::GcsBackend::new(endpoint, bucket).await?)
290        }
291        StorageConfig::BigTable {
292            endpoint,
293            project_id,
294            instance_name,
295            table_name,
296            connections,
297        } => Box::new(
298            backend::bigtable::BigTableBackend::new(
299                endpoint,
300                project_id,
301                instance_name,
302                table_name,
303                connections,
304            )
305            .await?,
306        ),
307    })
308}
309
310#[cfg(test)]
311mod tests {
312    use bytes::BytesMut;
313    use futures_util::{StreamExt, TryStreamExt};
314
315    use super::*;
316
317    fn make_stream(contents: &[u8]) -> BackendStream {
318        tokio_stream::once(Ok(contents.to_vec().into())).boxed()
319    }
320
321    fn make_path() -> ObjectPath {
322        ObjectPath {
323            usecase: "testing".into(),
324            scope: "testing".into(),
325            key: "testing".into(),
326        }
327    }
328
329    #[tokio::test]
330    async fn stores_files() {
331        let tempdir = tempfile::tempdir().unwrap();
332        let config = StorageConfig::FileSystem {
333            path: tempdir.path(),
334        };
335        let service = StorageService::new(config.clone(), config).await.unwrap();
336
337        let key = service
338            .put_object(make_path(), &Default::default(), make_stream(b"oh hai!"))
339            .await
340            .unwrap();
341
342        let (_metadata, stream) = service.get_object(&key).await.unwrap().unwrap();
343        let file_contents: BytesMut = stream.try_collect().await.unwrap();
344
345        assert_eq!(file_contents.as_ref(), b"oh hai!");
346    }
347
348    #[tokio::test]
349    async fn works_with_gcs() {
350        let config = StorageConfig::Gcs {
351            endpoint: Some("http://localhost:8087"),
352            bucket: "test-bucket", // aligned with the env var in devservices and CI
353        };
354        let service = StorageService::new(config.clone(), config).await.unwrap();
355
356        let key = service
357            .put_object(make_path(), &Default::default(), make_stream(b"oh hai!"))
358            .await
359            .unwrap();
360
361        let (_metadata, stream) = service.get_object(&key).await.unwrap().unwrap();
362        let file_contents: BytesMut = stream.try_collect().await.unwrap();
363
364        assert_eq!(file_contents.as_ref(), b"oh hai!");
365    }
366}