objectstore_server/
state.rs1use std::sync::Arc;
2use std::time::Duration;
3
4use anyhow::Result;
5use objectstore_service::{StorageConfig, StorageService};
6use tokio::runtime::Handle;
7
8use crate::auth::PublicKeyDirectory;
9use crate::config::{Config, Storage};
10
11pub type ServiceState = Arc<Services>;
13
14#[derive(Debug)]
22pub struct Services {
23 pub config: Config,
25 pub service: StorageService,
29 pub key_directory: PublicKeyDirectory,
34}
35
36impl Services {
37 pub async fn spawn(config: Config) -> Result<ServiceState> {
42 tokio::spawn(track_runtime_metrics(config.runtime.metrics_interval));
43
44 let high_volume = map_storage_config(&config.high_volume_storage);
45 let long_term = map_storage_config(&config.long_term_storage);
46 let service = StorageService::new(high_volume, long_term).await?;
47
48 let key_directory: PublicKeyDirectory = (&config.auth).try_into()?;
49
50 Ok(Arc::new(Self {
51 config,
52 service,
53 key_directory,
54 }))
55 }
56}
57
58fn map_storage_config(config: &'_ Storage) -> StorageConfig<'_> {
59 match config {
60 Storage::FileSystem { path } => StorageConfig::FileSystem { path },
61 Storage::S3Compatible { endpoint, bucket } => {
62 StorageConfig::S3Compatible { endpoint, bucket }
63 }
64 Storage::Gcs { endpoint, bucket } => StorageConfig::Gcs {
65 endpoint: endpoint.as_deref(),
66 bucket,
67 },
68 Storage::BigTable {
69 endpoint,
70 project_id,
71 instance_name,
72 table_name,
73 connections,
74 } => StorageConfig::BigTable {
75 endpoint: endpoint.as_deref(),
76 project_id,
77 instance_name,
78 table_name,
79 connections: *connections,
80 },
81 }
82}
83
84async fn track_runtime_metrics(interval: Duration) {
86 let mut ticker = tokio::time::interval(interval);
87 let metrics = Handle::current().metrics();
88
89 loop {
90 ticker.tick().await;
91 tracing::trace!("Capturing runtime metrics");
92
93 merni::gauge!("runtime.num_workers": metrics.num_workers());
94 merni::gauge!("runtime.num_alive_tasks": metrics.num_alive_tasks());
95 merni::gauge!("runtime.global_queue_depth": metrics.global_queue_depth());
96 merni::gauge!("runtime.num_blocking_threads": metrics.num_blocking_threads());
97 merni::gauge!("runtime.num_idle_blocking_threads": metrics.num_idle_blocking_threads());
98 merni::gauge!("runtime.blocking_queue_depth": metrics.blocking_queue_depth());
99
100 let registered_fds = metrics.io_driver_fd_registered_count();
101 let deregistered_fds = metrics.io_driver_fd_deregistered_count();
102 merni::gauge!("runtime.num_io_driver_fds": registered_fds - deregistered_fds);
103 }
104}