objectstore_service/
service.rs

1//! Core storage service and configuration.
2//!
3//! [`StorageService`] is the main entry point for storing and retrieving
4//! objects. Each operation runs in a separate tokio task for panic isolation.
5//!
6//! See the [crate-level documentation](crate) for full architecture details.
7
8use std::future::Future;
9use std::sync::Arc;
10
11use objectstore_types::metadata::Metadata;
12
13use crate::backend::common::Backend;
14use crate::concurrency::ConcurrencyLimiter;
15use crate::error::{Error, Result};
16use crate::id::{ObjectContext, ObjectId};
17use crate::stream::{ClientStream, PayloadStream};
18use crate::streaming::StreamExecutor;
19
20/// Service response for [`StorageService::get_object`].
21pub type GetResponse = Option<(Metadata, PayloadStream)>;
22/// Service response for [`StorageService::get_metadata`].
23pub type MetadataResponse = Option<Metadata>;
24/// Service response for [`StorageService::insert_object`].
25pub type InsertResponse = ObjectId;
26/// Service response for [`StorageService::delete_object`].
27pub type DeleteResponse = ();
28
29/// Default concurrency limit for [`StorageService`].
30///
31/// This value is used when no explicit limit is set via
32/// [`StorageService::with_concurrency_limit`].
33pub const DEFAULT_CONCURRENCY_LIMIT: usize = 500;
34
35/// Asynchronous storage service wrapping a single [`Backend`].
36///
37/// `StorageService` is the main entry point for storing and retrieving objects.
38/// It delegates all storage operations to the backend supplied at construction,
39/// adding task spawning, panic isolation, and a concurrency limit on top.
40///
41/// The typical backend is [`TieredStorage`](crate::backend::tiered::TieredStorage),
42/// which provides size-based routing to high-volume and long-term backends along
43/// with redirect tombstone management. Any type implementing [`Backend`] can be used.
44///
45/// # Lifecycle
46///
47/// After construction, call [`start`](StorageService::start) to start the
48/// service's background processes.
49///
50/// # Run-to-Completion and Panic Isolation
51///
52/// Each operation runs to completion even if the caller is cancelled (e.g., on
53/// client disconnect). This ensures that multi-step operations in the backend
54/// are never left partially applied. Post-commit cleanup (e.g. deleting
55/// unreferenced long-term blobs) runs in background tasks so callers are not
56/// blocked. Call [`join`](StorageService::join) during shutdown to wait for
57/// outstanding cleanup. Operations are also isolated from panics in backend
58/// code — a failure in one operation does not bring down other in-flight work.
59/// See [`Error::Panic`].
60///
61/// # Concurrency Limit
62///
63/// A semaphore caps the number of in-flight backend operations. The limit is
64/// configured via [`with_concurrency_limit`](StorageService::with_concurrency_limit);
65/// without an explicit value the default is [`DEFAULT_CONCURRENCY_LIMIT`].
66/// Operations that exceed the limit are rejected immediately with
67/// [`Error::AtCapacity`].
68#[derive(Clone, Debug)]
69pub struct StorageService {
70    inner: Arc<dyn Backend>,
71    concurrency: ConcurrencyLimiter,
72}
73
74impl StorageService {
75    /// Creates a new `StorageService` wrapping the given backend.
76    pub fn new(backend: Box<dyn Backend>) -> Self {
77        Self {
78            inner: Arc::from(backend),
79            concurrency: ConcurrencyLimiter::new(DEFAULT_CONCURRENCY_LIMIT),
80        }
81    }
82
83    /// Sets the maximum number of concurrent backend operations.
84    ///
85    /// Must be called before [`start`](Self::start). Operations beyond this
86    /// limit are rejected with [`Error::AtCapacity`].
87    pub fn with_concurrency_limit(mut self, max: usize) -> Self {
88        self.concurrency = ConcurrencyLimiter::new(max);
89        self
90    }
91
92    /// Returns the number of backend task slots currently available.
93    pub fn tasks_available(&self) -> usize {
94        self.concurrency.available_permits()
95    }
96
97    /// Returns the number of backend tasks currently running.
98    pub fn tasks_running(&self) -> usize {
99        self.concurrency.used_permits()
100    }
101
102    /// Returns the configured limit for concurrent backend tasks.
103    pub fn tasks_limit(&self) -> usize {
104        self.concurrency.total_permits()
105    }
106
107    /// Prepares to stream multiple operations concurrently against this service.
108    ///
109    /// Operations are executed concurrently up to a window derived from the
110    /// service's current capacity. The permits for that window are reserved
111    /// upfront — if the service is at capacity, this returns
112    /// [`Error::AtCapacity`] immediately before any operations are read.
113    pub fn stream(&self) -> Result<StreamExecutor> {
114        let available = self.tasks_available();
115        let window = (available as f64 * 0.10).ceil() as usize;
116
117        let acquire_result = match window {
118            0 => Err(Error::AtCapacity),
119            _ => self.concurrency.try_acquire_many(window),
120        };
121        let reservation = acquire_result.inspect_err(|_| {
122            objectstore_metrics::count!("service.concurrency.rejected");
123            objectstore_log::warn!("Request rejected: service at capacity");
124        })?;
125
126        Ok(StreamExecutor {
127            backend: Arc::clone(&self.inner),
128            window,
129            reservation,
130        })
131    }
132
133    /// Starts background processes for the storage service.
134    ///
135    /// Currently spawns a task that emits the `service.concurrency.in_use`
136    /// and `service.concurrency.limit` gauges once per second.
137    pub fn start(&self) {
138        let concurrency = self.concurrency.clone();
139        let limit = concurrency.total_permits();
140        tokio::spawn(async move {
141            concurrency
142                .run_emitter(|permits| async move {
143                    objectstore_metrics::gauge!("service.concurrency.in_use" = permits);
144                    objectstore_metrics::gauge!("service.concurrency.limit" = limit);
145                })
146                .await;
147        });
148    }
149
150    /// Spawns a future in a separate task and awaits its result.
151    ///
152    /// Returns [`Error::AtCapacity`] if the concurrency limit is reached,
153    /// [`Error::Panic`] if the spawned task panics (the panic message
154    /// is captured for diagnostics), or [`Error::Dropped`] if the task is
155    /// dropped before sending its result.
156    ///
157    /// Emits `service.task.start` (counter) after acquiring a permit and
158    /// `service.task.duration` (distribution) when the task completes, tagged
159    /// with the given `operation` name and an `outcome` of `"success"` or
160    /// `"error"`.
161    async fn spawn<T, F>(&self, operation: &'static str, f: F) -> Result<T>
162    where
163        T: Send + 'static,
164        F: Future<Output = Result<T>> + Send + 'static,
165    {
166        let permit = self.concurrency.try_acquire().inspect_err(|_| {
167            objectstore_metrics::count!("service.concurrency.rejected");
168            objectstore_log::warn!("Request rejected: service at capacity");
169        })?;
170
171        crate::concurrency::spawn_metered(operation, permit, f).await
172    }
173
174    /// Creates or overwrites an object.
175    ///
176    /// The object is identified by the components of an [`ObjectId`]. The
177    /// `context` is required, while the `key` can be assigned automatically if
178    /// set to `None`.
179    ///
180    /// # Run-to-completion
181    ///
182    /// Once called, the operation runs to completion even if the returned future
183    /// is dropped (e.g., on client disconnect). This guarantees that partially
184    /// written objects in the backend are never left in an inconsistent state.
185    pub async fn insert_object(
186        &self,
187        context: ObjectContext,
188        key: Option<String>,
189        metadata: Metadata,
190        stream: ClientStream,
191    ) -> Result<InsertResponse> {
192        let id = ObjectId::optional(context, key);
193        let inner = Arc::clone(&self.inner);
194        self.spawn("insert", async move {
195            inner.put_object(&id, &metadata, stream).await?;
196            Ok(id)
197        })
198        .await
199    }
200
201    /// Retrieves only the metadata for an object, without the payload.
202    pub async fn get_metadata(&self, id: ObjectId) -> Result<MetadataResponse> {
203        let inner = Arc::clone(&self.inner);
204        self.spawn("get_metadata", async move { inner.get_metadata(&id).await })
205            .await
206    }
207
208    /// Streams the contents of an object.
209    pub async fn get_object(&self, id: ObjectId) -> Result<GetResponse> {
210        let inner = Arc::clone(&self.inner);
211        self.spawn("get", async move { inner.get_object(&id).await })
212            .await
213    }
214
215    /// Deletes an object, if it exists.
216    ///
217    /// # Run-to-completion
218    ///
219    /// Once called, the operation runs to completion even if the returned future
220    /// is dropped. This guarantees that multi-step delete sequences in the backend
221    /// are never left partially applied.
222    pub async fn delete_object(&self, id: ObjectId) -> Result<DeleteResponse> {
223        let inner = Arc::clone(&self.inner);
224        self.spawn("delete", async move { inner.delete_object(&id).await })
225            .await
226    }
227
228    /// Waits for all outstanding background operations to complete.
229    ///
230    /// Blocks until any pending background cleanup tasks finish, up to the
231    /// backend's configured timeout. Should be called during graceful shutdown
232    /// after the HTTP server has stopped accepting new requests.
233    pub async fn join(&self) {
234        self.inner.join().await;
235    }
236}
237
238#[cfg(test)]
239mod tests {
240    use std::sync::Arc;
241    use std::time::Duration;
242
243    use bytes::BytesMut;
244    use futures_util::TryStreamExt;
245    use objectstore_types::metadata::Metadata;
246    use objectstore_types::scope::{Scope, Scopes};
247
248    use super::*;
249    use crate::backend::bigtable::{BigTableBackend, BigTableConfig};
250    use crate::backend::changelog::NoopChangeLog;
251    use crate::backend::common::{HighVolumeBackend, PutResponse, TieredWrite};
252    use crate::backend::gcs::{GcsBackend, GcsConfig};
253    use crate::backend::in_memory::InMemoryBackend;
254    use crate::backend::testing::{Hooks, TestBackend};
255    use crate::backend::tiered::TieredStorage;
256    use crate::error::Error;
257    use crate::stream::{self, ClientStream};
258
259    fn make_context() -> ObjectContext {
260        ObjectContext {
261            usecase: "testing".into(),
262            scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
263        }
264    }
265
266    fn make_service() -> StorageService {
267        StorageService::new(Box::new(InMemoryBackend::new("in-memory")))
268    }
269
270    #[tokio::test]
271    async fn insert_without_key_generates_unique_id() {
272        let service = make_service();
273
274        let id = service
275            .insert_object(
276                make_context(),
277                None,
278                Default::default(),
279                stream::single("auto-keyed"),
280            )
281            .await
282            .unwrap();
283
284        assert!(uuid::Uuid::parse_str(id.key()).is_ok());
285    }
286
287    #[tokio::test]
288    async fn stores_files() {
289        let service = make_service();
290
291        let key = service
292            .insert_object(
293                make_context(),
294                Some("testing".into()),
295                Default::default(),
296                stream::single("oh hai!"),
297            )
298            .await
299            .unwrap();
300
301        let (_metadata, stream) = service.get_object(key).await.unwrap().unwrap();
302        let file_contents: BytesMut = stream.try_collect().await.unwrap();
303
304        assert_eq!(file_contents.as_ref(), b"oh hai!");
305    }
306
307    #[tokio::test]
308    async fn works_with_gcs() {
309        let config = GcsConfig {
310            endpoint: Some("http://localhost:8087".into()),
311            bucket: "test-bucket".into(), // aligned with the env var in devservices and CI
312        };
313
314        let backend = GcsBackend::new(config).await.unwrap();
315        let service = StorageService::new(Box::new(backend));
316
317        let key = service
318            .insert_object(
319                make_context(),
320                Some("testing".into()),
321                Default::default(),
322                stream::single("oh hai!"),
323            )
324            .await
325            .unwrap();
326
327        let (_metadata, stream) = service.get_object(key).await.unwrap().unwrap();
328        let file_contents: BytesMut = stream.try_collect().await.unwrap();
329
330        assert_eq!(file_contents.as_ref(), b"oh hai!");
331    }
332
333    #[tokio::test]
334    async fn tombstone_redirect_and_delete() {
335        let bigtable_config = BigTableConfig {
336            endpoint: Some("localhost:8086".into()),
337            project_id: "testing".into(),
338            instance_name: "objectstore".into(),
339            table_name: "objectstore".into(),
340            connections: None,
341        };
342        let gcs_config = GcsConfig {
343            endpoint: Some("http://localhost:8087".into()),
344            bucket: "test-bucket".into(),
345        };
346
347        let high_volume = Box::new(BigTableBackend::new(bigtable_config).await.unwrap());
348        let long_term = Box::new(GcsBackend::new(gcs_config.clone()).await.unwrap());
349        let backend = TieredStorage::new(high_volume, long_term, Box::new(NoopChangeLog));
350        let service = StorageService::new(Box::new(backend));
351
352        // A separate GCS backend to directly inspect the long-term storage.
353        let gcs_backend = GcsBackend::new(gcs_config.clone()).await.unwrap();
354
355        // Insert a >1 MiB object with a key.  This forces the long-term path:
356        // the real payload goes to GCS, and a redirect tombstone is written to BigTable.
357        let payload_len = 2 * 1024 * 1024;
358        let payload = vec![0xAB; payload_len]; // 2 MiB
359        let id = service
360            .insert_object(
361                make_context(),
362                Some("delete-cleanup-test".into()),
363                Default::default(),
364                stream::single(payload),
365            )
366            .await
367            .unwrap();
368
369        // Sanity: the object is readable through the service (follows the tombstone).
370        let (_, stream) = service.get_object(id.clone()).await.unwrap().unwrap();
371        let body: BytesMut = stream.try_collect().await.unwrap();
372        assert_eq!(body.len(), payload_len);
373
374        // Delete through the service layer.
375        service.delete_object(id.clone()).await.unwrap();
376
377        // The tombstone in BigTable should be gone, so the service returns None.
378        let after_delete = service.get_object(id.clone()).await.unwrap();
379        assert!(after_delete.is_none(), "tombstone not deleted");
380
381        // The real object in GCS must also be gone — no orphan.
382        let orphan = gcs_backend.get_object(&id).await.unwrap();
383        assert!(orphan.is_none(), "object leaked");
384    }
385
386    // --- Task spawning tests (public API) ---
387
388    #[tokio::test]
389    async fn basic_spawn_insert_and_get() {
390        let service = make_service();
391
392        let id = service
393            .insert_object(
394                make_context(),
395                Some("test-key".into()),
396                Metadata::default(),
397                stream::single("hello world"),
398            )
399            .await
400            .unwrap();
401
402        let (_, stream) = service.get_object(id).await.unwrap().unwrap();
403        let body: BytesMut = stream.try_collect().await.unwrap();
404        assert_eq!(body.as_ref(), b"hello world");
405    }
406
407    #[tokio::test]
408    async fn basic_spawn_metadata_and_delete() {
409        let service = make_service();
410
411        let id = service
412            .insert_object(
413                make_context(),
414                Some("meta-key".into()),
415                Metadata::default(),
416                stream::single("data"),
417            )
418            .await
419            .unwrap();
420
421        let metadata = service.get_metadata(id.clone()).await.unwrap();
422        assert!(metadata.is_some());
423
424        service.delete_object(id.clone()).await.unwrap();
425
426        let after = service.get_object(id).await.unwrap();
427        assert!(after.is_none());
428    }
429
430    #[derive(Debug)]
431    struct PanicOnGet;
432
433    #[async_trait::async_trait]
434    impl Hooks for PanicOnGet {
435        async fn get_object(
436            &self,
437            _inner: &InMemoryBackend,
438            _id: &ObjectId,
439        ) -> Result<GetResponse> {
440            panic!("intentional panic in get_object");
441        }
442    }
443
444    #[tokio::test]
445    async fn panic_in_backend_returns_task_failed() {
446        let service = StorageService::new(Box::new(TestBackend::new(PanicOnGet)));
447
448        let id = ObjectId::new(make_context(), "panic-test".into());
449        let result = service.get_object(id).await;
450
451        let Err(Error::Panic(msg)) = result else {
452            panic!("expected Panic error");
453        };
454        assert!(msg.contains("intentional panic in get_object"), "{msg}");
455    }
456
457    /// In-memory backend with optional synchronization for `put_object`.
458    ///
459    /// When `pause` is enabled, each `put_object` call notifies `paused` and
460    #[derive(Clone, Debug, Default)]
461    struct GateOnPut {
462        pause: bool,
463        paused: Arc<tokio::sync::Notify>,
464        resume: Arc<tokio::sync::Notify>,
465        on_put: Arc<tokio::sync::Notify>,
466    }
467
468    impl GateOnPut {
469        fn with_pause() -> Self {
470            Self {
471                pause: true,
472                ..Default::default()
473            }
474        }
475    }
476
477    #[async_trait::async_trait]
478    impl Hooks for GateOnPut {
479        async fn put_object(
480            &self,
481            inner: &InMemoryBackend,
482            id: &ObjectId,
483            metadata: &Metadata,
484            stream: ClientStream,
485        ) -> Result<PutResponse> {
486            if self.pause {
487                self.paused.notify_one();
488                self.resume.notified().await;
489            }
490            inner.put_object(id, metadata, stream).await?;
491            self.on_put.notify_one();
492            Ok(())
493        }
494
495        async fn compare_and_write(
496            &self,
497            inner: &InMemoryBackend,
498            id: &ObjectId,
499            current: Option<&ObjectId>,
500            write: TieredWrite,
501        ) -> Result<bool> {
502            let notify = matches!(write, TieredWrite::Tombstone(_) | TieredWrite::Object(_, _));
503            let result = inner.compare_and_write(id, current, write).await?;
504            if notify {
505                self.on_put.notify_one();
506            }
507            Ok(result)
508        }
509    }
510
511    #[tokio::test]
512    async fn receiver_drop_does_not_prevent_completion() {
513        let hv = Box::new(TestBackend::new(GateOnPut::default()));
514        let lt = Box::new(TestBackend::new(GateOnPut::with_pause()));
515        let backend = TieredStorage::new(hv.clone(), lt.clone(), Box::new(NoopChangeLog));
516        let service = StorageService::new(Box::new(backend));
517
518        let payload = vec![0xABu8; 2 * 1024 * 1024]; // 2 MiB → long-term path
519        let request = service.insert_object(
520            make_context(),
521            Some("completion-test".into()),
522            Metadata::default(),
523            stream::single(payload),
524        );
525
526        // Start insert through the public API. select! drops the future once the
527        // backend signals it has paused, simulating a client disconnect mid-write.
528        let paused = Arc::clone(&lt.hooks.paused);
529        tokio::select! {
530            _ = request => panic!("insert should not complete while backend is paused"),
531            _ = paused.notified() => {}
532        }
533
534        // The spawned task is now blocked inside put_object, and the caller
535        // request (including the oneshot receiver) has been dropped. Unpause so
536        // the task can finish writing.
537        lt.hooks.resume.notify_one();
538
539        // Wait for the tombstone write to the high-volume backend, which is the
540        // last step of the long-term insert path.
541        let on_put = Arc::clone(&hv.hooks.on_put);
542        tokio::time::timeout(Duration::from_secs(5), on_put.notified())
543            .await
544            .expect("timed out waiting for tombstone write");
545
546        // Verify the object was fully written despite the caller being dropped.
547        // The tombstone in HV points to the revision key in LT.
548        let id = ObjectId::new(make_context(), "completion-test".into());
549        let tombstone = hv.inner.get(&id).expect_tombstone();
550        let lt_id = tombstone.target;
551        assert!(lt.inner.contains(&lt_id), "long-term object missing");
552    }
553
554    // --- Concurrency limit tests ---
555
556    fn make_limited_service(limit: usize) -> (StorageService, TestBackend<GateOnPut>) {
557        let backend = TestBackend::new(GateOnPut::with_pause());
558        let service = StorageService::new(Box::new(backend.clone())).with_concurrency_limit(limit);
559        (service, backend)
560    }
561
562    #[tokio::test]
563    async fn at_capacity_rejects() {
564        let (service, hv) = make_limited_service(1);
565
566        // First insert blocks on the gated backend, holding the single permit.
567        let svc = service.clone();
568        let first = tokio::spawn(async move {
569            svc.insert_object(
570                make_context(),
571                Some("first".into()),
572                Metadata::default(),
573                stream::single("data"),
574            )
575            .await
576        });
577
578        // Wait for the backend to signal it has paused (permit is held).
579        hv.hooks.paused.notified().await;
580
581        // Second insert should be rejected immediately.
582        let result = service
583            .insert_object(
584                make_context(),
585                Some("second".into()),
586                Metadata::default(),
587                stream::single("data"),
588            )
589            .await;
590
591        assert!(
592            matches!(result, Err(Error::AtCapacity)),
593            "expected AtCapacity, got {result:?}"
594        );
595
596        // Unblock the first operation.
597        hv.hooks.resume.notify_one();
598        first.await.unwrap().unwrap();
599
600        // Now that the permit is released, a new operation should succeed.
601        service
602            .get_metadata(ObjectId::new(make_context(), "first".into()))
603            .await
604            .unwrap();
605    }
606
607    #[tokio::test]
608    async fn tasks_limit_returns_configured_limit() {
609        let backend = Box::new(InMemoryBackend::new("cap"));
610        let service = StorageService::new(backend).with_concurrency_limit(7);
611        assert_eq!(service.tasks_limit(), 7);
612    }
613
614    #[tokio::test]
615    async fn tasks_running_tracks_in_flight() {
616        let (service, hv) = make_limited_service(5);
617
618        assert_eq!(service.tasks_running(), 0);
619
620        // Kick off a request that blocks in the backend, holding a permit.
621        let svc = service.clone();
622        let _blocked = tokio::spawn(async move {
623            svc.insert_object(
624                make_context(),
625                Some("in-use-test".into()),
626                Metadata::default(),
627                stream::single("data"),
628            )
629            .await
630        });
631
632        hv.hooks.paused.notified().await;
633        assert_eq!(service.tasks_running(), 1);
634
635        hv.hooks.resume.notify_one();
636    }
637
638    #[tokio::test]
639    async fn permits_released_after_panic() {
640        let service =
641            StorageService::new(Box::new(TestBackend::new(PanicOnGet))).with_concurrency_limit(1);
642
643        // First operation panics — the permit must still be released.
644        let id = ObjectId::new(make_context(), "panic-permit".into());
645        let result = service.get_object(id.clone()).await;
646        assert!(matches!(result, Err(Error::Panic(_))));
647
648        // Second operation should succeed in acquiring the permit (not AtCapacity).
649        let result = service.get_object(id).await;
650        assert!(
651            !matches!(result, Err(Error::AtCapacity)),
652            "permit was not released after panic"
653        );
654    }
655}