1use std::future::Future;
9use std::sync::Arc;
10
11use objectstore_types::metadata::Metadata;
12use objectstore_types::range::{ByteRange, ContentRange};
13
14use crate::backend::common::Backend;
15use crate::backend::counting::CountingBackend;
16use crate::concurrency::ConcurrencyLimiter;
17use crate::error::{Error, Result};
18use crate::id::{ObjectContext, ObjectId};
19use crate::multipart::{
20 AbortMultipartResponse, CompleteMultipartResponse, CompletedPart, InitiateMultipartResponse,
21 ListPartsResponse, PartNumber, UploadId, UploadPartResponse,
22};
23use crate::stream::{ClientStream, PayloadStream};
24use crate::streaming::StreamExecutor;
25
26pub type GetResponse = Option<(Metadata, Option<ContentRange>, PayloadStream)>;
28pub type MetadataResponse = Option<Metadata>;
30pub type InsertResponse = ObjectId;
32pub type DeleteResponse = ();
34
35pub const DEFAULT_CONCURRENCY_LIMIT: usize = 500;
40
41#[derive(Clone, Debug)]
75pub struct StorageService {
76 inner: Arc<dyn Backend>,
77 concurrency: ConcurrencyLimiter,
78}
79
80impl StorageService {
81 pub fn new(backend: Box<dyn Backend>) -> Self {
88 Self {
89 inner: Arc::new(CountingBackend::new(backend)),
90 concurrency: ConcurrencyLimiter::new(DEFAULT_CONCURRENCY_LIMIT),
91 }
92 }
93
94 pub fn with_concurrency_limit(mut self, max: usize) -> Self {
99 self.concurrency = ConcurrencyLimiter::new(max);
100 self
101 }
102
103 pub fn tasks_available(&self) -> usize {
105 self.concurrency.available_permits()
106 }
107
108 pub fn tasks_running(&self) -> usize {
110 self.concurrency.used_permits()
111 }
112
113 pub fn tasks_limit(&self) -> usize {
115 self.concurrency.total_permits()
116 }
117
118 pub fn stream(&self) -> Result<StreamExecutor> {
125 let available = self.tasks_available();
126 let window = (available as f64 * 0.10).ceil() as usize;
127
128 let acquire_result = match window {
129 0 => Err(Error::AtCapacity),
130 _ => self.concurrency.try_acquire_many(window),
131 };
132 let reservation = acquire_result.inspect_err(|_| {
133 objectstore_metrics::count!("service.concurrency.rejected");
134 objectstore_log::warn!("Request rejected: service at capacity");
135 })?;
136
137 Ok(StreamExecutor {
138 backend: Arc::clone(&self.inner),
139 window,
140 reservation,
141 })
142 }
143
144 pub fn start(&self) {
149 let concurrency = self.concurrency.clone();
150 let limit = concurrency.total_permits();
151 tokio::spawn(async move {
152 concurrency
153 .run_emitter(|permits| async move {
154 objectstore_metrics::gauge!("service.concurrency.in_use" = permits);
155 objectstore_metrics::gauge!("service.concurrency.limit" = limit);
156 })
157 .await;
158 });
159 }
160
161 async fn spawn<T, F>(&self, operation: &'static str, f: F) -> Result<T>
173 where
174 T: Send + 'static,
175 F: Future<Output = Result<T>> + Send + 'static,
176 {
177 let permit = self.concurrency.try_acquire().inspect_err(|_| {
178 objectstore_metrics::count!("service.concurrency.rejected");
179 objectstore_log::warn!("Request rejected: service at capacity");
180 })?;
181
182 crate::concurrency::spawn_metered(operation, permit, f).await
183 }
184
185 pub async fn insert_object(
197 &self,
198 context: ObjectContext,
199 key: Option<String>,
200 metadata: Metadata,
201 stream: ClientStream,
202 ) -> Result<InsertResponse> {
203 let id = ObjectId::optional(context, key);
204 let inner = Arc::clone(&self.inner);
205 self.spawn("insert", async move {
206 inner.put_object(&id, &metadata, stream).await?;
207 Ok(id)
208 })
209 .await
210 }
211
212 pub async fn get_metadata(&self, id: ObjectId) -> Result<MetadataResponse> {
214 let inner = Arc::clone(&self.inner);
215 self.spawn("get_metadata", async move { inner.get_metadata(&id).await })
216 .await
217 }
218
219 pub async fn get_object(&self, id: ObjectId, range: Option<ByteRange>) -> Result<GetResponse> {
221 let inner = Arc::clone(&self.inner);
222 self.spawn("get", async move { inner.get_object(&id, range).await })
223 .await
224 }
225
226 pub async fn delete_object(&self, id: ObjectId) -> Result<DeleteResponse> {
234 let inner = Arc::clone(&self.inner);
235 self.spawn("delete", async move { inner.delete_object(&id).await })
236 .await
237 }
238
239 pub async fn join(&self) {
245 self.inner.join().await;
246 }
247
248 pub async fn initiate_multipart(
252 &self,
253 id: ObjectId,
254 metadata: Metadata,
255 ) -> Result<InitiateMultipartResponse> {
256 self.inner.as_multipart_upload_backend()?; let inner = self.inner.clone();
258 self.spawn("initiate_multipart", async move {
259 inner
260 .as_multipart_upload_backend()?
261 .initiate_multipart(&id, &metadata)
262 .await
263 })
264 .await
265 }
266
267 pub async fn upload_part(
276 &self,
277 id: ObjectId,
278 upload_id: UploadId,
279 part_number: PartNumber,
280 content_length: u64,
281 content_md5: Option<String>,
282 body: ClientStream,
283 ) -> Result<UploadPartResponse> {
284 self.inner.as_multipart_upload_backend()?; let inner = self.inner.clone();
286 self.spawn("upload_part", async move {
287 inner
288 .as_multipart_upload_backend()?
289 .upload_part(
290 &id,
291 &upload_id,
292 part_number,
293 content_length,
294 content_md5.as_deref(),
295 body,
296 )
297 .await
298 })
299 .await
300 }
301
302 pub async fn list_parts(
304 &self,
305 id: ObjectId,
306 upload_id: UploadId,
307 max_parts: Option<u32>,
308 part_number_marker: Option<PartNumber>,
309 ) -> Result<ListPartsResponse> {
310 self.inner.as_multipart_upload_backend()?; let inner = self.inner.clone();
312 self.spawn("list_parts", async move {
313 inner
314 .as_multipart_upload_backend()?
315 .list_parts(&id, &upload_id, max_parts, part_number_marker)
316 .await
317 })
318 .await
319 }
320
321 pub async fn abort_multipart(
323 &self,
324 id: ObjectId,
325 upload_id: UploadId,
326 ) -> Result<AbortMultipartResponse> {
327 self.inner.as_multipart_upload_backend()?; let inner = self.inner.clone();
329 self.spawn("abort_multipart", async move {
330 inner
331 .as_multipart_upload_backend()?
332 .abort_multipart(&id, &upload_id)
333 .await
334 })
335 .await
336 }
337
338 pub async fn complete_multipart(
340 &self,
341 id: ObjectId,
342 upload_id: UploadId,
343 parts: Vec<CompletedPart>,
344 ) -> Result<CompleteMultipartResponse> {
345 self.inner.as_multipart_upload_backend()?; let inner = self.inner.clone();
347 self.spawn("complete_multipart", async move {
348 inner
349 .as_multipart_upload_backend()?
350 .complete_multipart(&id, &upload_id, parts)
351 .await
352 })
353 .await
354 }
355}
356
357#[cfg(test)]
358mod tests {
359 use std::sync::Arc;
360 use std::time::Duration;
361
362 use bytes::BytesMut;
363 use futures_util::TryStreamExt;
364 use objectstore_types::metadata::Metadata;
365 use objectstore_types::range::ByteRange;
366 use objectstore_types::scope::{Scope, Scopes};
367
368 use super::*;
369 use crate::backend::bigtable::{BigTableBackend, BigTableConfig};
370 use crate::backend::changelog::NoopChangeLog;
371 use crate::backend::common::{HighVolumeBackend, PutResponse, TieredWrite};
372 use crate::backend::gcs::{GcsBackend, GcsConfig};
373 use crate::backend::in_memory::InMemoryBackend;
374 use crate::backend::testing::{Hooks, TestBackend};
375 use crate::backend::tiered::TieredStorage;
376 use crate::error::Error;
377 use crate::stream::{self, ClientStream};
378
379 fn make_context() -> ObjectContext {
380 ObjectContext {
381 usecase: "testing".into(),
382 scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
383 }
384 }
385
386 fn make_service() -> StorageService {
387 StorageService::new(Box::new(InMemoryBackend::new("in-memory")))
388 }
389
390 #[tokio::test]
391 async fn insert_without_key_generates_unique_id() {
392 let service = make_service();
393
394 let id = service
395 .insert_object(
396 make_context(),
397 None,
398 Metadata::default(),
399 stream::single("auto-keyed"),
400 )
401 .await
402 .unwrap();
403
404 assert!(uuid::Uuid::parse_str(id.key()).is_ok());
405 }
406
407 #[tokio::test]
408 async fn stores_files() {
409 let service = make_service();
410
411 let key = service
412 .insert_object(
413 make_context(),
414 Some("testing".into()),
415 Metadata::default(),
416 stream::single("oh hai!"),
417 )
418 .await
419 .unwrap();
420
421 let (_metadata, _, stream) = service.get_object(key, None).await.unwrap().unwrap();
422 let file_contents: BytesMut = stream.try_collect().await.unwrap();
423
424 assert_eq!(file_contents.as_ref(), b"oh hai!");
425 }
426
427 #[tokio::test]
428 async fn works_with_gcs() {
429 let config = GcsConfig {
430 endpoint: Some("http://localhost:8087".into()),
431 bucket: "test-bucket".into(), };
433
434 let backend = GcsBackend::new(config).await.unwrap();
435 let service = StorageService::new(Box::new(backend));
436
437 let key = service
438 .insert_object(
439 make_context(),
440 Some("testing".into()),
441 Metadata::default(),
442 stream::single("oh hai!"),
443 )
444 .await
445 .unwrap();
446
447 let (_metadata, _, stream) = service.get_object(key, None).await.unwrap().unwrap();
448 let file_contents: BytesMut = stream.try_collect().await.unwrap();
449
450 assert_eq!(file_contents.as_ref(), b"oh hai!");
451 }
452
453 #[tokio::test]
454 async fn tombstone_redirect_and_delete() {
455 let bigtable_config = BigTableConfig {
456 endpoint: Some("localhost:8086".into()),
457 project_id: "testing".into(),
458 instance_name: "objectstore".into(),
459 table_name: "objectstore".into(),
460 connections: None,
461 };
462 let gcs_config = GcsConfig {
463 endpoint: Some("http://localhost:8087".into()),
464 bucket: "test-bucket".into(),
465 };
466
467 let high_volume = Box::new(BigTableBackend::new(bigtable_config).await.unwrap());
468 let long_term = Box::new(GcsBackend::new(gcs_config.clone()).await.unwrap());
469 let backend = TieredStorage::new(high_volume, long_term, Box::new(NoopChangeLog));
470 let service = StorageService::new(Box::new(backend));
471
472 let gcs_backend = GcsBackend::new(gcs_config.clone()).await.unwrap();
474
475 let payload_len = 2 * 1024 * 1024;
478 let payload = vec![0xAB; payload_len]; let id = service
480 .insert_object(
481 make_context(),
482 Some("delete-cleanup-test".into()),
483 Metadata::default(),
484 stream::single(payload),
485 )
486 .await
487 .unwrap();
488
489 let (_, _, stream) = service.get_object(id.clone(), None).await.unwrap().unwrap();
491 let body: BytesMut = stream.try_collect().await.unwrap();
492 assert_eq!(body.len(), payload_len);
493
494 service.delete_object(id.clone()).await.unwrap();
496
497 let after_delete = service.get_object(id.clone(), None).await.unwrap();
499 assert!(after_delete.is_none(), "tombstone not deleted");
500
501 let orphan = gcs_backend.get_object(&id, None).await.unwrap();
503 assert!(orphan.is_none(), "object leaked");
504 }
505
506 #[tokio::test]
509 async fn basic_spawn_insert_and_get() {
510 let service = make_service();
511
512 let id = service
513 .insert_object(
514 make_context(),
515 Some("test-key".into()),
516 Metadata::default(),
517 stream::single("hello world"),
518 )
519 .await
520 .unwrap();
521
522 let (_, _, stream) = service.get_object(id, None).await.unwrap().unwrap();
523 let body: BytesMut = stream.try_collect().await.unwrap();
524 assert_eq!(body.as_ref(), b"hello world");
525 }
526
527 #[tokio::test]
528 async fn basic_spawn_metadata_and_delete() {
529 let service = make_service();
530
531 let id = service
532 .insert_object(
533 make_context(),
534 Some("meta-key".into()),
535 Metadata::default(),
536 stream::single("data"),
537 )
538 .await
539 .unwrap();
540
541 let metadata = service.get_metadata(id.clone()).await.unwrap();
542 assert!(metadata.is_some());
543
544 service.delete_object(id.clone()).await.unwrap();
545
546 let after = service.get_object(id, None).await.unwrap();
547 assert!(after.is_none());
548 }
549
550 #[derive(Debug)]
551 struct PanicOnGet;
552
553 #[async_trait::async_trait]
554 impl Hooks for PanicOnGet {
555 async fn get_object(
556 &self,
557 _inner: &InMemoryBackend,
558 _id: &ObjectId,
559 _range: Option<ByteRange>,
560 ) -> Result<GetResponse> {
561 panic!("intentional panic in get_object");
562 }
563 }
564
565 #[tokio::test]
566 async fn panic_in_backend_returns_task_failed() {
567 let service = StorageService::new(Box::new(TestBackend::new(PanicOnGet)));
568
569 let id = ObjectId::new(make_context(), "panic-test".into());
570 let result = service.get_object(id, None).await;
571
572 let Err(Error::Panic(msg)) = result else {
573 panic!("expected Panic error");
574 };
575 assert!(msg.contains("intentional panic in get_object"), "{msg}");
576 }
577
578 #[derive(Clone, Debug, Default)]
582 struct GateOnPut {
583 pause: bool,
584 paused: Arc<tokio::sync::Notify>,
585 resume: Arc<tokio::sync::Notify>,
586 on_put: Arc<tokio::sync::Notify>,
587 }
588
589 impl GateOnPut {
590 fn with_pause() -> Self {
591 Self {
592 pause: true,
593 ..Default::default()
594 }
595 }
596 }
597
598 #[async_trait::async_trait]
599 impl Hooks for GateOnPut {
600 async fn put_object(
601 &self,
602 inner: &InMemoryBackend,
603 id: &ObjectId,
604 metadata: &Metadata,
605 stream: ClientStream,
606 ) -> Result<PutResponse> {
607 if self.pause {
608 self.paused.notify_one();
609 self.resume.notified().await;
610 }
611 inner.put_object(id, metadata, stream).await?;
612 self.on_put.notify_one();
613 Ok(())
614 }
615
616 async fn compare_and_write(
617 &self,
618 inner: &InMemoryBackend,
619 id: &ObjectId,
620 current: Option<&ObjectId>,
621 write: TieredWrite,
622 ) -> Result<bool> {
623 let notify = matches!(write, TieredWrite::Tombstone(_) | TieredWrite::Object(_, _));
624 let result = inner.compare_and_write(id, current, write).await?;
625 if notify {
626 self.on_put.notify_one();
627 }
628 Ok(result)
629 }
630 }
631
632 #[tokio::test]
633 async fn receiver_drop_does_not_prevent_completion() {
634 let hv = Box::new(TestBackend::new(GateOnPut::default()));
635 let lt = Box::new(TestBackend::new(GateOnPut::with_pause()));
636 let backend = TieredStorage::new(hv.clone(), lt.clone(), Box::new(NoopChangeLog));
637 let service = StorageService::new(Box::new(backend));
638
639 let payload = vec![0xABu8; 2 * 1024 * 1024]; let request = service.insert_object(
641 make_context(),
642 Some("completion-test".into()),
643 Metadata::default(),
644 stream::single(payload),
645 );
646
647 let paused = Arc::clone(<.hooks.paused);
650 tokio::select! {
651 _ = request => panic!("insert should not complete while backend is paused"),
652 _ = paused.notified() => {}
653 }
654
655 lt.hooks.resume.notify_one();
659
660 let on_put = Arc::clone(&hv.hooks.on_put);
663 tokio::time::timeout(Duration::from_secs(5), on_put.notified())
664 .await
665 .expect("timed out waiting for tombstone write");
666
667 let id = ObjectId::new(make_context(), "completion-test".into());
670 let tombstone = hv.inner.get(&id).expect_tombstone();
671 let lt_id = tombstone.target;
672 assert!(lt.inner.contains(<_id), "long-term object missing");
673 }
674
675 fn make_limited_service(limit: usize) -> (StorageService, TestBackend<GateOnPut>) {
678 let backend = TestBackend::new(GateOnPut::with_pause());
679 let service = StorageService::new(Box::new(backend.clone())).with_concurrency_limit(limit);
680 (service, backend)
681 }
682
683 #[tokio::test]
684 async fn at_capacity_rejects() {
685 let (service, hv) = make_limited_service(1);
686
687 let svc = service.clone();
689 let first = tokio::spawn(async move {
690 svc.insert_object(
691 make_context(),
692 Some("first".into()),
693 Metadata::default(),
694 stream::single("data"),
695 )
696 .await
697 });
698
699 hv.hooks.paused.notified().await;
701
702 let result = service
704 .insert_object(
705 make_context(),
706 Some("second".into()),
707 Metadata::default(),
708 stream::single("data"),
709 )
710 .await;
711
712 assert!(
713 matches!(result, Err(Error::AtCapacity)),
714 "expected AtCapacity, got {result:?}"
715 );
716
717 hv.hooks.resume.notify_one();
719 first.await.unwrap().unwrap();
720
721 service
723 .get_metadata(ObjectId::new(make_context(), "first".into()))
724 .await
725 .unwrap();
726 }
727
728 #[tokio::test]
729 async fn tasks_limit_returns_configured_limit() {
730 let backend = Box::new(InMemoryBackend::new("cap"));
731 let service = StorageService::new(backend).with_concurrency_limit(7);
732 assert_eq!(service.tasks_limit(), 7);
733 }
734
735 #[tokio::test]
736 async fn tasks_running_tracks_in_flight() {
737 let (service, hv) = make_limited_service(5);
738
739 assert_eq!(service.tasks_running(), 0);
740
741 let svc = service.clone();
743 let _blocked = tokio::spawn(async move {
744 svc.insert_object(
745 make_context(),
746 Some("in-use-test".into()),
747 Metadata::default(),
748 stream::single("data"),
749 )
750 .await
751 });
752
753 hv.hooks.paused.notified().await;
754 assert_eq!(service.tasks_running(), 1);
755
756 hv.hooks.resume.notify_one();
757 }
758
759 #[tokio::test]
760 async fn permits_released_after_panic() {
761 let service =
762 StorageService::new(Box::new(TestBackend::new(PanicOnGet))).with_concurrency_limit(1);
763
764 let id = ObjectId::new(make_context(), "panic-permit".into());
766 let result = service.get_object(id.clone(), None).await;
767 assert!(matches!(result, Err(Error::Panic(_))));
768
769 let result = service.get_object(id, None).await;
771 assert!(
772 !matches!(result, Err(Error::AtCapacity)),
773 "permit was not released after panic"
774 );
775 }
776}