objectstore_service/
service.rs

1//! Core storage service and configuration.
2//!
3//! [`StorageService`] is the main entry point for storing and retrieving
4//! objects. Each operation runs in a separate tokio task for panic isolation.
5//! See the [crate-level documentation](crate) for the two-tier backend system,
6//! redirect tombstones, and consistency guarantees.
7
8use std::future::Future;
9use std::path::Path;
10use std::sync::Arc;
11
12use objectstore_types::metadata::Metadata;
13
14use crate::PayloadStream;
15use crate::backend::common::BoxedBackend;
16use crate::concurrency::ConcurrencyLimiter;
17use crate::error::{Error, Result};
18use crate::id::{ObjectContext, ObjectId};
19use crate::streaming::StreamExecutor;
20use crate::tiered::TieredStorage;
21
22/// Service response for [`StorageService::get_object`].
23pub type GetResponse = Option<(Metadata, PayloadStream)>;
24/// Service response for [`StorageService::get_metadata`].
25pub type MetadataResponse = Option<Metadata>;
26/// Service response for [`StorageService::insert_object`].
27pub type InsertResponse = ObjectId;
28/// Service response for [`StorageService::delete_object`].
29pub type DeleteResponse = ();
30
31/// Configuration to initialize a [`StorageService`].
32#[derive(Debug, Clone)]
33pub enum StorageConfig<'a> {
34    /// Use a local filesystem as the storage backend.
35    FileSystem {
36        /// The path to the directory where files will be stored.
37        path: &'a Path,
38    },
39    /// Use an S3-compatible storage backend.
40    S3Compatible {
41        /// Optional endpoint URL for the S3-compatible storage.
42        endpoint: &'a str,
43        /// The name of the bucket to use.
44        bucket: &'a str,
45    },
46    /// Use Google Cloud Storage as storage backend.
47    Gcs {
48        /// Optional endpoint URL for the S3-compatible storage.
49        ///
50        /// Assumes an emulator without authentication if set.
51        endpoint: Option<&'a str>,
52        /// The name of the bucket to use.
53        bucket: &'a str,
54    },
55    /// Use BigTable as storage backend.
56    BigTable {
57        /// Optional endpoint URL for the BigTable storage.
58        ///
59        /// Assumes an emulator without authentication if set.
60        endpoint: Option<&'a str>,
61        /// The Google Cloud project ID.
62        project_id: &'a str,
63        /// The BigTable instance name.
64        instance_name: &'a str,
65        /// The BigTable table name.
66        table_name: &'a str,
67        /// The number of concurrent connections to BigTable.
68        ///
69        /// Defaults to 1.
70        connections: Option<usize>,
71    },
72}
73
74/// Default concurrency limit for [`StorageService`].
75///
76/// This value is used when no explicit limit is set via
77/// [`StorageService::with_concurrency_limit`].
78pub const DEFAULT_CONCURRENCY_LIMIT: usize = 500;
79
80/// Asynchronous storage service with a two-tier backend system.
81///
82/// `StorageService` is the main entry point for storing and retrieving objects.
83/// It routes objects to a high-volume or long-term backend based on size (see
84/// the [crate-level documentation](crate) for details) and maintains redirect
85/// tombstones so that reads never need to probe both backends.
86///
87/// # Lifecycle
88///
89/// After construction, call [`start`](StorageService::start) to start the
90/// service's background processes.
91///
92/// # Redirect Tombstones
93///
94/// Because the [`ObjectId`] is backend-independent, reads must be able to find
95/// an object without knowing which backend stores it. A naive approach would
96/// check the long-term backend on every read miss in the high-volume backend —
97/// but that is slow and expensive.
98///
99/// Instead, when an object is stored in the long-term backend, the service
100/// writes a **redirect tombstone** in the high-volume backend. A redirect
101/// tombstone is an empty object with
102/// [`is_redirect_tombstone: true`](objectstore_types::metadata::Metadata::is_redirect_tombstone)
103/// in its metadata. It acts as a signpost: "the real data lives in the other
104/// backend."
105///
106/// # Consistency Without Locks
107///
108/// The tombstone system maintains consistency through operation ordering rather
109/// than distributed locks. The invariant is: a redirect tombstone is always the
110/// **last thing written** and the **last thing removed**.
111///
112/// - On **write**, the real object is persisted before the tombstone. If the
113///   tombstone write fails, the real object is rolled back.
114/// - On **delete**, the real object is removed before the tombstone. If the
115///   long-term delete fails, the tombstone remains and the data stays reachable.
116///
117/// This ensures that at every intermediate step, either the data is fully
118/// reachable (tombstone points to data) or fully absent — never an orphan in
119/// either direction.
120///
121/// See the individual methods for per-operation tombstone behavior.
122///
123/// # Run-to-Completion and Panic Isolation
124///
125/// Each operation runs to completion even if the caller is cancelled (e.g., on
126/// client disconnect). This ensures that multi-step operations such as writing
127/// redirect tombstones are never left partially applied. Operations are also
128/// isolated from panics in backend code — a failure in one operation does not
129/// bring down other in-flight work. See [`Error::Panic`].
130///
131/// # Concurrency Limit
132///
133/// A semaphore caps the number of in-flight backend operations. The limit is
134/// configured via [`with_concurrency_limit`](StorageService::with_concurrency_limit);
135/// without an explicit value the default is [`DEFAULT_CONCURRENCY_LIMIT`].
136/// Operations that exceed the limit are rejected immediately with
137/// [`Error::AtCapacity`].
138#[derive(Clone, Debug)]
139pub struct StorageService {
140    inner: Arc<TieredStorage>,
141    concurrency: ConcurrencyLimiter,
142}
143
144impl StorageService {
145    /// Creates a new `StorageService` with the specified configuration.
146    pub async fn new(
147        high_volume_config: StorageConfig<'_>,
148        long_term_config: StorageConfig<'_>,
149    ) -> anyhow::Result<Self> {
150        let high_volume_backend = create_backend(high_volume_config).await?;
151        let long_term_backend = create_backend(long_term_config).await?;
152        Ok(Self::from_backends(high_volume_backend, long_term_backend))
153    }
154
155    pub(crate) fn from_backends(
156        high_volume_backend: BoxedBackend,
157        long_term_backend: BoxedBackend,
158    ) -> Self {
159        Self {
160            inner: Arc::new(TieredStorage {
161                high_volume_backend,
162                long_term_backend,
163            }),
164            concurrency: ConcurrencyLimiter::new(DEFAULT_CONCURRENCY_LIMIT),
165        }
166    }
167
168    /// Sets the maximum number of concurrent backend operations.
169    ///
170    /// Must be called before [`start`](Self::start). Operations beyond this
171    /// limit are rejected with [`Error::AtCapacity`].
172    pub fn with_concurrency_limit(mut self, max: usize) -> Self {
173        self.concurrency = ConcurrencyLimiter::new(max);
174        self
175    }
176
177    /// Returns the number of backend task slots currently available.
178    pub fn tasks_available(&self) -> usize {
179        self.concurrency.available_permits()
180    }
181
182    /// Returns the number of backend tasks currently running.
183    pub fn tasks_running(&self) -> usize {
184        self.concurrency.used_permits()
185    }
186
187    /// Returns the configured limit for concurrent backend tasks.
188    pub fn tasks_limit(&self) -> usize {
189        self.concurrency.total_permits()
190    }
191
192    /// Prepares to stream multiple operations concurrently against this service.
193    ///
194    /// Operations are executed concurrently up to a window derived from the
195    /// service's current capacity. The permits for that window are reserved
196    /// upfront — if the service is at capacity, this returns
197    /// [`Error::AtCapacity`] immediately before any operations are read.
198    pub fn stream(&self) -> Result<StreamExecutor> {
199        let available = self.tasks_available();
200        let window = (available as f64 * 0.10).ceil() as usize;
201
202        let acquire_result = match window {
203            0 => Err(Error::AtCapacity),
204            _ => self.concurrency.try_acquire_many(window),
205        };
206        let reservation = acquire_result.inspect_err(|_| {
207            objectstore_metrics::counter!("service.concurrency.rejected": 1);
208        })?;
209
210        Ok(StreamExecutor {
211            tiered: Arc::clone(&self.inner),
212            window,
213            reservation,
214        })
215    }
216
217    /// Starts background processes for the storage service.
218    ///
219    /// Currently spawns a task that emits the `service.concurrency.in_use`
220    /// and `service.concurrency.limit` gauges once per second.
221    pub fn start(&self) {
222        let concurrency = self.concurrency.clone();
223        let limit = concurrency.total_permits();
224        tokio::spawn(async move {
225            concurrency
226                .run_emitter(|permits| async move {
227                    objectstore_metrics::gauge!("service.concurrency.in_use": permits);
228                    objectstore_metrics::gauge!("service.concurrency.limit": limit);
229                })
230                .await;
231        });
232    }
233
234    /// Spawns a future in a separate task and awaits its result.
235    ///
236    /// Returns [`Error::AtCapacity`] if the concurrency limit is reached,
237    /// [`Error::Panic`] if the spawned task panics (the panic message
238    /// is captured for diagnostics), or [`Error::Dropped`] if the task is
239    /// dropped before sending its result.
240    ///
241    /// Emits `service.task.start` (counter) after acquiring a permit and
242    /// `service.task.duration` (distribution) when the task completes, tagged
243    /// with the given `operation` name and an `outcome` of `"success"` or
244    /// `"error"`.
245    async fn spawn<T, F>(&self, operation: &'static str, f: F) -> Result<T>
246    where
247        T: Send + 'static,
248        F: Future<Output = Result<T>> + Send + 'static,
249    {
250        let permit = self.concurrency.try_acquire().inspect_err(|_| {
251            objectstore_metrics::counter!("service.concurrency.rejected": 1);
252        })?;
253
254        crate::concurrency::spawn_metered(operation, permit, f).await
255    }
256
257    /// Creates or overwrites an object.
258    ///
259    /// The object is identified by the components of an [`ObjectId`]. The
260    /// `context` is required, while the `key` can be assigned automatically if
261    /// set to `None`.
262    ///
263    /// # Run-to-completion
264    ///
265    /// Once called, the operation runs to completion even if the returned future
266    /// is dropped (e.g., on client disconnect). This guarantees that partially
267    /// written objects are never left without their redirect tombstone.
268    ///
269    /// # Tombstone handling
270    ///
271    /// If the object has a caller-provided key and a redirect tombstone already
272    /// exists at that key, the new write is routed to the long-term backend
273    /// (preserving the existing tombstone as a redirect to the new data).
274    ///
275    /// For long-term writes, the real object is persisted first, then the
276    /// tombstone. If the tombstone write fails, the real object is rolled back
277    /// to avoid orphans.
278    pub async fn insert_object(
279        &self,
280        context: ObjectContext,
281        key: Option<String>,
282        metadata: Metadata,
283        stream: PayloadStream,
284    ) -> Result<InsertResponse> {
285        let inner = Arc::clone(&self.inner);
286        self.spawn("insert", async move {
287            inner.insert_object(context, key, &metadata, stream).await
288        })
289        .await
290    }
291
292    /// Retrieves only the metadata for an object, without the payload.
293    ///
294    /// # Tombstone handling
295    ///
296    /// Looks up the object in the high-volume backend first. If the result is a
297    /// redirect tombstone, follows the redirect and fetches metadata from the
298    /// long-term backend instead.
299    pub async fn get_metadata(&self, id: ObjectId) -> Result<MetadataResponse> {
300        let inner = Arc::clone(&self.inner);
301        self.spawn("get_metadata", async move { inner.get_metadata(&id).await })
302            .await
303    }
304
305    /// Streams the contents of an object.
306    ///
307    /// # Tombstone handling
308    ///
309    /// Looks up the object in the high-volume backend first. If the result is a
310    /// redirect tombstone, follows the redirect and fetches the object from the
311    /// long-term backend instead.
312    pub async fn get_object(&self, id: ObjectId) -> Result<GetResponse> {
313        let inner = Arc::clone(&self.inner);
314        self.spawn("get", async move { inner.get_object(&id).await })
315            .await
316    }
317
318    /// Deletes an object, if it exists.
319    ///
320    /// # Run-to-completion
321    ///
322    /// Once called, the operation runs to completion even if the returned future
323    /// is dropped. This guarantees that the tombstone is only removed after the
324    /// long-term object has been successfully deleted.
325    ///
326    /// # Tombstone handling
327    ///
328    /// Attempts to delete from the high-volume backend, but skips deletion if
329    /// the entry is a redirect tombstone. When a tombstone is found, the
330    /// long-term object is deleted first, then the tombstone. This ordering
331    /// ensures that if the long-term delete fails, the tombstone remains and
332    /// the data is still reachable.
333    pub async fn delete_object(&self, id: ObjectId) -> Result<DeleteResponse> {
334        let inner = Arc::clone(&self.inner);
335        self.spawn("delete", async move { inner.delete_object(&id).await })
336            .await
337    }
338}
339
340async fn create_backend(config: StorageConfig<'_>) -> anyhow::Result<BoxedBackend> {
341    Ok(match config {
342        StorageConfig::FileSystem { path } => {
343            Box::new(crate::backend::local_fs::LocalFsBackend::new(path))
344        }
345        StorageConfig::S3Compatible { endpoint, bucket } => Box::new(
346            crate::backend::s3_compatible::S3CompatibleBackend::without_token(endpoint, bucket),
347        ),
348        StorageConfig::Gcs { endpoint, bucket } => {
349            Box::new(crate::backend::gcs::GcsBackend::new(endpoint, bucket).await?)
350        }
351        StorageConfig::BigTable {
352            endpoint,
353            project_id,
354            instance_name,
355            table_name,
356            connections,
357        } => Box::new(
358            crate::backend::bigtable::BigTableBackend::new(
359                endpoint,
360                project_id,
361                instance_name,
362                table_name,
363                connections,
364            )
365            .await?,
366        ),
367    })
368}
369
370#[cfg(test)]
371mod tests {
372    use std::sync::Arc;
373    use std::time::Duration;
374
375    use bytes::BytesMut;
376    use futures_util::TryStreamExt;
377    use objectstore_types::metadata::Metadata;
378    use objectstore_types::scope::{Scope, Scopes};
379
380    use super::*;
381    use crate::backend::common::Backend as _;
382    use crate::backend::in_memory::InMemoryBackend;
383    use crate::error::Error;
384    use crate::stream::make_stream;
385
386    fn make_context() -> ObjectContext {
387        ObjectContext {
388            usecase: "testing".into(),
389            scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
390        }
391    }
392
393    fn make_service() -> (StorageService, InMemoryBackend, InMemoryBackend) {
394        let hv = InMemoryBackend::new("in-memory-hv");
395        let lt = InMemoryBackend::new("in-memory-lt");
396        let service = StorageService::from_backends(Box::new(hv.clone()), Box::new(lt.clone()));
397        (service, hv, lt)
398    }
399
400    // --- Integration tests (real backends) ---
401
402    #[tokio::test]
403    async fn stores_files() {
404        let tempdir = tempfile::tempdir().unwrap();
405        let config = StorageConfig::FileSystem {
406            path: tempdir.path(),
407        };
408        let service = StorageService::new(config.clone(), config).await.unwrap();
409
410        let key = service
411            .insert_object(
412                make_context(),
413                Some("testing".into()),
414                Default::default(),
415                make_stream(b"oh hai!"),
416            )
417            .await
418            .unwrap();
419
420        let (_metadata, stream) = service.get_object(key).await.unwrap().unwrap();
421        let file_contents: BytesMut = stream.try_collect().await.unwrap();
422
423        assert_eq!(file_contents.as_ref(), b"oh hai!");
424    }
425
426    #[tokio::test]
427    async fn works_with_gcs() {
428        let config = StorageConfig::Gcs {
429            endpoint: Some("http://localhost:8087"),
430            bucket: "test-bucket", // aligned with the env var in devservices and CI
431        };
432        let service = StorageService::new(config.clone(), config).await.unwrap();
433
434        let key = service
435            .insert_object(
436                make_context(),
437                Some("testing".into()),
438                Default::default(),
439                make_stream(b"oh hai!"),
440            )
441            .await
442            .unwrap();
443
444        let (_metadata, stream) = service.get_object(key).await.unwrap().unwrap();
445        let file_contents: BytesMut = stream.try_collect().await.unwrap();
446
447        assert_eq!(file_contents.as_ref(), b"oh hai!");
448    }
449
450    #[tokio::test]
451    async fn tombstone_redirect_and_delete() {
452        let high_volume = StorageConfig::BigTable {
453            endpoint: Some("localhost:8086"),
454            project_id: "testing",
455            instance_name: "objectstore",
456            table_name: "objectstore",
457            connections: None,
458        };
459        let long_term = StorageConfig::Gcs {
460            endpoint: Some("http://localhost:8087"),
461            bucket: "test-bucket",
462        };
463        let service = StorageService::new(high_volume, long_term).await.unwrap();
464
465        // A separate GCS backend to directly inspect the long-term storage.
466        let gcs_backend =
467            crate::backend::gcs::GcsBackend::new(Some("http://localhost:8087"), "test-bucket")
468                .await
469                .unwrap();
470
471        // Insert a >1 MiB object with a key.  This forces the long-term path:
472        // the real payload goes to GCS, and a redirect tombstone is written to BigTable.
473        let payload = vec![0xAB; 2 * 1024 * 1024]; // 2 MiB
474        let id = service
475            .insert_object(
476                make_context(),
477                Some("delete-cleanup-test".into()),
478                Default::default(),
479                make_stream(&payload),
480            )
481            .await
482            .unwrap();
483
484        // Sanity: the object is readable through the service (follows the tombstone).
485        let (_, stream) = service.get_object(id.clone()).await.unwrap().unwrap();
486        let body: BytesMut = stream.try_collect().await.unwrap();
487        assert_eq!(body.len(), payload.len());
488
489        // Delete through the service layer.
490        service.delete_object(id.clone()).await.unwrap();
491
492        // The tombstone in BigTable should be gone, so the service returns None.
493        let after_delete = service.get_object(id.clone()).await.unwrap();
494        assert!(after_delete.is_none(), "tombstone not deleted");
495
496        // The real object in GCS must also be gone — no orphan.
497        let orphan = gcs_backend.get_object(&id).await.unwrap();
498        assert!(orphan.is_none(), "object leaked");
499    }
500
501    // --- Task spawning tests (public API) ---
502
503    #[tokio::test]
504    async fn basic_spawn_insert_and_get() {
505        let (service, _hv, _lt) = make_service();
506
507        let id = service
508            .insert_object(
509                make_context(),
510                Some("test-key".into()),
511                Metadata::default(),
512                make_stream(b"hello world"),
513            )
514            .await
515            .unwrap();
516
517        let (_, stream) = service.get_object(id).await.unwrap().unwrap();
518        let body: BytesMut = stream.try_collect().await.unwrap();
519        assert_eq!(body.as_ref(), b"hello world");
520    }
521
522    #[tokio::test]
523    async fn basic_spawn_metadata_and_delete() {
524        let (service, _hv, _lt) = make_service();
525
526        let id = service
527            .insert_object(
528                make_context(),
529                Some("meta-key".into()),
530                Metadata::default(),
531                make_stream(b"data"),
532            )
533            .await
534            .unwrap();
535
536        let metadata = service.get_metadata(id.clone()).await.unwrap();
537        assert!(metadata.is_some());
538
539        service.delete_object(id.clone()).await.unwrap();
540
541        let after = service.get_object(id).await.unwrap();
542        assert!(after.is_none());
543    }
544
545    /// A backend that panics on `get_object` to verify panic isolation.
546    #[derive(Debug)]
547    struct PanickingBackend;
548
549    #[async_trait::async_trait]
550    impl crate::backend::common::Backend for PanickingBackend {
551        fn name(&self) -> &'static str {
552            "panicking"
553        }
554
555        async fn put_object(
556            &self,
557            _id: &ObjectId,
558            _metadata: &Metadata,
559            _stream: PayloadStream,
560        ) -> Result<()> {
561            Ok(())
562        }
563
564        async fn get_object(&self, _id: &ObjectId) -> Result<Option<(Metadata, PayloadStream)>> {
565            panic!("intentional panic in get_object");
566        }
567
568        async fn delete_object(&self, _id: &ObjectId) -> Result<()> {
569            Ok(())
570        }
571    }
572
573    #[tokio::test]
574    async fn panic_in_backend_returns_task_failed() {
575        let service =
576            StorageService::from_backends(Box::new(PanickingBackend), Box::new(PanickingBackend));
577
578        let id = ObjectId::new(make_context(), "panic-test".into());
579        let result = service.get_object(id).await;
580
581        let Err(Error::Panic(msg)) = result else {
582            panic!("expected Panic error");
583        };
584        assert!(msg.contains("intentional panic in get_object"), "{msg}");
585    }
586
587    /// In-memory backend with optional synchronization for `put_object`.
588    ///
589    /// When `pause` is enabled, each `put_object` call notifies `paused` and
590    /// then waits on `resume` before proceeding. After the write completes,
591    /// `on_put` is always notified regardless of the `pause` setting.
592    #[derive(Debug, Clone)]
593    struct GatedBackend {
594        inner: InMemoryBackend,
595        pause: bool,
596        paused: Arc<tokio::sync::Notify>,
597        resume: Arc<tokio::sync::Notify>,
598        on_put: Arc<tokio::sync::Notify>,
599    }
600
601    impl GatedBackend {
602        fn new(name: &'static str) -> Self {
603            Self {
604                inner: InMemoryBackend::new(name),
605                pause: false,
606                paused: Arc::new(tokio::sync::Notify::new()),
607                resume: Arc::new(tokio::sync::Notify::new()),
608                on_put: Arc::new(tokio::sync::Notify::new()),
609            }
610        }
611
612        fn with_pause(mut self) -> Self {
613            self.pause = true;
614            self
615        }
616    }
617
618    #[async_trait::async_trait]
619    impl crate::backend::common::Backend for GatedBackend {
620        fn name(&self) -> &'static str {
621            self.inner.name()
622        }
623
624        async fn put_object(
625            &self,
626            id: &ObjectId,
627            metadata: &Metadata,
628            stream: PayloadStream,
629        ) -> Result<()> {
630            if self.pause {
631                self.paused.notify_one();
632                self.resume.notified().await;
633            }
634            self.inner.put_object(id, metadata, stream).await?;
635            self.on_put.notify_one();
636            Ok(())
637        }
638
639        async fn get_object(&self, id: &ObjectId) -> Result<Option<(Metadata, PayloadStream)>> {
640            self.inner.get_object(id).await
641        }
642
643        async fn delete_object(&self, id: &ObjectId) -> Result<()> {
644            self.inner.delete_object(id).await
645        }
646    }
647
648    #[tokio::test]
649    async fn receiver_drop_does_not_prevent_completion() {
650        let hv = GatedBackend::new("gated-hv");
651        let lt = GatedBackend::new("gated-lt").with_pause();
652        let service = StorageService::from_backends(Box::new(hv.clone()), Box::new(lt.clone()));
653
654        let payload = vec![0xABu8; 2 * 1024 * 1024]; // 2 MiB → long-term path
655        let request = service.insert_object(
656            make_context(),
657            Some("completion-test".into()),
658            Metadata::default(),
659            make_stream(&payload),
660        );
661
662        // Start insert through the public API. select! drops the future once the
663        // backend signals it has paused, simulating a client disconnect mid-write.
664        let paused = Arc::clone(&lt.paused);
665        tokio::select! {
666            _ = request => panic!("insert should not complete while backend is paused"),
667            _ = paused.notified() => {}
668        }
669
670        // The spawned task is now blocked inside put_object, and the caller
671        // request (including the oneshot receiver) has been dropped. Unpause so
672        // the task can finish writing.
673        lt.resume.notify_one();
674
675        // Wait for the tombstone write to the high-volume backend, which is the
676        // last step of the long-term insert path.
677        let on_put = Arc::clone(&hv.on_put);
678        tokio::time::timeout(Duration::from_secs(5), on_put.notified())
679            .await
680            .expect("timed out waiting for tombstone write");
681
682        // Verify the object was fully written despite the caller being dropped.
683        let id = ObjectId::new(make_context(), "completion-test".into());
684        assert!(lt.inner.contains(&id), "long-term object missing");
685        let (meta, _) = hv.inner.get_stored(&id).expect("tombstone missing");
686        assert!(meta.is_tombstone(), "expected redirect tombstone");
687    }
688
689    // --- Concurrency limit tests ---
690
691    fn make_limited_service(limit: usize) -> (StorageService, GatedBackend, GatedBackend) {
692        let hv = GatedBackend::new("limited-hv").with_pause();
693        let lt = GatedBackend::new("limited-lt");
694        let service = StorageService::from_backends(Box::new(hv.clone()), Box::new(lt.clone()))
695            .with_concurrency_limit(limit);
696        (service, hv, lt)
697    }
698
699    #[tokio::test]
700    async fn at_capacity_rejects() {
701        let (service, hv, _lt) = make_limited_service(1);
702
703        // First insert blocks on the gated backend, holding the single permit.
704        let svc = service.clone();
705        let first = tokio::spawn(async move {
706            svc.insert_object(
707                make_context(),
708                Some("first".into()),
709                Metadata::default(),
710                make_stream(b"data"),
711            )
712            .await
713        });
714
715        // Wait for the backend to signal it has paused (permit is held).
716        hv.paused.notified().await;
717
718        // Second insert should be rejected immediately.
719        let result = service
720            .insert_object(
721                make_context(),
722                Some("second".into()),
723                Metadata::default(),
724                make_stream(b"data"),
725            )
726            .await;
727
728        assert!(
729            matches!(result, Err(Error::AtCapacity)),
730            "expected AtCapacity, got {result:?}"
731        );
732
733        // Unblock the first operation.
734        hv.resume.notify_one();
735        first.await.unwrap().unwrap();
736
737        // Now that the permit is released, a new operation should succeed.
738        service
739            .get_metadata(ObjectId::new(make_context(), "first".into()))
740            .await
741            .unwrap();
742    }
743
744    #[tokio::test]
745    async fn tasks_limit_returns_configured_limit() {
746        let hv = GatedBackend::new("cap-hv");
747        let lt = GatedBackend::new("cap-lt");
748        let service =
749            StorageService::from_backends(Box::new(hv), Box::new(lt)).with_concurrency_limit(7);
750        assert_eq!(service.tasks_limit(), 7);
751    }
752
753    #[tokio::test]
754    async fn tasks_running_tracks_in_flight() {
755        let (service, hv, _lt) = make_limited_service(5);
756
757        assert_eq!(service.tasks_running(), 0);
758
759        // Kick off a request that blocks in the backend, holding a permit.
760        let svc = service.clone();
761        let _blocked = tokio::spawn(async move {
762            svc.insert_object(
763                make_context(),
764                Some("in-use-test".into()),
765                Metadata::default(),
766                make_stream(b"data"),
767            )
768            .await
769        });
770
771        hv.paused.notified().await;
772        assert_eq!(service.tasks_running(), 1);
773
774        hv.resume.notify_one();
775    }
776
777    #[tokio::test]
778    async fn permits_released_after_panic() {
779        let service =
780            StorageService::from_backends(Box::new(PanickingBackend), Box::new(PanickingBackend))
781                .with_concurrency_limit(1);
782
783        // First operation panics — the permit must still be released.
784        let id = ObjectId::new(make_context(), "panic-permit".into());
785        let result = service.get_object(id.clone()).await;
786        assert!(matches!(result, Err(Error::Panic(_))));
787
788        // Second operation should succeed in acquiring the permit (not AtCapacity).
789        let result = service.get_object(id).await;
790        assert!(
791            !matches!(result, Err(Error::AtCapacity)),
792            "permit was not released after panic"
793        );
794    }
795}