1use std::convert::Infallible;
2use std::sync::Arc;
3use std::time::Duration;
4
5use crate::metrics::MetricOutcomes;
6use crate::services::autoscaling::{AutoscalingMetricService, AutoscalingMetrics};
7use crate::services::buffer::{
8 ObservableEnvelopeBuffer, PartitionedEnvelopeBuffer, ProjectKeyPair,
9};
10use crate::services::cogs::{CogsService, CogsServiceRecorder};
11use crate::services::global_config::{
12 GlobalConfigHandle, GlobalConfigManager, GlobalConfigService,
13};
14use crate::services::health_check::{HealthCheck, HealthCheckService};
15use crate::services::metrics::RouterService;
16#[cfg(feature = "processing")]
17use crate::services::objectstore::Objectstore;
18#[cfg(feature = "processing")]
19use crate::services::objectstore::ObjectstoreService;
20use crate::services::outcome::{OutcomeProducer, OutcomeProducerService, TrackOutcome};
21use crate::services::outcome_aggregator::OutcomeAggregator;
22use crate::services::processor::{
23 self, EnvelopeProcessor, EnvelopeProcessorService, EnvelopeProcessorServicePool,
24};
25use crate::services::projects::cache::{ProjectCacheHandle, ProjectCacheService};
26use crate::services::projects::source::ProjectSource;
27use crate::services::proxy_processor::{ProxyAddrs, ProxyProcessorService};
28use crate::services::relays::{RelayCache, RelayCacheService};
29use crate::services::stats::RelayStats;
30#[cfg(feature = "processing")]
31use crate::services::store::{StoreService, StoreServicePool};
32use crate::services::upload::{self, Upload};
33use crate::services::upstream::{UpstreamRelay, UpstreamRelayService};
34use crate::utils::{MemoryChecker, MemoryStat, ThreadKind};
35#[cfg(feature = "processing")]
36use anyhow::Context;
37use anyhow::Result;
38use axum::extract::FromRequestParts;
39use axum::http::request::Parts;
40use relay_cogs::Cogs;
41use relay_config::Config;
42#[cfg(feature = "processing")]
43use relay_config::{RedisConfigRef, RedisConfigsRef};
44#[cfg(feature = "processing")]
45use relay_redis::AsyncRedisClient;
46#[cfg(feature = "processing")]
47use relay_redis::redis::Script;
48#[cfg(feature = "processing")]
49use relay_redis::{RedisClients, RedisError, RedisScripts};
50#[cfg(feature = "processing")]
51use relay_system::ConcurrentService;
52use relay_system::{Addr, Service, ServiceSpawn, ServiceSpawnExt as _, channel};
53
54#[derive(Debug, thiserror::Error)]
56pub enum ServiceError {
57 #[error("could not load the Geoip Db")]
59 GeoIp,
60
61 #[cfg(feature = "processing")]
63 #[error("could not initialize kafka producer: {0}")]
64 Kafka(String),
65
66 #[cfg(feature = "processing")]
68 #[error("could not initialize redis client during startup")]
69 Redis,
70}
71
72#[derive(Clone, Debug)]
73pub struct Registry {
74 pub health_check: Addr<HealthCheck>,
75 pub outcome_producer: Addr<OutcomeProducer>,
76 pub outcome_aggregator: Addr<TrackOutcome>,
77 pub processor: Addr<EnvelopeProcessor>,
78 pub relay_cache: Addr<RelayCache>,
79 pub global_config: Addr<GlobalConfigManager>,
80 pub upstream_relay: Addr<UpstreamRelay>,
81 pub envelope_buffer: Arc<PartitionedEnvelopeBuffer>,
82 pub project_cache_handle: ProjectCacheHandle,
83 pub autoscaling: Option<Addr<AutoscalingMetrics>>,
84 #[cfg(feature = "processing")]
85 pub objectstore: Option<Addr<Objectstore>>,
86 pub upload: Addr<Upload>,
87 pub global_config_handle: GlobalConfigHandle,
88}
89
90pub fn create_runtime(name: &'static str, threads: usize) -> relay_system::Runtime {
92 relay_system::Runtime::builder(name)
93 .worker_threads(threads)
94 .max_blocking_threads(150)
105 .thread_keep_alive(Duration::from_secs(1))
109 .build()
110}
111
112fn create_processor_pool(config: &Config) -> Result<EnvelopeProcessorServicePool> {
113 let thread_count = match config.cpu_concurrency() {
116 conc @ 0..=2 => conc.max(1),
117 conc @ 3..=4 => conc - 1,
118 conc => conc - 2,
119 };
120 relay_log::info!("starting {thread_count} envelope processing workers");
121
122 let pool = crate::utils::ThreadPoolBuilder::new("processor", tokio::runtime::Handle::current())
123 .num_threads(thread_count)
124 .max_concurrency(config.pool_concurrency())
125 .thread_kind(ThreadKind::Worker)
126 .build()?;
127
128 Ok(pool)
129}
130
131#[cfg(feature = "processing")]
132fn create_store_pool(config: &Config) -> Result<StoreServicePool> {
133 let thread_count = config.cpu_concurrency().div_ceil(12);
139 relay_log::info!("starting {thread_count} store workers");
140
141 let pool = crate::utils::ThreadPoolBuilder::new("store", tokio::runtime::Handle::current())
142 .num_threads(thread_count)
143 .max_concurrency(config.pool_concurrency())
144 .build()?;
145
146 Ok(pool)
147}
148
149#[derive(Debug)]
150struct StateInner {
151 config: Arc<Config>,
152 memory_checker: MemoryChecker,
153 registry: Registry,
154}
155
156#[derive(Clone, Debug)]
158pub struct ServiceState {
159 inner: Arc<StateInner>,
160}
161
162impl ServiceState {
163 pub async fn start(
165 handle: &relay_system::Handle,
166 services: &dyn ServiceSpawn,
167 config: Arc<Config>,
168 ) -> Result<Self> {
169 let upstream_relay = services.start(UpstreamRelayService::new(config.clone()));
170
171 #[cfg(feature = "processing")]
172 let redis_clients = config
173 .redis()
174 .filter(|_| config.processing_enabled())
175 .map(create_redis_clients)
176 .transpose()
177 .context(ServiceError::Redis)?;
178
179 #[cfg(feature = "processing")]
184 if let Some(redis_clients) = &redis_clients {
185 initialize_redis_scripts_for_client(redis_clients)
186 .await
187 .context(ServiceError::Redis)?;
188 }
189
190 let memory_stat = MemoryStat::new(config.memory_stat_refresh_frequency_ms());
193
194 let (processor, processor_rx) = match config.relay_mode() {
197 relay_config::RelayMode::Proxy => channel(ProxyProcessorService::name()),
198 relay_config::RelayMode::Managed => channel(EnvelopeProcessorService::name()),
199 };
200
201 let outcome_producer = services.start(OutcomeProducerService::create(
202 config.clone(),
203 upstream_relay.clone(),
204 processor.clone(),
205 )?);
206 let outcome_aggregator =
207 services.start(OutcomeAggregator::new(&config, outcome_producer.clone()));
208
209 let (global_config, global_config_rx) =
210 GlobalConfigService::new(config.clone(), upstream_relay.clone());
211 let global_config_handle = global_config.handle();
212 let global_config = services.start(global_config);
216
217 let project_source = ProjectSource::start_in(
218 services,
219 Arc::clone(&config),
220 upstream_relay.clone(),
221 #[cfg(feature = "processing")]
222 redis_clients.clone(),
223 )
224 .await;
225 let project_cache_handle =
226 ProjectCacheService::new(Arc::clone(&config), project_source).start_in(services);
227
228 let metric_outcomes = MetricOutcomes::new(outcome_aggregator.clone());
229
230 #[cfg(feature = "processing")]
231 let store_pool = create_store_pool(&config)?;
232 #[cfg(feature = "processing")]
233 let store = config
234 .processing_enabled()
235 .then(|| {
236 StoreService::create(
237 store_pool.clone(),
238 config.clone(),
239 global_config_handle.clone(),
240 outcome_aggregator.clone(),
241 metric_outcomes.clone(),
242 )
243 .map(|s| services.start(s))
244 })
245 .transpose()?;
246
247 #[cfg(feature = "processing")]
248 let objectstore = ObjectstoreService::new(config.objectstore(), store.clone())?.map(|s| {
249 let concurrent = ConcurrentService::new(s)
250 .with_backlog_limit(config.objectstore().max_backlog)
251 .with_concurrency_limit(config.objectstore().max_concurrent_requests);
252 services.start(concurrent)
253 });
254
255 let envelope_buffer = PartitionedEnvelopeBuffer::create(
256 config.spool_partitions(),
257 config.clone(),
258 memory_stat.clone(),
259 global_config_rx.clone(),
260 project_cache_handle.clone(),
261 processor.clone(),
262 outcome_aggregator.clone(),
263 services,
264 );
265
266 let (processor_pool, aggregator_handle, autoscaling) = match config.relay_mode() {
267 relay_config::RelayMode::Proxy => {
268 services.start_with(
269 ProxyProcessorService::new(
270 config.clone(),
271 project_cache_handle.clone(),
272 ProxyAddrs {
273 outcome_aggregator: outcome_aggregator.clone(),
274 upstream_relay: upstream_relay.clone(),
275 },
276 ),
277 processor_rx,
278 );
279 (None, None, None)
280 }
281 relay_config::RelayMode::Managed => {
282 let processor_pool = create_processor_pool(&config)?;
283
284 let aggregator = RouterService::new(
285 handle.clone(),
286 config.default_aggregator_config().clone(),
287 config.secondary_aggregator_configs().clone(),
288 Some(processor.clone().recipient()),
289 project_cache_handle.clone(),
290 );
291 let aggregator_handle = aggregator.handle();
292 let aggregator = services.start(aggregator);
293
294 let cogs = CogsService::new(&config);
295 let cogs = Cogs::new(CogsServiceRecorder::new(&config, services.start(cogs)));
296
297 services.start_with(
298 EnvelopeProcessorService::new(
299 processor_pool.clone(),
300 config.clone(),
301 global_config_handle.clone(),
302 project_cache_handle.clone(),
303 cogs,
304 #[cfg(feature = "processing")]
305 redis_clients.clone(),
306 processor::Addrs {
307 outcome_aggregator: outcome_aggregator.clone(),
308 upstream_relay: upstream_relay.clone(),
309 #[cfg(feature = "processing")]
310 objectstore: objectstore.clone(),
311 #[cfg(feature = "processing")]
312 store_forwarder: store,
313 aggregator: aggregator.clone(),
314 },
315 metric_outcomes.clone(),
316 ),
317 processor_rx,
318 );
319
320 let autoscaling = services.start(AutoscalingMetricService::new(
321 memory_stat.clone(),
322 envelope_buffer.clone(),
323 handle.clone(),
324 processor_pool.clone(),
325 ));
326
327 (
328 Some(processor_pool),
329 Some(aggregator_handle),
330 Some(autoscaling),
331 )
332 }
333 };
334
335 let health_check = services.start(HealthCheckService::new(
336 config.clone(),
337 MemoryChecker::new(memory_stat.clone(), config.clone()),
338 aggregator_handle,
339 upstream_relay.clone(),
340 envelope_buffer.clone(),
341 ));
342
343 services.start(RelayStats::new(
344 config.clone(),
345 handle.clone(),
346 upstream_relay.clone(),
347 #[cfg(feature = "processing")]
348 redis_clients.clone(),
349 processor_pool,
350 #[cfg(feature = "processing")]
351 store_pool,
352 ));
353
354 let relay_cache = services.start(RelayCacheService::new(
355 config.clone(),
356 upstream_relay.clone(),
357 ));
358
359 let upload = services.start(upload::create_service(
360 &config,
361 &upstream_relay,
362 #[cfg(feature = "processing")]
363 &objectstore,
364 ));
365
366 let registry = Registry {
367 processor,
368 health_check,
369 outcome_producer,
370 outcome_aggregator,
371 relay_cache,
372 global_config,
373 project_cache_handle,
374 upstream_relay,
375 envelope_buffer,
376 autoscaling,
377 #[cfg(feature = "processing")]
378 objectstore,
379 upload,
380 global_config_handle,
381 };
382
383 let state = StateInner {
384 config: config.clone(),
385 memory_checker: MemoryChecker::new(memory_stat, config.clone()),
386 registry,
387 };
388
389 Ok(ServiceState {
390 inner: Arc::new(state),
391 })
392 }
393
394 pub fn config(&self) -> &Config {
396 &self.inner.config
397 }
398
399 pub fn memory_checker(&self) -> &MemoryChecker {
403 &self.inner.memory_checker
404 }
405
406 pub fn autoscaling(&self) -> Option<&Addr<AutoscalingMetrics>> {
407 self.inner.registry.autoscaling.as_ref()
408 }
409
410 pub fn envelope_buffer(&self, project_key_pair: ProjectKeyPair) -> &ObservableEnvelopeBuffer {
412 self.inner.registry.envelope_buffer.buffer(project_key_pair)
413 }
414
415 pub fn project_cache_handle(&self) -> &ProjectCacheHandle {
417 &self.inner.registry.project_cache_handle
418 }
419
420 pub fn relay_cache(&self) -> &Addr<RelayCache> {
422 &self.inner.registry.relay_cache
423 }
424
425 pub fn health_check(&self) -> &Addr<HealthCheck> {
427 &self.inner.registry.health_check
428 }
429
430 pub fn outcome_producer(&self) -> &Addr<OutcomeProducer> {
432 &self.inner.registry.outcome_producer
433 }
434
435 pub fn upstream_relay(&self) -> &Addr<UpstreamRelay> {
437 &self.inner.registry.upstream_relay
438 }
439
440 pub fn processor(&self) -> &Addr<EnvelopeProcessor> {
442 &self.inner.registry.processor
443 }
444
445 pub fn global_config(&self) -> &Addr<GlobalConfigManager> {
447 &self.inner.registry.global_config
448 }
449
450 pub fn outcome_aggregator(&self) -> &Addr<TrackOutcome> {
452 &self.inner.registry.outcome_aggregator
453 }
454
455 #[cfg(feature = "processing")]
456 pub fn objectstore(&self) -> Option<&Addr<Objectstore>> {
458 self.inner.registry.objectstore.as_ref()
459 }
460
461 pub fn upload(&self) -> &Addr<Upload> {
463 &self.inner.registry.upload
464 }
465
466 pub fn global_config_handle(&self) -> &GlobalConfigHandle {
467 &self.inner.registry.global_config_handle
468 }
469}
470
471#[cfg(feature = "processing")]
479pub fn create_redis_clients(configs: RedisConfigsRef<'_>) -> Result<RedisClients, RedisError> {
480 const PROJECT_CONFIG_REDIS_CLIENT: &str = "projectconfig";
481 const QUOTA_REDIS_CLIENT: &str = "quotas";
482 const UNIFIED_REDIS_CLIENT: &str = "unified";
483
484 match configs {
485 RedisConfigsRef::Unified(unified) => {
486 let client = create_async_redis_client(UNIFIED_REDIS_CLIENT, &unified)?;
487
488 Ok(RedisClients {
489 project_configs: client.clone(),
490 quotas: client,
491 })
492 }
493 RedisConfigsRef::Individual {
494 project_configs,
495 quotas,
496 } => {
497 let project_configs =
498 create_async_redis_client(PROJECT_CONFIG_REDIS_CLIENT, &project_configs)?;
499 let quotas = create_async_redis_client(QUOTA_REDIS_CLIENT, "as)?;
500
501 Ok(RedisClients {
502 project_configs,
503 quotas,
504 })
505 }
506 }
507}
508
509#[cfg(feature = "processing")]
510fn create_async_redis_client(
511 name: &'static str,
512 config: &RedisConfigRef<'_>,
513) -> Result<AsyncRedisClient, RedisError> {
514 match config {
515 RedisConfigRef::Cluster {
516 cluster_nodes,
517 options,
518 } => AsyncRedisClient::cluster(name, cluster_nodes.iter().map(|s| s.as_str()), options),
519 RedisConfigRef::Single { server, options } => {
520 AsyncRedisClient::single(name, server, options)
521 }
522 }
523}
524
525#[cfg(feature = "processing")]
526async fn initialize_redis_scripts_for_client(
527 redis_clients: &RedisClients,
528) -> Result<(), RedisError> {
529 let scripts = RedisScripts::all();
530
531 let RedisClients {
532 project_configs,
533 quotas,
534 } = redis_clients;
535
536 initialize_redis_scripts(project_configs, &scripts).await?;
537 initialize_redis_scripts(quotas, &scripts).await?;
538
539 Ok(())
540}
541
542#[cfg(feature = "processing")]
543async fn initialize_redis_scripts(
544 client: &AsyncRedisClient,
545 scripts: &[&Script],
546) -> Result<(), RedisError> {
547 let mut connection = client.get_connection().await?;
548
549 for script in scripts {
550 script
553 .prepare_invoke()
554 .load_async(&mut connection)
555 .await
556 .map_err(RedisError::Redis)?;
557 }
558
559 Ok(())
560}
561
562impl FromRequestParts<Self> for ServiceState {
563 type Rejection = Infallible;
564
565 async fn from_request_parts(_: &mut Parts, state: &Self) -> Result<Self, Self::Rejection> {
566 Ok(state.clone())
567 }
568}