Skip to main content

objectstore_server/
state.rs

1//! Shared server state passed to all HTTP request handlers.
2//!
3//! [`Services`] is constructed once during startup by [`Services::spawn`] and then shared
4//! across all request handlers as [`ServiceState`] (an `Arc<Services>`).
5
6use std::sync::Arc;
7use std::time::Duration;
8
9use anyhow::Result;
10use bytes::Bytes;
11use futures_util::Stream;
12use objectstore_service::id::ObjectContext;
13use objectstore_service::{StorageService, backend};
14use tokio::runtime::Handle;
15
16use crate::auth::PublicKeyDirectory;
17use crate::config::Config;
18use crate::rate_limits::{MeteredPayloadStream, RateLimiter};
19use crate::web::RequestCounter;
20
21/// Shared reference to the objectstore [`Services`].
22pub type ServiceState = Arc<Services>;
23
24/// Reference to the objectstore business logic.
25///
26/// This structure is created during server startup and shared with all HTTP request handlers. It
27/// can be used to access the configured storage backends and other shared resources.
28///
29/// In request handlers, use `axum::extract::State<ServiceState>` to retrieve a shared reference to
30/// this structure.
31#[derive(Debug)]
32pub struct Services {
33    /// The server configuration.
34    pub config: Config,
35    /// Raw handle to the underlying storage service that does not enforce authorization checks.
36    ///
37    /// Consider using [`crate::auth::AuthAwareService`] for auth-checked access.
38    pub service: StorageService,
39    /// Directory for EdDSA public keys.
40    ///
41    /// The `kid` header field from incoming authorization tokens should correspond to a public key
42    /// in this directory that can be used to verify the token.
43    pub key_directory: PublicKeyDirectory,
44    /// Stateful admission-based rate limiter for incoming requests.
45    pub rate_limiter: RateLimiter,
46    /// In-flight HTTP request counter with the configured limit.
47    ///
48    /// Shared with the web layer so the concurrency-limit middleware, the tracking
49    /// layer, and any endpoint that reads the count all see the same atomic.
50    pub request_counter: RequestCounter,
51}
52
53impl Services {
54    /// Spawns all services and background tasks for objectstore.
55    ///
56    /// This returns a [`ServiceState`], which is a shared reference to the services suitable for
57    /// use in the web server.
58    pub async fn spawn(config: Config) -> Result<ServiceState> {
59        tokio::spawn(track_runtime_metrics(config.runtime.metrics_interval));
60        #[cfg(target_os = "linux")]
61        tokio::spawn(track_allocator_metrics(config.runtime.metrics_interval));
62
63        let backend = backend::from_config(config.storage.clone()).await?;
64        let service =
65            StorageService::new(backend).with_concurrency_limit(config.service.max_concurrency);
66        service.start();
67
68        let key_directory = PublicKeyDirectory::from_config(&config.auth).await?;
69        if config.auth.enforce && key_directory.keys.is_empty() {
70            anyhow::bail!(
71                "Auth enforcement is enabled but no keys are configured. Either disable auth enforcement (dev/test environments) or configure a public key."
72            );
73        }
74        let rate_limiter = RateLimiter::new(config.rate_limits.clone());
75        rate_limiter.start();
76
77        let request_counter = RequestCounter::new(config.http.max_requests);
78        tokio::spawn(request_counter.clone().run_emitter());
79
80        Ok(Arc::new(Self {
81            config,
82            service,
83            key_directory,
84            rate_limiter,
85            request_counter,
86        }))
87    }
88
89    /// Wraps a byte stream with bandwidth metering for rate limiting.
90    ///
91    /// Works with any stream type — use for both [`objectstore_service::PayloadStream`]
92    /// (outgoing, `E = io::Error`) and [`objectstore_service::ClientStream`]
93    /// (incoming, `E = ClientError`).
94    pub(crate) fn meter_stream<S, E>(
95        &self,
96        stream: S,
97        context: &ObjectContext,
98    ) -> MeteredPayloadStream<S>
99    where
100        S: Stream<Item = Result<Bytes, E>> + Send + 'static,
101    {
102        MeteredPayloadStream::new(stream, self.rate_limiter.bytes_accumulators(context))
103    }
104
105    /// Records bandwidth usage for the given context without wrapping a stream.
106    ///
107    /// Used for cases where the payload size is known upfront (e.g. batch INSERT).
108    pub fn record_bandwidth(&self, context: &ObjectContext, bytes: u64) {
109        self.rate_limiter.record_bandwidth(context, bytes);
110    }
111}
112
113/// Periodically captures and reports jemalloc stats.
114#[cfg(target_os = "linux")]
115async fn track_allocator_metrics(interval: Duration) {
116    // INVARIANT: MIB resolution only fails if jemalloc is not the active allocator,
117    // which would be a misconfigured build. Panic early to surface the problem.
118    let epoch = tikv_jemalloc_ctl::epoch::mib().expect("jemalloc epoch MIB");
119    let allocated = tikv_jemalloc_ctl::stats::allocated::mib().expect("jemalloc allocated MIB");
120    let active = tikv_jemalloc_ctl::stats::active::mib().expect("jemalloc active MIB");
121    let resident = tikv_jemalloc_ctl::stats::resident::mib().expect("jemalloc resident MIB");
122    let mapped = tikv_jemalloc_ctl::stats::mapped::mib().expect("jemalloc mapped MIB");
123
124    let mut ticker = tokio::time::interval(interval);
125    loop {
126        ticker.tick().await;
127
128        let Ok(_) = epoch.advance() else {
129            continue;
130        };
131
132        if let Ok(allocated_bytes) = allocated.read() {
133            // Bytes currently allocated by the application.
134            objectstore_metrics::gauge!("jemalloc.allocated" = allocated_bytes);
135        }
136        if let Ok(active_bytes) = active.read() {
137            // Bytes in active jemalloc pages (≥ allocated).
138            objectstore_metrics::gauge!("jemalloc.active" = active_bytes);
139        }
140        if let Ok(resident_bytes) = resident.read() {
141            // Bytes in resident pages mapped from the OS (≥ active).
142            objectstore_metrics::gauge!("jemalloc.resident" = resident_bytes);
143        }
144        if let Ok(mapped_bytes) = mapped.read() {
145            // Bytes in chunks mapped from the OS (≥ resident).
146            objectstore_metrics::gauge!("jemalloc.mapped" = mapped_bytes);
147        }
148    }
149}
150
151/// Periodically captures and reports internal Tokio runtime metrics.
152async fn track_runtime_metrics(interval: Duration) {
153    let mut ticker = tokio::time::interval(interval);
154    let metrics = Handle::current().metrics();
155
156    loop {
157        ticker.tick().await;
158        objectstore_log::trace!("Capturing runtime metrics");
159
160        objectstore_metrics::gauge!("runtime.num_workers" = metrics.num_workers());
161        objectstore_metrics::gauge!("runtime.num_alive_tasks" = metrics.num_alive_tasks());
162        objectstore_metrics::gauge!("runtime.global_queue_depth" = metrics.global_queue_depth());
163        objectstore_metrics::gauge!(
164            "runtime.num_blocking_threads" = metrics.num_blocking_threads()
165        );
166        objectstore_metrics::gauge!(
167            "runtime.num_idle_blocking_threads" = metrics.num_idle_blocking_threads()
168        );
169        objectstore_metrics::gauge!(
170            "runtime.blocking_queue_depth" = metrics.blocking_queue_depth()
171        );
172
173        let registered_fds = metrics.io_driver_fd_registered_count();
174        let deregistered_fds = metrics.io_driver_fd_deregistered_count();
175        objectstore_metrics::gauge!(
176            "runtime.num_io_driver_fds" = registered_fds - deregistered_fds
177        );
178    }
179}
180
181#[cfg(test)]
182mod tests {
183    use super::*;
184
185    #[tokio::test]
186    async fn enforce_without_keys_fails_startup() {
187        let config = Config {
188            auth: crate::config::AuthZ {
189                enforce: true,
190                ..Default::default()
191            },
192            ..Default::default()
193        };
194        let err = Services::spawn(config).await.unwrap_err();
195        assert!(
196            err.to_string()
197                .contains("Auth enforcement is enabled but no keys are configured"),
198        );
199    }
200
201    #[tokio::test]
202    async fn no_enforce_without_keys_starts_ok() {
203        let config = Config {
204            auth: crate::config::AuthZ {
205                enforce: false,
206                ..Default::default()
207            },
208            ..Default::default()
209        };
210        assert!(Services::spawn(config).await.is_ok());
211    }
212}