Skip to main content

objectstore_service/
service.rs

1//! Core storage service and configuration.
2//!
3//! [`StorageService`] is the main entry point for storing and retrieving
4//! objects. Each operation runs in a separate tokio task for panic isolation.
5//!
6//! See the [crate-level documentation](crate) for full architecture details.
7
8use std::future::Future;
9use std::sync::Arc;
10
11use objectstore_types::metadata::Metadata;
12
13use crate::backend::common::Backend;
14use crate::concurrency::ConcurrencyLimiter;
15use crate::error::{Error, Result};
16use crate::id::{ObjectContext, ObjectId};
17use crate::multipart::{
18    AbortMultipartResponse, CompleteMultipartResponse, CompletedPart, InitiateMultipartResponse,
19    ListPartsResponse, PartNumber, UploadId, UploadPartResponse,
20};
21use crate::stream::{ClientStream, PayloadStream};
22use crate::streaming::StreamExecutor;
23
24/// Service response for [`StorageService::get_object`].
25pub type GetResponse = Option<(Metadata, PayloadStream)>;
26/// Service response for [`StorageService::get_metadata`].
27pub type MetadataResponse = Option<Metadata>;
28/// Service response for [`StorageService::insert_object`].
29pub type InsertResponse = ObjectId;
30/// Service response for [`StorageService::delete_object`].
31pub type DeleteResponse = ();
32
33/// Default concurrency limit for [`StorageService`].
34///
35/// This value is used when no explicit limit is set via
36/// [`StorageService::with_concurrency_limit`].
37pub const DEFAULT_CONCURRENCY_LIMIT: usize = 500;
38
39/// Asynchronous storage service wrapping a single [`Backend`].
40///
41/// `StorageService` is the main entry point for storing and retrieving objects.
42/// It delegates all storage operations to the backend supplied at construction,
43/// adding task spawning, panic isolation, and a concurrency limit on top.
44///
45/// The typical backend is [`TieredStorage`](crate::backend::tiered::TieredStorage),
46/// which provides size-based routing to high-volume and long-term backends along
47/// with redirect tombstone management. Any type implementing [`Backend`] can be used.
48///
49/// # Lifecycle
50///
51/// After construction, call [`start`](StorageService::start) to start the
52/// service's background processes.
53///
54/// # Run-to-Completion and Panic Isolation
55///
56/// Each operation runs to completion even if the caller is cancelled (e.g., on
57/// client disconnect). This ensures that multi-step operations in the backend
58/// are never left partially applied. Post-commit cleanup (e.g. deleting
59/// unreferenced long-term blobs) runs in background tasks so callers are not
60/// blocked. Call [`join`](StorageService::join) during shutdown to wait for
61/// outstanding cleanup. Operations are also isolated from panics in backend
62/// code — a failure in one operation does not bring down other in-flight work.
63/// See [`Error::Panic`].
64///
65/// # Concurrency Limit
66///
67/// A semaphore caps the number of in-flight backend operations. The limit is
68/// configured via [`with_concurrency_limit`](StorageService::with_concurrency_limit);
69/// without an explicit value the default is [`DEFAULT_CONCURRENCY_LIMIT`].
70/// Operations that exceed the limit are rejected immediately with
71/// [`Error::AtCapacity`].
72#[derive(Clone, Debug)]
73pub struct StorageService {
74    inner: Arc<dyn Backend>,
75    concurrency: ConcurrencyLimiter,
76}
77
78impl StorageService {
79    /// Creates a new `StorageService` wrapping the given backend.
80    pub fn new(backend: Box<dyn Backend>) -> Self {
81        Self {
82            inner: Arc::from(backend),
83            concurrency: ConcurrencyLimiter::new(DEFAULT_CONCURRENCY_LIMIT),
84        }
85    }
86
87    /// Sets the maximum number of concurrent backend operations.
88    ///
89    /// Must be called before [`start`](Self::start). Operations beyond this
90    /// limit are rejected with [`Error::AtCapacity`].
91    pub fn with_concurrency_limit(mut self, max: usize) -> Self {
92        self.concurrency = ConcurrencyLimiter::new(max);
93        self
94    }
95
96    /// Returns the number of backend task slots currently available.
97    pub fn tasks_available(&self) -> usize {
98        self.concurrency.available_permits()
99    }
100
101    /// Returns the number of backend tasks currently running.
102    pub fn tasks_running(&self) -> usize {
103        self.concurrency.used_permits()
104    }
105
106    /// Returns the configured limit for concurrent backend tasks.
107    pub fn tasks_limit(&self) -> usize {
108        self.concurrency.total_permits()
109    }
110
111    /// Prepares to stream multiple operations concurrently against this service.
112    ///
113    /// Operations are executed concurrently up to a window derived from the
114    /// service's current capacity. The permits for that window are reserved
115    /// upfront — if the service is at capacity, this returns
116    /// [`Error::AtCapacity`] immediately before any operations are read.
117    pub fn stream(&self) -> Result<StreamExecutor> {
118        let available = self.tasks_available();
119        let window = (available as f64 * 0.10).ceil() as usize;
120
121        let acquire_result = match window {
122            0 => Err(Error::AtCapacity),
123            _ => self.concurrency.try_acquire_many(window),
124        };
125        let reservation = acquire_result.inspect_err(|_| {
126            objectstore_metrics::count!("service.concurrency.rejected");
127            objectstore_log::warn!("Request rejected: service at capacity");
128        })?;
129
130        Ok(StreamExecutor {
131            backend: Arc::clone(&self.inner),
132            window,
133            reservation,
134        })
135    }
136
137    /// Starts background processes for the storage service.
138    ///
139    /// Currently spawns a task that emits the `service.concurrency.in_use`
140    /// and `service.concurrency.limit` gauges once per second.
141    pub fn start(&self) {
142        let concurrency = self.concurrency.clone();
143        let limit = concurrency.total_permits();
144        tokio::spawn(async move {
145            concurrency
146                .run_emitter(|permits| async move {
147                    objectstore_metrics::gauge!("service.concurrency.in_use" = permits);
148                    objectstore_metrics::gauge!("service.concurrency.limit" = limit);
149                })
150                .await;
151        });
152    }
153
154    /// Spawns a future in a separate task and awaits its result.
155    ///
156    /// Returns [`Error::AtCapacity`] if the concurrency limit is reached,
157    /// [`Error::Panic`] if the spawned task panics (the panic message
158    /// is captured for diagnostics), or [`Error::Dropped`] if the task is
159    /// dropped before sending its result.
160    ///
161    /// Emits `service.task.start` (counter) after acquiring a permit and
162    /// `service.task.duration` (distribution) when the task completes, tagged
163    /// with the given `operation` name and an `outcome` of `"success"` or
164    /// `"error"`.
165    async fn spawn<T, F>(&self, operation: &'static str, f: F) -> Result<T>
166    where
167        T: Send + 'static,
168        F: Future<Output = Result<T>> + Send + 'static,
169    {
170        let permit = self.concurrency.try_acquire().inspect_err(|_| {
171            objectstore_metrics::count!("service.concurrency.rejected");
172            objectstore_log::warn!("Request rejected: service at capacity");
173        })?;
174
175        crate::concurrency::spawn_metered(operation, permit, f).await
176    }
177
178    /// Creates or overwrites an object.
179    ///
180    /// The object is identified by the components of an [`ObjectId`]. The
181    /// `context` is required, while the `key` can be assigned automatically if
182    /// set to `None`.
183    ///
184    /// # Run-to-completion
185    ///
186    /// Once called, the operation runs to completion even if the returned future
187    /// is dropped (e.g., on client disconnect). This guarantees that partially
188    /// written objects in the backend are never left in an inconsistent state.
189    pub async fn insert_object(
190        &self,
191        context: ObjectContext,
192        key: Option<String>,
193        metadata: Metadata,
194        stream: ClientStream,
195    ) -> Result<InsertResponse> {
196        let id = ObjectId::optional(context, key);
197        let inner = Arc::clone(&self.inner);
198        self.spawn("insert", async move {
199            inner.put_object(&id, &metadata, stream).await?;
200            Ok(id)
201        })
202        .await
203    }
204
205    /// Retrieves only the metadata for an object, without the payload.
206    pub async fn get_metadata(&self, id: ObjectId) -> Result<MetadataResponse> {
207        let inner = Arc::clone(&self.inner);
208        self.spawn("get_metadata", async move { inner.get_metadata(&id).await })
209            .await
210    }
211
212    /// Streams the contents of an object.
213    pub async fn get_object(&self, id: ObjectId) -> Result<GetResponse> {
214        let inner = Arc::clone(&self.inner);
215        self.spawn("get", async move { inner.get_object(&id).await })
216            .await
217    }
218
219    /// Deletes an object, if it exists.
220    ///
221    /// # Run-to-completion
222    ///
223    /// Once called, the operation runs to completion even if the returned future
224    /// is dropped. This guarantees that multi-step delete sequences in the backend
225    /// are never left partially applied.
226    pub async fn delete_object(&self, id: ObjectId) -> Result<DeleteResponse> {
227        let inner = Arc::clone(&self.inner);
228        self.spawn("delete", async move { inner.delete_object(&id).await })
229            .await
230    }
231
232    /// Waits for all outstanding background operations to complete.
233    ///
234    /// Blocks until any pending background cleanup tasks finish, up to the
235    /// backend's configured timeout. Should be called during graceful shutdown
236    /// after the HTTP server has stopped accepting new requests.
237    pub async fn join(&self) {
238        self.inner.join().await;
239    }
240
241    // --- Multipart upload operations ---
242
243    /// Initiates a new multipart upload.
244    pub async fn initiate_multipart(
245        &self,
246        id: ObjectId,
247        metadata: Metadata,
248    ) -> Result<InitiateMultipartResponse> {
249        let inner = self.inner.clone().as_multipart_upload_backend()?;
250        self.spawn("initiate_multipart", async move {
251            inner.initiate_multipart(&id, &metadata).await
252        })
253        .await
254    }
255
256    /// Uploads a single part.
257    ///
258    /// Note that this requires a `content_length`.
259    /// This grants us the broadest and most seamless compatibility when it comes to backends.
260    /// For example, MinIO rejects `UploadPart` requests without a `Content-Length` on plain PUT
261    /// requests.
262    /// This can be worked around by using AWS SigV4 chunked streaming requests, which we could use
263    /// if one day we'll have a usecase where the client doesn't know the part length upfront.
264    pub async fn upload_part(
265        &self,
266        id: ObjectId,
267        upload_id: UploadId,
268        part_number: PartNumber,
269        content_length: u64,
270        content_md5: Option<String>,
271        body: ClientStream,
272    ) -> Result<UploadPartResponse> {
273        let inner = self.inner.clone().as_multipart_upload_backend()?;
274        self.spawn("upload_part", async move {
275            inner
276                .upload_part(
277                    &id,
278                    &upload_id,
279                    part_number,
280                    content_length,
281                    content_md5.as_deref(),
282                    body,
283                )
284                .await
285        })
286        .await
287    }
288
289    /// Lists the parts uploaded so far.
290    pub async fn list_parts(
291        &self,
292        id: ObjectId,
293        upload_id: UploadId,
294        max_parts: Option<u32>,
295        part_number_marker: Option<PartNumber>,
296    ) -> Result<ListPartsResponse> {
297        let inner = self.inner.clone().as_multipart_upload_backend()?;
298        self.spawn("list_parts", async move {
299            inner
300                .list_parts(&id, &upload_id, max_parts, part_number_marker)
301                .await
302        })
303        .await
304    }
305
306    /// Aborts a multipart upload.
307    pub async fn abort_multipart(
308        &self,
309        id: ObjectId,
310        upload_id: UploadId,
311    ) -> Result<AbortMultipartResponse> {
312        let inner = self.inner.clone().as_multipart_upload_backend()?;
313        self.spawn("abort_multipart", async move {
314            inner.abort_multipart(&id, &upload_id).await
315        })
316        .await
317    }
318
319    /// Finalizes a multipart upload.
320    pub async fn complete_multipart(
321        &self,
322        id: ObjectId,
323        upload_id: UploadId,
324        parts: Vec<CompletedPart>,
325    ) -> Result<CompleteMultipartResponse> {
326        let inner = self.inner.clone().as_multipart_upload_backend()?;
327        self.spawn("complete_multipart", async move {
328            inner.complete_multipart(&id, &upload_id, parts).await
329        })
330        .await
331    }
332}
333
334#[cfg(test)]
335mod tests {
336    use std::sync::Arc;
337    use std::time::Duration;
338
339    use bytes::BytesMut;
340    use futures_util::TryStreamExt;
341    use objectstore_types::metadata::Metadata;
342    use objectstore_types::scope::{Scope, Scopes};
343
344    use super::*;
345    use crate::backend::bigtable::{BigTableBackend, BigTableConfig};
346    use crate::backend::changelog::NoopChangeLog;
347    use crate::backend::common::{HighVolumeBackend, PutResponse, TieredWrite};
348    use crate::backend::gcs::{GcsBackend, GcsConfig};
349    use crate::backend::in_memory::InMemoryBackend;
350    use crate::backend::testing::{Hooks, TestBackend};
351    use crate::backend::tiered::TieredStorage;
352    use crate::error::Error;
353    use crate::stream::{self, ClientStream};
354
355    fn make_context() -> ObjectContext {
356        ObjectContext {
357            usecase: "testing".into(),
358            scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
359        }
360    }
361
362    fn make_service() -> StorageService {
363        StorageService::new(Box::new(InMemoryBackend::new("in-memory")))
364    }
365
366    #[tokio::test]
367    async fn insert_without_key_generates_unique_id() {
368        let service = make_service();
369
370        let id = service
371            .insert_object(
372                make_context(),
373                None,
374                Default::default(),
375                stream::single("auto-keyed"),
376            )
377            .await
378            .unwrap();
379
380        assert!(uuid::Uuid::parse_str(id.key()).is_ok());
381    }
382
383    #[tokio::test]
384    async fn stores_files() {
385        let service = make_service();
386
387        let key = service
388            .insert_object(
389                make_context(),
390                Some("testing".into()),
391                Default::default(),
392                stream::single("oh hai!"),
393            )
394            .await
395            .unwrap();
396
397        let (_metadata, stream) = service.get_object(key).await.unwrap().unwrap();
398        let file_contents: BytesMut = stream.try_collect().await.unwrap();
399
400        assert_eq!(file_contents.as_ref(), b"oh hai!");
401    }
402
403    #[tokio::test]
404    async fn works_with_gcs() {
405        let config = GcsConfig {
406            endpoint: Some("http://localhost:8087".into()),
407            bucket: "test-bucket".into(), // aligned with the env var in devservices and CI
408        };
409
410        let backend = GcsBackend::new(config).await.unwrap();
411        let service = StorageService::new(Box::new(backend));
412
413        let key = service
414            .insert_object(
415                make_context(),
416                Some("testing".into()),
417                Default::default(),
418                stream::single("oh hai!"),
419            )
420            .await
421            .unwrap();
422
423        let (_metadata, stream) = service.get_object(key).await.unwrap().unwrap();
424        let file_contents: BytesMut = stream.try_collect().await.unwrap();
425
426        assert_eq!(file_contents.as_ref(), b"oh hai!");
427    }
428
429    #[tokio::test]
430    async fn tombstone_redirect_and_delete() {
431        let bigtable_config = BigTableConfig {
432            endpoint: Some("localhost:8086".into()),
433            project_id: "testing".into(),
434            instance_name: "objectstore".into(),
435            table_name: "objectstore".into(),
436            connections: None,
437        };
438        let gcs_config = GcsConfig {
439            endpoint: Some("http://localhost:8087".into()),
440            bucket: "test-bucket".into(),
441        };
442
443        let high_volume = Box::new(BigTableBackend::new(bigtable_config).await.unwrap());
444        let long_term = Box::new(GcsBackend::new(gcs_config.clone()).await.unwrap());
445        let backend = TieredStorage::new(high_volume, long_term, Box::new(NoopChangeLog));
446        let service = StorageService::new(Box::new(backend));
447
448        // A separate GCS backend to directly inspect the long-term storage.
449        let gcs_backend = GcsBackend::new(gcs_config.clone()).await.unwrap();
450
451        // Insert a >1 MiB object with a key.  This forces the long-term path:
452        // the real payload goes to GCS, and a redirect tombstone is written to BigTable.
453        let payload_len = 2 * 1024 * 1024;
454        let payload = vec![0xAB; payload_len]; // 2 MiB
455        let id = service
456            .insert_object(
457                make_context(),
458                Some("delete-cleanup-test".into()),
459                Default::default(),
460                stream::single(payload),
461            )
462            .await
463            .unwrap();
464
465        // Sanity: the object is readable through the service (follows the tombstone).
466        let (_, stream) = service.get_object(id.clone()).await.unwrap().unwrap();
467        let body: BytesMut = stream.try_collect().await.unwrap();
468        assert_eq!(body.len(), payload_len);
469
470        // Delete through the service layer.
471        service.delete_object(id.clone()).await.unwrap();
472
473        // The tombstone in BigTable should be gone, so the service returns None.
474        let after_delete = service.get_object(id.clone()).await.unwrap();
475        assert!(after_delete.is_none(), "tombstone not deleted");
476
477        // The real object in GCS must also be gone — no orphan.
478        let orphan = gcs_backend.get_object(&id).await.unwrap();
479        assert!(orphan.is_none(), "object leaked");
480    }
481
482    // --- Task spawning tests (public API) ---
483
484    #[tokio::test]
485    async fn basic_spawn_insert_and_get() {
486        let service = make_service();
487
488        let id = service
489            .insert_object(
490                make_context(),
491                Some("test-key".into()),
492                Metadata::default(),
493                stream::single("hello world"),
494            )
495            .await
496            .unwrap();
497
498        let (_, stream) = service.get_object(id).await.unwrap().unwrap();
499        let body: BytesMut = stream.try_collect().await.unwrap();
500        assert_eq!(body.as_ref(), b"hello world");
501    }
502
503    #[tokio::test]
504    async fn basic_spawn_metadata_and_delete() {
505        let service = make_service();
506
507        let id = service
508            .insert_object(
509                make_context(),
510                Some("meta-key".into()),
511                Metadata::default(),
512                stream::single("data"),
513            )
514            .await
515            .unwrap();
516
517        let metadata = service.get_metadata(id.clone()).await.unwrap();
518        assert!(metadata.is_some());
519
520        service.delete_object(id.clone()).await.unwrap();
521
522        let after = service.get_object(id).await.unwrap();
523        assert!(after.is_none());
524    }
525
526    #[derive(Debug)]
527    struct PanicOnGet;
528
529    #[async_trait::async_trait]
530    impl Hooks for PanicOnGet {
531        async fn get_object(
532            &self,
533            _inner: &InMemoryBackend,
534            _id: &ObjectId,
535        ) -> Result<GetResponse> {
536            panic!("intentional panic in get_object");
537        }
538    }
539
540    #[tokio::test]
541    async fn panic_in_backend_returns_task_failed() {
542        let service = StorageService::new(Box::new(TestBackend::new(PanicOnGet)));
543
544        let id = ObjectId::new(make_context(), "panic-test".into());
545        let result = service.get_object(id).await;
546
547        let Err(Error::Panic(msg)) = result else {
548            panic!("expected Panic error");
549        };
550        assert!(msg.contains("intentional panic in get_object"), "{msg}");
551    }
552
553    /// In-memory backend with optional synchronization for `put_object`.
554    ///
555    /// When `pause` is enabled, each `put_object` call notifies `paused` and
556    #[derive(Clone, Debug, Default)]
557    struct GateOnPut {
558        pause: bool,
559        paused: Arc<tokio::sync::Notify>,
560        resume: Arc<tokio::sync::Notify>,
561        on_put: Arc<tokio::sync::Notify>,
562    }
563
564    impl GateOnPut {
565        fn with_pause() -> Self {
566            Self {
567                pause: true,
568                ..Default::default()
569            }
570        }
571    }
572
573    #[async_trait::async_trait]
574    impl Hooks for GateOnPut {
575        async fn put_object(
576            &self,
577            inner: &InMemoryBackend,
578            id: &ObjectId,
579            metadata: &Metadata,
580            stream: ClientStream,
581        ) -> Result<PutResponse> {
582            if self.pause {
583                self.paused.notify_one();
584                self.resume.notified().await;
585            }
586            inner.put_object(id, metadata, stream).await?;
587            self.on_put.notify_one();
588            Ok(())
589        }
590
591        async fn compare_and_write(
592            &self,
593            inner: &InMemoryBackend,
594            id: &ObjectId,
595            current: Option<&ObjectId>,
596            write: TieredWrite,
597        ) -> Result<bool> {
598            let notify = matches!(write, TieredWrite::Tombstone(_) | TieredWrite::Object(_, _));
599            let result = inner.compare_and_write(id, current, write).await?;
600            if notify {
601                self.on_put.notify_one();
602            }
603            Ok(result)
604        }
605    }
606
607    #[tokio::test]
608    async fn receiver_drop_does_not_prevent_completion() {
609        let hv = Box::new(TestBackend::new(GateOnPut::default()));
610        let lt = Box::new(TestBackend::new(GateOnPut::with_pause()));
611        let backend = TieredStorage::new(hv.clone(), lt.clone(), Box::new(NoopChangeLog));
612        let service = StorageService::new(Box::new(backend));
613
614        let payload = vec![0xABu8; 2 * 1024 * 1024]; // 2 MiB → long-term path
615        let request = service.insert_object(
616            make_context(),
617            Some("completion-test".into()),
618            Metadata::default(),
619            stream::single(payload),
620        );
621
622        // Start insert through the public API. select! drops the future once the
623        // backend signals it has paused, simulating a client disconnect mid-write.
624        let paused = Arc::clone(&lt.hooks.paused);
625        tokio::select! {
626            _ = request => panic!("insert should not complete while backend is paused"),
627            _ = paused.notified() => {}
628        }
629
630        // The spawned task is now blocked inside put_object, and the caller
631        // request (including the oneshot receiver) has been dropped. Unpause so
632        // the task can finish writing.
633        lt.hooks.resume.notify_one();
634
635        // Wait for the tombstone write to the high-volume backend, which is the
636        // last step of the long-term insert path.
637        let on_put = Arc::clone(&hv.hooks.on_put);
638        tokio::time::timeout(Duration::from_secs(5), on_put.notified())
639            .await
640            .expect("timed out waiting for tombstone write");
641
642        // Verify the object was fully written despite the caller being dropped.
643        // The tombstone in HV points to the revision key in LT.
644        let id = ObjectId::new(make_context(), "completion-test".into());
645        let tombstone = hv.inner.get(&id).expect_tombstone();
646        let lt_id = tombstone.target;
647        assert!(lt.inner.contains(&lt_id), "long-term object missing");
648    }
649
650    // --- Concurrency limit tests ---
651
652    fn make_limited_service(limit: usize) -> (StorageService, TestBackend<GateOnPut>) {
653        let backend = TestBackend::new(GateOnPut::with_pause());
654        let service = StorageService::new(Box::new(backend.clone())).with_concurrency_limit(limit);
655        (service, backend)
656    }
657
658    #[tokio::test]
659    async fn at_capacity_rejects() {
660        let (service, hv) = make_limited_service(1);
661
662        // First insert blocks on the gated backend, holding the single permit.
663        let svc = service.clone();
664        let first = tokio::spawn(async move {
665            svc.insert_object(
666                make_context(),
667                Some("first".into()),
668                Metadata::default(),
669                stream::single("data"),
670            )
671            .await
672        });
673
674        // Wait for the backend to signal it has paused (permit is held).
675        hv.hooks.paused.notified().await;
676
677        // Second insert should be rejected immediately.
678        let result = service
679            .insert_object(
680                make_context(),
681                Some("second".into()),
682                Metadata::default(),
683                stream::single("data"),
684            )
685            .await;
686
687        assert!(
688            matches!(result, Err(Error::AtCapacity)),
689            "expected AtCapacity, got {result:?}"
690        );
691
692        // Unblock the first operation.
693        hv.hooks.resume.notify_one();
694        first.await.unwrap().unwrap();
695
696        // Now that the permit is released, a new operation should succeed.
697        service
698            .get_metadata(ObjectId::new(make_context(), "first".into()))
699            .await
700            .unwrap();
701    }
702
703    #[tokio::test]
704    async fn tasks_limit_returns_configured_limit() {
705        let backend = Box::new(InMemoryBackend::new("cap"));
706        let service = StorageService::new(backend).with_concurrency_limit(7);
707        assert_eq!(service.tasks_limit(), 7);
708    }
709
710    #[tokio::test]
711    async fn tasks_running_tracks_in_flight() {
712        let (service, hv) = make_limited_service(5);
713
714        assert_eq!(service.tasks_running(), 0);
715
716        // Kick off a request that blocks in the backend, holding a permit.
717        let svc = service.clone();
718        let _blocked = tokio::spawn(async move {
719            svc.insert_object(
720                make_context(),
721                Some("in-use-test".into()),
722                Metadata::default(),
723                stream::single("data"),
724            )
725            .await
726        });
727
728        hv.hooks.paused.notified().await;
729        assert_eq!(service.tasks_running(), 1);
730
731        hv.hooks.resume.notify_one();
732    }
733
734    #[tokio::test]
735    async fn permits_released_after_panic() {
736        let service =
737            StorageService::new(Box::new(TestBackend::new(PanicOnGet))).with_concurrency_limit(1);
738
739        // First operation panics — the permit must still be released.
740        let id = ObjectId::new(make_context(), "panic-permit".into());
741        let result = service.get_object(id.clone()).await;
742        assert!(matches!(result, Err(Error::Panic(_))));
743
744        // Second operation should succeed in acquiring the permit (not AtCapacity).
745        let result = service.get_object(id).await;
746        assert!(
747            !matches!(result, Err(Error::AtCapacity)),
748            "permit was not released after panic"
749        );
750    }
751}