1use std::convert::Infallible;
2use std::sync::Arc;
3use std::time::Duration;
4
5use crate::metrics::MetricOutcomes;
6use crate::services::autoscaling::{AutoscalingMetricService, AutoscalingMetrics};
7use crate::services::buffer::{
8 ObservableEnvelopeBuffer, PartitionedEnvelopeBuffer, ProjectKeyPair,
9};
10use crate::services::cogs::{CogsService, CogsServiceRecorder};
11use crate::services::global_config::{GlobalConfigManager, GlobalConfigService};
12#[cfg(feature = "processing")]
13use crate::services::global_rate_limits::GlobalRateLimitsService;
14use crate::services::health_check::{HealthCheck, HealthCheckService};
15use crate::services::metrics::RouterService;
16use crate::services::outcome::{OutcomeProducer, OutcomeProducerService, TrackOutcome};
17use crate::services::outcome_aggregator::OutcomeAggregator;
18use crate::services::processor::{
19 self, EnvelopeProcessor, EnvelopeProcessorService, EnvelopeProcessorServicePool,
20};
21use crate::services::projects::cache::{ProjectCacheHandle, ProjectCacheService};
22use crate::services::projects::source::ProjectSource;
23use crate::services::proxy_processor::{ProxyAddrs, ProxyProcessorService};
24use crate::services::relays::{RelayCache, RelayCacheService};
25use crate::services::stats::RelayStats;
26#[cfg(feature = "processing")]
27use crate::services::store::{StoreService, StoreServicePool};
28#[cfg(feature = "processing")]
29use crate::services::upload::UploadService;
30use crate::services::upstream::{UpstreamRelay, UpstreamRelayService};
31use crate::utils::{MemoryChecker, MemoryStat, ThreadKind};
32#[cfg(feature = "processing")]
33use anyhow::Context;
34use anyhow::Result;
35use axum::extract::FromRequestParts;
36use axum::http::request::Parts;
37use relay_cogs::Cogs;
38use relay_config::Config;
39#[cfg(feature = "processing")]
40use relay_config::{RedisConfigRef, RedisConfigsRef};
41#[cfg(feature = "processing")]
42use relay_redis::AsyncRedisClient;
43#[cfg(feature = "processing")]
44use relay_redis::redis::Script;
45#[cfg(feature = "processing")]
46use relay_redis::{RedisClients, RedisError, RedisScripts};
47use relay_system::{Addr, Service, ServiceSpawn, ServiceSpawnExt as _, channel};
48
49#[derive(Debug, thiserror::Error)]
51pub enum ServiceError {
52 #[error("could not load the Geoip Db")]
54 GeoIp,
55
56 #[cfg(feature = "processing")]
58 #[error("could not initialize kafka producer: {0}")]
59 Kafka(String),
60
61 #[cfg(feature = "processing")]
63 #[error("could not initialize redis client during startup")]
64 Redis,
65}
66
67#[derive(Clone, Debug)]
68pub struct Registry {
69 pub health_check: Addr<HealthCheck>,
70 pub outcome_producer: Addr<OutcomeProducer>,
71 pub outcome_aggregator: Addr<TrackOutcome>,
72 pub processor: Addr<EnvelopeProcessor>,
73 pub relay_cache: Addr<RelayCache>,
74 pub global_config: Addr<GlobalConfigManager>,
75 pub upstream_relay: Addr<UpstreamRelay>,
76 pub envelope_buffer: PartitionedEnvelopeBuffer,
77 pub project_cache_handle: ProjectCacheHandle,
78 pub autoscaling: Option<Addr<AutoscalingMetrics>>,
79}
80
81pub fn create_runtime(name: &'static str, threads: usize) -> relay_system::Runtime {
83 relay_system::Runtime::builder(name)
84 .worker_threads(threads)
85 .max_blocking_threads(150)
96 .thread_keep_alive(Duration::from_secs(1))
100 .build()
101}
102
103fn create_processor_pool(config: &Config) -> Result<EnvelopeProcessorServicePool> {
104 let thread_count = match config.cpu_concurrency() {
107 conc @ 0..=2 => conc.max(1),
108 conc @ 3..=4 => conc - 1,
109 conc => conc - 2,
110 };
111 relay_log::info!("starting {thread_count} envelope processing workers");
112
113 let pool = crate::utils::ThreadPoolBuilder::new("processor", tokio::runtime::Handle::current())
114 .num_threads(thread_count)
115 .max_concurrency(config.pool_concurrency())
116 .thread_kind(ThreadKind::Worker)
117 .build()?;
118
119 Ok(pool)
120}
121
122#[cfg(feature = "processing")]
123fn create_store_pool(config: &Config) -> Result<StoreServicePool> {
124 let thread_count = config.cpu_concurrency().div_ceil(12);
130 relay_log::info!("starting {thread_count} store workers");
131
132 let pool = crate::utils::ThreadPoolBuilder::new("store", tokio::runtime::Handle::current())
133 .num_threads(thread_count)
134 .max_concurrency(config.pool_concurrency())
135 .build()?;
136
137 Ok(pool)
138}
139
140#[derive(Debug)]
141struct StateInner {
142 config: Arc<Config>,
143 memory_checker: MemoryChecker,
144 registry: Registry,
145}
146
147#[derive(Clone, Debug)]
149pub struct ServiceState {
150 inner: Arc<StateInner>,
151}
152
153impl ServiceState {
154 pub async fn start(
156 handle: &relay_system::Handle,
157 services: &dyn ServiceSpawn,
158 config: Arc<Config>,
159 ) -> Result<Self> {
160 let upstream_relay = services.start(UpstreamRelayService::new(config.clone()));
161
162 #[cfg(feature = "processing")]
163 let redis_clients = config
164 .redis()
165 .filter(|_| config.processing_enabled())
166 .map(create_redis_clients)
167 .transpose()
168 .context(ServiceError::Redis)?;
169
170 #[cfg(feature = "processing")]
175 if let Some(redis_clients) = &redis_clients {
176 initialize_redis_scripts_for_client(redis_clients)
177 .await
178 .context(ServiceError::Redis)?;
179 }
180
181 let memory_stat = MemoryStat::new(config.memory_stat_refresh_frequency_ms());
184
185 let (processor, processor_rx) = match config.relay_mode() {
188 relay_config::RelayMode::Proxy => channel(ProxyProcessorService::name()),
189 relay_config::RelayMode::Managed => channel(EnvelopeProcessorService::name()),
190 };
191
192 let outcome_producer = services.start(OutcomeProducerService::create(
193 config.clone(),
194 upstream_relay.clone(),
195 processor.clone(),
196 )?);
197 let outcome_aggregator =
198 services.start(OutcomeAggregator::new(&config, outcome_producer.clone()));
199
200 let (global_config, global_config_rx) =
201 GlobalConfigService::new(config.clone(), upstream_relay.clone());
202 let global_config_handle = global_config.handle();
203 let global_config = services.start(global_config);
207
208 let project_source = ProjectSource::start_in(
209 services,
210 Arc::clone(&config),
211 upstream_relay.clone(),
212 #[cfg(feature = "processing")]
213 redis_clients.clone(),
214 )
215 .await;
216 let project_cache_handle =
217 ProjectCacheService::new(Arc::clone(&config), project_source).start_in(services);
218
219 let metric_outcomes = MetricOutcomes::new(outcome_aggregator.clone());
220
221 #[cfg(feature = "processing")]
222 let store_pool = create_store_pool(&config)?;
223 #[cfg(feature = "processing")]
224 let store = config
225 .processing_enabled()
226 .then(|| {
227 StoreService::create(
228 store_pool.clone(),
229 config.clone(),
230 global_config_handle.clone(),
231 outcome_aggregator.clone(),
232 metric_outcomes.clone(),
233 )
234 .map(|s| services.start(s))
235 })
236 .transpose()?;
237
238 #[cfg(feature = "processing")]
239 let upload = UploadService::new(config.upload(), store.clone())?.map(|s| services.start(s));
240
241 #[cfg(feature = "processing")]
242 let global_rate_limits = redis_clients
243 .as_ref()
244 .map(|p| services.start(GlobalRateLimitsService::new(p.quotas.clone())));
245
246 let envelope_buffer = PartitionedEnvelopeBuffer::create(
247 config.spool_partitions(),
248 config.clone(),
249 memory_stat.clone(),
250 global_config_rx.clone(),
251 project_cache_handle.clone(),
252 processor.clone(),
253 outcome_aggregator.clone(),
254 services,
255 );
256
257 let (processor_pool, aggregator_handle, autoscaling) = match config.relay_mode() {
258 relay_config::RelayMode::Proxy => {
259 services.start_with(
260 ProxyProcessorService::new(
261 config.clone(),
262 project_cache_handle.clone(),
263 ProxyAddrs {
264 outcome_aggregator: outcome_aggregator.clone(),
265 upstream_relay: upstream_relay.clone(),
266 },
267 ),
268 processor_rx,
269 );
270 (None, None, None)
271 }
272 relay_config::RelayMode::Managed => {
273 let processor_pool = create_processor_pool(&config)?;
274
275 let aggregator = RouterService::new(
276 handle.clone(),
277 config.default_aggregator_config().clone(),
278 config.secondary_aggregator_configs().clone(),
279 Some(processor.clone().recipient()),
280 project_cache_handle.clone(),
281 );
282 let aggregator_handle = aggregator.handle();
283 let aggregator = services.start(aggregator);
284
285 let cogs = CogsService::new(&config);
286 let cogs = Cogs::new(CogsServiceRecorder::new(&config, services.start(cogs)));
287
288 services.start_with(
289 EnvelopeProcessorService::new(
290 processor_pool.clone(),
291 config.clone(),
292 global_config_handle,
293 project_cache_handle.clone(),
294 cogs,
295 #[cfg(feature = "processing")]
296 redis_clients.clone(),
297 processor::Addrs {
298 outcome_aggregator: outcome_aggregator.clone(),
299 upstream_relay: upstream_relay.clone(),
300 #[cfg(feature = "processing")]
301 upload,
302 #[cfg(feature = "processing")]
303 store_forwarder: store,
304 aggregator: aggregator.clone(),
305 #[cfg(feature = "processing")]
306 global_rate_limits,
307 },
308 metric_outcomes.clone(),
309 ),
310 processor_rx,
311 );
312
313 let autoscaling = services.start(AutoscalingMetricService::new(
314 memory_stat.clone(),
315 envelope_buffer.clone(),
316 handle.clone(),
317 processor_pool.clone(),
318 ));
319
320 (
321 Some(processor_pool),
322 Some(aggregator_handle),
323 Some(autoscaling),
324 )
325 }
326 };
327
328 let health_check = services.start(HealthCheckService::new(
329 config.clone(),
330 MemoryChecker::new(memory_stat.clone(), config.clone()),
331 aggregator_handle,
332 upstream_relay.clone(),
333 envelope_buffer.clone(),
334 ));
335
336 services.start(RelayStats::new(
337 config.clone(),
338 handle.clone(),
339 upstream_relay.clone(),
340 #[cfg(feature = "processing")]
341 redis_clients.clone(),
342 processor_pool,
343 #[cfg(feature = "processing")]
344 store_pool,
345 ));
346
347 let relay_cache = services.start(RelayCacheService::new(
348 config.clone(),
349 upstream_relay.clone(),
350 ));
351
352 let registry = Registry {
353 processor,
354 health_check,
355 outcome_producer,
356 outcome_aggregator,
357 relay_cache,
358 global_config,
359 project_cache_handle,
360 upstream_relay,
361 envelope_buffer,
362 autoscaling,
363 };
364
365 let state = StateInner {
366 config: config.clone(),
367 memory_checker: MemoryChecker::new(memory_stat, config.clone()),
368 registry,
369 };
370
371 Ok(ServiceState {
372 inner: Arc::new(state),
373 })
374 }
375
376 pub fn config(&self) -> &Config {
378 &self.inner.config
379 }
380
381 pub fn memory_checker(&self) -> &MemoryChecker {
385 &self.inner.memory_checker
386 }
387
388 pub fn autoscaling(&self) -> Option<&Addr<AutoscalingMetrics>> {
389 self.inner.registry.autoscaling.as_ref()
390 }
391
392 pub fn envelope_buffer(&self, project_key_pair: ProjectKeyPair) -> &ObservableEnvelopeBuffer {
394 self.inner.registry.envelope_buffer.buffer(project_key_pair)
395 }
396
397 pub fn project_cache_handle(&self) -> &ProjectCacheHandle {
399 &self.inner.registry.project_cache_handle
400 }
401
402 pub fn relay_cache(&self) -> &Addr<RelayCache> {
404 &self.inner.registry.relay_cache
405 }
406
407 pub fn health_check(&self) -> &Addr<HealthCheck> {
409 &self.inner.registry.health_check
410 }
411
412 pub fn outcome_producer(&self) -> &Addr<OutcomeProducer> {
414 &self.inner.registry.outcome_producer
415 }
416
417 pub fn upstream_relay(&self) -> &Addr<UpstreamRelay> {
419 &self.inner.registry.upstream_relay
420 }
421
422 pub fn processor(&self) -> &Addr<EnvelopeProcessor> {
424 &self.inner.registry.processor
425 }
426
427 pub fn global_config(&self) -> &Addr<GlobalConfigManager> {
429 &self.inner.registry.global_config
430 }
431
432 pub fn outcome_aggregator(&self) -> &Addr<TrackOutcome> {
434 &self.inner.registry.outcome_aggregator
435 }
436}
437
438#[cfg(feature = "processing")]
446pub fn create_redis_clients(configs: RedisConfigsRef<'_>) -> Result<RedisClients, RedisError> {
447 const CARDINALITY_REDIS_CLIENT: &str = "cardinality";
448 const PROJECT_CONFIG_REDIS_CLIENT: &str = "projectconfig";
449 const QUOTA_REDIS_CLIENT: &str = "quotas";
450 const UNIFIED_REDIS_CLIENT: &str = "unified";
451
452 match configs {
453 RedisConfigsRef::Unified(unified) => {
454 let client = create_async_redis_client(UNIFIED_REDIS_CLIENT, &unified)?;
455
456 Ok(RedisClients {
457 project_configs: client.clone(),
458 cardinality: client.clone(),
459 quotas: client,
460 })
461 }
462 RedisConfigsRef::Individual {
463 project_configs,
464 cardinality,
465 quotas,
466 } => {
467 let project_configs =
468 create_async_redis_client(PROJECT_CONFIG_REDIS_CLIENT, &project_configs)?;
469 let cardinality = create_async_redis_client(CARDINALITY_REDIS_CLIENT, &cardinality)?;
470 let quotas = create_async_redis_client(QUOTA_REDIS_CLIENT, "as)?;
471
472 Ok(RedisClients {
473 project_configs,
474 cardinality,
475 quotas,
476 })
477 }
478 }
479}
480
481#[cfg(feature = "processing")]
482fn create_async_redis_client(
483 name: &'static str,
484 config: &RedisConfigRef<'_>,
485) -> Result<AsyncRedisClient, RedisError> {
486 match config {
487 RedisConfigRef::Cluster {
488 cluster_nodes,
489 options,
490 } => AsyncRedisClient::cluster(name, cluster_nodes.iter().map(|s| s.as_str()), options),
491 RedisConfigRef::Single { server, options } => {
492 AsyncRedisClient::single(name, server, options)
493 }
494 }
495}
496
497#[cfg(feature = "processing")]
498async fn initialize_redis_scripts_for_client(
499 redis_clients: &RedisClients,
500) -> Result<(), RedisError> {
501 let scripts = RedisScripts::all();
502
503 let clients = [&redis_clients.cardinality, &redis_clients.quotas];
504 for client in clients {
505 initialize_redis_scripts(client, &scripts).await?;
506 }
507
508 Ok(())
509}
510
511#[cfg(feature = "processing")]
512async fn initialize_redis_scripts(
513 client: &AsyncRedisClient,
514 scripts: &[&Script; 3],
515) -> Result<(), RedisError> {
516 let mut connection = client.get_connection().await?;
517
518 for script in scripts {
519 script
522 .prepare_invoke()
523 .load_async(&mut connection)
524 .await
525 .map_err(RedisError::Redis)?;
526 }
527
528 Ok(())
529}
530
531impl FromRequestParts<Self> for ServiceState {
532 type Rejection = Infallible;
533
534 async fn from_request_parts(_: &mut Parts, state: &Self) -> Result<Self, Self::Rejection> {
535 Ok(state.clone())
536 }
537}