objectstore_server/
state.rs1use std::sync::Arc;
7use std::time::Duration;
8
9use anyhow::Result;
10use bytes::Bytes;
11use futures_util::Stream;
12use objectstore_service::id::ObjectContext;
13use objectstore_service::{StorageService, backend};
14use tokio::runtime::Handle;
15
16use crate::auth::PublicKeyDirectory;
17use crate::config::Config;
18use crate::rate_limits::{MeteredPayloadStream, RateLimiter};
19use crate::web::RequestCounter;
20
21pub type ServiceState = Arc<Services>;
23
24#[derive(Debug)]
32pub struct Services {
33 pub config: Config,
35 pub service: StorageService,
39 pub key_directory: PublicKeyDirectory,
44 pub rate_limiter: RateLimiter,
46 pub request_counter: RequestCounter,
51}
52
53impl Services {
54 pub async fn spawn(config: Config) -> Result<ServiceState> {
59 tokio::spawn(track_runtime_metrics(config.runtime.metrics_interval));
60
61 let backend = backend::from_config(config.storage.clone()).await?;
62 let service =
63 StorageService::new(backend).with_concurrency_limit(config.service.max_concurrency);
64 service.start();
65
66 let key_directory = PublicKeyDirectory::try_from(&config.auth)?;
67 if config.auth.enforce && key_directory.keys.is_empty() {
68 anyhow::bail!(
69 "Auth enforcement is enabled but no keys are configured. Either disable auth enforcement (dev/test environments) or configure a public key."
70 );
71 }
72 let rate_limiter = RateLimiter::new(config.rate_limits.clone());
73 rate_limiter.start();
74
75 let request_counter = RequestCounter::new(config.http.max_requests);
76 tokio::spawn(request_counter.clone().run_emitter());
77
78 Ok(Arc::new(Self {
79 config,
80 service,
81 key_directory,
82 rate_limiter,
83 request_counter,
84 }))
85 }
86
87 pub(crate) fn meter_stream<S, E>(
93 &self,
94 stream: S,
95 context: &ObjectContext,
96 ) -> MeteredPayloadStream<S>
97 where
98 S: Stream<Item = Result<Bytes, E>> + Send + 'static,
99 {
100 MeteredPayloadStream::new(stream, self.rate_limiter.bytes_accumulators(context))
101 }
102
103 pub fn record_bandwidth(&self, context: &ObjectContext, bytes: u64) {
107 self.rate_limiter.record_bandwidth(context, bytes);
108 }
109}
110
111async fn track_runtime_metrics(interval: Duration) {
113 let mut ticker = tokio::time::interval(interval);
114 let metrics = Handle::current().metrics();
115
116 loop {
117 ticker.tick().await;
118 objectstore_log::trace!("Capturing runtime metrics");
119
120 objectstore_metrics::gauge!("runtime.num_workers" = metrics.num_workers());
121 objectstore_metrics::gauge!("runtime.num_alive_tasks" = metrics.num_alive_tasks());
122 objectstore_metrics::gauge!("runtime.global_queue_depth" = metrics.global_queue_depth());
123 objectstore_metrics::gauge!(
124 "runtime.num_blocking_threads" = metrics.num_blocking_threads()
125 );
126 objectstore_metrics::gauge!(
127 "runtime.num_idle_blocking_threads" = metrics.num_idle_blocking_threads()
128 );
129 objectstore_metrics::gauge!(
130 "runtime.blocking_queue_depth" = metrics.blocking_queue_depth()
131 );
132
133 let registered_fds = metrics.io_driver_fd_registered_count();
134 let deregistered_fds = metrics.io_driver_fd_deregistered_count();
135 objectstore_metrics::gauge!(
136 "runtime.num_io_driver_fds" = registered_fds - deregistered_fds
137 );
138 }
139}
140
141#[cfg(test)]
142mod tests {
143 use super::*;
144
145 #[tokio::test]
146 async fn enforce_without_keys_fails_startup() {
147 let config = Config {
148 auth: crate::config::AuthZ {
149 enforce: true,
150 ..Default::default()
151 },
152 ..Default::default()
153 };
154 let err = Services::spawn(config).await.unwrap_err();
155 assert!(
156 err.to_string()
157 .contains("Auth enforcement is enabled but no keys are configured"),
158 );
159 }
160
161 #[tokio::test]
162 async fn no_enforce_without_keys_starts_ok() {
163 let config = Config {
164 auth: crate::config::AuthZ {
165 enforce: false,
166 ..Default::default()
167 },
168 ..Default::default()
169 };
170 assert!(Services::spawn(config).await.is_ok());
171 }
172}