objectstore_server/
state.rs1use std::sync::Arc;
7use std::time::Duration;
8
9use anyhow::Result;
10use futures_util::StreamExt;
11use objectstore_service::{PayloadStream, StorageConfig, StorageService};
12use tokio::runtime::Handle;
13
14use objectstore_service::id::ObjectContext;
15
16use crate::auth::PublicKeyDirectory;
17use crate::config::{Config, Storage};
18use crate::rate_limits::{MeteredPayloadStream, RateLimiter};
19use crate::web::RequestCounter;
20
21pub type ServiceState = Arc<Services>;
23
24#[derive(Debug)]
32pub struct Services {
33 pub config: Config,
35 pub service: StorageService,
39 pub key_directory: PublicKeyDirectory,
44 pub rate_limiter: RateLimiter,
46 pub request_counter: RequestCounter,
51}
52
53impl Services {
54 pub async fn spawn(config: Config) -> Result<ServiceState> {
59 tokio::spawn(track_runtime_metrics(config.runtime.metrics_interval));
60
61 let high_volume = map_storage_config(&config.high_volume_storage);
62 let long_term = map_storage_config(&config.long_term_storage);
63 let service = StorageService::new(high_volume, long_term)
64 .await?
65 .with_concurrency_limit(config.service.max_concurrency);
66 service.start();
67
68 let key_directory = PublicKeyDirectory::try_from(&config.auth)?;
69 let rate_limiter = RateLimiter::new(config.rate_limits.clone());
70 rate_limiter.start();
71
72 let request_counter = RequestCounter::new(config.http.max_requests);
73 tokio::spawn(request_counter.clone().run_emitter());
74
75 Ok(Arc::new(Self {
76 config,
77 service,
78 key_directory,
79 rate_limiter,
80 request_counter,
81 }))
82 }
83
84 pub fn meter_stream(&self, stream: PayloadStream, context: &ObjectContext) -> PayloadStream {
86 MeteredPayloadStream::new(stream, self.rate_limiter.bytes_accumulators(context)).boxed()
87 }
88
89 pub fn record_bandwidth(&self, context: &ObjectContext, bytes: u64) {
93 self.rate_limiter.record_bandwidth(context, bytes);
94 }
95}
96
97fn map_storage_config(config: &'_ Storage) -> StorageConfig<'_> {
98 match config {
99 Storage::FileSystem { path } => StorageConfig::FileSystem { path },
100 Storage::S3Compatible { endpoint, bucket } => {
101 StorageConfig::S3Compatible { endpoint, bucket }
102 }
103 Storage::Gcs { endpoint, bucket } => StorageConfig::Gcs {
104 endpoint: endpoint.as_deref(),
105 bucket,
106 },
107 Storage::BigTable {
108 endpoint,
109 project_id,
110 instance_name,
111 table_name,
112 connections,
113 } => StorageConfig::BigTable {
114 endpoint: endpoint.as_deref(),
115 project_id,
116 instance_name,
117 table_name,
118 connections: *connections,
119 },
120 }
121}
122
123async fn track_runtime_metrics(interval: Duration) {
125 let mut ticker = tokio::time::interval(interval);
126 let metrics = Handle::current().metrics();
127
128 loop {
129 ticker.tick().await;
130 tracing::trace!("Capturing runtime metrics");
131
132 objectstore_metrics::gauge!("runtime.num_workers": metrics.num_workers());
133 objectstore_metrics::gauge!("runtime.num_alive_tasks": metrics.num_alive_tasks());
134 objectstore_metrics::gauge!("runtime.global_queue_depth": metrics.global_queue_depth());
135 objectstore_metrics::gauge!("runtime.num_blocking_threads": metrics.num_blocking_threads());
136 objectstore_metrics::gauge!("runtime.num_idle_blocking_threads": metrics.num_idle_blocking_threads());
137 objectstore_metrics::gauge!("runtime.blocking_queue_depth": metrics.blocking_queue_depth());
138
139 let registered_fds = metrics.io_driver_fd_registered_count();
140 let deregistered_fds = metrics.io_driver_fd_deregistered_count();
141 objectstore_metrics::gauge!("runtime.num_io_driver_fds": registered_fds - deregistered_fds);
142 }
143}