relay_server/services/
stats.rs

1use std::sync::Arc;
2
3use relay_config::{Config, RelayMode};
4#[cfg(feature = "processing")]
5use relay_redis::{AsyncRedisClient, RedisClientStats, RedisClients};
6use relay_statsd::metric;
7use relay_system::{Addr, Handle, RuntimeMetrics, Service};
8use relay_threading::AsyncPool;
9use tokio::time::interval;
10
11use crate::services::processor::EnvelopeProcessorServicePool;
12#[cfg(feature = "processing")]
13use crate::services::store::StoreServicePool;
14use crate::services::upstream::{IsNetworkOutage, UpstreamRelay};
15use crate::statsd::{RelayCounters, RelayGauges, RuntimeCounters, RuntimeGauges};
16
17/// Relay Stats Service.
18///
19/// Service which collects stats periodically and emits them via statsd.
20pub struct RelayStats {
21    config: Arc<Config>,
22    runtime: Handle,
23    rt_metrics: RuntimeMetrics,
24    upstream_relay: Addr<UpstreamRelay>,
25    #[cfg(feature = "processing")]
26    redis_clients: Option<RedisClients>,
27    processor_pool: EnvelopeProcessorServicePool,
28    #[cfg(feature = "processing")]
29    store_pool: StoreServicePool,
30}
31
32impl RelayStats {
33    pub fn new(
34        config: Arc<Config>,
35        runtime: Handle,
36        upstream_relay: Addr<UpstreamRelay>,
37        #[cfg(feature = "processing")] redis_clients: Option<RedisClients>,
38        processor_pool: EnvelopeProcessorServicePool,
39        #[cfg(feature = "processing")] store_pool: StoreServicePool,
40    ) -> Self {
41        Self {
42            config,
43            upstream_relay,
44            rt_metrics: runtime.metrics(),
45            runtime,
46            #[cfg(feature = "processing")]
47            redis_clients,
48            processor_pool,
49            #[cfg(feature = "processing")]
50            store_pool,
51        }
52    }
53
54    async fn service_metrics(&self) {
55        for (service, metrics) in self.runtime.current_services_metrics().iter() {
56            metric!(
57                gauge(RelayGauges::ServiceUtilization) = metrics.utilization as u64,
58                service = service.name(),
59                instance_id = &service.instance_id().to_string(),
60            );
61        }
62    }
63
64    async fn tokio_metrics(&self) {
65        metric!(gauge(RuntimeGauges::NumIdleThreads) = self.rt_metrics.num_idle_threads() as u64);
66        metric!(gauge(RuntimeGauges::NumAliveTasks) = self.rt_metrics.num_alive_tasks() as u64);
67        metric!(
68            gauge(RuntimeGauges::BlockingQueueDepth) =
69                self.rt_metrics.blocking_queue_depth() as u64
70        );
71        metric!(
72            gauge(RuntimeGauges::NumBlockingThreads) =
73                self.rt_metrics.num_blocking_threads() as u64
74        );
75        metric!(
76            gauge(RuntimeGauges::NumIdleBlockingThreads) =
77                self.rt_metrics.num_idle_blocking_threads() as u64
78        );
79
80        metric!(
81            counter(RuntimeCounters::BudgetForcedYieldCount) +=
82                self.rt_metrics.budget_forced_yield_count()
83        );
84
85        metric!(gauge(RuntimeGauges::NumWorkers) = self.rt_metrics.num_workers() as u64);
86        for worker in 0..self.rt_metrics.num_workers() {
87            let worker_name = worker.to_string();
88
89            metric!(
90                gauge(RuntimeGauges::WorkerLocalQueueDepth) =
91                    self.rt_metrics.worker_local_queue_depth(worker) as u64,
92                worker = &worker_name,
93            );
94            metric!(
95                gauge(RuntimeGauges::WorkerMeanPollTime) =
96                    self.rt_metrics.worker_mean_poll_time(worker).as_secs_f64(),
97                worker = &worker_name,
98            );
99
100            metric!(
101                counter(RuntimeCounters::WorkerLocalScheduleCount) +=
102                    self.rt_metrics.worker_local_schedule_count(worker),
103                worker = &worker_name,
104            );
105            metric!(
106                counter(RuntimeCounters::WorkerNoopCount) +=
107                    self.rt_metrics.worker_noop_count(worker),
108                worker = &worker_name,
109            );
110            metric!(
111                counter(RuntimeCounters::WorkerOverflowCount) +=
112                    self.rt_metrics.worker_overflow_count(worker),
113                worker = &worker_name,
114            );
115            metric!(
116                counter(RuntimeCounters::WorkerParkCount) +=
117                    self.rt_metrics.worker_park_count(worker),
118                worker = &worker_name,
119            );
120            metric!(
121                counter(RuntimeCounters::WorkerPollCount) +=
122                    self.rt_metrics.worker_poll_count(worker),
123                worker = &worker_name,
124            );
125            metric!(
126                counter(RuntimeCounters::WorkerStealCount) +=
127                    self.rt_metrics.worker_steal_count(worker),
128                worker = &worker_name,
129            );
130            metric!(
131                counter(RuntimeCounters::WorkerStealOperations) +=
132                    self.rt_metrics.worker_steal_operations(worker),
133                worker = &worker_name,
134            );
135            metric!(
136                counter(RuntimeCounters::WorkerTotalBusyDuration) +=
137                    self.rt_metrics
138                        .worker_total_busy_duration(worker)
139                        .as_millis() as u64,
140                worker = &worker_name,
141            );
142        }
143    }
144
145    async fn upstream_status(&self) {
146        if self.config.relay_mode() == RelayMode::Managed {
147            if let Ok(is_outage) = self.upstream_relay.send(IsNetworkOutage).await {
148                metric!(gauge(RelayGauges::NetworkOutage) = u64::from(is_outage));
149            }
150        }
151    }
152
153    #[cfg(feature = "processing")]
154    fn async_redis_connection(client: &AsyncRedisClient, name: &str) {
155        Self::stats_metrics(client.stats(), name);
156    }
157
158    #[cfg(feature = "processing")]
159    fn stats_metrics(stats: RedisClientStats, name: &str) {
160        metric!(
161            gauge(RelayGauges::RedisPoolConnections) = u64::from(stats.connections),
162            pool = name
163        );
164        metric!(
165            gauge(RelayGauges::RedisPoolIdleConnections) = u64::from(stats.idle_connections),
166            pool = name
167        );
168        metric!(
169            gauge(RelayGauges::RedisPoolMaxConnections) = u64::from(stats.max_connections),
170            pool = name
171        );
172        metric!(
173            gauge(RelayGauges::RedisPoolWaitingForConnection) =
174                u64::from(stats.waiting_for_connection),
175            pool = name
176        );
177    }
178
179    #[cfg(not(feature = "processing"))]
180    async fn redis_clients(&self) {}
181
182    #[cfg(feature = "processing")]
183    async fn redis_clients(&self) {
184        if let Some(RedisClients {
185            project_configs,
186            cardinality,
187            quotas,
188        }) = self.redis_clients.as_ref()
189        {
190            Self::async_redis_connection(project_configs, "project_configs");
191            Self::async_redis_connection(cardinality, "cardinality");
192            Self::async_redis_connection(quotas, "quotas");
193        }
194    }
195
196    fn emit_async_pool_metrics<T>(async_pool: &AsyncPool<T>) {
197        let metrics = async_pool.metrics();
198
199        metric!(
200            gauge(RelayGauges::AsyncPoolQueueSize) = metrics.queue_size(),
201            pool = async_pool.name()
202        );
203        metric!(
204            gauge(RelayGauges::AsyncPoolUtilization) = metrics.cpu_utilization() as u64,
205            pool = async_pool.name()
206        );
207        metric!(
208            gauge(RelayGauges::AsyncPoolActivity) = metrics.activity() as u64,
209            pool = async_pool.name()
210        );
211        metric!(
212            counter(RelayCounters::AsyncPoolFinishedTasks) += metrics.finished_tasks(),
213            pool = async_pool.name()
214        );
215    }
216
217    async fn async_pools_metrics(&self) {
218        Self::emit_async_pool_metrics(&self.processor_pool);
219        #[cfg(feature = "processing")]
220        Self::emit_async_pool_metrics(&self.store_pool);
221    }
222}
223
224impl Service for RelayStats {
225    type Interface = ();
226
227    async fn run(self, _rx: relay_system::Receiver<Self::Interface>) {
228        let Some(mut ticker) = self.config.metrics_periodic_interval().map(interval) else {
229            return;
230        };
231
232        loop {
233            let _ = tokio::join!(
234                self.upstream_status(),
235                self.service_metrics(),
236                self.tokio_metrics(),
237                self.redis_clients(),
238                self.async_pools_metrics()
239            );
240            ticker.tick().await;
241        }
242    }
243}