Skip to main content

objectstore_server/
state.rs

1//! Shared server state passed to all HTTP request handlers.
2//!
3//! [`Services`] is constructed once during startup by [`Services::spawn`] and then shared
4//! across all request handlers as [`ServiceState`] (an `Arc<Services>`).
5
6use std::sync::Arc;
7use std::time::Duration;
8
9use anyhow::Result;
10use bytes::Bytes;
11use futures_util::Stream;
12use objectstore_service::id::ObjectContext;
13use objectstore_service::{StorageService, backend};
14use tokio::runtime::Handle;
15
16use crate::auth::PublicKeyDirectory;
17use crate::config::Config;
18use crate::rate_limits::{MeteredPayloadStream, RateLimiter};
19use crate::web::RequestCounter;
20
21/// Shared reference to the objectstore [`Services`].
22pub type ServiceState = Arc<Services>;
23
24/// Reference to the objectstore business logic.
25///
26/// This structure is created during server startup and shared with all HTTP request handlers. It
27/// can be used to access the configured storage backends and other shared resources.
28///
29/// In request handlers, use `axum::extract::State<ServiceState>` to retrieve a shared reference to
30/// this structure.
31#[derive(Debug)]
32pub struct Services {
33    /// The server configuration.
34    pub config: Config,
35    /// Raw handle to the underlying storage service that does not enforce authorization checks.
36    ///
37    /// Consider using [`crate::auth::AuthAwareService`] for auth-checked access.
38    pub service: StorageService,
39    /// Directory for EdDSA public keys.
40    ///
41    /// The `kid` header field from incoming authorization tokens should correspond to a public key
42    /// in this directory that can be used to verify the token.
43    pub key_directory: PublicKeyDirectory,
44    /// Stateful admission-based rate limiter for incoming requests.
45    pub rate_limiter: RateLimiter,
46    /// In-flight HTTP request counter with the configured limit.
47    ///
48    /// Shared with the web layer so the concurrency-limit middleware, the tracking
49    /// layer, and any endpoint that reads the count all see the same atomic.
50    pub request_counter: RequestCounter,
51}
52
53impl Services {
54    /// Spawns all services and background tasks for objectstore.
55    ///
56    /// This returns a [`ServiceState`], which is a shared reference to the services suitable for
57    /// use in the web server.
58    pub async fn spawn(config: Config) -> Result<ServiceState> {
59        tokio::spawn(track_runtime_metrics(config.runtime.metrics_interval));
60
61        let backend = backend::from_config(config.storage.clone()).await?;
62        let service =
63            StorageService::new(backend).with_concurrency_limit(config.service.max_concurrency);
64        service.start();
65
66        let key_directory = PublicKeyDirectory::try_from(&config.auth)?;
67        if config.auth.enforce && key_directory.keys.is_empty() {
68            anyhow::bail!(
69                "Auth enforcement is enabled but no keys are configured. Either disable auth enforcement (dev/test environments) or configure a public key."
70            );
71        }
72        let rate_limiter = RateLimiter::new(config.rate_limits.clone());
73        rate_limiter.start();
74
75        let request_counter = RequestCounter::new(config.http.max_requests);
76        tokio::spawn(request_counter.clone().run_emitter());
77
78        Ok(Arc::new(Self {
79            config,
80            service,
81            key_directory,
82            rate_limiter,
83            request_counter,
84        }))
85    }
86
87    /// Wraps a byte stream with bandwidth metering for rate limiting.
88    ///
89    /// Works with any stream type — use for both [`objectstore_service::PayloadStream`]
90    /// (outgoing, `E = io::Error`) and [`objectstore_service::ClientStream`]
91    /// (incoming, `E = ClientError`).
92    pub(crate) fn meter_stream<S, E>(
93        &self,
94        stream: S,
95        context: &ObjectContext,
96    ) -> MeteredPayloadStream<S>
97    where
98        S: Stream<Item = Result<Bytes, E>> + Send + 'static,
99    {
100        MeteredPayloadStream::new(stream, self.rate_limiter.bytes_accumulators(context))
101    }
102
103    /// Records bandwidth usage for the given context without wrapping a stream.
104    ///
105    /// Used for cases where the payload size is known upfront (e.g. batch INSERT).
106    pub fn record_bandwidth(&self, context: &ObjectContext, bytes: u64) {
107        self.rate_limiter.record_bandwidth(context, bytes);
108    }
109}
110
111/// Periodically captures and reports internal Tokio runtime metrics.
112async fn track_runtime_metrics(interval: Duration) {
113    let mut ticker = tokio::time::interval(interval);
114    let metrics = Handle::current().metrics();
115
116    loop {
117        ticker.tick().await;
118        objectstore_log::trace!("Capturing runtime metrics");
119
120        objectstore_metrics::gauge!("runtime.num_workers" = metrics.num_workers());
121        objectstore_metrics::gauge!("runtime.num_alive_tasks" = metrics.num_alive_tasks());
122        objectstore_metrics::gauge!("runtime.global_queue_depth" = metrics.global_queue_depth());
123        objectstore_metrics::gauge!(
124            "runtime.num_blocking_threads" = metrics.num_blocking_threads()
125        );
126        objectstore_metrics::gauge!(
127            "runtime.num_idle_blocking_threads" = metrics.num_idle_blocking_threads()
128        );
129        objectstore_metrics::gauge!(
130            "runtime.blocking_queue_depth" = metrics.blocking_queue_depth()
131        );
132
133        let registered_fds = metrics.io_driver_fd_registered_count();
134        let deregistered_fds = metrics.io_driver_fd_deregistered_count();
135        objectstore_metrics::gauge!(
136            "runtime.num_io_driver_fds" = registered_fds - deregistered_fds
137        );
138    }
139}
140
141#[cfg(test)]
142mod tests {
143    use super::*;
144
145    #[tokio::test]
146    async fn enforce_without_keys_fails_startup() {
147        let config = Config {
148            auth: crate::config::AuthZ {
149                enforce: true,
150                ..Default::default()
151            },
152            ..Default::default()
153        };
154        let err = Services::spawn(config).await.unwrap_err();
155        assert!(
156            err.to_string()
157                .contains("Auth enforcement is enabled but no keys are configured"),
158        );
159    }
160
161    #[tokio::test]
162    async fn no_enforce_without_keys_starts_ok() {
163        let config = Config {
164            auth: crate::config::AuthZ {
165                enforce: false,
166                ..Default::default()
167            },
168            ..Default::default()
169        };
170        assert!(Services::spawn(config).await.is_ok());
171    }
172}