1use std::future::Future;
9use std::sync::Arc;
10
11use objectstore_types::metadata::Metadata;
12
13use crate::backend::common::Backend;
14use crate::concurrency::ConcurrencyLimiter;
15use crate::error::{Error, Result};
16use crate::id::{ObjectContext, ObjectId};
17use crate::stream::{ClientStream, PayloadStream};
18use crate::streaming::StreamExecutor;
19
20pub type GetResponse = Option<(Metadata, PayloadStream)>;
22pub type MetadataResponse = Option<Metadata>;
24pub type InsertResponse = ObjectId;
26pub type DeleteResponse = ();
28
29pub const DEFAULT_CONCURRENCY_LIMIT: usize = 500;
34
35#[derive(Clone, Debug)]
69pub struct StorageService {
70 inner: Arc<dyn Backend>,
71 concurrency: ConcurrencyLimiter,
72}
73
74impl StorageService {
75 pub fn new(backend: Box<dyn Backend>) -> Self {
77 Self {
78 inner: Arc::from(backend),
79 concurrency: ConcurrencyLimiter::new(DEFAULT_CONCURRENCY_LIMIT),
80 }
81 }
82
83 pub fn with_concurrency_limit(mut self, max: usize) -> Self {
88 self.concurrency = ConcurrencyLimiter::new(max);
89 self
90 }
91
92 pub fn tasks_available(&self) -> usize {
94 self.concurrency.available_permits()
95 }
96
97 pub fn tasks_running(&self) -> usize {
99 self.concurrency.used_permits()
100 }
101
102 pub fn tasks_limit(&self) -> usize {
104 self.concurrency.total_permits()
105 }
106
107 pub fn stream(&self) -> Result<StreamExecutor> {
114 let available = self.tasks_available();
115 let window = (available as f64 * 0.10).ceil() as usize;
116
117 let acquire_result = match window {
118 0 => Err(Error::AtCapacity),
119 _ => self.concurrency.try_acquire_many(window),
120 };
121 let reservation = acquire_result.inspect_err(|_| {
122 objectstore_metrics::count!("service.concurrency.rejected");
123 objectstore_log::warn!("Request rejected: service at capacity");
124 })?;
125
126 Ok(StreamExecutor {
127 backend: Arc::clone(&self.inner),
128 window,
129 reservation,
130 })
131 }
132
133 pub fn start(&self) {
138 let concurrency = self.concurrency.clone();
139 let limit = concurrency.total_permits();
140 tokio::spawn(async move {
141 concurrency
142 .run_emitter(|permits| async move {
143 objectstore_metrics::gauge!("service.concurrency.in_use" = permits);
144 objectstore_metrics::gauge!("service.concurrency.limit" = limit);
145 })
146 .await;
147 });
148 }
149
150 async fn spawn<T, F>(&self, operation: &'static str, f: F) -> Result<T>
162 where
163 T: Send + 'static,
164 F: Future<Output = Result<T>> + Send + 'static,
165 {
166 let permit = self.concurrency.try_acquire().inspect_err(|_| {
167 objectstore_metrics::count!("service.concurrency.rejected");
168 objectstore_log::warn!("Request rejected: service at capacity");
169 })?;
170
171 crate::concurrency::spawn_metered(operation, permit, f).await
172 }
173
174 pub async fn insert_object(
186 &self,
187 context: ObjectContext,
188 key: Option<String>,
189 metadata: Metadata,
190 stream: ClientStream,
191 ) -> Result<InsertResponse> {
192 let id = ObjectId::optional(context, key);
193 let inner = Arc::clone(&self.inner);
194 self.spawn("insert", async move {
195 inner.put_object(&id, &metadata, stream).await?;
196 Ok(id)
197 })
198 .await
199 }
200
201 pub async fn get_metadata(&self, id: ObjectId) -> Result<MetadataResponse> {
203 let inner = Arc::clone(&self.inner);
204 self.spawn("get_metadata", async move { inner.get_metadata(&id).await })
205 .await
206 }
207
208 pub async fn get_object(&self, id: ObjectId) -> Result<GetResponse> {
210 let inner = Arc::clone(&self.inner);
211 self.spawn("get", async move { inner.get_object(&id).await })
212 .await
213 }
214
215 pub async fn delete_object(&self, id: ObjectId) -> Result<DeleteResponse> {
223 let inner = Arc::clone(&self.inner);
224 self.spawn("delete", async move { inner.delete_object(&id).await })
225 .await
226 }
227
228 pub async fn join(&self) {
234 self.inner.join().await;
235 }
236}
237
238#[cfg(test)]
239mod tests {
240 use std::sync::Arc;
241 use std::time::Duration;
242
243 use bytes::BytesMut;
244 use futures_util::TryStreamExt;
245 use objectstore_types::metadata::Metadata;
246 use objectstore_types::scope::{Scope, Scopes};
247
248 use super::*;
249 use crate::backend::bigtable::{BigTableBackend, BigTableConfig};
250 use crate::backend::changelog::NoopChangeLog;
251 use crate::backend::common::{HighVolumeBackend, PutResponse, TieredWrite};
252 use crate::backend::gcs::{GcsBackend, GcsConfig};
253 use crate::backend::in_memory::InMemoryBackend;
254 use crate::backend::testing::{Hooks, TestBackend};
255 use crate::backend::tiered::TieredStorage;
256 use crate::error::Error;
257 use crate::stream::{self, ClientStream};
258
259 fn make_context() -> ObjectContext {
260 ObjectContext {
261 usecase: "testing".into(),
262 scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
263 }
264 }
265
266 fn make_service() -> StorageService {
267 StorageService::new(Box::new(InMemoryBackend::new("in-memory")))
268 }
269
270 #[tokio::test]
271 async fn insert_without_key_generates_unique_id() {
272 let service = make_service();
273
274 let id = service
275 .insert_object(
276 make_context(),
277 None,
278 Default::default(),
279 stream::single("auto-keyed"),
280 )
281 .await
282 .unwrap();
283
284 assert!(uuid::Uuid::parse_str(id.key()).is_ok());
285 }
286
287 #[tokio::test]
288 async fn stores_files() {
289 let service = make_service();
290
291 let key = service
292 .insert_object(
293 make_context(),
294 Some("testing".into()),
295 Default::default(),
296 stream::single("oh hai!"),
297 )
298 .await
299 .unwrap();
300
301 let (_metadata, stream) = service.get_object(key).await.unwrap().unwrap();
302 let file_contents: BytesMut = stream.try_collect().await.unwrap();
303
304 assert_eq!(file_contents.as_ref(), b"oh hai!");
305 }
306
307 #[tokio::test]
308 async fn works_with_gcs() {
309 let config = GcsConfig {
310 endpoint: Some("http://localhost:8087".into()),
311 bucket: "test-bucket".into(), };
313
314 let backend = GcsBackend::new(config).await.unwrap();
315 let service = StorageService::new(Box::new(backend));
316
317 let key = service
318 .insert_object(
319 make_context(),
320 Some("testing".into()),
321 Default::default(),
322 stream::single("oh hai!"),
323 )
324 .await
325 .unwrap();
326
327 let (_metadata, stream) = service.get_object(key).await.unwrap().unwrap();
328 let file_contents: BytesMut = stream.try_collect().await.unwrap();
329
330 assert_eq!(file_contents.as_ref(), b"oh hai!");
331 }
332
333 #[tokio::test]
334 async fn tombstone_redirect_and_delete() {
335 let bigtable_config = BigTableConfig {
336 endpoint: Some("localhost:8086".into()),
337 project_id: "testing".into(),
338 instance_name: "objectstore".into(),
339 table_name: "objectstore".into(),
340 connections: None,
341 };
342 let gcs_config = GcsConfig {
343 endpoint: Some("http://localhost:8087".into()),
344 bucket: "test-bucket".into(),
345 };
346
347 let high_volume = Box::new(BigTableBackend::new(bigtable_config).await.unwrap());
348 let long_term = Box::new(GcsBackend::new(gcs_config.clone()).await.unwrap());
349 let backend = TieredStorage::new(high_volume, long_term, Box::new(NoopChangeLog));
350 let service = StorageService::new(Box::new(backend));
351
352 let gcs_backend = GcsBackend::new(gcs_config.clone()).await.unwrap();
354
355 let payload_len = 2 * 1024 * 1024;
358 let payload = vec![0xAB; payload_len]; let id = service
360 .insert_object(
361 make_context(),
362 Some("delete-cleanup-test".into()),
363 Default::default(),
364 stream::single(payload),
365 )
366 .await
367 .unwrap();
368
369 let (_, stream) = service.get_object(id.clone()).await.unwrap().unwrap();
371 let body: BytesMut = stream.try_collect().await.unwrap();
372 assert_eq!(body.len(), payload_len);
373
374 service.delete_object(id.clone()).await.unwrap();
376
377 let after_delete = service.get_object(id.clone()).await.unwrap();
379 assert!(after_delete.is_none(), "tombstone not deleted");
380
381 let orphan = gcs_backend.get_object(&id).await.unwrap();
383 assert!(orphan.is_none(), "object leaked");
384 }
385
386 #[tokio::test]
389 async fn basic_spawn_insert_and_get() {
390 let service = make_service();
391
392 let id = service
393 .insert_object(
394 make_context(),
395 Some("test-key".into()),
396 Metadata::default(),
397 stream::single("hello world"),
398 )
399 .await
400 .unwrap();
401
402 let (_, stream) = service.get_object(id).await.unwrap().unwrap();
403 let body: BytesMut = stream.try_collect().await.unwrap();
404 assert_eq!(body.as_ref(), b"hello world");
405 }
406
407 #[tokio::test]
408 async fn basic_spawn_metadata_and_delete() {
409 let service = make_service();
410
411 let id = service
412 .insert_object(
413 make_context(),
414 Some("meta-key".into()),
415 Metadata::default(),
416 stream::single("data"),
417 )
418 .await
419 .unwrap();
420
421 let metadata = service.get_metadata(id.clone()).await.unwrap();
422 assert!(metadata.is_some());
423
424 service.delete_object(id.clone()).await.unwrap();
425
426 let after = service.get_object(id).await.unwrap();
427 assert!(after.is_none());
428 }
429
430 #[derive(Debug)]
431 struct PanicOnGet;
432
433 #[async_trait::async_trait]
434 impl Hooks for PanicOnGet {
435 async fn get_object(
436 &self,
437 _inner: &InMemoryBackend,
438 _id: &ObjectId,
439 ) -> Result<GetResponse> {
440 panic!("intentional panic in get_object");
441 }
442 }
443
444 #[tokio::test]
445 async fn panic_in_backend_returns_task_failed() {
446 let service = StorageService::new(Box::new(TestBackend::new(PanicOnGet)));
447
448 let id = ObjectId::new(make_context(), "panic-test".into());
449 let result = service.get_object(id).await;
450
451 let Err(Error::Panic(msg)) = result else {
452 panic!("expected Panic error");
453 };
454 assert!(msg.contains("intentional panic in get_object"), "{msg}");
455 }
456
457 #[derive(Clone, Debug, Default)]
461 struct GateOnPut {
462 pause: bool,
463 paused: Arc<tokio::sync::Notify>,
464 resume: Arc<tokio::sync::Notify>,
465 on_put: Arc<tokio::sync::Notify>,
466 }
467
468 impl GateOnPut {
469 fn with_pause() -> Self {
470 Self {
471 pause: true,
472 ..Default::default()
473 }
474 }
475 }
476
477 #[async_trait::async_trait]
478 impl Hooks for GateOnPut {
479 async fn put_object(
480 &self,
481 inner: &InMemoryBackend,
482 id: &ObjectId,
483 metadata: &Metadata,
484 stream: ClientStream,
485 ) -> Result<PutResponse> {
486 if self.pause {
487 self.paused.notify_one();
488 self.resume.notified().await;
489 }
490 inner.put_object(id, metadata, stream).await?;
491 self.on_put.notify_one();
492 Ok(())
493 }
494
495 async fn compare_and_write(
496 &self,
497 inner: &InMemoryBackend,
498 id: &ObjectId,
499 current: Option<&ObjectId>,
500 write: TieredWrite,
501 ) -> Result<bool> {
502 let notify = matches!(write, TieredWrite::Tombstone(_) | TieredWrite::Object(_, _));
503 let result = inner.compare_and_write(id, current, write).await?;
504 if notify {
505 self.on_put.notify_one();
506 }
507 Ok(result)
508 }
509 }
510
511 #[tokio::test]
512 async fn receiver_drop_does_not_prevent_completion() {
513 let hv = Box::new(TestBackend::new(GateOnPut::default()));
514 let lt = Box::new(TestBackend::new(GateOnPut::with_pause()));
515 let backend = TieredStorage::new(hv.clone(), lt.clone(), Box::new(NoopChangeLog));
516 let service = StorageService::new(Box::new(backend));
517
518 let payload = vec![0xABu8; 2 * 1024 * 1024]; let request = service.insert_object(
520 make_context(),
521 Some("completion-test".into()),
522 Metadata::default(),
523 stream::single(payload),
524 );
525
526 let paused = Arc::clone(<.hooks.paused);
529 tokio::select! {
530 _ = request => panic!("insert should not complete while backend is paused"),
531 _ = paused.notified() => {}
532 }
533
534 lt.hooks.resume.notify_one();
538
539 let on_put = Arc::clone(&hv.hooks.on_put);
542 tokio::time::timeout(Duration::from_secs(5), on_put.notified())
543 .await
544 .expect("timed out waiting for tombstone write");
545
546 let id = ObjectId::new(make_context(), "completion-test".into());
549 let tombstone = hv.inner.get(&id).expect_tombstone();
550 let lt_id = tombstone.target;
551 assert!(lt.inner.contains(<_id), "long-term object missing");
552 }
553
554 fn make_limited_service(limit: usize) -> (StorageService, TestBackend<GateOnPut>) {
557 let backend = TestBackend::new(GateOnPut::with_pause());
558 let service = StorageService::new(Box::new(backend.clone())).with_concurrency_limit(limit);
559 (service, backend)
560 }
561
562 #[tokio::test]
563 async fn at_capacity_rejects() {
564 let (service, hv) = make_limited_service(1);
565
566 let svc = service.clone();
568 let first = tokio::spawn(async move {
569 svc.insert_object(
570 make_context(),
571 Some("first".into()),
572 Metadata::default(),
573 stream::single("data"),
574 )
575 .await
576 });
577
578 hv.hooks.paused.notified().await;
580
581 let result = service
583 .insert_object(
584 make_context(),
585 Some("second".into()),
586 Metadata::default(),
587 stream::single("data"),
588 )
589 .await;
590
591 assert!(
592 matches!(result, Err(Error::AtCapacity)),
593 "expected AtCapacity, got {result:?}"
594 );
595
596 hv.hooks.resume.notify_one();
598 first.await.unwrap().unwrap();
599
600 service
602 .get_metadata(ObjectId::new(make_context(), "first".into()))
603 .await
604 .unwrap();
605 }
606
607 #[tokio::test]
608 async fn tasks_limit_returns_configured_limit() {
609 let backend = Box::new(InMemoryBackend::new("cap"));
610 let service = StorageService::new(backend).with_concurrency_limit(7);
611 assert_eq!(service.tasks_limit(), 7);
612 }
613
614 #[tokio::test]
615 async fn tasks_running_tracks_in_flight() {
616 let (service, hv) = make_limited_service(5);
617
618 assert_eq!(service.tasks_running(), 0);
619
620 let svc = service.clone();
622 let _blocked = tokio::spawn(async move {
623 svc.insert_object(
624 make_context(),
625 Some("in-use-test".into()),
626 Metadata::default(),
627 stream::single("data"),
628 )
629 .await
630 });
631
632 hv.hooks.paused.notified().await;
633 assert_eq!(service.tasks_running(), 1);
634
635 hv.hooks.resume.notify_one();
636 }
637
638 #[tokio::test]
639 async fn permits_released_after_panic() {
640 let service =
641 StorageService::new(Box::new(TestBackend::new(PanicOnGet))).with_concurrency_limit(1);
642
643 let id = ObjectId::new(make_context(), "panic-permit".into());
645 let result = service.get_object(id.clone()).await;
646 assert!(matches!(result, Err(Error::Panic(_))));
647
648 let result = service.get_object(id).await;
650 assert!(
651 !matches!(result, Err(Error::AtCapacity)),
652 "permit was not released after panic"
653 );
654 }
655}