objectstore_server/
state.rs1use std::sync::Arc;
7use std::time::Duration;
8
9use anyhow::Result;
10use bytes::Bytes;
11use futures_util::Stream;
12use objectstore_service::id::ObjectContext;
13use objectstore_service::{StorageService, backend};
14use tokio::runtime::Handle;
15
16use crate::auth::PublicKeyDirectory;
17use crate::config::Config;
18use crate::rate_limits::{MeteredPayloadStream, RateLimiter};
19use crate::web::RequestCounter;
20
21pub type ServiceState = Arc<Services>;
23
24#[derive(Debug)]
32pub struct Services {
33 pub config: Config,
35 pub service: StorageService,
39 pub key_directory: PublicKeyDirectory,
44 pub rate_limiter: RateLimiter,
46 pub request_counter: RequestCounter,
51}
52
53impl Services {
54 pub async fn spawn(config: Config) -> Result<ServiceState> {
59 tokio::spawn(track_runtime_metrics(config.runtime.metrics_interval));
60
61 let backend = backend::from_config(config.storage.clone()).await?;
62 let service =
63 StorageService::new(backend).with_concurrency_limit(config.service.max_concurrency);
64 service.start();
65
66 let key_directory = PublicKeyDirectory::try_from(&config.auth)?;
67 let rate_limiter = RateLimiter::new(config.rate_limits.clone());
68 rate_limiter.start();
69
70 let request_counter = RequestCounter::new(config.http.max_requests);
71 tokio::spawn(request_counter.clone().run_emitter());
72
73 Ok(Arc::new(Self {
74 config,
75 service,
76 key_directory,
77 rate_limiter,
78 request_counter,
79 }))
80 }
81
82 pub(crate) fn meter_stream<S, E>(
88 &self,
89 stream: S,
90 context: &ObjectContext,
91 ) -> MeteredPayloadStream<S>
92 where
93 S: Stream<Item = Result<Bytes, E>> + Send + 'static,
94 {
95 MeteredPayloadStream::new(stream, self.rate_limiter.bytes_accumulators(context))
96 }
97
98 pub fn record_bandwidth(&self, context: &ObjectContext, bytes: u64) {
102 self.rate_limiter.record_bandwidth(context, bytes);
103 }
104}
105
106async fn track_runtime_metrics(interval: Duration) {
108 let mut ticker = tokio::time::interval(interval);
109 let metrics = Handle::current().metrics();
110
111 loop {
112 ticker.tick().await;
113 objectstore_log::trace!("Capturing runtime metrics");
114
115 objectstore_metrics::gauge!("runtime.num_workers" = metrics.num_workers());
116 objectstore_metrics::gauge!("runtime.num_alive_tasks" = metrics.num_alive_tasks());
117 objectstore_metrics::gauge!("runtime.global_queue_depth" = metrics.global_queue_depth());
118 objectstore_metrics::gauge!(
119 "runtime.num_blocking_threads" = metrics.num_blocking_threads()
120 );
121 objectstore_metrics::gauge!(
122 "runtime.num_idle_blocking_threads" = metrics.num_idle_blocking_threads()
123 );
124 objectstore_metrics::gauge!(
125 "runtime.blocking_queue_depth" = metrics.blocking_queue_depth()
126 );
127
128 let registered_fds = metrics.io_driver_fd_registered_count();
129 let deregistered_fds = metrics.io_driver_fd_deregistered_count();
130 objectstore_metrics::gauge!(
131 "runtime.num_io_driver_fds" = registered_fds - deregistered_fds
132 );
133 }
134}