1use std::convert::Infallible;
2use std::sync::Arc;
3use std::time::Duration;
4
5use crate::metrics::MetricOutcomes;
6use crate::services::autoscaling::{AutoscalingMetricService, AutoscalingMetrics};
7use crate::services::buffer::{
8 ObservableEnvelopeBuffer, PartitionedEnvelopeBuffer, ProjectKeyPair,
9};
10use crate::services::cogs::{CogsService, CogsServiceRecorder};
11use crate::services::global_config::{GlobalConfigManager, GlobalConfigService};
12use crate::services::health_check::{HealthCheck, HealthCheckService};
13use crate::services::metrics::RouterService;
14use crate::services::outcome::{OutcomeProducer, OutcomeProducerService, TrackOutcome};
15use crate::services::outcome_aggregator::OutcomeAggregator;
16use crate::services::processor::{
17 self, EnvelopeProcessor, EnvelopeProcessorService, EnvelopeProcessorServicePool,
18};
19use crate::services::projects::cache::{ProjectCacheHandle, ProjectCacheService};
20use crate::services::projects::source::ProjectSource;
21use crate::services::proxy_processor::{ProxyAddrs, ProxyProcessorService};
22use crate::services::relays::{RelayCache, RelayCacheService};
23use crate::services::stats::RelayStats;
24#[cfg(feature = "processing")]
25use crate::services::store::{StoreService, StoreServicePool};
26#[cfg(feature = "processing")]
27use crate::services::upload::UploadService;
28use crate::services::upstream::{UpstreamRelay, UpstreamRelayService};
29use crate::utils::{MemoryChecker, MemoryStat, ThreadKind};
30#[cfg(feature = "processing")]
31use anyhow::Context;
32use anyhow::Result;
33use axum::extract::FromRequestParts;
34use axum::http::request::Parts;
35use relay_cogs::Cogs;
36use relay_config::Config;
37#[cfg(feature = "processing")]
38use relay_config::{RedisConfigRef, RedisConfigsRef};
39#[cfg(feature = "processing")]
40use relay_redis::AsyncRedisClient;
41#[cfg(feature = "processing")]
42use relay_redis::redis::Script;
43#[cfg(feature = "processing")]
44use relay_redis::{RedisClients, RedisError, RedisScripts};
45use relay_system::{Addr, Service, ServiceSpawn, ServiceSpawnExt as _, channel};
46
47#[derive(Debug, thiserror::Error)]
49pub enum ServiceError {
50 #[error("could not load the Geoip Db")]
52 GeoIp,
53
54 #[cfg(feature = "processing")]
56 #[error("could not initialize kafka producer: {0}")]
57 Kafka(String),
58
59 #[cfg(feature = "processing")]
61 #[error("could not initialize redis client during startup")]
62 Redis,
63}
64
65#[derive(Clone, Debug)]
66pub struct Registry {
67 pub health_check: Addr<HealthCheck>,
68 pub outcome_producer: Addr<OutcomeProducer>,
69 pub outcome_aggregator: Addr<TrackOutcome>,
70 pub processor: Addr<EnvelopeProcessor>,
71 pub relay_cache: Addr<RelayCache>,
72 pub global_config: Addr<GlobalConfigManager>,
73 pub upstream_relay: Addr<UpstreamRelay>,
74 pub envelope_buffer: PartitionedEnvelopeBuffer,
75 pub project_cache_handle: ProjectCacheHandle,
76 pub autoscaling: Option<Addr<AutoscalingMetrics>>,
77}
78
79pub fn create_runtime(name: &'static str, threads: usize) -> relay_system::Runtime {
81 relay_system::Runtime::builder(name)
82 .worker_threads(threads)
83 .max_blocking_threads(150)
94 .thread_keep_alive(Duration::from_secs(1))
98 .build()
99}
100
101fn create_processor_pool(config: &Config) -> Result<EnvelopeProcessorServicePool> {
102 let thread_count = match config.cpu_concurrency() {
105 conc @ 0..=2 => conc.max(1),
106 conc @ 3..=4 => conc - 1,
107 conc => conc - 2,
108 };
109 relay_log::info!("starting {thread_count} envelope processing workers");
110
111 let pool = crate::utils::ThreadPoolBuilder::new("processor", tokio::runtime::Handle::current())
112 .num_threads(thread_count)
113 .max_concurrency(config.pool_concurrency())
114 .thread_kind(ThreadKind::Worker)
115 .build()?;
116
117 Ok(pool)
118}
119
120#[cfg(feature = "processing")]
121fn create_store_pool(config: &Config) -> Result<StoreServicePool> {
122 let thread_count = config.cpu_concurrency().div_ceil(12);
128 relay_log::info!("starting {thread_count} store workers");
129
130 let pool = crate::utils::ThreadPoolBuilder::new("store", tokio::runtime::Handle::current())
131 .num_threads(thread_count)
132 .max_concurrency(config.pool_concurrency())
133 .build()?;
134
135 Ok(pool)
136}
137
138#[derive(Debug)]
139struct StateInner {
140 config: Arc<Config>,
141 memory_checker: MemoryChecker,
142 registry: Registry,
143}
144
145#[derive(Clone, Debug)]
147pub struct ServiceState {
148 inner: Arc<StateInner>,
149}
150
151impl ServiceState {
152 pub async fn start(
154 handle: &relay_system::Handle,
155 services: &dyn ServiceSpawn,
156 config: Arc<Config>,
157 ) -> Result<Self> {
158 let upstream_relay = services.start(UpstreamRelayService::new(config.clone()));
159
160 #[cfg(feature = "processing")]
161 let redis_clients = config
162 .redis()
163 .filter(|_| config.processing_enabled())
164 .map(create_redis_clients)
165 .transpose()
166 .context(ServiceError::Redis)?;
167
168 #[cfg(feature = "processing")]
173 if let Some(redis_clients) = &redis_clients {
174 initialize_redis_scripts_for_client(redis_clients)
175 .await
176 .context(ServiceError::Redis)?;
177 }
178
179 let memory_stat = MemoryStat::new(config.memory_stat_refresh_frequency_ms());
182
183 let (processor, processor_rx) = match config.relay_mode() {
186 relay_config::RelayMode::Proxy => channel(ProxyProcessorService::name()),
187 relay_config::RelayMode::Managed => channel(EnvelopeProcessorService::name()),
188 };
189
190 let outcome_producer = services.start(OutcomeProducerService::create(
191 config.clone(),
192 upstream_relay.clone(),
193 processor.clone(),
194 )?);
195 let outcome_aggregator =
196 services.start(OutcomeAggregator::new(&config, outcome_producer.clone()));
197
198 let (global_config, global_config_rx) =
199 GlobalConfigService::new(config.clone(), upstream_relay.clone());
200 let global_config_handle = global_config.handle();
201 let global_config = services.start(global_config);
205
206 let project_source = ProjectSource::start_in(
207 services,
208 Arc::clone(&config),
209 upstream_relay.clone(),
210 #[cfg(feature = "processing")]
211 redis_clients.clone(),
212 )
213 .await;
214 let project_cache_handle =
215 ProjectCacheService::new(Arc::clone(&config), project_source).start_in(services);
216
217 let metric_outcomes = MetricOutcomes::new(outcome_aggregator.clone());
218
219 #[cfg(feature = "processing")]
220 let store_pool = create_store_pool(&config)?;
221 #[cfg(feature = "processing")]
222 let store = config
223 .processing_enabled()
224 .then(|| {
225 StoreService::create(
226 store_pool.clone(),
227 config.clone(),
228 global_config_handle.clone(),
229 outcome_aggregator.clone(),
230 metric_outcomes.clone(),
231 )
232 .map(|s| services.start(s))
233 })
234 .transpose()?;
235
236 #[cfg(feature = "processing")]
237 let upload = UploadService::new(config.upload(), store.clone())?.map(|s| services.start(s));
238
239 let envelope_buffer = PartitionedEnvelopeBuffer::create(
240 config.spool_partitions(),
241 config.clone(),
242 memory_stat.clone(),
243 global_config_rx.clone(),
244 project_cache_handle.clone(),
245 processor.clone(),
246 outcome_aggregator.clone(),
247 services,
248 );
249
250 let (processor_pool, aggregator_handle, autoscaling) = match config.relay_mode() {
251 relay_config::RelayMode::Proxy => {
252 services.start_with(
253 ProxyProcessorService::new(
254 config.clone(),
255 project_cache_handle.clone(),
256 ProxyAddrs {
257 outcome_aggregator: outcome_aggregator.clone(),
258 upstream_relay: upstream_relay.clone(),
259 },
260 ),
261 processor_rx,
262 );
263 (None, None, None)
264 }
265 relay_config::RelayMode::Managed => {
266 let processor_pool = create_processor_pool(&config)?;
267
268 let aggregator = RouterService::new(
269 handle.clone(),
270 config.default_aggregator_config().clone(),
271 config.secondary_aggregator_configs().clone(),
272 Some(processor.clone().recipient()),
273 project_cache_handle.clone(),
274 );
275 let aggregator_handle = aggregator.handle();
276 let aggregator = services.start(aggregator);
277
278 let cogs = CogsService::new(&config);
279 let cogs = Cogs::new(CogsServiceRecorder::new(&config, services.start(cogs)));
280
281 services.start_with(
282 EnvelopeProcessorService::new(
283 processor_pool.clone(),
284 config.clone(),
285 global_config_handle,
286 project_cache_handle.clone(),
287 cogs,
288 #[cfg(feature = "processing")]
289 redis_clients.clone(),
290 processor::Addrs {
291 outcome_aggregator: outcome_aggregator.clone(),
292 upstream_relay: upstream_relay.clone(),
293 #[cfg(feature = "processing")]
294 upload,
295 #[cfg(feature = "processing")]
296 store_forwarder: store,
297 aggregator: aggregator.clone(),
298 },
299 metric_outcomes.clone(),
300 ),
301 processor_rx,
302 );
303
304 let autoscaling = services.start(AutoscalingMetricService::new(
305 memory_stat.clone(),
306 envelope_buffer.clone(),
307 handle.clone(),
308 processor_pool.clone(),
309 ));
310
311 (
312 Some(processor_pool),
313 Some(aggregator_handle),
314 Some(autoscaling),
315 )
316 }
317 };
318
319 let health_check = services.start(HealthCheckService::new(
320 config.clone(),
321 MemoryChecker::new(memory_stat.clone(), config.clone()),
322 aggregator_handle,
323 upstream_relay.clone(),
324 envelope_buffer.clone(),
325 ));
326
327 services.start(RelayStats::new(
328 config.clone(),
329 handle.clone(),
330 upstream_relay.clone(),
331 #[cfg(feature = "processing")]
332 redis_clients.clone(),
333 processor_pool,
334 #[cfg(feature = "processing")]
335 store_pool,
336 ));
337
338 let relay_cache = services.start(RelayCacheService::new(
339 config.clone(),
340 upstream_relay.clone(),
341 ));
342
343 let registry = Registry {
344 processor,
345 health_check,
346 outcome_producer,
347 outcome_aggregator,
348 relay_cache,
349 global_config,
350 project_cache_handle,
351 upstream_relay,
352 envelope_buffer,
353 autoscaling,
354 };
355
356 let state = StateInner {
357 config: config.clone(),
358 memory_checker: MemoryChecker::new(memory_stat, config.clone()),
359 registry,
360 };
361
362 Ok(ServiceState {
363 inner: Arc::new(state),
364 })
365 }
366
367 pub fn config(&self) -> &Config {
369 &self.inner.config
370 }
371
372 pub fn memory_checker(&self) -> &MemoryChecker {
376 &self.inner.memory_checker
377 }
378
379 pub fn autoscaling(&self) -> Option<&Addr<AutoscalingMetrics>> {
380 self.inner.registry.autoscaling.as_ref()
381 }
382
383 pub fn envelope_buffer(&self, project_key_pair: ProjectKeyPair) -> &ObservableEnvelopeBuffer {
385 self.inner.registry.envelope_buffer.buffer(project_key_pair)
386 }
387
388 pub fn project_cache_handle(&self) -> &ProjectCacheHandle {
390 &self.inner.registry.project_cache_handle
391 }
392
393 pub fn relay_cache(&self) -> &Addr<RelayCache> {
395 &self.inner.registry.relay_cache
396 }
397
398 pub fn health_check(&self) -> &Addr<HealthCheck> {
400 &self.inner.registry.health_check
401 }
402
403 pub fn outcome_producer(&self) -> &Addr<OutcomeProducer> {
405 &self.inner.registry.outcome_producer
406 }
407
408 pub fn upstream_relay(&self) -> &Addr<UpstreamRelay> {
410 &self.inner.registry.upstream_relay
411 }
412
413 pub fn processor(&self) -> &Addr<EnvelopeProcessor> {
415 &self.inner.registry.processor
416 }
417
418 pub fn global_config(&self) -> &Addr<GlobalConfigManager> {
420 &self.inner.registry.global_config
421 }
422
423 pub fn outcome_aggregator(&self) -> &Addr<TrackOutcome> {
425 &self.inner.registry.outcome_aggregator
426 }
427}
428
429#[cfg(feature = "processing")]
437pub fn create_redis_clients(configs: RedisConfigsRef<'_>) -> Result<RedisClients, RedisError> {
438 const CARDINALITY_REDIS_CLIENT: &str = "cardinality";
439 const PROJECT_CONFIG_REDIS_CLIENT: &str = "projectconfig";
440 const QUOTA_REDIS_CLIENT: &str = "quotas";
441 const UNIFIED_REDIS_CLIENT: &str = "unified";
442
443 match configs {
444 RedisConfigsRef::Unified(unified) => {
445 let client = create_async_redis_client(UNIFIED_REDIS_CLIENT, &unified)?;
446
447 Ok(RedisClients {
448 project_configs: client.clone(),
449 cardinality: client.clone(),
450 quotas: client,
451 })
452 }
453 RedisConfigsRef::Individual {
454 project_configs,
455 cardinality,
456 quotas,
457 } => {
458 let project_configs =
459 create_async_redis_client(PROJECT_CONFIG_REDIS_CLIENT, &project_configs)?;
460 let cardinality = create_async_redis_client(CARDINALITY_REDIS_CLIENT, &cardinality)?;
461 let quotas = create_async_redis_client(QUOTA_REDIS_CLIENT, "as)?;
462
463 Ok(RedisClients {
464 project_configs,
465 cardinality,
466 quotas,
467 })
468 }
469 }
470}
471
472#[cfg(feature = "processing")]
473fn create_async_redis_client(
474 name: &'static str,
475 config: &RedisConfigRef<'_>,
476) -> Result<AsyncRedisClient, RedisError> {
477 match config {
478 RedisConfigRef::Cluster {
479 cluster_nodes,
480 options,
481 } => AsyncRedisClient::cluster(name, cluster_nodes.iter().map(|s| s.as_str()), options),
482 RedisConfigRef::Single { server, options } => {
483 AsyncRedisClient::single(name, server, options)
484 }
485 }
486}
487
488#[cfg(feature = "processing")]
489async fn initialize_redis_scripts_for_client(
490 redis_clients: &RedisClients,
491) -> Result<(), RedisError> {
492 let scripts = RedisScripts::all();
493
494 let clients = [&redis_clients.cardinality, &redis_clients.quotas];
495 for client in clients {
496 initialize_redis_scripts(client, &scripts).await?;
497 }
498
499 Ok(())
500}
501
502#[cfg(feature = "processing")]
503async fn initialize_redis_scripts(
504 client: &AsyncRedisClient,
505 scripts: &[&Script; 3],
506) -> Result<(), RedisError> {
507 let mut connection = client.get_connection().await?;
508
509 for script in scripts {
510 script
513 .prepare_invoke()
514 .load_async(&mut connection)
515 .await
516 .map_err(RedisError::Redis)?;
517 }
518
519 Ok(())
520}
521
522impl FromRequestParts<Self> for ServiceState {
523 type Rejection = Infallible;
524
525 async fn from_request_parts(_: &mut Parts, state: &Self) -> Result<Self, Self::Rejection> {
526 Ok(state.clone())
527 }
528}