objectstore_server/
state.rs

1//! Shared server state passed to all HTTP request handlers.
2//!
3//! [`Services`] is constructed once during startup by [`Services::spawn`] and then shared
4//! across all request handlers as [`ServiceState`] (an `Arc<Services>`).
5
6use std::sync::Arc;
7use std::time::Duration;
8
9use anyhow::Result;
10use futures_util::StreamExt;
11use objectstore_service::{PayloadStream, StorageConfig, StorageService};
12use tokio::runtime::Handle;
13
14use objectstore_service::id::ObjectContext;
15
16use crate::auth::PublicKeyDirectory;
17use crate::config::{Config, Storage};
18use crate::rate_limits::{MeteredPayloadStream, RateLimiter};
19use crate::web::RequestCounter;
20
21/// Shared reference to the objectstore [`Services`].
22pub type ServiceState = Arc<Services>;
23
24/// Reference to the objectstore business logic.
25///
26/// This structure is created during server startup and shared with all HTTP request handlers. It
27/// can be used to access the configured storage backends and other shared resources.
28///
29/// In request handlers, use `axum::extract::State<ServiceState>` to retrieve a shared reference to
30/// this structure.
31#[derive(Debug)]
32pub struct Services {
33    /// The server configuration.
34    pub config: Config,
35    /// Raw handle to the underlying storage service that does not enforce authorization checks.
36    ///
37    /// Consider using [`crate::auth::AuthAwareService`] for auth-checked access.
38    pub service: StorageService,
39    /// Directory for EdDSA public keys.
40    ///
41    /// The `kid` header field from incoming authorization tokens should correspond to a public key
42    /// in this directory that can be used to verify the token.
43    pub key_directory: PublicKeyDirectory,
44    /// Stateful admission-based rate limiter for incoming requests.
45    pub rate_limiter: RateLimiter,
46    /// In-flight HTTP request counter with the configured limit.
47    ///
48    /// Shared with the web layer so the concurrency-limit middleware, the tracking
49    /// layer, and any endpoint that reads the count all see the same atomic.
50    pub request_counter: RequestCounter,
51}
52
53impl Services {
54    /// Spawns all services and background tasks for objectstore.
55    ///
56    /// This returns a [`ServiceState`], which is a shared reference to the services suitable for
57    /// use in the web server.
58    pub async fn spawn(config: Config) -> Result<ServiceState> {
59        tokio::spawn(track_runtime_metrics(config.runtime.metrics_interval));
60
61        let high_volume = map_storage_config(&config.high_volume_storage);
62        let long_term = map_storage_config(&config.long_term_storage);
63        let service = StorageService::new(high_volume, long_term)
64            .await?
65            .with_concurrency_limit(config.service.max_concurrency);
66        service.start();
67
68        let key_directory = PublicKeyDirectory::try_from(&config.auth)?;
69        let rate_limiter = RateLimiter::new(config.rate_limits.clone());
70        rate_limiter.start();
71
72        let request_counter = RequestCounter::new(config.http.max_requests);
73        tokio::spawn(request_counter.clone().run_emitter());
74
75        Ok(Arc::new(Self {
76            config,
77            service,
78            key_directory,
79            rate_limiter,
80            request_counter,
81        }))
82    }
83
84    /// Wraps a [`PayloadStream`] with bandwidth metering for rate limiting.
85    pub fn meter_stream(&self, stream: PayloadStream, context: &ObjectContext) -> PayloadStream {
86        MeteredPayloadStream::new(stream, self.rate_limiter.bytes_accumulators(context)).boxed()
87    }
88
89    /// Records bandwidth usage for the given context without wrapping a stream.
90    ///
91    /// Used for cases where the payload size is known upfront (e.g. batch INSERT).
92    pub fn record_bandwidth(&self, context: &ObjectContext, bytes: u64) {
93        self.rate_limiter.record_bandwidth(context, bytes);
94    }
95}
96
97fn map_storage_config(config: &'_ Storage) -> StorageConfig<'_> {
98    match config {
99        Storage::FileSystem { path } => StorageConfig::FileSystem { path },
100        Storage::S3Compatible { endpoint, bucket } => {
101            StorageConfig::S3Compatible { endpoint, bucket }
102        }
103        Storage::Gcs { endpoint, bucket } => StorageConfig::Gcs {
104            endpoint: endpoint.as_deref(),
105            bucket,
106        },
107        Storage::BigTable {
108            endpoint,
109            project_id,
110            instance_name,
111            table_name,
112            connections,
113        } => StorageConfig::BigTable {
114            endpoint: endpoint.as_deref(),
115            project_id,
116            instance_name,
117            table_name,
118            connections: *connections,
119        },
120    }
121}
122
123/// Periodically captures and reports internal Tokio runtime metrics.
124async fn track_runtime_metrics(interval: Duration) {
125    let mut ticker = tokio::time::interval(interval);
126    let metrics = Handle::current().metrics();
127
128    loop {
129        ticker.tick().await;
130        tracing::trace!("Capturing runtime metrics");
131
132        objectstore_metrics::gauge!("runtime.num_workers": metrics.num_workers());
133        objectstore_metrics::gauge!("runtime.num_alive_tasks": metrics.num_alive_tasks());
134        objectstore_metrics::gauge!("runtime.global_queue_depth": metrics.global_queue_depth());
135        objectstore_metrics::gauge!("runtime.num_blocking_threads": metrics.num_blocking_threads());
136        objectstore_metrics::gauge!("runtime.num_idle_blocking_threads": metrics.num_idle_blocking_threads());
137        objectstore_metrics::gauge!("runtime.blocking_queue_depth": metrics.blocking_queue_depth());
138
139        let registered_fds = metrics.io_driver_fd_registered_count();
140        let deregistered_fds = metrics.io_driver_fd_deregistered_count();
141        objectstore_metrics::gauge!("runtime.num_io_driver_fds": registered_fds - deregistered_fds);
142    }
143}