objectstore_server/
state.rs

1//! Shared server state passed to all HTTP request handlers.
2//!
3//! [`Services`] is constructed once during startup by [`Services::spawn`] and then shared
4//! across all request handlers as [`ServiceState`] (an `Arc<Services>`).
5
6use std::sync::Arc;
7use std::time::Duration;
8
9use anyhow::Result;
10use bytes::Bytes;
11use futures_util::Stream;
12use objectstore_service::id::ObjectContext;
13use objectstore_service::{StorageService, backend};
14use tokio::runtime::Handle;
15
16use crate::auth::PublicKeyDirectory;
17use crate::config::Config;
18use crate::rate_limits::{MeteredPayloadStream, RateLimiter};
19use crate::web::RequestCounter;
20
21/// Shared reference to the objectstore [`Services`].
22pub type ServiceState = Arc<Services>;
23
24/// Reference to the objectstore business logic.
25///
26/// This structure is created during server startup and shared with all HTTP request handlers. It
27/// can be used to access the configured storage backends and other shared resources.
28///
29/// In request handlers, use `axum::extract::State<ServiceState>` to retrieve a shared reference to
30/// this structure.
31#[derive(Debug)]
32pub struct Services {
33    /// The server configuration.
34    pub config: Config,
35    /// Raw handle to the underlying storage service that does not enforce authorization checks.
36    ///
37    /// Consider using [`crate::auth::AuthAwareService`] for auth-checked access.
38    pub service: StorageService,
39    /// Directory for EdDSA public keys.
40    ///
41    /// The `kid` header field from incoming authorization tokens should correspond to a public key
42    /// in this directory that can be used to verify the token.
43    pub key_directory: PublicKeyDirectory,
44    /// Stateful admission-based rate limiter for incoming requests.
45    pub rate_limiter: RateLimiter,
46    /// In-flight HTTP request counter with the configured limit.
47    ///
48    /// Shared with the web layer so the concurrency-limit middleware, the tracking
49    /// layer, and any endpoint that reads the count all see the same atomic.
50    pub request_counter: RequestCounter,
51}
52
53impl Services {
54    /// Spawns all services and background tasks for objectstore.
55    ///
56    /// This returns a [`ServiceState`], which is a shared reference to the services suitable for
57    /// use in the web server.
58    pub async fn spawn(config: Config) -> Result<ServiceState> {
59        tokio::spawn(track_runtime_metrics(config.runtime.metrics_interval));
60
61        let backend = backend::from_config(config.storage.clone()).await?;
62        let service =
63            StorageService::new(backend).with_concurrency_limit(config.service.max_concurrency);
64        service.start();
65
66        let key_directory = PublicKeyDirectory::try_from(&config.auth)?;
67        let rate_limiter = RateLimiter::new(config.rate_limits.clone());
68        rate_limiter.start();
69
70        let request_counter = RequestCounter::new(config.http.max_requests);
71        tokio::spawn(request_counter.clone().run_emitter());
72
73        Ok(Arc::new(Self {
74            config,
75            service,
76            key_directory,
77            rate_limiter,
78            request_counter,
79        }))
80    }
81
82    /// Wraps a byte stream with bandwidth metering for rate limiting.
83    ///
84    /// Works with any stream type — use for both [`objectstore_service::PayloadStream`]
85    /// (outgoing, `E = io::Error`) and [`objectstore_service::ClientStream`]
86    /// (incoming, `E = ClientError`).
87    pub(crate) fn meter_stream<S, E>(
88        &self,
89        stream: S,
90        context: &ObjectContext,
91    ) -> MeteredPayloadStream<S>
92    where
93        S: Stream<Item = Result<Bytes, E>> + Send + 'static,
94    {
95        MeteredPayloadStream::new(stream, self.rate_limiter.bytes_accumulators(context))
96    }
97
98    /// Records bandwidth usage for the given context without wrapping a stream.
99    ///
100    /// Used for cases where the payload size is known upfront (e.g. batch INSERT).
101    pub fn record_bandwidth(&self, context: &ObjectContext, bytes: u64) {
102        self.rate_limiter.record_bandwidth(context, bytes);
103    }
104}
105
106/// Periodically captures and reports internal Tokio runtime metrics.
107async fn track_runtime_metrics(interval: Duration) {
108    let mut ticker = tokio::time::interval(interval);
109    let metrics = Handle::current().metrics();
110
111    loop {
112        ticker.tick().await;
113        objectstore_log::trace!("Capturing runtime metrics");
114
115        objectstore_metrics::gauge!("runtime.num_workers" = metrics.num_workers());
116        objectstore_metrics::gauge!("runtime.num_alive_tasks" = metrics.num_alive_tasks());
117        objectstore_metrics::gauge!("runtime.global_queue_depth" = metrics.global_queue_depth());
118        objectstore_metrics::gauge!(
119            "runtime.num_blocking_threads" = metrics.num_blocking_threads()
120        );
121        objectstore_metrics::gauge!(
122            "runtime.num_idle_blocking_threads" = metrics.num_idle_blocking_threads()
123        );
124        objectstore_metrics::gauge!(
125            "runtime.blocking_queue_depth" = metrics.blocking_queue_depth()
126        );
127
128        let registered_fds = metrics.io_driver_fd_registered_count();
129        let deregistered_fds = metrics.io_driver_fd_deregistered_count();
130        objectstore_metrics::gauge!(
131            "runtime.num_io_driver_fds" = registered_fds - deregistered_fds
132        );
133    }
134}