1use std::future::Future;
9use std::path::Path;
10use std::sync::Arc;
11
12use objectstore_types::metadata::Metadata;
13
14use crate::PayloadStream;
15use crate::backend::common::BoxedBackend;
16use crate::concurrency::ConcurrencyLimiter;
17use crate::error::{Error, Result};
18use crate::id::{ObjectContext, ObjectId};
19use crate::streaming::StreamExecutor;
20use crate::tiered::TieredStorage;
21
22pub type GetResponse = Option<(Metadata, PayloadStream)>;
24pub type MetadataResponse = Option<Metadata>;
26pub type InsertResponse = ObjectId;
28pub type DeleteResponse = ();
30
31#[derive(Debug, Clone)]
33pub enum StorageConfig<'a> {
34 FileSystem {
36 path: &'a Path,
38 },
39 S3Compatible {
41 endpoint: &'a str,
43 bucket: &'a str,
45 },
46 Gcs {
48 endpoint: Option<&'a str>,
52 bucket: &'a str,
54 },
55 BigTable {
57 endpoint: Option<&'a str>,
61 project_id: &'a str,
63 instance_name: &'a str,
65 table_name: &'a str,
67 connections: Option<usize>,
71 },
72}
73
74pub const DEFAULT_CONCURRENCY_LIMIT: usize = 500;
79
80#[derive(Clone, Debug)]
139pub struct StorageService {
140 inner: Arc<TieredStorage>,
141 concurrency: ConcurrencyLimiter,
142}
143
144impl StorageService {
145 pub async fn new(
147 high_volume_config: StorageConfig<'_>,
148 long_term_config: StorageConfig<'_>,
149 ) -> anyhow::Result<Self> {
150 let high_volume_backend = create_backend(high_volume_config).await?;
151 let long_term_backend = create_backend(long_term_config).await?;
152 Ok(Self::from_backends(high_volume_backend, long_term_backend))
153 }
154
155 pub(crate) fn from_backends(
156 high_volume_backend: BoxedBackend,
157 long_term_backend: BoxedBackend,
158 ) -> Self {
159 Self {
160 inner: Arc::new(TieredStorage {
161 high_volume_backend,
162 long_term_backend,
163 }),
164 concurrency: ConcurrencyLimiter::new(DEFAULT_CONCURRENCY_LIMIT),
165 }
166 }
167
168 pub fn with_concurrency_limit(mut self, max: usize) -> Self {
173 self.concurrency = ConcurrencyLimiter::new(max);
174 self
175 }
176
177 pub fn tasks_available(&self) -> usize {
179 self.concurrency.available_permits()
180 }
181
182 pub fn tasks_running(&self) -> usize {
184 self.concurrency.used_permits()
185 }
186
187 pub fn tasks_limit(&self) -> usize {
189 self.concurrency.total_permits()
190 }
191
192 pub fn stream(&self) -> Result<StreamExecutor> {
199 let available = self.tasks_available();
200 let window = (available as f64 * 0.10).ceil() as usize;
201
202 let acquire_result = match window {
203 0 => Err(Error::AtCapacity),
204 _ => self.concurrency.try_acquire_many(window),
205 };
206 let reservation = acquire_result.inspect_err(|_| {
207 objectstore_metrics::counter!("service.concurrency.rejected": 1);
208 })?;
209
210 Ok(StreamExecutor {
211 tiered: Arc::clone(&self.inner),
212 window,
213 reservation,
214 })
215 }
216
217 pub fn start(&self) {
222 let concurrency = self.concurrency.clone();
223 let limit = concurrency.total_permits();
224 tokio::spawn(async move {
225 concurrency
226 .run_emitter(|permits| async move {
227 objectstore_metrics::gauge!("service.concurrency.in_use": permits);
228 objectstore_metrics::gauge!("service.concurrency.limit": limit);
229 })
230 .await;
231 });
232 }
233
234 async fn spawn<T, F>(&self, operation: &'static str, f: F) -> Result<T>
246 where
247 T: Send + 'static,
248 F: Future<Output = Result<T>> + Send + 'static,
249 {
250 let permit = self.concurrency.try_acquire().inspect_err(|_| {
251 objectstore_metrics::counter!("service.concurrency.rejected": 1);
252 })?;
253
254 crate::concurrency::spawn_metered(operation, permit, f).await
255 }
256
257 pub async fn insert_object(
279 &self,
280 context: ObjectContext,
281 key: Option<String>,
282 metadata: Metadata,
283 stream: PayloadStream,
284 ) -> Result<InsertResponse> {
285 let inner = Arc::clone(&self.inner);
286 self.spawn("insert", async move {
287 inner.insert_object(context, key, &metadata, stream).await
288 })
289 .await
290 }
291
292 pub async fn get_metadata(&self, id: ObjectId) -> Result<MetadataResponse> {
300 let inner = Arc::clone(&self.inner);
301 self.spawn("get_metadata", async move { inner.get_metadata(&id).await })
302 .await
303 }
304
305 pub async fn get_object(&self, id: ObjectId) -> Result<GetResponse> {
313 let inner = Arc::clone(&self.inner);
314 self.spawn("get", async move { inner.get_object(&id).await })
315 .await
316 }
317
318 pub async fn delete_object(&self, id: ObjectId) -> Result<DeleteResponse> {
334 let inner = Arc::clone(&self.inner);
335 self.spawn("delete", async move { inner.delete_object(&id).await })
336 .await
337 }
338}
339
340async fn create_backend(config: StorageConfig<'_>) -> anyhow::Result<BoxedBackend> {
341 Ok(match config {
342 StorageConfig::FileSystem { path } => {
343 Box::new(crate::backend::local_fs::LocalFsBackend::new(path))
344 }
345 StorageConfig::S3Compatible { endpoint, bucket } => Box::new(
346 crate::backend::s3_compatible::S3CompatibleBackend::without_token(endpoint, bucket),
347 ),
348 StorageConfig::Gcs { endpoint, bucket } => {
349 Box::new(crate::backend::gcs::GcsBackend::new(endpoint, bucket).await?)
350 }
351 StorageConfig::BigTable {
352 endpoint,
353 project_id,
354 instance_name,
355 table_name,
356 connections,
357 } => Box::new(
358 crate::backend::bigtable::BigTableBackend::new(
359 endpoint,
360 project_id,
361 instance_name,
362 table_name,
363 connections,
364 )
365 .await?,
366 ),
367 })
368}
369
370#[cfg(test)]
371mod tests {
372 use std::sync::Arc;
373 use std::time::Duration;
374
375 use bytes::BytesMut;
376 use futures_util::TryStreamExt;
377 use objectstore_types::metadata::Metadata;
378 use objectstore_types::scope::{Scope, Scopes};
379
380 use super::*;
381 use crate::backend::common::Backend as _;
382 use crate::backend::in_memory::InMemoryBackend;
383 use crate::error::Error;
384 use crate::stream::make_stream;
385
386 fn make_context() -> ObjectContext {
387 ObjectContext {
388 usecase: "testing".into(),
389 scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
390 }
391 }
392
393 fn make_service() -> (StorageService, InMemoryBackend, InMemoryBackend) {
394 let hv = InMemoryBackend::new("in-memory-hv");
395 let lt = InMemoryBackend::new("in-memory-lt");
396 let service = StorageService::from_backends(Box::new(hv.clone()), Box::new(lt.clone()));
397 (service, hv, lt)
398 }
399
400 #[tokio::test]
403 async fn stores_files() {
404 let tempdir = tempfile::tempdir().unwrap();
405 let config = StorageConfig::FileSystem {
406 path: tempdir.path(),
407 };
408 let service = StorageService::new(config.clone(), config).await.unwrap();
409
410 let key = service
411 .insert_object(
412 make_context(),
413 Some("testing".into()),
414 Default::default(),
415 make_stream(b"oh hai!"),
416 )
417 .await
418 .unwrap();
419
420 let (_metadata, stream) = service.get_object(key).await.unwrap().unwrap();
421 let file_contents: BytesMut = stream.try_collect().await.unwrap();
422
423 assert_eq!(file_contents.as_ref(), b"oh hai!");
424 }
425
426 #[tokio::test]
427 async fn works_with_gcs() {
428 let config = StorageConfig::Gcs {
429 endpoint: Some("http://localhost:8087"),
430 bucket: "test-bucket", };
432 let service = StorageService::new(config.clone(), config).await.unwrap();
433
434 let key = service
435 .insert_object(
436 make_context(),
437 Some("testing".into()),
438 Default::default(),
439 make_stream(b"oh hai!"),
440 )
441 .await
442 .unwrap();
443
444 let (_metadata, stream) = service.get_object(key).await.unwrap().unwrap();
445 let file_contents: BytesMut = stream.try_collect().await.unwrap();
446
447 assert_eq!(file_contents.as_ref(), b"oh hai!");
448 }
449
450 #[tokio::test]
451 async fn tombstone_redirect_and_delete() {
452 let high_volume = StorageConfig::BigTable {
453 endpoint: Some("localhost:8086"),
454 project_id: "testing",
455 instance_name: "objectstore",
456 table_name: "objectstore",
457 connections: None,
458 };
459 let long_term = StorageConfig::Gcs {
460 endpoint: Some("http://localhost:8087"),
461 bucket: "test-bucket",
462 };
463 let service = StorageService::new(high_volume, long_term).await.unwrap();
464
465 let gcs_backend =
467 crate::backend::gcs::GcsBackend::new(Some("http://localhost:8087"), "test-bucket")
468 .await
469 .unwrap();
470
471 let payload = vec![0xAB; 2 * 1024 * 1024]; let id = service
475 .insert_object(
476 make_context(),
477 Some("delete-cleanup-test".into()),
478 Default::default(),
479 make_stream(&payload),
480 )
481 .await
482 .unwrap();
483
484 let (_, stream) = service.get_object(id.clone()).await.unwrap().unwrap();
486 let body: BytesMut = stream.try_collect().await.unwrap();
487 assert_eq!(body.len(), payload.len());
488
489 service.delete_object(id.clone()).await.unwrap();
491
492 let after_delete = service.get_object(id.clone()).await.unwrap();
494 assert!(after_delete.is_none(), "tombstone not deleted");
495
496 let orphan = gcs_backend.get_object(&id).await.unwrap();
498 assert!(orphan.is_none(), "object leaked");
499 }
500
501 #[tokio::test]
504 async fn basic_spawn_insert_and_get() {
505 let (service, _hv, _lt) = make_service();
506
507 let id = service
508 .insert_object(
509 make_context(),
510 Some("test-key".into()),
511 Metadata::default(),
512 make_stream(b"hello world"),
513 )
514 .await
515 .unwrap();
516
517 let (_, stream) = service.get_object(id).await.unwrap().unwrap();
518 let body: BytesMut = stream.try_collect().await.unwrap();
519 assert_eq!(body.as_ref(), b"hello world");
520 }
521
522 #[tokio::test]
523 async fn basic_spawn_metadata_and_delete() {
524 let (service, _hv, _lt) = make_service();
525
526 let id = service
527 .insert_object(
528 make_context(),
529 Some("meta-key".into()),
530 Metadata::default(),
531 make_stream(b"data"),
532 )
533 .await
534 .unwrap();
535
536 let metadata = service.get_metadata(id.clone()).await.unwrap();
537 assert!(metadata.is_some());
538
539 service.delete_object(id.clone()).await.unwrap();
540
541 let after = service.get_object(id).await.unwrap();
542 assert!(after.is_none());
543 }
544
545 #[derive(Debug)]
547 struct PanickingBackend;
548
549 #[async_trait::async_trait]
550 impl crate::backend::common::Backend for PanickingBackend {
551 fn name(&self) -> &'static str {
552 "panicking"
553 }
554
555 async fn put_object(
556 &self,
557 _id: &ObjectId,
558 _metadata: &Metadata,
559 _stream: PayloadStream,
560 ) -> Result<()> {
561 Ok(())
562 }
563
564 async fn get_object(&self, _id: &ObjectId) -> Result<Option<(Metadata, PayloadStream)>> {
565 panic!("intentional panic in get_object");
566 }
567
568 async fn delete_object(&self, _id: &ObjectId) -> Result<()> {
569 Ok(())
570 }
571 }
572
573 #[tokio::test]
574 async fn panic_in_backend_returns_task_failed() {
575 let service =
576 StorageService::from_backends(Box::new(PanickingBackend), Box::new(PanickingBackend));
577
578 let id = ObjectId::new(make_context(), "panic-test".into());
579 let result = service.get_object(id).await;
580
581 let Err(Error::Panic(msg)) = result else {
582 panic!("expected Panic error");
583 };
584 assert!(msg.contains("intentional panic in get_object"), "{msg}");
585 }
586
587 #[derive(Debug, Clone)]
593 struct GatedBackend {
594 inner: InMemoryBackend,
595 pause: bool,
596 paused: Arc<tokio::sync::Notify>,
597 resume: Arc<tokio::sync::Notify>,
598 on_put: Arc<tokio::sync::Notify>,
599 }
600
601 impl GatedBackend {
602 fn new(name: &'static str) -> Self {
603 Self {
604 inner: InMemoryBackend::new(name),
605 pause: false,
606 paused: Arc::new(tokio::sync::Notify::new()),
607 resume: Arc::new(tokio::sync::Notify::new()),
608 on_put: Arc::new(tokio::sync::Notify::new()),
609 }
610 }
611
612 fn with_pause(mut self) -> Self {
613 self.pause = true;
614 self
615 }
616 }
617
618 #[async_trait::async_trait]
619 impl crate::backend::common::Backend for GatedBackend {
620 fn name(&self) -> &'static str {
621 self.inner.name()
622 }
623
624 async fn put_object(
625 &self,
626 id: &ObjectId,
627 metadata: &Metadata,
628 stream: PayloadStream,
629 ) -> Result<()> {
630 if self.pause {
631 self.paused.notify_one();
632 self.resume.notified().await;
633 }
634 self.inner.put_object(id, metadata, stream).await?;
635 self.on_put.notify_one();
636 Ok(())
637 }
638
639 async fn get_object(&self, id: &ObjectId) -> Result<Option<(Metadata, PayloadStream)>> {
640 self.inner.get_object(id).await
641 }
642
643 async fn delete_object(&self, id: &ObjectId) -> Result<()> {
644 self.inner.delete_object(id).await
645 }
646 }
647
648 #[tokio::test]
649 async fn receiver_drop_does_not_prevent_completion() {
650 let hv = GatedBackend::new("gated-hv");
651 let lt = GatedBackend::new("gated-lt").with_pause();
652 let service = StorageService::from_backends(Box::new(hv.clone()), Box::new(lt.clone()));
653
654 let payload = vec![0xABu8; 2 * 1024 * 1024]; let request = service.insert_object(
656 make_context(),
657 Some("completion-test".into()),
658 Metadata::default(),
659 make_stream(&payload),
660 );
661
662 let paused = Arc::clone(<.paused);
665 tokio::select! {
666 _ = request => panic!("insert should not complete while backend is paused"),
667 _ = paused.notified() => {}
668 }
669
670 lt.resume.notify_one();
674
675 let on_put = Arc::clone(&hv.on_put);
678 tokio::time::timeout(Duration::from_secs(5), on_put.notified())
679 .await
680 .expect("timed out waiting for tombstone write");
681
682 let id = ObjectId::new(make_context(), "completion-test".into());
684 assert!(lt.inner.contains(&id), "long-term object missing");
685 let (meta, _) = hv.inner.get_stored(&id).expect("tombstone missing");
686 assert!(meta.is_tombstone(), "expected redirect tombstone");
687 }
688
689 fn make_limited_service(limit: usize) -> (StorageService, GatedBackend, GatedBackend) {
692 let hv = GatedBackend::new("limited-hv").with_pause();
693 let lt = GatedBackend::new("limited-lt");
694 let service = StorageService::from_backends(Box::new(hv.clone()), Box::new(lt.clone()))
695 .with_concurrency_limit(limit);
696 (service, hv, lt)
697 }
698
699 #[tokio::test]
700 async fn at_capacity_rejects() {
701 let (service, hv, _lt) = make_limited_service(1);
702
703 let svc = service.clone();
705 let first = tokio::spawn(async move {
706 svc.insert_object(
707 make_context(),
708 Some("first".into()),
709 Metadata::default(),
710 make_stream(b"data"),
711 )
712 .await
713 });
714
715 hv.paused.notified().await;
717
718 let result = service
720 .insert_object(
721 make_context(),
722 Some("second".into()),
723 Metadata::default(),
724 make_stream(b"data"),
725 )
726 .await;
727
728 assert!(
729 matches!(result, Err(Error::AtCapacity)),
730 "expected AtCapacity, got {result:?}"
731 );
732
733 hv.resume.notify_one();
735 first.await.unwrap().unwrap();
736
737 service
739 .get_metadata(ObjectId::new(make_context(), "first".into()))
740 .await
741 .unwrap();
742 }
743
744 #[tokio::test]
745 async fn tasks_limit_returns_configured_limit() {
746 let hv = GatedBackend::new("cap-hv");
747 let lt = GatedBackend::new("cap-lt");
748 let service =
749 StorageService::from_backends(Box::new(hv), Box::new(lt)).with_concurrency_limit(7);
750 assert_eq!(service.tasks_limit(), 7);
751 }
752
753 #[tokio::test]
754 async fn tasks_running_tracks_in_flight() {
755 let (service, hv, _lt) = make_limited_service(5);
756
757 assert_eq!(service.tasks_running(), 0);
758
759 let svc = service.clone();
761 let _blocked = tokio::spawn(async move {
762 svc.insert_object(
763 make_context(),
764 Some("in-use-test".into()),
765 Metadata::default(),
766 make_stream(b"data"),
767 )
768 .await
769 });
770
771 hv.paused.notified().await;
772 assert_eq!(service.tasks_running(), 1);
773
774 hv.resume.notify_one();
775 }
776
777 #[tokio::test]
778 async fn permits_released_after_panic() {
779 let service =
780 StorageService::from_backends(Box::new(PanickingBackend), Box::new(PanickingBackend))
781 .with_concurrency_limit(1);
782
783 let id = ObjectId::new(make_context(), "panic-permit".into());
785 let result = service.get_object(id.clone()).await;
786 assert!(matches!(result, Err(Error::Panic(_))));
787
788 let result = service.get_object(id).await;
790 assert!(
791 !matches!(result, Err(Error::AtCapacity)),
792 "permit was not released after panic"
793 );
794 }
795}