Skip to main content

objectstore_service/
service.rs

1//! Core storage service and configuration.
2//!
3//! [`StorageService`] is the main entry point for storing and retrieving
4//! objects. Each operation runs in a separate tokio task for panic isolation.
5//!
6//! See the [crate-level documentation](crate) for full architecture details.
7
8use std::future::Future;
9use std::sync::Arc;
10
11use objectstore_types::metadata::Metadata;
12use objectstore_types::range::{ByteRange, ContentRange};
13
14use crate::backend::common::Backend;
15use crate::backend::counting::CountingBackend;
16use crate::concurrency::ConcurrencyLimiter;
17use crate::error::{Error, Result};
18use crate::id::{ObjectContext, ObjectId};
19use crate::multipart::{
20    AbortMultipartResponse, CompleteMultipartResponse, CompletedPart, InitiateMultipartResponse,
21    ListPartsResponse, PartNumber, UploadId, UploadPartResponse,
22};
23use crate::stream::{ClientStream, PayloadStream};
24use crate::streaming::StreamExecutor;
25
26/// Service response for [`StorageService::get_object`].
27pub type GetResponse = Option<(Metadata, Option<ContentRange>, PayloadStream)>;
28/// Service response for [`StorageService::get_metadata`].
29pub type MetadataResponse = Option<Metadata>;
30/// Service response for [`StorageService::insert_object`].
31pub type InsertResponse = ObjectId;
32/// Service response for [`StorageService::delete_object`].
33pub type DeleteResponse = ();
34
35/// Default concurrency limit for [`StorageService`].
36///
37/// This value is used when no explicit limit is set via
38/// [`StorageService::with_concurrency_limit`].
39pub const DEFAULT_CONCURRENCY_LIMIT: usize = 500;
40
41/// Asynchronous storage service wrapping a single [`Backend`].
42///
43/// `StorageService` is the main entry point for storing and retrieving objects.
44/// It delegates all storage operations to the backend supplied at construction,
45/// adding task spawning, panic isolation, and a concurrency limit on top.
46///
47/// The typical backend is [`TieredStorage`](crate::backend::tiered::TieredStorage),
48/// which provides size-based routing to high-volume and long-term backends along
49/// with redirect tombstone management. Any type implementing [`Backend`] can be used.
50///
51/// # Lifecycle
52///
53/// After construction, call [`start`](StorageService::start) to start the
54/// service's background processes.
55///
56/// # Run-to-Completion and Panic Isolation
57///
58/// Each operation runs to completion even if the caller is cancelled (e.g., on
59/// client disconnect). This ensures that multi-step operations in the backend
60/// are never left partially applied. Post-commit cleanup (e.g. deleting
61/// unreferenced long-term blobs) runs in background tasks so callers are not
62/// blocked. Call [`join`](StorageService::join) during shutdown to wait for
63/// outstanding cleanup. Operations are also isolated from panics in backend
64/// code — a failure in one operation does not bring down other in-flight work.
65/// See [`Error::Panic`].
66///
67/// # Concurrency Limit
68///
69/// A semaphore caps the number of in-flight backend operations. The limit is
70/// configured via [`with_concurrency_limit`](StorageService::with_concurrency_limit);
71/// without an explicit value the default is [`DEFAULT_CONCURRENCY_LIMIT`].
72/// Operations that exceed the limit are rejected immediately with
73/// [`Error::AtCapacity`].
74#[derive(Clone, Debug)]
75pub struct StorageService {
76    inner: Arc<dyn Backend>,
77    concurrency: ConcurrencyLimiter,
78}
79
80impl StorageService {
81    /// Creates a new `StorageService` wrapping the given backend.
82    ///
83    /// The backend is wrapped in a [`CountingBackend`] which increments a COGS usage counter for
84    /// each operation run. Single-object operations served directly by `StorageService` are covered
85    /// as we batched operations served by [`StreamExecutor`]. See
86    /// [`backend::counting`](crate::backend::counting) for details.
87    pub fn new(backend: Box<dyn Backend>) -> Self {
88        Self {
89            inner: Arc::new(CountingBackend::new(backend)),
90            concurrency: ConcurrencyLimiter::new(DEFAULT_CONCURRENCY_LIMIT),
91        }
92    }
93
94    /// Sets the maximum number of concurrent backend operations.
95    ///
96    /// Must be called before [`start`](Self::start). Operations beyond this
97    /// limit are rejected with [`Error::AtCapacity`].
98    pub fn with_concurrency_limit(mut self, max: usize) -> Self {
99        self.concurrency = ConcurrencyLimiter::new(max);
100        self
101    }
102
103    /// Returns the number of backend task slots currently available.
104    pub fn tasks_available(&self) -> usize {
105        self.concurrency.available_permits()
106    }
107
108    /// Returns the number of backend tasks currently running.
109    pub fn tasks_running(&self) -> usize {
110        self.concurrency.used_permits()
111    }
112
113    /// Returns the configured limit for concurrent backend tasks.
114    pub fn tasks_limit(&self) -> usize {
115        self.concurrency.total_permits()
116    }
117
118    /// Prepares to stream multiple operations concurrently against this service.
119    ///
120    /// Operations are executed concurrently up to a window derived from the
121    /// service's current capacity. The permits for that window are reserved
122    /// upfront — if the service is at capacity, this returns
123    /// [`Error::AtCapacity`] immediately before any operations are read.
124    pub fn stream(&self) -> Result<StreamExecutor> {
125        let available = self.tasks_available();
126        let window = (available as f64 * 0.10).ceil() as usize;
127
128        let acquire_result = match window {
129            0 => Err(Error::AtCapacity),
130            _ => self.concurrency.try_acquire_many(window),
131        };
132        let reservation = acquire_result.inspect_err(|_| {
133            objectstore_metrics::count!("service.concurrency.rejected");
134            objectstore_log::warn!("Request rejected: service at capacity");
135        })?;
136
137        Ok(StreamExecutor {
138            backend: Arc::clone(&self.inner),
139            window,
140            reservation,
141        })
142    }
143
144    /// Starts background processes for the storage service.
145    ///
146    /// Currently spawns a task that emits the `service.concurrency.in_use`
147    /// and `service.concurrency.limit` gauges once per second.
148    pub fn start(&self) {
149        let concurrency = self.concurrency.clone();
150        let limit = concurrency.total_permits();
151        tokio::spawn(async move {
152            concurrency
153                .run_emitter(|permits| async move {
154                    objectstore_metrics::gauge!("service.concurrency.in_use" = permits);
155                    objectstore_metrics::gauge!("service.concurrency.limit" = limit);
156                })
157                .await;
158        });
159    }
160
161    /// Spawns a future in a separate task and awaits its result.
162    ///
163    /// Returns [`Error::AtCapacity`] if the concurrency limit is reached,
164    /// [`Error::Panic`] if the spawned task panics (the panic message
165    /// is captured for diagnostics), or [`Error::Dropped`] if the task is
166    /// dropped before sending its result.
167    ///
168    /// Emits `service.task.start` (counter) after acquiring a permit and
169    /// `service.task.duration` (distribution) when the task completes, tagged
170    /// with the given `operation` name and an `outcome` of `"success"` or
171    /// `"error"`.
172    async fn spawn<T, F>(&self, operation: &'static str, f: F) -> Result<T>
173    where
174        T: Send + 'static,
175        F: Future<Output = Result<T>> + Send + 'static,
176    {
177        let permit = self.concurrency.try_acquire().inspect_err(|_| {
178            objectstore_metrics::count!("service.concurrency.rejected");
179            objectstore_log::warn!("Request rejected: service at capacity");
180        })?;
181
182        crate::concurrency::spawn_metered(operation, permit, f).await
183    }
184
185    /// Creates or overwrites an object.
186    ///
187    /// The object is identified by the components of an [`ObjectId`]. The
188    /// `context` is required, while the `key` can be assigned automatically if
189    /// set to `None`.
190    ///
191    /// # Run-to-completion
192    ///
193    /// Once called, the operation runs to completion even if the returned future
194    /// is dropped (e.g., on client disconnect). This guarantees that partially
195    /// written objects in the backend are never left in an inconsistent state.
196    pub async fn insert_object(
197        &self,
198        context: ObjectContext,
199        key: Option<String>,
200        metadata: Metadata,
201        stream: ClientStream,
202    ) -> Result<InsertResponse> {
203        let id = ObjectId::optional(context, key);
204        let inner = Arc::clone(&self.inner);
205        self.spawn("insert", async move {
206            inner.put_object(&id, &metadata, stream).await?;
207            Ok(id)
208        })
209        .await
210    }
211
212    /// Retrieves only the metadata for an object, without the payload.
213    pub async fn get_metadata(&self, id: ObjectId) -> Result<MetadataResponse> {
214        let inner = Arc::clone(&self.inner);
215        self.spawn("get_metadata", async move { inner.get_metadata(&id).await })
216            .await
217    }
218
219    /// Streams (part of) the contents of an object.
220    pub async fn get_object(&self, id: ObjectId, range: Option<ByteRange>) -> Result<GetResponse> {
221        let inner = Arc::clone(&self.inner);
222        self.spawn("get", async move { inner.get_object(&id, range).await })
223            .await
224    }
225
226    /// Deletes an object, if it exists.
227    ///
228    /// # Run-to-completion
229    ///
230    /// Once called, the operation runs to completion even if the returned future
231    /// is dropped. This guarantees that multi-step delete sequences in the backend
232    /// are never left partially applied.
233    pub async fn delete_object(&self, id: ObjectId) -> Result<DeleteResponse> {
234        let inner = Arc::clone(&self.inner);
235        self.spawn("delete", async move { inner.delete_object(&id).await })
236            .await
237    }
238
239    /// Waits for all outstanding background operations to complete.
240    ///
241    /// Blocks until any pending background cleanup tasks finish, up to the
242    /// backend's configured timeout. Should be called during graceful shutdown
243    /// after the HTTP server has stopped accepting new requests.
244    pub async fn join(&self) {
245        self.inner.join().await;
246    }
247
248    // --- Multipart upload operations ---
249
250    /// Initiates a new multipart upload.
251    pub async fn initiate_multipart(
252        &self,
253        id: ObjectId,
254        metadata: Metadata,
255    ) -> Result<InitiateMultipartResponse> {
256        self.inner.as_multipart_upload_backend()?; // Fail before clone/spawn if unsupported
257        let inner = self.inner.clone();
258        self.spawn("initiate_multipart", async move {
259            inner
260                .as_multipart_upload_backend()?
261                .initiate_multipart(&id, &metadata)
262                .await
263        })
264        .await
265    }
266
267    /// Uploads a single part.
268    ///
269    /// Note that this requires a `content_length`.
270    /// This grants us the broadest and most seamless compatibility when it comes to backends.
271    /// For example, MinIO rejects `UploadPart` requests without a `Content-Length` on plain PUT
272    /// requests.
273    /// This can be worked around by using AWS SigV4 chunked streaming requests, which we could use
274    /// if one day we'll have a usecase where the client doesn't know the part length upfront.
275    pub async fn upload_part(
276        &self,
277        id: ObjectId,
278        upload_id: UploadId,
279        part_number: PartNumber,
280        content_length: u64,
281        content_md5: Option<String>,
282        body: ClientStream,
283    ) -> Result<UploadPartResponse> {
284        self.inner.as_multipart_upload_backend()?; // Fail before clone/spawn if unsupported
285        let inner = self.inner.clone();
286        self.spawn("upload_part", async move {
287            inner
288                .as_multipart_upload_backend()?
289                .upload_part(
290                    &id,
291                    &upload_id,
292                    part_number,
293                    content_length,
294                    content_md5.as_deref(),
295                    body,
296                )
297                .await
298        })
299        .await
300    }
301
302    /// Lists the parts uploaded so far.
303    pub async fn list_parts(
304        &self,
305        id: ObjectId,
306        upload_id: UploadId,
307        max_parts: Option<u32>,
308        part_number_marker: Option<PartNumber>,
309    ) -> Result<ListPartsResponse> {
310        self.inner.as_multipart_upload_backend()?; // Fail before clone/spawn if unsupported
311        let inner = self.inner.clone();
312        self.spawn("list_parts", async move {
313            inner
314                .as_multipart_upload_backend()?
315                .list_parts(&id, &upload_id, max_parts, part_number_marker)
316                .await
317        })
318        .await
319    }
320
321    /// Aborts a multipart upload.
322    pub async fn abort_multipart(
323        &self,
324        id: ObjectId,
325        upload_id: UploadId,
326    ) -> Result<AbortMultipartResponse> {
327        self.inner.as_multipart_upload_backend()?; // Fail before clone/spawn if unsupported
328        let inner = self.inner.clone();
329        self.spawn("abort_multipart", async move {
330            inner
331                .as_multipart_upload_backend()?
332                .abort_multipart(&id, &upload_id)
333                .await
334        })
335        .await
336    }
337
338    /// Finalizes a multipart upload.
339    pub async fn complete_multipart(
340        &self,
341        id: ObjectId,
342        upload_id: UploadId,
343        parts: Vec<CompletedPart>,
344    ) -> Result<CompleteMultipartResponse> {
345        self.inner.as_multipart_upload_backend()?; // Fail before clone/spawn if unsupported
346        let inner = self.inner.clone();
347        self.spawn("complete_multipart", async move {
348            inner
349                .as_multipart_upload_backend()?
350                .complete_multipart(&id, &upload_id, parts)
351                .await
352        })
353        .await
354    }
355}
356
357#[cfg(test)]
358mod tests {
359    use std::sync::Arc;
360    use std::time::Duration;
361
362    use bytes::BytesMut;
363    use futures_util::TryStreamExt;
364    use objectstore_types::metadata::Metadata;
365    use objectstore_types::range::ByteRange;
366    use objectstore_types::scope::{Scope, Scopes};
367
368    use super::*;
369    use crate::backend::bigtable::{BigTableBackend, BigTableConfig};
370    use crate::backend::changelog::NoopChangeLog;
371    use crate::backend::common::{HighVolumeBackend, PutResponse, TieredWrite};
372    use crate::backend::gcs::{GcsBackend, GcsConfig};
373    use crate::backend::in_memory::InMemoryBackend;
374    use crate::backend::testing::{Hooks, TestBackend};
375    use crate::backend::tiered::TieredStorage;
376    use crate::error::Error;
377    use crate::stream::{self, ClientStream};
378
379    fn make_context() -> ObjectContext {
380        ObjectContext {
381            usecase: "testing".into(),
382            scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
383        }
384    }
385
386    fn make_service() -> StorageService {
387        StorageService::new(Box::new(InMemoryBackend::new("in-memory")))
388    }
389
390    #[tokio::test]
391    async fn insert_without_key_generates_unique_id() {
392        let service = make_service();
393
394        let id = service
395            .insert_object(
396                make_context(),
397                None,
398                Metadata::default(),
399                stream::single("auto-keyed"),
400            )
401            .await
402            .unwrap();
403
404        assert!(uuid::Uuid::parse_str(id.key()).is_ok());
405    }
406
407    #[tokio::test]
408    async fn stores_files() {
409        let service = make_service();
410
411        let key = service
412            .insert_object(
413                make_context(),
414                Some("testing".into()),
415                Metadata::default(),
416                stream::single("oh hai!"),
417            )
418            .await
419            .unwrap();
420
421        let (_metadata, _, stream) = service.get_object(key, None).await.unwrap().unwrap();
422        let file_contents: BytesMut = stream.try_collect().await.unwrap();
423
424        assert_eq!(file_contents.as_ref(), b"oh hai!");
425    }
426
427    #[tokio::test]
428    async fn works_with_gcs() {
429        let config = GcsConfig {
430            endpoint: Some("http://localhost:8087".into()),
431            bucket: "test-bucket".into(), // aligned with the env var in devservices and CI
432        };
433
434        let backend = GcsBackend::new(config).await.unwrap();
435        let service = StorageService::new(Box::new(backend));
436
437        let key = service
438            .insert_object(
439                make_context(),
440                Some("testing".into()),
441                Metadata::default(),
442                stream::single("oh hai!"),
443            )
444            .await
445            .unwrap();
446
447        let (_metadata, _, stream) = service.get_object(key, None).await.unwrap().unwrap();
448        let file_contents: BytesMut = stream.try_collect().await.unwrap();
449
450        assert_eq!(file_contents.as_ref(), b"oh hai!");
451    }
452
453    #[tokio::test]
454    async fn tombstone_redirect_and_delete() {
455        let bigtable_config = BigTableConfig {
456            endpoint: Some("localhost:8086".into()),
457            project_id: "testing".into(),
458            instance_name: "objectstore".into(),
459            table_name: "objectstore".into(),
460            connections: None,
461        };
462        let gcs_config = GcsConfig {
463            endpoint: Some("http://localhost:8087".into()),
464            bucket: "test-bucket".into(),
465        };
466
467        let high_volume = Box::new(BigTableBackend::new(bigtable_config).await.unwrap());
468        let long_term = Box::new(GcsBackend::new(gcs_config.clone()).await.unwrap());
469        let backend = TieredStorage::new(high_volume, long_term, Box::new(NoopChangeLog));
470        let service = StorageService::new(Box::new(backend));
471
472        // A separate GCS backend to directly inspect the long-term storage.
473        let gcs_backend = GcsBackend::new(gcs_config.clone()).await.unwrap();
474
475        // Insert a >1 MiB object with a key.  This forces the long-term path:
476        // the real payload goes to GCS, and a redirect tombstone is written to BigTable.
477        let payload_len = 2 * 1024 * 1024;
478        let payload = vec![0xAB; payload_len]; // 2 MiB
479        let id = service
480            .insert_object(
481                make_context(),
482                Some("delete-cleanup-test".into()),
483                Metadata::default(),
484                stream::single(payload),
485            )
486            .await
487            .unwrap();
488
489        // Sanity: the object is readable through the service (follows the tombstone).
490        let (_, _, stream) = service.get_object(id.clone(), None).await.unwrap().unwrap();
491        let body: BytesMut = stream.try_collect().await.unwrap();
492        assert_eq!(body.len(), payload_len);
493
494        // Delete through the service layer.
495        service.delete_object(id.clone()).await.unwrap();
496
497        // The tombstone in BigTable should be gone, so the service returns None.
498        let after_delete = service.get_object(id.clone(), None).await.unwrap();
499        assert!(after_delete.is_none(), "tombstone not deleted");
500
501        // The real object in GCS must also be gone — no orphan.
502        let orphan = gcs_backend.get_object(&id, None).await.unwrap();
503        assert!(orphan.is_none(), "object leaked");
504    }
505
506    // --- Task spawning tests (public API) ---
507
508    #[tokio::test]
509    async fn basic_spawn_insert_and_get() {
510        let service = make_service();
511
512        let id = service
513            .insert_object(
514                make_context(),
515                Some("test-key".into()),
516                Metadata::default(),
517                stream::single("hello world"),
518            )
519            .await
520            .unwrap();
521
522        let (_, _, stream) = service.get_object(id, None).await.unwrap().unwrap();
523        let body: BytesMut = stream.try_collect().await.unwrap();
524        assert_eq!(body.as_ref(), b"hello world");
525    }
526
527    #[tokio::test]
528    async fn basic_spawn_metadata_and_delete() {
529        let service = make_service();
530
531        let id = service
532            .insert_object(
533                make_context(),
534                Some("meta-key".into()),
535                Metadata::default(),
536                stream::single("data"),
537            )
538            .await
539            .unwrap();
540
541        let metadata = service.get_metadata(id.clone()).await.unwrap();
542        assert!(metadata.is_some());
543
544        service.delete_object(id.clone()).await.unwrap();
545
546        let after = service.get_object(id, None).await.unwrap();
547        assert!(after.is_none());
548    }
549
550    #[derive(Debug)]
551    struct PanicOnGet;
552
553    #[async_trait::async_trait]
554    impl Hooks for PanicOnGet {
555        async fn get_object(
556            &self,
557            _inner: &InMemoryBackend,
558            _id: &ObjectId,
559            _range: Option<ByteRange>,
560        ) -> Result<GetResponse> {
561            panic!("intentional panic in get_object");
562        }
563    }
564
565    #[tokio::test]
566    async fn panic_in_backend_returns_task_failed() {
567        let service = StorageService::new(Box::new(TestBackend::new(PanicOnGet)));
568
569        let id = ObjectId::new(make_context(), "panic-test".into());
570        let result = service.get_object(id, None).await;
571
572        let Err(Error::Panic(msg)) = result else {
573            panic!("expected Panic error");
574        };
575        assert!(msg.contains("intentional panic in get_object"), "{msg}");
576    }
577
578    /// In-memory backend with optional synchronization for `put_object`.
579    ///
580    /// When `pause` is enabled, each `put_object` call notifies `paused` and
581    #[derive(Clone, Debug, Default)]
582    struct GateOnPut {
583        pause: bool,
584        paused: Arc<tokio::sync::Notify>,
585        resume: Arc<tokio::sync::Notify>,
586        on_put: Arc<tokio::sync::Notify>,
587    }
588
589    impl GateOnPut {
590        fn with_pause() -> Self {
591            Self {
592                pause: true,
593                ..Default::default()
594            }
595        }
596    }
597
598    #[async_trait::async_trait]
599    impl Hooks for GateOnPut {
600        async fn put_object(
601            &self,
602            inner: &InMemoryBackend,
603            id: &ObjectId,
604            metadata: &Metadata,
605            stream: ClientStream,
606        ) -> Result<PutResponse> {
607            if self.pause {
608                self.paused.notify_one();
609                self.resume.notified().await;
610            }
611            inner.put_object(id, metadata, stream).await?;
612            self.on_put.notify_one();
613            Ok(())
614        }
615
616        async fn compare_and_write(
617            &self,
618            inner: &InMemoryBackend,
619            id: &ObjectId,
620            current: Option<&ObjectId>,
621            write: TieredWrite,
622        ) -> Result<bool> {
623            let notify = matches!(write, TieredWrite::Tombstone(_) | TieredWrite::Object(_, _));
624            let result = inner.compare_and_write(id, current, write).await?;
625            if notify {
626                self.on_put.notify_one();
627            }
628            Ok(result)
629        }
630    }
631
632    #[tokio::test]
633    async fn receiver_drop_does_not_prevent_completion() {
634        let hv = Box::new(TestBackend::new(GateOnPut::default()));
635        let lt = Box::new(TestBackend::new(GateOnPut::with_pause()));
636        let backend = TieredStorage::new(hv.clone(), lt.clone(), Box::new(NoopChangeLog));
637        let service = StorageService::new(Box::new(backend));
638
639        let payload = vec![0xABu8; 2 * 1024 * 1024]; // 2 MiB → long-term path
640        let request = service.insert_object(
641            make_context(),
642            Some("completion-test".into()),
643            Metadata::default(),
644            stream::single(payload),
645        );
646
647        // Start insert through the public API. select! drops the future once the
648        // backend signals it has paused, simulating a client disconnect mid-write.
649        let paused = Arc::clone(&lt.hooks.paused);
650        tokio::select! {
651            _ = request => panic!("insert should not complete while backend is paused"),
652            _ = paused.notified() => {}
653        }
654
655        // The spawned task is now blocked inside put_object, and the caller
656        // request (including the oneshot receiver) has been dropped. Unpause so
657        // the task can finish writing.
658        lt.hooks.resume.notify_one();
659
660        // Wait for the tombstone write to the high-volume backend, which is the
661        // last step of the long-term insert path.
662        let on_put = Arc::clone(&hv.hooks.on_put);
663        tokio::time::timeout(Duration::from_secs(5), on_put.notified())
664            .await
665            .expect("timed out waiting for tombstone write");
666
667        // Verify the object was fully written despite the caller being dropped.
668        // The tombstone in HV points to the revision key in LT.
669        let id = ObjectId::new(make_context(), "completion-test".into());
670        let tombstone = hv.inner.get(&id).expect_tombstone();
671        let lt_id = tombstone.target;
672        assert!(lt.inner.contains(&lt_id), "long-term object missing");
673    }
674
675    // --- Concurrency limit tests ---
676
677    fn make_limited_service(limit: usize) -> (StorageService, TestBackend<GateOnPut>) {
678        let backend = TestBackend::new(GateOnPut::with_pause());
679        let service = StorageService::new(Box::new(backend.clone())).with_concurrency_limit(limit);
680        (service, backend)
681    }
682
683    #[tokio::test]
684    async fn at_capacity_rejects() {
685        let (service, hv) = make_limited_service(1);
686
687        // First insert blocks on the gated backend, holding the single permit.
688        let svc = service.clone();
689        let first = tokio::spawn(async move {
690            svc.insert_object(
691                make_context(),
692                Some("first".into()),
693                Metadata::default(),
694                stream::single("data"),
695            )
696            .await
697        });
698
699        // Wait for the backend to signal it has paused (permit is held).
700        hv.hooks.paused.notified().await;
701
702        // Second insert should be rejected immediately.
703        let result = service
704            .insert_object(
705                make_context(),
706                Some("second".into()),
707                Metadata::default(),
708                stream::single("data"),
709            )
710            .await;
711
712        assert!(
713            matches!(result, Err(Error::AtCapacity)),
714            "expected AtCapacity, got {result:?}"
715        );
716
717        // Unblock the first operation.
718        hv.hooks.resume.notify_one();
719        first.await.unwrap().unwrap();
720
721        // Now that the permit is released, a new operation should succeed.
722        service
723            .get_metadata(ObjectId::new(make_context(), "first".into()))
724            .await
725            .unwrap();
726    }
727
728    #[tokio::test]
729    async fn tasks_limit_returns_configured_limit() {
730        let backend = Box::new(InMemoryBackend::new("cap"));
731        let service = StorageService::new(backend).with_concurrency_limit(7);
732        assert_eq!(service.tasks_limit(), 7);
733    }
734
735    #[tokio::test]
736    async fn tasks_running_tracks_in_flight() {
737        let (service, hv) = make_limited_service(5);
738
739        assert_eq!(service.tasks_running(), 0);
740
741        // Kick off a request that blocks in the backend, holding a permit.
742        let svc = service.clone();
743        let _blocked = tokio::spawn(async move {
744            svc.insert_object(
745                make_context(),
746                Some("in-use-test".into()),
747                Metadata::default(),
748                stream::single("data"),
749            )
750            .await
751        });
752
753        hv.hooks.paused.notified().await;
754        assert_eq!(service.tasks_running(), 1);
755
756        hv.hooks.resume.notify_one();
757    }
758
759    #[tokio::test]
760    async fn permits_released_after_panic() {
761        let service =
762            StorageService::new(Box::new(TestBackend::new(PanicOnGet))).with_concurrency_limit(1);
763
764        // First operation panics — the permit must still be released.
765        let id = ObjectId::new(make_context(), "panic-permit".into());
766        let result = service.get_object(id.clone(), None).await;
767        assert!(matches!(result, Err(Error::Panic(_))));
768
769        // Second operation should succeed in acquiring the permit (not AtCapacity).
770        let result = service.get_object(id, None).await;
771        assert!(
772            !matches!(result, Err(Error::AtCapacity)),
773            "permit was not released after panic"
774        );
775    }
776}