1use std::convert::Infallible;
2use std::sync::Arc;
3use std::time::Duration;
4
5use crate::metrics::MetricOutcomes;
6use crate::services::autoscaling::{AutoscalingMetricService, AutoscalingMetrics};
7use crate::services::buffer::{
8 ObservableEnvelopeBuffer, PartitionedEnvelopeBuffer, ProjectKeyPair,
9};
10use crate::services::cogs::{CogsService, CogsServiceRecorder};
11use crate::services::global_config::{GlobalConfigManager, GlobalConfigService};
12#[cfg(feature = "processing")]
13use crate::services::global_rate_limits::GlobalRateLimitsService;
14use crate::services::health_check::{HealthCheck, HealthCheckService};
15use crate::services::metrics::RouterService;
16use crate::services::outcome::{OutcomeProducer, OutcomeProducerService, TrackOutcome};
17use crate::services::outcome_aggregator::OutcomeAggregator;
18use crate::services::processor::{
19 self, EnvelopeProcessor, EnvelopeProcessorService, EnvelopeProcessorServicePool,
20};
21use crate::services::projects::cache::{ProjectCacheHandle, ProjectCacheService};
22use crate::services::projects::source::ProjectSource;
23use crate::services::proxy_processor::{ProxyAddrs, ProxyProcessorService};
24use crate::services::relays::{RelayCache, RelayCacheService};
25use crate::services::stats::RelayStats;
26#[cfg(feature = "processing")]
27use crate::services::store::{StoreService, StoreServicePool};
28#[cfg(feature = "processing")]
29use crate::services::upload::UploadService;
30use crate::services::upstream::{UpstreamRelay, UpstreamRelayService};
31use crate::utils::{MemoryChecker, MemoryStat, ThreadKind};
32#[cfg(feature = "processing")]
33use anyhow::Context;
34use anyhow::Result;
35use axum::extract::FromRequestParts;
36use axum::http::request::Parts;
37use relay_cogs::Cogs;
38use relay_config::Config;
39#[cfg(feature = "processing")]
40use relay_config::{RedisConfigRef, RedisConfigsRef};
41#[cfg(feature = "processing")]
42use relay_redis::AsyncRedisClient;
43#[cfg(feature = "processing")]
44use relay_redis::redis::Script;
45#[cfg(feature = "processing")]
46use relay_redis::{RedisClients, RedisError, RedisScripts};
47use relay_system::{Addr, Service, ServiceSpawn, ServiceSpawnExt as _, channel};
48
49#[derive(Debug, thiserror::Error)]
51pub enum ServiceError {
52 #[error("could not load the Geoip Db")]
54 GeoIp,
55
56 #[cfg(feature = "processing")]
58 #[error("could not initialize kafka producer: {0}")]
59 Kafka(String),
60
61 #[cfg(feature = "processing")]
63 #[error("could not initialize redis client during startup")]
64 Redis,
65}
66
67#[derive(Clone, Debug)]
68pub struct Registry {
69 pub health_check: Addr<HealthCheck>,
70 pub outcome_producer: Addr<OutcomeProducer>,
71 pub outcome_aggregator: Addr<TrackOutcome>,
72 pub processor: Addr<EnvelopeProcessor>,
73 pub relay_cache: Addr<RelayCache>,
74 pub global_config: Addr<GlobalConfigManager>,
75 pub upstream_relay: Addr<UpstreamRelay>,
76 pub envelope_buffer: PartitionedEnvelopeBuffer,
77 pub project_cache_handle: ProjectCacheHandle,
78 pub autoscaling: Option<Addr<AutoscalingMetrics>>,
79}
80
81pub fn create_runtime(name: &'static str, threads: usize) -> relay_system::Runtime {
83 relay_system::Runtime::builder(name)
84 .worker_threads(threads)
85 .max_blocking_threads(150)
96 .thread_keep_alive(Duration::from_secs(1))
100 .build()
101}
102
103fn create_processor_pool(config: &Config) -> Result<EnvelopeProcessorServicePool> {
104 let thread_count = match config.cpu_concurrency() {
107 conc @ 0..=2 => conc.max(1),
108 conc @ 3..=4 => conc - 1,
109 conc => conc - 2,
110 };
111 relay_log::info!("starting {thread_count} envelope processing workers");
112
113 let pool = crate::utils::ThreadPoolBuilder::new("processor", tokio::runtime::Handle::current())
114 .num_threads(thread_count)
115 .max_concurrency(config.pool_concurrency())
116 .thread_kind(ThreadKind::Worker)
117 .build()?;
118
119 Ok(pool)
120}
121
122#[cfg(feature = "processing")]
123fn create_store_pool(config: &Config) -> Result<StoreServicePool> {
124 let thread_count = config.cpu_concurrency().div_ceil(12);
130 relay_log::info!("starting {thread_count} store workers");
131
132 let pool = crate::utils::ThreadPoolBuilder::new("store", tokio::runtime::Handle::current())
133 .num_threads(thread_count)
134 .max_concurrency(config.pool_concurrency())
135 .build()?;
136
137 Ok(pool)
138}
139
140#[derive(Debug)]
141struct StateInner {
142 config: Arc<Config>,
143 memory_checker: MemoryChecker,
144 registry: Registry,
145}
146
147#[derive(Clone, Debug)]
149pub struct ServiceState {
150 inner: Arc<StateInner>,
151}
152
153impl ServiceState {
154 pub async fn start(
156 handle: &relay_system::Handle,
157 services: &dyn ServiceSpawn,
158 config: Arc<Config>,
159 ) -> Result<Self> {
160 let upstream_relay = services.start(UpstreamRelayService::new(config.clone()));
161
162 #[cfg(feature = "processing")]
163 let redis_clients = config
164 .redis()
165 .filter(|_| config.processing_enabled())
166 .map(create_redis_clients)
167 .transpose()
168 .context(ServiceError::Redis)?;
169
170 #[cfg(feature = "processing")]
175 if let Some(redis_clients) = &redis_clients {
176 initialize_redis_scripts_for_client(redis_clients)
177 .await
178 .context(ServiceError::Redis)?;
179 }
180
181 let memory_stat = MemoryStat::new(config.memory_stat_refresh_frequency_ms());
184
185 let (processor, processor_rx) = match config.relay_mode() {
188 relay_config::RelayMode::Proxy => channel(ProxyProcessorService::name()),
189 relay_config::RelayMode::Managed => channel(EnvelopeProcessorService::name()),
190 };
191
192 let outcome_producer = services.start(OutcomeProducerService::create(
193 config.clone(),
194 upstream_relay.clone(),
195 processor.clone(),
196 )?);
197 let outcome_aggregator =
198 services.start(OutcomeAggregator::new(&config, outcome_producer.clone()));
199
200 let (global_config, global_config_rx) =
201 GlobalConfigService::new(config.clone(), upstream_relay.clone());
202 let global_config_handle = global_config.handle();
203 let global_config = services.start(global_config);
207
208 let project_source = ProjectSource::start_in(
209 services,
210 Arc::clone(&config),
211 upstream_relay.clone(),
212 #[cfg(feature = "processing")]
213 redis_clients.clone(),
214 )
215 .await;
216 let project_cache_handle =
217 ProjectCacheService::new(Arc::clone(&config), project_source).start_in(services);
218
219 let metric_outcomes = MetricOutcomes::new(outcome_aggregator.clone());
220
221 #[cfg(feature = "processing")]
222 let store_pool = create_store_pool(&config)?;
223 #[cfg(feature = "processing")]
224 let store = config
225 .processing_enabled()
226 .then(|| {
227 let upload = services.start(UploadService::new(config.upload()));
228 StoreService::create(
229 store_pool.clone(),
230 config.clone(),
231 global_config_handle.clone(),
232 outcome_aggregator.clone(),
233 metric_outcomes.clone(),
234 upload,
235 )
236 .map(|s| services.start(s))
237 })
238 .transpose()?;
239
240 #[cfg(feature = "processing")]
241 let global_rate_limits = redis_clients
242 .as_ref()
243 .map(|p| services.start(GlobalRateLimitsService::new(p.quotas.clone())));
244
245 let envelope_buffer = PartitionedEnvelopeBuffer::create(
246 config.spool_partitions(),
247 config.clone(),
248 memory_stat.clone(),
249 global_config_rx.clone(),
250 project_cache_handle.clone(),
251 processor.clone(),
252 outcome_aggregator.clone(),
253 services,
254 );
255
256 let (processor_pool, aggregator_handle, autoscaling) = match config.relay_mode() {
257 relay_config::RelayMode::Proxy => {
258 services.start_with(
259 ProxyProcessorService::new(
260 config.clone(),
261 project_cache_handle.clone(),
262 ProxyAddrs {
263 outcome_aggregator: outcome_aggregator.clone(),
264 upstream_relay: upstream_relay.clone(),
265 },
266 ),
267 processor_rx,
268 );
269 (None, None, None)
270 }
271 relay_config::RelayMode::Managed => {
272 let processor_pool = create_processor_pool(&config)?;
273
274 let aggregator = RouterService::new(
275 handle.clone(),
276 config.default_aggregator_config().clone(),
277 config.secondary_aggregator_configs().clone(),
278 Some(processor.clone().recipient()),
279 project_cache_handle.clone(),
280 );
281 let aggregator_handle = aggregator.handle();
282 let aggregator = services.start(aggregator);
283
284 let cogs = CogsService::new(&config);
285 let cogs = Cogs::new(CogsServiceRecorder::new(&config, services.start(cogs)));
286
287 services.start_with(
288 EnvelopeProcessorService::new(
289 processor_pool.clone(),
290 config.clone(),
291 global_config_handle,
292 project_cache_handle.clone(),
293 cogs,
294 #[cfg(feature = "processing")]
295 redis_clients.clone(),
296 processor::Addrs {
297 outcome_aggregator: outcome_aggregator.clone(),
298 upstream_relay: upstream_relay.clone(),
299 #[cfg(feature = "processing")]
300 store_forwarder: store.clone(),
301 aggregator: aggregator.clone(),
302 #[cfg(feature = "processing")]
303 global_rate_limits,
304 },
305 metric_outcomes.clone(),
306 ),
307 processor_rx,
308 );
309
310 let autoscaling = services.start(AutoscalingMetricService::new(
311 memory_stat.clone(),
312 envelope_buffer.clone(),
313 handle.clone(),
314 processor_pool.clone(),
315 ));
316
317 (
318 Some(processor_pool),
319 Some(aggregator_handle),
320 Some(autoscaling),
321 )
322 }
323 };
324
325 let health_check = services.start(HealthCheckService::new(
326 config.clone(),
327 MemoryChecker::new(memory_stat.clone(), config.clone()),
328 aggregator_handle,
329 upstream_relay.clone(),
330 envelope_buffer.clone(),
331 ));
332
333 services.start(RelayStats::new(
334 config.clone(),
335 handle.clone(),
336 upstream_relay.clone(),
337 #[cfg(feature = "processing")]
338 redis_clients.clone(),
339 processor_pool,
340 #[cfg(feature = "processing")]
341 store_pool,
342 ));
343
344 let relay_cache = services.start(RelayCacheService::new(
345 config.clone(),
346 upstream_relay.clone(),
347 ));
348
349 let registry = Registry {
350 processor,
351 health_check,
352 outcome_producer,
353 outcome_aggregator,
354 relay_cache,
355 global_config,
356 project_cache_handle,
357 upstream_relay,
358 envelope_buffer,
359 autoscaling,
360 };
361
362 let state = StateInner {
363 config: config.clone(),
364 memory_checker: MemoryChecker::new(memory_stat, config.clone()),
365 registry,
366 };
367
368 Ok(ServiceState {
369 inner: Arc::new(state),
370 })
371 }
372
373 pub fn config(&self) -> &Config {
375 &self.inner.config
376 }
377
378 pub fn memory_checker(&self) -> &MemoryChecker {
382 &self.inner.memory_checker
383 }
384
385 pub fn autoscaling(&self) -> Option<&Addr<AutoscalingMetrics>> {
386 self.inner.registry.autoscaling.as_ref()
387 }
388
389 pub fn envelope_buffer(&self, project_key_pair: ProjectKeyPair) -> &ObservableEnvelopeBuffer {
391 self.inner.registry.envelope_buffer.buffer(project_key_pair)
392 }
393
394 pub fn project_cache_handle(&self) -> &ProjectCacheHandle {
396 &self.inner.registry.project_cache_handle
397 }
398
399 pub fn relay_cache(&self) -> &Addr<RelayCache> {
401 &self.inner.registry.relay_cache
402 }
403
404 pub fn health_check(&self) -> &Addr<HealthCheck> {
406 &self.inner.registry.health_check
407 }
408
409 pub fn outcome_producer(&self) -> &Addr<OutcomeProducer> {
411 &self.inner.registry.outcome_producer
412 }
413
414 pub fn upstream_relay(&self) -> &Addr<UpstreamRelay> {
416 &self.inner.registry.upstream_relay
417 }
418
419 pub fn processor(&self) -> &Addr<EnvelopeProcessor> {
421 &self.inner.registry.processor
422 }
423
424 pub fn global_config(&self) -> &Addr<GlobalConfigManager> {
426 &self.inner.registry.global_config
427 }
428
429 pub fn outcome_aggregator(&self) -> &Addr<TrackOutcome> {
431 &self.inner.registry.outcome_aggregator
432 }
433}
434
435#[cfg(feature = "processing")]
443pub fn create_redis_clients(configs: RedisConfigsRef<'_>) -> Result<RedisClients, RedisError> {
444 const CARDINALITY_REDIS_CLIENT: &str = "cardinality";
445 const PROJECT_CONFIG_REDIS_CLIENT: &str = "projectconfig";
446 const QUOTA_REDIS_CLIENT: &str = "quotas";
447 const UNIFIED_REDIS_CLIENT: &str = "unified";
448
449 match configs {
450 RedisConfigsRef::Unified(unified) => {
451 let client = create_async_redis_client(UNIFIED_REDIS_CLIENT, &unified)?;
452
453 Ok(RedisClients {
454 project_configs: client.clone(),
455 cardinality: client.clone(),
456 quotas: client,
457 })
458 }
459 RedisConfigsRef::Individual {
460 project_configs,
461 cardinality,
462 quotas,
463 } => {
464 let project_configs =
465 create_async_redis_client(PROJECT_CONFIG_REDIS_CLIENT, &project_configs)?;
466 let cardinality = create_async_redis_client(CARDINALITY_REDIS_CLIENT, &cardinality)?;
467 let quotas = create_async_redis_client(QUOTA_REDIS_CLIENT, "as)?;
468
469 Ok(RedisClients {
470 project_configs,
471 cardinality,
472 quotas,
473 })
474 }
475 }
476}
477
478#[cfg(feature = "processing")]
479fn create_async_redis_client(
480 name: &'static str,
481 config: &RedisConfigRef<'_>,
482) -> Result<AsyncRedisClient, RedisError> {
483 match config {
484 RedisConfigRef::Cluster {
485 cluster_nodes,
486 options,
487 } => AsyncRedisClient::cluster(name, cluster_nodes.iter().map(|s| s.as_str()), options),
488 RedisConfigRef::Single { server, options } => {
489 AsyncRedisClient::single(name, server, options)
490 }
491 }
492}
493
494#[cfg(feature = "processing")]
495async fn initialize_redis_scripts_for_client(
496 redis_clients: &RedisClients,
497) -> Result<(), RedisError> {
498 let scripts = RedisScripts::all();
499
500 let clients = [&redis_clients.cardinality, &redis_clients.quotas];
501 for client in clients {
502 initialize_redis_scripts(client, &scripts).await?;
503 }
504
505 Ok(())
506}
507
508#[cfg(feature = "processing")]
509async fn initialize_redis_scripts(
510 client: &AsyncRedisClient,
511 scripts: &[&Script; 3],
512) -> Result<(), RedisError> {
513 let mut connection = client.get_connection().await?;
514
515 for script in scripts {
516 script
519 .prepare_invoke()
520 .load_async(&mut connection)
521 .await
522 .map_err(RedisError::Redis)?;
523 }
524
525 Ok(())
526}
527
528impl FromRequestParts<Self> for ServiceState {
529 type Rejection = Infallible;
530
531 async fn from_request_parts(_: &mut Parts, state: &Self) -> Result<Self, Self::Rejection> {
532 Ok(state.clone())
533 }
534}