objectstore_server/
state.rs1use std::sync::Arc;
7use std::time::Duration;
8
9use anyhow::Result;
10use bytes::Bytes;
11use futures_util::Stream;
12use objectstore_service::id::ObjectContext;
13use objectstore_service::{StorageService, backend};
14use tokio::runtime::Handle;
15
16use crate::auth::PublicKeyDirectory;
17use crate::config::Config;
18use crate::rate_limits::{MeteredPayloadStream, RateLimiter};
19use crate::web::RequestCounter;
20
21pub type ServiceState = Arc<Services>;
23
24#[derive(Debug)]
32pub struct Services {
33 pub config: Config,
35 pub service: StorageService,
39 pub key_directory: PublicKeyDirectory,
44 pub rate_limiter: RateLimiter,
46 pub request_counter: RequestCounter,
51}
52
53impl Services {
54 pub async fn spawn(config: Config) -> Result<ServiceState> {
59 tokio::spawn(track_runtime_metrics(config.runtime.metrics_interval));
60 #[cfg(target_os = "linux")]
61 tokio::spawn(track_allocator_metrics(config.runtime.metrics_interval));
62
63 let backend = backend::from_config(config.storage.clone()).await?;
64 let service =
65 StorageService::new(backend).with_concurrency_limit(config.service.max_concurrency);
66 service.start();
67
68 let key_directory = PublicKeyDirectory::from_config(&config.auth).await?;
69 if config.auth.enforce && key_directory.keys.is_empty() {
70 anyhow::bail!(
71 "Auth enforcement is enabled but no keys are configured. Either disable auth enforcement (dev/test environments) or configure a public key."
72 );
73 }
74 let rate_limiter = RateLimiter::new(config.rate_limits.clone());
75 rate_limiter.start();
76
77 let request_counter = RequestCounter::new(config.http.max_requests);
78 tokio::spawn(request_counter.clone().run_emitter());
79
80 Ok(Arc::new(Self {
81 config,
82 service,
83 key_directory,
84 rate_limiter,
85 request_counter,
86 }))
87 }
88
89 pub(crate) fn meter_stream<S, E>(
95 &self,
96 stream: S,
97 context: &ObjectContext,
98 ) -> MeteredPayloadStream<S>
99 where
100 S: Stream<Item = Result<Bytes, E>> + Send + 'static,
101 {
102 MeteredPayloadStream::new(stream, self.rate_limiter.bytes_accumulators(context))
103 }
104
105 pub fn record_bandwidth(&self, context: &ObjectContext, bytes: u64) {
109 self.rate_limiter.record_bandwidth(context, bytes);
110 }
111}
112
113#[cfg(target_os = "linux")]
115async fn track_allocator_metrics(interval: Duration) {
116 let epoch = tikv_jemalloc_ctl::epoch::mib().expect("jemalloc epoch MIB");
119 let allocated = tikv_jemalloc_ctl::stats::allocated::mib().expect("jemalloc allocated MIB");
120 let active = tikv_jemalloc_ctl::stats::active::mib().expect("jemalloc active MIB");
121 let resident = tikv_jemalloc_ctl::stats::resident::mib().expect("jemalloc resident MIB");
122 let mapped = tikv_jemalloc_ctl::stats::mapped::mib().expect("jemalloc mapped MIB");
123
124 let mut ticker = tokio::time::interval(interval);
125 loop {
126 ticker.tick().await;
127
128 let Ok(_) = epoch.advance() else {
129 continue;
130 };
131
132 if let Ok(allocated_bytes) = allocated.read() {
133 objectstore_metrics::gauge!("jemalloc.allocated" = allocated_bytes);
135 }
136 if let Ok(active_bytes) = active.read() {
137 objectstore_metrics::gauge!("jemalloc.active" = active_bytes);
139 }
140 if let Ok(resident_bytes) = resident.read() {
141 objectstore_metrics::gauge!("jemalloc.resident" = resident_bytes);
143 }
144 if let Ok(mapped_bytes) = mapped.read() {
145 objectstore_metrics::gauge!("jemalloc.mapped" = mapped_bytes);
147 }
148 }
149}
150
151async fn track_runtime_metrics(interval: Duration) {
153 let mut ticker = tokio::time::interval(interval);
154 let metrics = Handle::current().metrics();
155
156 loop {
157 ticker.tick().await;
158 objectstore_log::trace!("Capturing runtime metrics");
159
160 objectstore_metrics::gauge!("runtime.num_workers" = metrics.num_workers());
161 objectstore_metrics::gauge!("runtime.num_alive_tasks" = metrics.num_alive_tasks());
162 objectstore_metrics::gauge!("runtime.global_queue_depth" = metrics.global_queue_depth());
163 objectstore_metrics::gauge!(
164 "runtime.num_blocking_threads" = metrics.num_blocking_threads()
165 );
166 objectstore_metrics::gauge!(
167 "runtime.num_idle_blocking_threads" = metrics.num_idle_blocking_threads()
168 );
169 objectstore_metrics::gauge!(
170 "runtime.blocking_queue_depth" = metrics.blocking_queue_depth()
171 );
172
173 let registered_fds = metrics.io_driver_fd_registered_count();
174 let deregistered_fds = metrics.io_driver_fd_deregistered_count();
175 objectstore_metrics::gauge!(
176 "runtime.num_io_driver_fds" = registered_fds - deregistered_fds
177 );
178 }
179}
180
181#[cfg(test)]
182mod tests {
183 use super::*;
184
185 #[tokio::test]
186 async fn enforce_without_keys_fails_startup() {
187 let config = Config {
188 auth: crate::config::AuthZ {
189 enforce: true,
190 ..Default::default()
191 },
192 ..Default::default()
193 };
194 let err = Services::spawn(config).await.unwrap_err();
195 assert!(
196 err.to_string()
197 .contains("Auth enforcement is enabled but no keys are configured"),
198 );
199 }
200
201 #[tokio::test]
202 async fn no_enforce_without_keys_starts_ok() {
203 let config = Config {
204 auth: crate::config::AuthZ {
205 enforce: false,
206 ..Default::default()
207 },
208 ..Default::default()
209 };
210 assert!(Services::spawn(config).await.is_ok());
211 }
212}