1use std::convert::Infallible;
2use std::sync::Arc;
3use std::time::Duration;
4
5use crate::metrics::MetricOutcomes;
6use crate::services::autoscaling::{AutoscalingMetricService, AutoscalingMetrics};
7use crate::services::buffer::{
8 ObservableEnvelopeBuffer, PartitionedEnvelopeBuffer, ProjectKeyPair,
9};
10use crate::services::cogs::{CogsService, CogsServiceRecorder};
11use crate::services::global_config::{GlobalConfigManager, GlobalConfigService};
12#[cfg(feature = "processing")]
13use crate::services::global_rate_limits::GlobalRateLimitsService;
14use crate::services::health_check::{HealthCheck, HealthCheckService};
15use crate::services::metrics::RouterService;
16use crate::services::outcome::{OutcomeProducer, OutcomeProducerService, TrackOutcome};
17use crate::services::outcome_aggregator::OutcomeAggregator;
18use crate::services::processor::{
19 self, EnvelopeProcessor, EnvelopeProcessorService, EnvelopeProcessorServicePool,
20};
21use crate::services::projects::cache::{ProjectCacheHandle, ProjectCacheService};
22use crate::services::projects::source::ProjectSource;
23use crate::services::relays::{RelayCache, RelayCacheService};
24use crate::services::stats::RelayStats;
25#[cfg(feature = "processing")]
26use crate::services::store::{StoreService, StoreServicePool};
27use crate::services::upstream::{UpstreamRelay, UpstreamRelayService};
28use crate::utils::{MemoryChecker, MemoryStat, ThreadKind};
29#[cfg(feature = "processing")]
30use anyhow::Context;
31use anyhow::Result;
32use axum::extract::FromRequestParts;
33use axum::http::request::Parts;
34use relay_cogs::Cogs;
35use relay_config::Config;
36#[cfg(feature = "processing")]
37use relay_config::{RedisConfigRef, RedisConfigsRef};
38#[cfg(feature = "processing")]
39use relay_redis::AsyncRedisClient;
40#[cfg(feature = "processing")]
41use relay_redis::redis::Script;
42#[cfg(feature = "processing")]
43use relay_redis::{RedisClients, RedisError, RedisScripts};
44use relay_system::{Addr, Service, ServiceSpawn, ServiceSpawnExt as _, channel};
45
46#[derive(Debug, thiserror::Error)]
48pub enum ServiceError {
49 #[error("could not load the Geoip Db")]
51 GeoIp,
52
53 #[cfg(feature = "processing")]
55 #[error("could not initialize kafka producer: {0}")]
56 Kafka(String),
57
58 #[cfg(feature = "processing")]
60 #[error("could not initialize redis client during startup")]
61 Redis,
62}
63
64#[derive(Clone, Debug)]
65pub struct Registry {
66 pub health_check: Addr<HealthCheck>,
67 pub outcome_producer: Addr<OutcomeProducer>,
68 pub outcome_aggregator: Addr<TrackOutcome>,
69 pub processor: Addr<EnvelopeProcessor>,
70 pub relay_cache: Addr<RelayCache>,
71 pub global_config: Addr<GlobalConfigManager>,
72 pub upstream_relay: Addr<UpstreamRelay>,
73 pub envelope_buffer: PartitionedEnvelopeBuffer,
74 pub project_cache_handle: ProjectCacheHandle,
75 pub autoscaling: Addr<AutoscalingMetrics>,
76}
77
78pub fn create_runtime(name: &'static str, threads: usize) -> relay_system::Runtime {
80 relay_system::Runtime::builder(name)
81 .worker_threads(threads)
82 .max_blocking_threads(150)
93 .thread_keep_alive(Duration::from_secs(1))
97 .build()
98}
99
100fn create_processor_pool(config: &Config) -> Result<EnvelopeProcessorServicePool> {
101 let thread_count = match config.cpu_concurrency() {
104 conc @ 0..=2 => conc.max(1),
105 conc @ 3..=4 => conc - 1,
106 conc => conc - 2,
107 };
108 relay_log::info!("starting {thread_count} envelope processing workers");
109
110 let pool = crate::utils::ThreadPoolBuilder::new("processor", tokio::runtime::Handle::current())
111 .num_threads(thread_count)
112 .max_concurrency(config.pool_concurrency())
113 .thread_kind(ThreadKind::Worker)
114 .build()?;
115
116 Ok(pool)
117}
118
119#[cfg(feature = "processing")]
120fn create_store_pool(config: &Config) -> Result<StoreServicePool> {
121 let thread_count = config.cpu_concurrency().div_ceil(12);
127 relay_log::info!("starting {thread_count} store workers");
128
129 let pool = crate::utils::ThreadPoolBuilder::new("store", tokio::runtime::Handle::current())
130 .num_threads(thread_count)
131 .max_concurrency(config.pool_concurrency())
132 .build()?;
133
134 Ok(pool)
135}
136
137#[derive(Debug)]
138struct StateInner {
139 config: Arc<Config>,
140 memory_checker: MemoryChecker,
141 registry: Registry,
142}
143
144#[derive(Clone, Debug)]
146pub struct ServiceState {
147 inner: Arc<StateInner>,
148}
149
150impl ServiceState {
151 pub async fn start(
153 handle: &relay_system::Handle,
154 services: &dyn ServiceSpawn,
155 config: Arc<Config>,
156 ) -> Result<Self> {
157 let upstream_relay = services.start(UpstreamRelayService::new(config.clone()));
158
159 #[cfg(feature = "processing")]
160 let redis_clients = config
161 .redis()
162 .filter(|_| config.processing_enabled())
163 .map(create_redis_clients)
164 .transpose()
165 .context(ServiceError::Redis)?;
166
167 #[cfg(feature = "processing")]
172 if let Some(redis_clients) = &redis_clients {
173 initialize_redis_scripts_for_client(redis_clients)
174 .await
175 .context(ServiceError::Redis)?;
176 }
177
178 let memory_stat = MemoryStat::new(config.memory_stat_refresh_frequency_ms());
181
182 let (processor, processor_rx) = channel(EnvelopeProcessorService::name());
185 let outcome_producer = services.start(OutcomeProducerService::create(
186 config.clone(),
187 upstream_relay.clone(),
188 processor.clone(),
189 )?);
190 let outcome_aggregator =
191 services.start(OutcomeAggregator::new(&config, outcome_producer.clone()));
192
193 let (global_config, global_config_rx) =
194 GlobalConfigService::new(config.clone(), upstream_relay.clone());
195 let global_config_handle = global_config.handle();
196 let global_config = services.start(global_config);
200
201 let project_source = ProjectSource::start_in(
202 services,
203 Arc::clone(&config),
204 upstream_relay.clone(),
205 #[cfg(feature = "processing")]
206 redis_clients.clone(),
207 )
208 .await;
209 let project_cache_handle =
210 ProjectCacheService::new(Arc::clone(&config), project_source).start_in(services);
211
212 let aggregator = RouterService::new(
213 handle.clone(),
214 config.default_aggregator_config().clone(),
215 config.secondary_aggregator_configs().clone(),
216 Some(processor.clone().recipient()),
217 project_cache_handle.clone(),
218 );
219 let aggregator_handle = aggregator.handle();
220 let aggregator = services.start(aggregator);
221
222 let metric_outcomes = MetricOutcomes::new(outcome_aggregator.clone());
223
224 #[cfg(feature = "processing")]
225 let store_pool = create_store_pool(&config)?;
226 #[cfg(feature = "processing")]
227 let store = config
228 .processing_enabled()
229 .then(|| {
230 StoreService::create(
231 store_pool.clone(),
232 config.clone(),
233 global_config_handle.clone(),
234 outcome_aggregator.clone(),
235 metric_outcomes.clone(),
236 )
237 .map(|s| services.start(s))
238 })
239 .transpose()?;
240
241 let cogs = CogsService::new(&config);
242 let cogs = Cogs::new(CogsServiceRecorder::new(&config, services.start(cogs)));
243
244 #[cfg(feature = "processing")]
245 let global_rate_limits = redis_clients
246 .as_ref()
247 .map(|p| services.start(GlobalRateLimitsService::new(p.quotas.clone())));
248
249 let processor_pool = create_processor_pool(&config)?;
250 services.start_with(
251 EnvelopeProcessorService::new(
252 processor_pool.clone(),
253 config.clone(),
254 global_config_handle,
255 project_cache_handle.clone(),
256 cogs,
257 #[cfg(feature = "processing")]
258 redis_clients.clone(),
259 processor::Addrs {
260 outcome_aggregator: outcome_aggregator.clone(),
261 upstream_relay: upstream_relay.clone(),
262 #[cfg(feature = "processing")]
263 store_forwarder: store.clone(),
264 aggregator: aggregator.clone(),
265 #[cfg(feature = "processing")]
266 global_rate_limits,
267 },
268 metric_outcomes.clone(),
269 ),
270 processor_rx,
271 );
272
273 let envelope_buffer = PartitionedEnvelopeBuffer::create(
274 config.spool_partitions(),
275 config.clone(),
276 memory_stat.clone(),
277 global_config_rx.clone(),
278 project_cache_handle.clone(),
279 processor.clone(),
280 outcome_aggregator.clone(),
281 services,
282 );
283
284 let health_check = services.start(HealthCheckService::new(
285 config.clone(),
286 MemoryChecker::new(memory_stat.clone(), config.clone()),
287 aggregator_handle,
288 upstream_relay.clone(),
289 envelope_buffer.clone(),
290 ));
291
292 let autoscaling = services.start(AutoscalingMetricService::new(
293 memory_stat.clone(),
294 envelope_buffer.clone(),
295 handle.clone(),
296 processor_pool.clone(),
297 ));
298
299 services.start(RelayStats::new(
300 config.clone(),
301 handle.clone(),
302 upstream_relay.clone(),
303 #[cfg(feature = "processing")]
304 redis_clients.clone(),
305 processor_pool,
306 #[cfg(feature = "processing")]
307 store_pool,
308 ));
309
310 let relay_cache = services.start(RelayCacheService::new(
311 config.clone(),
312 upstream_relay.clone(),
313 ));
314
315 let registry = Registry {
316 processor,
317 health_check,
318 outcome_producer,
319 outcome_aggregator,
320 relay_cache,
321 global_config,
322 project_cache_handle,
323 upstream_relay,
324 envelope_buffer,
325 autoscaling,
326 };
327
328 let state = StateInner {
329 config: config.clone(),
330 memory_checker: MemoryChecker::new(memory_stat, config.clone()),
331 registry,
332 };
333
334 Ok(ServiceState {
335 inner: Arc::new(state),
336 })
337 }
338
339 pub fn config(&self) -> &Config {
341 &self.inner.config
342 }
343
344 pub fn memory_checker(&self) -> &MemoryChecker {
348 &self.inner.memory_checker
349 }
350
351 pub fn autoscaling(&self) -> &Addr<AutoscalingMetrics> {
352 &self.inner.registry.autoscaling
353 }
354
355 pub fn envelope_buffer(&self, project_key_pair: ProjectKeyPair) -> &ObservableEnvelopeBuffer {
357 self.inner.registry.envelope_buffer.buffer(project_key_pair)
358 }
359
360 pub fn project_cache_handle(&self) -> &ProjectCacheHandle {
362 &self.inner.registry.project_cache_handle
363 }
364
365 pub fn relay_cache(&self) -> &Addr<RelayCache> {
367 &self.inner.registry.relay_cache
368 }
369
370 pub fn health_check(&self) -> &Addr<HealthCheck> {
372 &self.inner.registry.health_check
373 }
374
375 pub fn outcome_producer(&self) -> &Addr<OutcomeProducer> {
377 &self.inner.registry.outcome_producer
378 }
379
380 pub fn upstream_relay(&self) -> &Addr<UpstreamRelay> {
382 &self.inner.registry.upstream_relay
383 }
384
385 pub fn processor(&self) -> &Addr<EnvelopeProcessor> {
387 &self.inner.registry.processor
388 }
389
390 pub fn global_config(&self) -> &Addr<GlobalConfigManager> {
392 &self.inner.registry.global_config
393 }
394
395 pub fn outcome_aggregator(&self) -> &Addr<TrackOutcome> {
397 &self.inner.registry.outcome_aggregator
398 }
399}
400
401#[cfg(feature = "processing")]
409pub fn create_redis_clients(configs: RedisConfigsRef<'_>) -> Result<RedisClients, RedisError> {
410 match configs {
411 RedisConfigsRef::Unified(unified) => {
412 let client = create_async_redis_client(&unified)?;
413
414 Ok(RedisClients {
415 project_configs: client.clone(),
416 cardinality: client.clone(),
417 quotas: client,
418 })
419 }
420 RedisConfigsRef::Individual {
421 project_configs,
422 cardinality,
423 quotas,
424 } => {
425 let project_configs = create_async_redis_client(&project_configs)?;
426 let cardinality = create_async_redis_client(&cardinality)?;
427 let quotas = create_async_redis_client("as)?;
428
429 Ok(RedisClients {
430 project_configs,
431 cardinality,
432 quotas,
433 })
434 }
435 }
436}
437
438#[cfg(feature = "processing")]
439fn create_async_redis_client(config: &RedisConfigRef<'_>) -> Result<AsyncRedisClient, RedisError> {
440 match config {
441 RedisConfigRef::Cluster {
442 cluster_nodes,
443 options,
444 } => AsyncRedisClient::cluster(cluster_nodes.iter().map(|s| s.as_str()), options),
445 RedisConfigRef::Single { server, options } => AsyncRedisClient::single(server, options),
446 }
447}
448
449#[cfg(feature = "processing")]
450async fn initialize_redis_scripts_for_client(
451 redis_clients: &RedisClients,
452) -> Result<(), RedisError> {
453 let scripts = RedisScripts::all();
454
455 let clients = [&redis_clients.cardinality, &redis_clients.quotas];
456 for client in clients {
457 initialize_redis_scripts(client, &scripts).await?;
458 }
459
460 Ok(())
461}
462
463#[cfg(feature = "processing")]
464async fn initialize_redis_scripts(
465 client: &AsyncRedisClient,
466 scripts: &[&Script; 3],
467) -> Result<(), RedisError> {
468 let mut connection = client.get_connection().await?;
469
470 for script in scripts {
471 script
474 .prepare_invoke()
475 .load_async(&mut connection)
476 .await
477 .map_err(RedisError::Redis)?;
478 }
479
480 Ok(())
481}
482
483impl FromRequestParts<Self> for ServiceState {
484 type Rejection = Infallible;
485
486 async fn from_request_parts(_: &mut Parts, state: &Self) -> Result<Self, Self::Rejection> {
487 Ok(state.clone())
488 }
489}