relay_server/services/
stats.rs1use std::sync::Arc;
2
3use relay_config::{Config, RelayMode};
4#[cfg(feature = "processing")]
5use relay_redis::{AsyncRedisClient, RedisClientStats, RedisClients};
6use relay_statsd::metric;
7use relay_system::{Addr, Handle, RuntimeMetrics, Service};
8use relay_threading::AsyncPool;
9use tokio::time::interval;
10
11use crate::services::processor::EnvelopeProcessorServicePool;
12#[cfg(feature = "processing")]
13use crate::services::store::StoreServicePool;
14use crate::services::upstream::{IsNetworkOutage, UpstreamRelay};
15use crate::statsd::{RelayCounters, RelayGauges, RuntimeCounters, RuntimeGauges};
16
17pub struct RelayStats {
21 config: Arc<Config>,
22 runtime: Handle,
23 rt_metrics: RuntimeMetrics,
24 upstream_relay: Addr<UpstreamRelay>,
25 #[cfg(feature = "processing")]
26 redis_clients: Option<RedisClients>,
27 processor_pool: EnvelopeProcessorServicePool,
28 #[cfg(feature = "processing")]
29 store_pool: StoreServicePool,
30}
31
32impl RelayStats {
33 pub fn new(
34 config: Arc<Config>,
35 runtime: Handle,
36 upstream_relay: Addr<UpstreamRelay>,
37 #[cfg(feature = "processing")] redis_clients: Option<RedisClients>,
38 processor_pool: EnvelopeProcessorServicePool,
39 #[cfg(feature = "processing")] store_pool: StoreServicePool,
40 ) -> Self {
41 Self {
42 config,
43 upstream_relay,
44 rt_metrics: runtime.metrics(),
45 runtime,
46 #[cfg(feature = "processing")]
47 redis_clients,
48 processor_pool,
49 #[cfg(feature = "processing")]
50 store_pool,
51 }
52 }
53
54 async fn service_metrics(&self) {
55 for (service, metrics) in self.runtime.current_services_metrics().iter() {
56 metric!(
57 gauge(RelayGauges::ServiceUtilization) = metrics.utilization as u64,
58 service = service.name(),
59 instance_id = &service.instance_id().to_string(),
60 );
61 }
62 }
63
64 async fn tokio_metrics(&self) {
65 metric!(gauge(RuntimeGauges::NumIdleThreads) = self.rt_metrics.num_idle_threads() as u64);
66 metric!(gauge(RuntimeGauges::NumAliveTasks) = self.rt_metrics.num_alive_tasks() as u64);
67 metric!(
68 gauge(RuntimeGauges::BlockingQueueDepth) =
69 self.rt_metrics.blocking_queue_depth() as u64
70 );
71 metric!(
72 gauge(RuntimeGauges::NumBlockingThreads) =
73 self.rt_metrics.num_blocking_threads() as u64
74 );
75 metric!(
76 gauge(RuntimeGauges::NumIdleBlockingThreads) =
77 self.rt_metrics.num_idle_blocking_threads() as u64
78 );
79
80 metric!(
81 counter(RuntimeCounters::BudgetForcedYieldCount) +=
82 self.rt_metrics.budget_forced_yield_count()
83 );
84
85 metric!(gauge(RuntimeGauges::NumWorkers) = self.rt_metrics.num_workers() as u64);
86 for worker in 0..self.rt_metrics.num_workers() {
87 let worker_name = worker.to_string();
88
89 metric!(
90 gauge(RuntimeGauges::WorkerLocalQueueDepth) =
91 self.rt_metrics.worker_local_queue_depth(worker) as u64,
92 worker = &worker_name,
93 );
94 metric!(
95 gauge(RuntimeGauges::WorkerMeanPollTime) =
96 self.rt_metrics.worker_mean_poll_time(worker).as_secs_f64(),
97 worker = &worker_name,
98 );
99
100 metric!(
101 counter(RuntimeCounters::WorkerLocalScheduleCount) +=
102 self.rt_metrics.worker_local_schedule_count(worker),
103 worker = &worker_name,
104 );
105 metric!(
106 counter(RuntimeCounters::WorkerNoopCount) +=
107 self.rt_metrics.worker_noop_count(worker),
108 worker = &worker_name,
109 );
110 metric!(
111 counter(RuntimeCounters::WorkerOverflowCount) +=
112 self.rt_metrics.worker_overflow_count(worker),
113 worker = &worker_name,
114 );
115 metric!(
116 counter(RuntimeCounters::WorkerParkCount) +=
117 self.rt_metrics.worker_park_count(worker),
118 worker = &worker_name,
119 );
120 metric!(
121 counter(RuntimeCounters::WorkerPollCount) +=
122 self.rt_metrics.worker_poll_count(worker),
123 worker = &worker_name,
124 );
125 metric!(
126 counter(RuntimeCounters::WorkerStealCount) +=
127 self.rt_metrics.worker_steal_count(worker),
128 worker = &worker_name,
129 );
130 metric!(
131 counter(RuntimeCounters::WorkerStealOperations) +=
132 self.rt_metrics.worker_steal_operations(worker),
133 worker = &worker_name,
134 );
135 metric!(
136 counter(RuntimeCounters::WorkerTotalBusyDuration) +=
137 self.rt_metrics
138 .worker_total_busy_duration(worker)
139 .as_millis() as u64,
140 worker = &worker_name,
141 );
142 }
143 }
144
145 async fn upstream_status(&self) {
146 if self.config.relay_mode() == RelayMode::Managed {
147 if let Ok(is_outage) = self.upstream_relay.send(IsNetworkOutage).await {
148 metric!(gauge(RelayGauges::NetworkOutage) = u64::from(is_outage));
149 }
150 }
151 }
152
153 #[cfg(feature = "processing")]
154 fn async_redis_connection(client: &AsyncRedisClient, name: &str) {
155 Self::stats_metrics(client.stats(), name);
156 }
157
158 #[cfg(feature = "processing")]
159 fn stats_metrics(stats: RedisClientStats, name: &str) {
160 metric!(
161 gauge(RelayGauges::RedisPoolConnections) = u64::from(stats.connections),
162 pool = name
163 );
164 metric!(
165 gauge(RelayGauges::RedisPoolIdleConnections) = u64::from(stats.idle_connections),
166 pool = name
167 );
168 metric!(
169 gauge(RelayGauges::RedisPoolMaxConnections) = u64::from(stats.max_connections),
170 pool = name
171 );
172 metric!(
173 gauge(RelayGauges::RedisPoolWaitingForConnection) =
174 u64::from(stats.waiting_for_connection),
175 pool = name
176 );
177 }
178
179 #[cfg(not(feature = "processing"))]
180 async fn redis_clients(&self) {}
181
182 #[cfg(feature = "processing")]
183 async fn redis_clients(&self) {
184 if let Some(RedisClients {
185 project_configs,
186 cardinality,
187 quotas,
188 }) = self.redis_clients.as_ref()
189 {
190 Self::async_redis_connection(project_configs, "project_configs");
191 Self::async_redis_connection(cardinality, "cardinality");
192 Self::async_redis_connection(quotas, "quotas");
193 }
194 }
195
196 fn emit_async_pool_metrics<T>(async_pool: &AsyncPool<T>) {
197 let metrics = async_pool.metrics();
198
199 metric!(
200 gauge(RelayGauges::AsyncPoolQueueSize) = metrics.queue_size(),
201 pool = async_pool.name()
202 );
203 metric!(
204 gauge(RelayGauges::AsyncPoolUtilization) = metrics.cpu_utilization() as u64,
205 pool = async_pool.name()
206 );
207 metric!(
208 gauge(RelayGauges::AsyncPoolActivity) = metrics.activity() as u64,
209 pool = async_pool.name()
210 );
211 metric!(
212 counter(RelayCounters::AsyncPoolFinishedTasks) += metrics.finished_tasks(),
213 pool = async_pool.name()
214 );
215 }
216
217 async fn async_pools_metrics(&self) {
218 Self::emit_async_pool_metrics(&self.processor_pool);
219 #[cfg(feature = "processing")]
220 Self::emit_async_pool_metrics(&self.store_pool);
221 }
222}
223
224impl Service for RelayStats {
225 type Interface = ();
226
227 async fn run(self, _rx: relay_system::Receiver<Self::Interface>) {
228 let Some(mut ticker) = self.config.metrics_periodic_interval().map(interval) else {
229 return;
230 };
231
232 loop {
233 let _ = tokio::join!(
234 self.upstream_status(),
235 self.service_metrics(),
236 self.tokio_metrics(),
237 self.redis_clients(),
238 self.async_pools_metrics()
239 );
240 ticker.tick().await;
241 }
242 }
243}