1use std::future::Future;
9use std::sync::Arc;
10
11use objectstore_types::metadata::Metadata;
12
13use crate::backend::common::Backend;
14use crate::concurrency::ConcurrencyLimiter;
15use crate::error::{Error, Result};
16use crate::id::{ObjectContext, ObjectId};
17use crate::multipart::{
18 AbortMultipartResponse, CompleteMultipartResponse, CompletedPart, InitiateMultipartResponse,
19 ListPartsResponse, PartNumber, UploadId, UploadPartResponse,
20};
21use crate::stream::{ClientStream, PayloadStream};
22use crate::streaming::StreamExecutor;
23
24pub type GetResponse = Option<(Metadata, PayloadStream)>;
26pub type MetadataResponse = Option<Metadata>;
28pub type InsertResponse = ObjectId;
30pub type DeleteResponse = ();
32
33pub const DEFAULT_CONCURRENCY_LIMIT: usize = 500;
38
39#[derive(Clone, Debug)]
73pub struct StorageService {
74 inner: Arc<dyn Backend>,
75 concurrency: ConcurrencyLimiter,
76}
77
78impl StorageService {
79 pub fn new(backend: Box<dyn Backend>) -> Self {
81 Self {
82 inner: Arc::from(backend),
83 concurrency: ConcurrencyLimiter::new(DEFAULT_CONCURRENCY_LIMIT),
84 }
85 }
86
87 pub fn with_concurrency_limit(mut self, max: usize) -> Self {
92 self.concurrency = ConcurrencyLimiter::new(max);
93 self
94 }
95
96 pub fn tasks_available(&self) -> usize {
98 self.concurrency.available_permits()
99 }
100
101 pub fn tasks_running(&self) -> usize {
103 self.concurrency.used_permits()
104 }
105
106 pub fn tasks_limit(&self) -> usize {
108 self.concurrency.total_permits()
109 }
110
111 pub fn stream(&self) -> Result<StreamExecutor> {
118 let available = self.tasks_available();
119 let window = (available as f64 * 0.10).ceil() as usize;
120
121 let acquire_result = match window {
122 0 => Err(Error::AtCapacity),
123 _ => self.concurrency.try_acquire_many(window),
124 };
125 let reservation = acquire_result.inspect_err(|_| {
126 objectstore_metrics::count!("service.concurrency.rejected");
127 objectstore_log::warn!("Request rejected: service at capacity");
128 })?;
129
130 Ok(StreamExecutor {
131 backend: Arc::clone(&self.inner),
132 window,
133 reservation,
134 })
135 }
136
137 pub fn start(&self) {
142 let concurrency = self.concurrency.clone();
143 let limit = concurrency.total_permits();
144 tokio::spawn(async move {
145 concurrency
146 .run_emitter(|permits| async move {
147 objectstore_metrics::gauge!("service.concurrency.in_use" = permits);
148 objectstore_metrics::gauge!("service.concurrency.limit" = limit);
149 })
150 .await;
151 });
152 }
153
154 async fn spawn<T, F>(&self, operation: &'static str, f: F) -> Result<T>
166 where
167 T: Send + 'static,
168 F: Future<Output = Result<T>> + Send + 'static,
169 {
170 let permit = self.concurrency.try_acquire().inspect_err(|_| {
171 objectstore_metrics::count!("service.concurrency.rejected");
172 objectstore_log::warn!("Request rejected: service at capacity");
173 })?;
174
175 crate::concurrency::spawn_metered(operation, permit, f).await
176 }
177
178 pub async fn insert_object(
190 &self,
191 context: ObjectContext,
192 key: Option<String>,
193 metadata: Metadata,
194 stream: ClientStream,
195 ) -> Result<InsertResponse> {
196 let id = ObjectId::optional(context, key);
197 let inner = Arc::clone(&self.inner);
198 self.spawn("insert", async move {
199 inner.put_object(&id, &metadata, stream).await?;
200 Ok(id)
201 })
202 .await
203 }
204
205 pub async fn get_metadata(&self, id: ObjectId) -> Result<MetadataResponse> {
207 let inner = Arc::clone(&self.inner);
208 self.spawn("get_metadata", async move { inner.get_metadata(&id).await })
209 .await
210 }
211
212 pub async fn get_object(&self, id: ObjectId) -> Result<GetResponse> {
214 let inner = Arc::clone(&self.inner);
215 self.spawn("get", async move { inner.get_object(&id).await })
216 .await
217 }
218
219 pub async fn delete_object(&self, id: ObjectId) -> Result<DeleteResponse> {
227 let inner = Arc::clone(&self.inner);
228 self.spawn("delete", async move { inner.delete_object(&id).await })
229 .await
230 }
231
232 pub async fn join(&self) {
238 self.inner.join().await;
239 }
240
241 pub async fn initiate_multipart(
245 &self,
246 id: ObjectId,
247 metadata: Metadata,
248 ) -> Result<InitiateMultipartResponse> {
249 let inner = self.inner.clone().as_multipart_upload_backend()?;
250 self.spawn("initiate_multipart", async move {
251 inner.initiate_multipart(&id, &metadata).await
252 })
253 .await
254 }
255
256 pub async fn upload_part(
265 &self,
266 id: ObjectId,
267 upload_id: UploadId,
268 part_number: PartNumber,
269 content_length: u64,
270 content_md5: Option<String>,
271 body: ClientStream,
272 ) -> Result<UploadPartResponse> {
273 let inner = self.inner.clone().as_multipart_upload_backend()?;
274 self.spawn("upload_part", async move {
275 inner
276 .upload_part(
277 &id,
278 &upload_id,
279 part_number,
280 content_length,
281 content_md5.as_deref(),
282 body,
283 )
284 .await
285 })
286 .await
287 }
288
289 pub async fn list_parts(
291 &self,
292 id: ObjectId,
293 upload_id: UploadId,
294 max_parts: Option<u32>,
295 part_number_marker: Option<PartNumber>,
296 ) -> Result<ListPartsResponse> {
297 let inner = self.inner.clone().as_multipart_upload_backend()?;
298 self.spawn("list_parts", async move {
299 inner
300 .list_parts(&id, &upload_id, max_parts, part_number_marker)
301 .await
302 })
303 .await
304 }
305
306 pub async fn abort_multipart(
308 &self,
309 id: ObjectId,
310 upload_id: UploadId,
311 ) -> Result<AbortMultipartResponse> {
312 let inner = self.inner.clone().as_multipart_upload_backend()?;
313 self.spawn("abort_multipart", async move {
314 inner.abort_multipart(&id, &upload_id).await
315 })
316 .await
317 }
318
319 pub async fn complete_multipart(
321 &self,
322 id: ObjectId,
323 upload_id: UploadId,
324 parts: Vec<CompletedPart>,
325 ) -> Result<CompleteMultipartResponse> {
326 let inner = self.inner.clone().as_multipart_upload_backend()?;
327 self.spawn("complete_multipart", async move {
328 inner.complete_multipart(&id, &upload_id, parts).await
329 })
330 .await
331 }
332}
333
334#[cfg(test)]
335mod tests {
336 use std::sync::Arc;
337 use std::time::Duration;
338
339 use bytes::BytesMut;
340 use futures_util::TryStreamExt;
341 use objectstore_types::metadata::Metadata;
342 use objectstore_types::scope::{Scope, Scopes};
343
344 use super::*;
345 use crate::backend::bigtable::{BigTableBackend, BigTableConfig};
346 use crate::backend::changelog::NoopChangeLog;
347 use crate::backend::common::{HighVolumeBackend, PutResponse, TieredWrite};
348 use crate::backend::gcs::{GcsBackend, GcsConfig};
349 use crate::backend::in_memory::InMemoryBackend;
350 use crate::backend::testing::{Hooks, TestBackend};
351 use crate::backend::tiered::TieredStorage;
352 use crate::error::Error;
353 use crate::stream::{self, ClientStream};
354
355 fn make_context() -> ObjectContext {
356 ObjectContext {
357 usecase: "testing".into(),
358 scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
359 }
360 }
361
362 fn make_service() -> StorageService {
363 StorageService::new(Box::new(InMemoryBackend::new("in-memory")))
364 }
365
366 #[tokio::test]
367 async fn insert_without_key_generates_unique_id() {
368 let service = make_service();
369
370 let id = service
371 .insert_object(
372 make_context(),
373 None,
374 Default::default(),
375 stream::single("auto-keyed"),
376 )
377 .await
378 .unwrap();
379
380 assert!(uuid::Uuid::parse_str(id.key()).is_ok());
381 }
382
383 #[tokio::test]
384 async fn stores_files() {
385 let service = make_service();
386
387 let key = service
388 .insert_object(
389 make_context(),
390 Some("testing".into()),
391 Default::default(),
392 stream::single("oh hai!"),
393 )
394 .await
395 .unwrap();
396
397 let (_metadata, stream) = service.get_object(key).await.unwrap().unwrap();
398 let file_contents: BytesMut = stream.try_collect().await.unwrap();
399
400 assert_eq!(file_contents.as_ref(), b"oh hai!");
401 }
402
403 #[tokio::test]
404 async fn works_with_gcs() {
405 let config = GcsConfig {
406 endpoint: Some("http://localhost:8087".into()),
407 bucket: "test-bucket".into(), };
409
410 let backend = GcsBackend::new(config).await.unwrap();
411 let service = StorageService::new(Box::new(backend));
412
413 let key = service
414 .insert_object(
415 make_context(),
416 Some("testing".into()),
417 Default::default(),
418 stream::single("oh hai!"),
419 )
420 .await
421 .unwrap();
422
423 let (_metadata, stream) = service.get_object(key).await.unwrap().unwrap();
424 let file_contents: BytesMut = stream.try_collect().await.unwrap();
425
426 assert_eq!(file_contents.as_ref(), b"oh hai!");
427 }
428
429 #[tokio::test]
430 async fn tombstone_redirect_and_delete() {
431 let bigtable_config = BigTableConfig {
432 endpoint: Some("localhost:8086".into()),
433 project_id: "testing".into(),
434 instance_name: "objectstore".into(),
435 table_name: "objectstore".into(),
436 connections: None,
437 };
438 let gcs_config = GcsConfig {
439 endpoint: Some("http://localhost:8087".into()),
440 bucket: "test-bucket".into(),
441 };
442
443 let high_volume = Box::new(BigTableBackend::new(bigtable_config).await.unwrap());
444 let long_term = Box::new(GcsBackend::new(gcs_config.clone()).await.unwrap());
445 let backend = TieredStorage::new(high_volume, long_term, Box::new(NoopChangeLog));
446 let service = StorageService::new(Box::new(backend));
447
448 let gcs_backend = GcsBackend::new(gcs_config.clone()).await.unwrap();
450
451 let payload_len = 2 * 1024 * 1024;
454 let payload = vec![0xAB; payload_len]; let id = service
456 .insert_object(
457 make_context(),
458 Some("delete-cleanup-test".into()),
459 Default::default(),
460 stream::single(payload),
461 )
462 .await
463 .unwrap();
464
465 let (_, stream) = service.get_object(id.clone()).await.unwrap().unwrap();
467 let body: BytesMut = stream.try_collect().await.unwrap();
468 assert_eq!(body.len(), payload_len);
469
470 service.delete_object(id.clone()).await.unwrap();
472
473 let after_delete = service.get_object(id.clone()).await.unwrap();
475 assert!(after_delete.is_none(), "tombstone not deleted");
476
477 let orphan = gcs_backend.get_object(&id).await.unwrap();
479 assert!(orphan.is_none(), "object leaked");
480 }
481
482 #[tokio::test]
485 async fn basic_spawn_insert_and_get() {
486 let service = make_service();
487
488 let id = service
489 .insert_object(
490 make_context(),
491 Some("test-key".into()),
492 Metadata::default(),
493 stream::single("hello world"),
494 )
495 .await
496 .unwrap();
497
498 let (_, stream) = service.get_object(id).await.unwrap().unwrap();
499 let body: BytesMut = stream.try_collect().await.unwrap();
500 assert_eq!(body.as_ref(), b"hello world");
501 }
502
503 #[tokio::test]
504 async fn basic_spawn_metadata_and_delete() {
505 let service = make_service();
506
507 let id = service
508 .insert_object(
509 make_context(),
510 Some("meta-key".into()),
511 Metadata::default(),
512 stream::single("data"),
513 )
514 .await
515 .unwrap();
516
517 let metadata = service.get_metadata(id.clone()).await.unwrap();
518 assert!(metadata.is_some());
519
520 service.delete_object(id.clone()).await.unwrap();
521
522 let after = service.get_object(id).await.unwrap();
523 assert!(after.is_none());
524 }
525
526 #[derive(Debug)]
527 struct PanicOnGet;
528
529 #[async_trait::async_trait]
530 impl Hooks for PanicOnGet {
531 async fn get_object(
532 &self,
533 _inner: &InMemoryBackend,
534 _id: &ObjectId,
535 ) -> Result<GetResponse> {
536 panic!("intentional panic in get_object");
537 }
538 }
539
540 #[tokio::test]
541 async fn panic_in_backend_returns_task_failed() {
542 let service = StorageService::new(Box::new(TestBackend::new(PanicOnGet)));
543
544 let id = ObjectId::new(make_context(), "panic-test".into());
545 let result = service.get_object(id).await;
546
547 let Err(Error::Panic(msg)) = result else {
548 panic!("expected Panic error");
549 };
550 assert!(msg.contains("intentional panic in get_object"), "{msg}");
551 }
552
553 #[derive(Clone, Debug, Default)]
557 struct GateOnPut {
558 pause: bool,
559 paused: Arc<tokio::sync::Notify>,
560 resume: Arc<tokio::sync::Notify>,
561 on_put: Arc<tokio::sync::Notify>,
562 }
563
564 impl GateOnPut {
565 fn with_pause() -> Self {
566 Self {
567 pause: true,
568 ..Default::default()
569 }
570 }
571 }
572
573 #[async_trait::async_trait]
574 impl Hooks for GateOnPut {
575 async fn put_object(
576 &self,
577 inner: &InMemoryBackend,
578 id: &ObjectId,
579 metadata: &Metadata,
580 stream: ClientStream,
581 ) -> Result<PutResponse> {
582 if self.pause {
583 self.paused.notify_one();
584 self.resume.notified().await;
585 }
586 inner.put_object(id, metadata, stream).await?;
587 self.on_put.notify_one();
588 Ok(())
589 }
590
591 async fn compare_and_write(
592 &self,
593 inner: &InMemoryBackend,
594 id: &ObjectId,
595 current: Option<&ObjectId>,
596 write: TieredWrite,
597 ) -> Result<bool> {
598 let notify = matches!(write, TieredWrite::Tombstone(_) | TieredWrite::Object(_, _));
599 let result = inner.compare_and_write(id, current, write).await?;
600 if notify {
601 self.on_put.notify_one();
602 }
603 Ok(result)
604 }
605 }
606
607 #[tokio::test]
608 async fn receiver_drop_does_not_prevent_completion() {
609 let hv = Box::new(TestBackend::new(GateOnPut::default()));
610 let lt = Box::new(TestBackend::new(GateOnPut::with_pause()));
611 let backend = TieredStorage::new(hv.clone(), lt.clone(), Box::new(NoopChangeLog));
612 let service = StorageService::new(Box::new(backend));
613
614 let payload = vec![0xABu8; 2 * 1024 * 1024]; let request = service.insert_object(
616 make_context(),
617 Some("completion-test".into()),
618 Metadata::default(),
619 stream::single(payload),
620 );
621
622 let paused = Arc::clone(<.hooks.paused);
625 tokio::select! {
626 _ = request => panic!("insert should not complete while backend is paused"),
627 _ = paused.notified() => {}
628 }
629
630 lt.hooks.resume.notify_one();
634
635 let on_put = Arc::clone(&hv.hooks.on_put);
638 tokio::time::timeout(Duration::from_secs(5), on_put.notified())
639 .await
640 .expect("timed out waiting for tombstone write");
641
642 let id = ObjectId::new(make_context(), "completion-test".into());
645 let tombstone = hv.inner.get(&id).expect_tombstone();
646 let lt_id = tombstone.target;
647 assert!(lt.inner.contains(<_id), "long-term object missing");
648 }
649
650 fn make_limited_service(limit: usize) -> (StorageService, TestBackend<GateOnPut>) {
653 let backend = TestBackend::new(GateOnPut::with_pause());
654 let service = StorageService::new(Box::new(backend.clone())).with_concurrency_limit(limit);
655 (service, backend)
656 }
657
658 #[tokio::test]
659 async fn at_capacity_rejects() {
660 let (service, hv) = make_limited_service(1);
661
662 let svc = service.clone();
664 let first = tokio::spawn(async move {
665 svc.insert_object(
666 make_context(),
667 Some("first".into()),
668 Metadata::default(),
669 stream::single("data"),
670 )
671 .await
672 });
673
674 hv.hooks.paused.notified().await;
676
677 let result = service
679 .insert_object(
680 make_context(),
681 Some("second".into()),
682 Metadata::default(),
683 stream::single("data"),
684 )
685 .await;
686
687 assert!(
688 matches!(result, Err(Error::AtCapacity)),
689 "expected AtCapacity, got {result:?}"
690 );
691
692 hv.hooks.resume.notify_one();
694 first.await.unwrap().unwrap();
695
696 service
698 .get_metadata(ObjectId::new(make_context(), "first".into()))
699 .await
700 .unwrap();
701 }
702
703 #[tokio::test]
704 async fn tasks_limit_returns_configured_limit() {
705 let backend = Box::new(InMemoryBackend::new("cap"));
706 let service = StorageService::new(backend).with_concurrency_limit(7);
707 assert_eq!(service.tasks_limit(), 7);
708 }
709
710 #[tokio::test]
711 async fn tasks_running_tracks_in_flight() {
712 let (service, hv) = make_limited_service(5);
713
714 assert_eq!(service.tasks_running(), 0);
715
716 let svc = service.clone();
718 let _blocked = tokio::spawn(async move {
719 svc.insert_object(
720 make_context(),
721 Some("in-use-test".into()),
722 Metadata::default(),
723 stream::single("data"),
724 )
725 .await
726 });
727
728 hv.hooks.paused.notified().await;
729 assert_eq!(service.tasks_running(), 1);
730
731 hv.hooks.resume.notify_one();
732 }
733
734 #[tokio::test]
735 async fn permits_released_after_panic() {
736 let service =
737 StorageService::new(Box::new(TestBackend::new(PanicOnGet))).with_concurrency_limit(1);
738
739 let id = ObjectId::new(make_context(), "panic-permit".into());
741 let result = service.get_object(id.clone()).await;
742 assert!(matches!(result, Err(Error::Panic(_))));
743
744 let result = service.get_object(id).await;
746 assert!(
747 !matches!(result, Err(Error::AtCapacity)),
748 "permit was not released after panic"
749 );
750 }
751}