objectstore_server/
state.rs1use std::sync::Arc;
2use std::time::Duration;
3
4use anyhow::Result;
5use objectstore_service::{StorageConfig, StorageService};
6use tokio::runtime::Handle;
7
8use crate::auth::PublicKeyDirectory;
9use crate::config::{Config, Storage};
10use crate::rate_limits::RateLimiter;
11
12pub type ServiceState = Arc<Services>;
14
15#[derive(Debug)]
23pub struct Services {
24 pub config: Config,
26 pub service: StorageService,
30 pub key_directory: PublicKeyDirectory,
35 pub rate_limiter: RateLimiter,
37}
38
39impl Services {
40 pub async fn spawn(config: Config) -> Result<ServiceState> {
45 tokio::spawn(track_runtime_metrics(config.runtime.metrics_interval));
46
47 let high_volume = map_storage_config(&config.high_volume_storage);
48 let long_term = map_storage_config(&config.long_term_storage);
49 let service = StorageService::new(high_volume, long_term).await?;
50
51 let key_directory = PublicKeyDirectory::try_from(&config.auth)?;
52 let rate_limiter = RateLimiter::new(config.rate_limits.clone());
53
54 Ok(Arc::new(Self {
55 config,
56 service,
57 key_directory,
58 rate_limiter,
59 }))
60 }
61}
62
63fn map_storage_config(config: &'_ Storage) -> StorageConfig<'_> {
64 match config {
65 Storage::FileSystem { path } => StorageConfig::FileSystem { path },
66 Storage::S3Compatible { endpoint, bucket } => {
67 StorageConfig::S3Compatible { endpoint, bucket }
68 }
69 Storage::Gcs { endpoint, bucket } => StorageConfig::Gcs {
70 endpoint: endpoint.as_deref(),
71 bucket,
72 },
73 Storage::BigTable {
74 endpoint,
75 project_id,
76 instance_name,
77 table_name,
78 connections,
79 } => StorageConfig::BigTable {
80 endpoint: endpoint.as_deref(),
81 project_id,
82 instance_name,
83 table_name,
84 connections: *connections,
85 },
86 }
87}
88
89async fn track_runtime_metrics(interval: Duration) {
91 let mut ticker = tokio::time::interval(interval);
92 let metrics = Handle::current().metrics();
93
94 loop {
95 ticker.tick().await;
96 tracing::trace!("Capturing runtime metrics");
97
98 merni::gauge!("runtime.num_workers": metrics.num_workers());
99 merni::gauge!("runtime.num_alive_tasks": metrics.num_alive_tasks());
100 merni::gauge!("runtime.global_queue_depth": metrics.global_queue_depth());
101 merni::gauge!("runtime.num_blocking_threads": metrics.num_blocking_threads());
102 merni::gauge!("runtime.num_idle_blocking_threads": metrics.num_idle_blocking_threads());
103 merni::gauge!("runtime.blocking_queue_depth": metrics.blocking_queue_depth());
104
105 let registered_fds = metrics.io_driver_fd_registered_count();
106 let deregistered_fds = metrics.io_driver_fd_deregistered_count();
107 merni::gauge!("runtime.num_io_driver_fds": registered_fds - deregistered_fds);
108 }
109}