1use std::convert::Infallible;
2use std::sync::Arc;
3use std::time::Duration;
4
5use crate::metrics::{MetricOutcomes, MetricStats};
6use crate::services::autoscaling::{AutoscalingMetricService, AutoscalingMetrics};
7use crate::services::buffer::{
8 ObservableEnvelopeBuffer, PartitionedEnvelopeBuffer, ProjectKeyPair,
9};
10use crate::services::cogs::{CogsService, CogsServiceRecorder};
11use crate::services::global_config::{GlobalConfigManager, GlobalConfigService};
12#[cfg(feature = "processing")]
13use crate::services::global_rate_limits::GlobalRateLimitsService;
14use crate::services::health_check::{HealthCheck, HealthCheckService};
15use crate::services::metrics::RouterService;
16use crate::services::outcome::{OutcomeProducer, OutcomeProducerService, TrackOutcome};
17use crate::services::outcome_aggregator::OutcomeAggregator;
18use crate::services::processor::{
19 self, EnvelopeProcessor, EnvelopeProcessorService, EnvelopeProcessorServicePool,
20};
21use crate::services::projects::cache::{ProjectCacheHandle, ProjectCacheService};
22use crate::services::projects::source::ProjectSource;
23use crate::services::relays::{RelayCache, RelayCacheService};
24use crate::services::stats::RelayStats;
25#[cfg(feature = "processing")]
26use crate::services::store::{StoreService, StoreServicePool};
27use crate::services::test_store::{TestStore, TestStoreService};
28use crate::services::upstream::{UpstreamRelay, UpstreamRelayService};
29use crate::utils::{MemoryChecker, MemoryStat, ThreadKind};
30#[cfg(feature = "processing")]
31use anyhow::Context;
32use anyhow::Result;
33use axum::extract::FromRequestParts;
34use axum::http::request::Parts;
35use relay_cogs::Cogs;
36use relay_config::Config;
37#[cfg(feature = "processing")]
38use relay_config::{RedisConfigRef, RedisConfigsRef};
39#[cfg(feature = "processing")]
40use relay_redis::AsyncRedisClient;
41#[cfg(feature = "processing")]
42use relay_redis::redis::Script;
43#[cfg(feature = "processing")]
44use relay_redis::{RedisClients, RedisError, RedisScripts};
45use relay_system::{Addr, Service, ServiceSpawn, ServiceSpawnExt as _, channel};
46
47#[derive(Debug, thiserror::Error)]
49pub enum ServiceError {
50 #[error("could not load the Geoip Db")]
52 GeoIp,
53
54 #[cfg(feature = "processing")]
56 #[error("could not initialize kafka producer: {0}")]
57 Kafka(String),
58
59 #[cfg(feature = "processing")]
61 #[error("could not initialize redis client during startup")]
62 Redis,
63}
64
65#[derive(Clone, Debug)]
66pub struct Registry {
67 pub health_check: Addr<HealthCheck>,
68 pub outcome_producer: Addr<OutcomeProducer>,
69 pub outcome_aggregator: Addr<TrackOutcome>,
70 pub processor: Addr<EnvelopeProcessor>,
71 pub test_store: Addr<TestStore>,
72 pub relay_cache: Addr<RelayCache>,
73 pub global_config: Addr<GlobalConfigManager>,
74 pub upstream_relay: Addr<UpstreamRelay>,
75 pub envelope_buffer: PartitionedEnvelopeBuffer,
76 pub project_cache_handle: ProjectCacheHandle,
77 pub autoscaling: Addr<AutoscalingMetrics>,
78}
79
80pub fn create_runtime(name: &'static str, threads: usize) -> relay_system::Runtime {
82 relay_system::Runtime::builder(name)
83 .worker_threads(threads)
84 .max_blocking_threads(150)
95 .thread_keep_alive(Duration::from_secs(1))
99 .build()
100}
101
102fn create_processor_pool(config: &Config) -> Result<EnvelopeProcessorServicePool> {
103 let thread_count = match config.cpu_concurrency() {
106 conc @ 0..=2 => conc.max(1),
107 conc @ 3..=4 => conc - 1,
108 conc => conc - 2,
109 };
110 relay_log::info!("starting {thread_count} envelope processing workers");
111
112 let pool = crate::utils::ThreadPoolBuilder::new("processor", tokio::runtime::Handle::current())
113 .num_threads(thread_count)
114 .max_concurrency(config.pool_concurrency())
115 .thread_kind(ThreadKind::Worker)
116 .build()?;
117
118 Ok(pool)
119}
120
121#[cfg(feature = "processing")]
122fn create_store_pool(config: &Config) -> Result<StoreServicePool> {
123 let thread_count = config.cpu_concurrency().div_ceil(12);
129 relay_log::info!("starting {thread_count} store workers");
130
131 let pool = crate::utils::ThreadPoolBuilder::new("store", tokio::runtime::Handle::current())
132 .num_threads(thread_count)
133 .max_concurrency(config.pool_concurrency())
134 .build()?;
135
136 Ok(pool)
137}
138
139#[derive(Debug)]
140struct StateInner {
141 config: Arc<Config>,
142 memory_checker: MemoryChecker,
143 registry: Registry,
144}
145
146#[derive(Clone, Debug)]
148pub struct ServiceState {
149 inner: Arc<StateInner>,
150}
151
152impl ServiceState {
153 pub async fn start(
155 handle: &relay_system::Handle,
156 services: &dyn ServiceSpawn,
157 config: Arc<Config>,
158 ) -> Result<Self> {
159 let upstream_relay = services.start(UpstreamRelayService::new(config.clone()));
160 let test_store = services.start(TestStoreService::new(config.clone()));
161
162 #[cfg(feature = "processing")]
163 let redis_clients = config
164 .redis()
165 .filter(|_| config.processing_enabled())
166 .map(create_redis_clients)
167 .transpose()
168 .context(ServiceError::Redis)?;
169
170 #[cfg(feature = "processing")]
175 if let Some(redis_clients) = &redis_clients {
176 initialize_redis_scripts_for_client(redis_clients)
177 .await
178 .context(ServiceError::Redis)?;
179 }
180
181 let memory_stat = MemoryStat::new(config.memory_stat_refresh_frequency_ms());
184
185 let (processor, processor_rx) = channel(EnvelopeProcessorService::name());
188 let outcome_producer = services.start(OutcomeProducerService::create(
189 config.clone(),
190 upstream_relay.clone(),
191 processor.clone(),
192 )?);
193 let outcome_aggregator =
194 services.start(OutcomeAggregator::new(&config, outcome_producer.clone()));
195
196 let (global_config, global_config_rx) =
197 GlobalConfigService::new(config.clone(), upstream_relay.clone());
198 let global_config_handle = global_config.handle();
199 let global_config = services.start(global_config);
203
204 let project_source = ProjectSource::start_in(
205 services,
206 Arc::clone(&config),
207 upstream_relay.clone(),
208 #[cfg(feature = "processing")]
209 redis_clients.clone(),
210 )
211 .await;
212 let project_cache_handle =
213 ProjectCacheService::new(Arc::clone(&config), project_source).start_in(services);
214
215 let aggregator = RouterService::new(
216 handle.clone(),
217 config.default_aggregator_config().clone(),
218 config.secondary_aggregator_configs().clone(),
219 Some(processor.clone().recipient()),
220 project_cache_handle.clone(),
221 );
222 let aggregator_handle = aggregator.handle();
223 let aggregator = services.start(aggregator);
224
225 let metric_stats = MetricStats::new(
226 config.clone(),
227 global_config_handle.clone(),
228 aggregator.clone(),
229 );
230
231 let metric_outcomes = MetricOutcomes::new(metric_stats, outcome_aggregator.clone());
232
233 #[cfg(feature = "processing")]
234 let store_pool = create_store_pool(&config)?;
235 #[cfg(feature = "processing")]
236 let store = config
237 .processing_enabled()
238 .then(|| {
239 StoreService::create(
240 store_pool.clone(),
241 config.clone(),
242 global_config_handle.clone(),
243 outcome_aggregator.clone(),
244 metric_outcomes.clone(),
245 )
246 .map(|s| services.start(s))
247 })
248 .transpose()?;
249
250 let cogs = CogsService::new(&config);
251 let cogs = Cogs::new(CogsServiceRecorder::new(&config, services.start(cogs)));
252
253 #[cfg(feature = "processing")]
254 let global_rate_limits = redis_clients
255 .as_ref()
256 .map(|p| services.start(GlobalRateLimitsService::new(p.quotas.clone())));
257
258 let processor_pool = create_processor_pool(&config)?;
259 services.start_with(
260 EnvelopeProcessorService::new(
261 processor_pool.clone(),
262 config.clone(),
263 global_config_handle,
264 project_cache_handle.clone(),
265 cogs,
266 #[cfg(feature = "processing")]
267 redis_clients.clone(),
268 processor::Addrs {
269 outcome_aggregator: outcome_aggregator.clone(),
270 upstream_relay: upstream_relay.clone(),
271 test_store: test_store.clone(),
272 #[cfg(feature = "processing")]
273 store_forwarder: store.clone(),
274 aggregator: aggregator.clone(),
275 #[cfg(feature = "processing")]
276 global_rate_limits,
277 },
278 metric_outcomes.clone(),
279 ),
280 processor_rx,
281 );
282
283 let envelope_buffer = PartitionedEnvelopeBuffer::create(
284 config.spool_partitions(),
285 config.clone(),
286 memory_stat.clone(),
287 global_config_rx.clone(),
288 project_cache_handle.clone(),
289 processor.clone(),
290 outcome_aggregator.clone(),
291 test_store.clone(),
292 services,
293 );
294
295 let health_check = services.start(HealthCheckService::new(
296 config.clone(),
297 MemoryChecker::new(memory_stat.clone(), config.clone()),
298 aggregator_handle,
299 upstream_relay.clone(),
300 envelope_buffer.clone(),
301 ));
302
303 let autoscaling = services.start(AutoscalingMetricService::new(
304 memory_stat.clone(),
305 envelope_buffer.clone(),
306 handle.clone(),
307 processor_pool.clone(),
308 ));
309
310 services.start(RelayStats::new(
311 config.clone(),
312 handle.clone(),
313 upstream_relay.clone(),
314 #[cfg(feature = "processing")]
315 redis_clients.clone(),
316 processor_pool,
317 #[cfg(feature = "processing")]
318 store_pool,
319 ));
320
321 let relay_cache = services.start(RelayCacheService::new(
322 config.clone(),
323 upstream_relay.clone(),
324 ));
325
326 let registry = Registry {
327 processor,
328 health_check,
329 outcome_producer,
330 outcome_aggregator,
331 test_store,
332 relay_cache,
333 global_config,
334 project_cache_handle,
335 upstream_relay,
336 envelope_buffer,
337 autoscaling,
338 };
339
340 let state = StateInner {
341 config: config.clone(),
342 memory_checker: MemoryChecker::new(memory_stat, config.clone()),
343 registry,
344 };
345
346 Ok(ServiceState {
347 inner: Arc::new(state),
348 })
349 }
350
351 pub fn config(&self) -> &Config {
353 &self.inner.config
354 }
355
356 pub fn memory_checker(&self) -> &MemoryChecker {
360 &self.inner.memory_checker
361 }
362
363 pub fn autoscaling(&self) -> &Addr<AutoscalingMetrics> {
364 &self.inner.registry.autoscaling
365 }
366
367 pub fn envelope_buffer(&self, project_key_pair: ProjectKeyPair) -> &ObservableEnvelopeBuffer {
369 self.inner.registry.envelope_buffer.buffer(project_key_pair)
370 }
371
372 pub fn project_cache_handle(&self) -> &ProjectCacheHandle {
374 &self.inner.registry.project_cache_handle
375 }
376
377 pub fn relay_cache(&self) -> &Addr<RelayCache> {
379 &self.inner.registry.relay_cache
380 }
381
382 pub fn health_check(&self) -> &Addr<HealthCheck> {
384 &self.inner.registry.health_check
385 }
386
387 pub fn outcome_producer(&self) -> &Addr<OutcomeProducer> {
389 &self.inner.registry.outcome_producer
390 }
391
392 pub fn test_store(&self) -> &Addr<TestStore> {
394 &self.inner.registry.test_store
395 }
396
397 pub fn upstream_relay(&self) -> &Addr<UpstreamRelay> {
399 &self.inner.registry.upstream_relay
400 }
401
402 pub fn processor(&self) -> &Addr<EnvelopeProcessor> {
404 &self.inner.registry.processor
405 }
406
407 pub fn global_config(&self) -> &Addr<GlobalConfigManager> {
409 &self.inner.registry.global_config
410 }
411
412 pub fn outcome_aggregator(&self) -> &Addr<TrackOutcome> {
414 &self.inner.registry.outcome_aggregator
415 }
416}
417
418#[cfg(feature = "processing")]
426pub fn create_redis_clients(configs: RedisConfigsRef<'_>) -> Result<RedisClients, RedisError> {
427 match configs {
428 RedisConfigsRef::Unified(unified) => {
429 let client = create_async_redis_client(&unified)?;
430
431 Ok(RedisClients {
432 project_configs: client.clone(),
433 cardinality: client.clone(),
434 quotas: client,
435 })
436 }
437 RedisConfigsRef::Individual {
438 project_configs,
439 cardinality,
440 quotas,
441 } => {
442 let project_configs = create_async_redis_client(&project_configs)?;
443 let cardinality = create_async_redis_client(&cardinality)?;
444 let quotas = create_async_redis_client("as)?;
445
446 Ok(RedisClients {
447 project_configs,
448 cardinality,
449 quotas,
450 })
451 }
452 }
453}
454
455#[cfg(feature = "processing")]
456fn create_async_redis_client(config: &RedisConfigRef<'_>) -> Result<AsyncRedisClient, RedisError> {
457 match config {
458 RedisConfigRef::Cluster {
459 cluster_nodes,
460 options,
461 } => AsyncRedisClient::cluster(cluster_nodes.iter().map(|s| s.as_str()), options),
462 RedisConfigRef::Single { server, options } => AsyncRedisClient::single(server, options),
463 }
464}
465
466#[cfg(feature = "processing")]
467async fn initialize_redis_scripts_for_client(
468 redis_clients: &RedisClients,
469) -> Result<(), RedisError> {
470 let scripts = RedisScripts::all();
471
472 let clients = [&redis_clients.cardinality, &redis_clients.quotas];
473 for client in clients {
474 initialize_redis_scripts(client, &scripts).await?;
475 }
476
477 Ok(())
478}
479
480#[cfg(feature = "processing")]
481async fn initialize_redis_scripts(
482 client: &AsyncRedisClient,
483 scripts: &[&Script; 3],
484) -> Result<(), RedisError> {
485 let mut connection = client.get_connection().await?;
486
487 for script in scripts {
488 script
491 .prepare_invoke()
492 .load_async(&mut connection)
493 .await
494 .map_err(RedisError::Redis)?;
495 }
496
497 Ok(())
498}
499
500impl FromRequestParts<Self> for ServiceState {
501 type Rejection = Infallible;
502
503 async fn from_request_parts(_: &mut Parts, state: &Self) -> Result<Self, Self::Rejection> {
504 Ok(state.clone())
505 }
506}