objectstore_server/
state.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use anyhow::Result;
5use objectstore_service::{StorageConfig, StorageService};
6use tokio::runtime::Handle;
7
8use crate::auth::PublicKeyDirectory;
9use crate::config::{Config, Storage};
10use crate::rate_limits::RateLimiter;
11
12/// Shared reference to the objectstore [`Services`].
13pub type ServiceState = Arc<Services>;
14
15/// Reference to the objectstore business logic.
16///
17/// This structure is created during server startup and shared with all HTTP request handlers. It
18/// can be used to access the configured storage backends and other shared resources.
19///
20/// In request handlers, use `axum::extract::State<ServiceState>` to retrieve a shared reference to
21/// this structure.
22#[derive(Debug)]
23pub struct Services {
24    /// The server configuration.
25    pub config: Config,
26    /// Raw handle to the underlying storage service that does not enforce authorization checks.
27    ///
28    /// Consider using [`crate::auth::AuthAwareService`].
29    pub service: StorageService,
30    /// Directory for EdDSA public keys.
31    ///
32    /// The `kid` header field from incoming authorization tokens should correspond to a public key
33    /// in this directory that can be used to verify the token.
34    pub key_directory: PublicKeyDirectory,
35    /// Stateful admission-based rate limiter for incoming requests.
36    pub rate_limiter: RateLimiter,
37}
38
39impl Services {
40    /// Spawns all services and background tasks for objectstore.
41    ///
42    /// This returns a [`ServiceState`], which is a shared reference to the services suitable for
43    /// use in the web server.
44    pub async fn spawn(config: Config) -> Result<ServiceState> {
45        tokio::spawn(track_runtime_metrics(config.runtime.metrics_interval));
46
47        let high_volume = map_storage_config(&config.high_volume_storage);
48        let long_term = map_storage_config(&config.long_term_storage);
49        let service = StorageService::new(high_volume, long_term).await?;
50
51        let key_directory = PublicKeyDirectory::try_from(&config.auth)?;
52        let rate_limiter = RateLimiter::new(config.rate_limits.clone());
53
54        Ok(Arc::new(Self {
55            config,
56            service,
57            key_directory,
58            rate_limiter,
59        }))
60    }
61}
62
63fn map_storage_config(config: &'_ Storage) -> StorageConfig<'_> {
64    match config {
65        Storage::FileSystem { path } => StorageConfig::FileSystem { path },
66        Storage::S3Compatible { endpoint, bucket } => {
67            StorageConfig::S3Compatible { endpoint, bucket }
68        }
69        Storage::Gcs { endpoint, bucket } => StorageConfig::Gcs {
70            endpoint: endpoint.as_deref(),
71            bucket,
72        },
73        Storage::BigTable {
74            endpoint,
75            project_id,
76            instance_name,
77            table_name,
78            connections,
79        } => StorageConfig::BigTable {
80            endpoint: endpoint.as_deref(),
81            project_id,
82            instance_name,
83            table_name,
84            connections: *connections,
85        },
86    }
87}
88
89/// Periodically captures and reports internal Tokio runtime metrics.
90async fn track_runtime_metrics(interval: Duration) {
91    let mut ticker = tokio::time::interval(interval);
92    let metrics = Handle::current().metrics();
93
94    loop {
95        ticker.tick().await;
96        tracing::trace!("Capturing runtime metrics");
97
98        merni::gauge!("runtime.num_workers": metrics.num_workers());
99        merni::gauge!("runtime.num_alive_tasks": metrics.num_alive_tasks());
100        merni::gauge!("runtime.global_queue_depth": metrics.global_queue_depth());
101        merni::gauge!("runtime.num_blocking_threads": metrics.num_blocking_threads());
102        merni::gauge!("runtime.num_idle_blocking_threads": metrics.num_idle_blocking_threads());
103        merni::gauge!("runtime.blocking_queue_depth": metrics.blocking_queue_depth());
104
105        let registered_fds = metrics.io_driver_fd_registered_count();
106        let deregistered_fds = metrics.io_driver_fd_deregistered_count();
107        merni::gauge!("runtime.num_io_driver_fds": registered_fds - deregistered_fds);
108    }
109}