1use std::convert::Infallible;
2use std::sync::Arc;
3use std::time::Duration;
4
5use crate::metrics::MetricOutcomes;
6use crate::services::autoscaling::{AutoscalingMetricService, AutoscalingMetrics};
7use crate::services::buffer::{
8 ObservableEnvelopeBuffer, PartitionedEnvelopeBuffer, ProjectKeyPair,
9};
10use crate::services::cogs::{CogsService, CogsServiceRecorder};
11use crate::services::global_config::{
12 GlobalConfigHandle, GlobalConfigManager, GlobalConfigService,
13};
14use crate::services::health_check::{HealthCheck, HealthCheckService};
15use crate::services::metrics::RouterService;
16#[cfg(feature = "processing")]
17use crate::services::objectstore::Objectstore;
18#[cfg(feature = "processing")]
19use crate::services::objectstore::ObjectstoreService;
20use crate::services::outcome::{
21 ClientReportOutcomeProducerService, NullOutcomeProducerService, OutcomeProducerService,
22 TrackOutcome,
23};
24use crate::services::processor::{
25 self, EnvelopeProcessor, EnvelopeProcessorService, EnvelopeProcessorServicePool,
26};
27use crate::services::projects::cache::{ProjectCacheHandle, ProjectCacheService};
28use crate::services::projects::source::ProjectSource;
29use crate::services::proxy_processor::{ProxyAddrs, ProxyProcessorService};
30use crate::services::relays::{RelayCache, RelayCacheService};
31use crate::services::stats::RelayStats;
32#[cfg(feature = "processing")]
33use crate::services::store::{StoreService, StoreServicePool};
34use crate::services::upload::{self, Upload};
35use crate::services::upstream::{UpstreamRelay, UpstreamRelayService};
36use crate::utils::{MemoryChecker, MemoryStat, ThreadKind};
37#[cfg(feature = "processing")]
38use anyhow::Context;
39use anyhow::Result;
40use axum::extract::FromRequestParts;
41use axum::http::request::Parts;
42use relay_cogs::Cogs;
43use relay_config::{Config, EmitOutcomes, RelayMode};
44#[cfg(feature = "processing")]
45use relay_config::{RedisConfigRef, RedisConfigsRef};
46#[cfg(feature = "processing")]
47use relay_redis::AsyncRedisClient;
48#[cfg(feature = "processing")]
49use relay_redis::redis::Script;
50#[cfg(feature = "processing")]
51use relay_redis::{RedisClients, RedisError, RedisScripts};
52#[cfg(feature = "processing")]
53use relay_system::ConcurrentService;
54use relay_system::{Addr, Service, ServiceSpawn, ServiceSpawnExt as _, channel};
55
56#[derive(Debug, thiserror::Error)]
58pub enum ServiceError {
59 #[error("could not load the Geoip Db")]
61 GeoIp,
62
63 #[cfg(feature = "processing")]
65 #[error("could not initialize kafka producer: {0}")]
66 Kafka(String),
67
68 #[cfg(feature = "processing")]
70 #[error("could not initialize redis client during startup")]
71 Redis,
72}
73
74#[derive(Clone, Debug)]
75pub struct Registry {
76 pub health_check: Addr<HealthCheck>,
77 pub outcome_aggregator: Addr<TrackOutcome>,
78 pub processor: Addr<EnvelopeProcessor>,
79 pub relay_cache: Addr<RelayCache>,
80 pub global_config: Addr<GlobalConfigManager>,
81 pub upstream_relay: Addr<UpstreamRelay>,
82 pub envelope_buffer: Arc<PartitionedEnvelopeBuffer>,
83 pub project_cache_handle: ProjectCacheHandle,
84 pub autoscaling: Option<Addr<AutoscalingMetrics>>,
85 #[cfg(feature = "processing")]
86 pub objectstore: Option<Addr<Objectstore>>,
87 pub upload: Addr<Upload>,
88 pub global_config_handle: GlobalConfigHandle,
89}
90
91pub fn create_runtime(name: &'static str, threads: usize) -> relay_system::Runtime {
93 relay_system::Runtime::builder(name)
94 .worker_threads(threads)
95 .max_blocking_threads(150)
106 .thread_keep_alive(Duration::from_secs(1))
110 .build()
111}
112
113fn create_processor_pool(config: &Config) -> Result<EnvelopeProcessorServicePool> {
114 let thread_count = match config.cpu_concurrency() {
117 conc @ 0..=2 => conc.max(1),
118 conc @ 3..=4 => conc - 1,
119 conc => conc - 2,
120 };
121 relay_log::info!("starting {thread_count} envelope processing workers");
122
123 let pool = crate::utils::ThreadPoolBuilder::new("processor", tokio::runtime::Handle::current())
124 .num_threads(thread_count)
125 .max_concurrency(config.pool_concurrency())
126 .thread_kind(ThreadKind::Worker)
127 .build()?;
128
129 Ok(pool)
130}
131
132#[cfg(feature = "processing")]
133fn create_store_pool(config: &Config) -> Result<StoreServicePool> {
134 let thread_count = config.cpu_concurrency().div_ceil(12);
140 relay_log::info!("starting {thread_count} store workers");
141
142 let pool = crate::utils::ThreadPoolBuilder::new("store", tokio::runtime::Handle::current())
143 .num_threads(thread_count)
144 .max_concurrency(config.pool_concurrency())
145 .build()?;
146
147 Ok(pool)
148}
149
150#[derive(Debug)]
151struct StateInner {
152 config: Arc<Config>,
153 memory_checker: MemoryChecker,
154 registry: Registry,
155}
156
157#[derive(Clone, Debug)]
159pub struct ServiceState {
160 inner: Arc<StateInner>,
161}
162
163impl ServiceState {
164 pub async fn start(
166 handle: &relay_system::Handle,
167 services: &dyn ServiceSpawn,
168 config: Arc<Config>,
169 ) -> Result<Self> {
170 let upstream_relay = services.start(UpstreamRelayService::new(config.clone()));
171
172 #[cfg(feature = "processing")]
173 let redis_clients = config
174 .redis()
175 .filter(|_| config.processing_enabled())
176 .map(create_redis_clients)
177 .transpose()
178 .context(ServiceError::Redis)?;
179
180 #[cfg(feature = "processing")]
185 if let Some(redis_clients) = &redis_clients {
186 initialize_redis_scripts_for_client(redis_clients)
187 .await
188 .context(ServiceError::Redis)?;
189 }
190
191 let memory_stat = MemoryStat::new(config.memory_stat_refresh_frequency_ms());
194
195 let (processor, processor_rx) = match config.relay_mode() {
198 RelayMode::Proxy => channel(ProxyProcessorService::name()),
199 RelayMode::Managed => channel(EnvelopeProcessorService::name()),
200 };
201
202 let (aggregator, aggregator_rx) = channel(RouterService::name());
203
204 let outcome_aggregator = match config.emit_outcomes() {
205 EmitOutcomes::None => services.start(NullOutcomeProducerService::new()),
206 _ => match config.relay_mode() {
207 RelayMode::Proxy => services.start(ClientReportOutcomeProducerService::new(
208 &config,
209 processor.clone(),
210 )),
211 RelayMode::Managed => services.start(OutcomeProducerService::new(
212 Arc::clone(&config),
213 aggregator.clone(),
214 )),
215 },
216 };
217
218 let (global_config, global_config_rx) =
219 GlobalConfigService::new(config.clone(), upstream_relay.clone());
220 let global_config_handle = global_config.handle();
221 let global_config = services.start(global_config);
225
226 let project_source = ProjectSource::start_in(
227 services,
228 Arc::clone(&config),
229 upstream_relay.clone(),
230 #[cfg(feature = "processing")]
231 redis_clients.clone(),
232 )
233 .await;
234 let project_cache_handle =
235 ProjectCacheService::new(Arc::clone(&config), project_source).start_in(services);
236
237 let metric_outcomes = MetricOutcomes::new(outcome_aggregator.clone());
238
239 #[cfg(feature = "processing")]
240 let store_pool = create_store_pool(&config)?;
241 #[cfg(feature = "processing")]
242 let store = config
243 .processing_enabled()
244 .then(|| {
245 StoreService::create(
246 store_pool.clone(),
247 config.clone(),
248 global_config_handle.clone(),
249 metric_outcomes.clone(),
250 )
251 .map(|s| services.start(s))
252 })
253 .transpose()?;
254
255 #[cfg(feature = "processing")]
256 let objectstore = ObjectstoreService::new(config.objectstore(), store.clone())?.map(|s| {
257 let concurrent = ConcurrentService::new(s)
258 .with_backlog_limit(config.objectstore().max_backlog)
259 .with_concurrency_limit(config.objectstore().max_concurrent_requests);
260 services.start(concurrent)
261 });
262
263 let envelope_buffer = PartitionedEnvelopeBuffer::create(
264 config.spool_partitions(),
265 config.clone(),
266 memory_stat.clone(),
267 global_config_rx.clone(),
268 project_cache_handle.clone(),
269 processor.clone(),
270 outcome_aggregator.clone(),
271 services,
272 );
273
274 let (processor_pool, aggregator_handle, autoscaling) = match config.relay_mode() {
275 RelayMode::Proxy => {
276 services.start_with(
277 ProxyProcessorService::new(
278 config.clone(),
279 project_cache_handle.clone(),
280 ProxyAddrs {
281 outcome_aggregator: outcome_aggregator.clone(),
282 upstream_relay: upstream_relay.clone(),
283 },
284 ),
285 processor_rx,
286 );
287 (None, None, None)
288 }
289 RelayMode::Managed => {
290 let processor_pool = create_processor_pool(&config)?;
291
292 let router = RouterService::new(
293 handle.clone(),
294 config.default_aggregator_config().clone(),
295 config.secondary_aggregator_configs().clone(),
296 Some(processor.clone().recipient()),
297 project_cache_handle.clone(),
298 );
299 let router_handle = router.handle();
300 services.start_with(router, aggregator_rx);
301
302 let cogs = CogsService::new(&config);
303 let cogs = Cogs::new(CogsServiceRecorder::new(&config, services.start(cogs)));
304
305 services.start_with(
306 EnvelopeProcessorService::new(
307 processor_pool.clone(),
308 config.clone(),
309 global_config_handle.clone(),
310 project_cache_handle.clone(),
311 cogs,
312 #[cfg(feature = "processing")]
313 redis_clients.clone(),
314 processor::Addrs {
315 outcome_aggregator: outcome_aggregator.clone(),
316 upstream_relay: upstream_relay.clone(),
317 #[cfg(feature = "processing")]
318 objectstore: objectstore.clone(),
319 #[cfg(feature = "processing")]
320 store_forwarder: store,
321 aggregator: aggregator.clone(),
322 },
323 metric_outcomes.clone(),
324 ),
325 processor_rx,
326 );
327
328 let autoscaling = services.start(AutoscalingMetricService::new(
329 memory_stat.clone(),
330 envelope_buffer.clone(),
331 handle.clone(),
332 processor_pool.clone(),
333 ));
334
335 (Some(processor_pool), Some(router_handle), Some(autoscaling))
336 }
337 };
338
339 let health_check = services.start(HealthCheckService::new(
340 config.clone(),
341 MemoryChecker::new(memory_stat.clone(), config.clone()),
342 aggregator_handle,
343 upstream_relay.clone(),
344 envelope_buffer.clone(),
345 ));
346
347 services.start(RelayStats::new(
348 config.clone(),
349 handle.clone(),
350 upstream_relay.clone(),
351 #[cfg(feature = "processing")]
352 redis_clients.clone(),
353 processor_pool,
354 #[cfg(feature = "processing")]
355 store_pool,
356 ));
357
358 let relay_cache = services.start(RelayCacheService::new(
359 config.clone(),
360 upstream_relay.clone(),
361 ));
362
363 let upload = services.start(upload::create_service(
364 &config,
365 &upstream_relay,
366 #[cfg(feature = "processing")]
367 &objectstore,
368 ));
369
370 let registry = Registry {
371 processor,
372 health_check,
373 outcome_aggregator,
374 relay_cache,
375 global_config,
376 project_cache_handle,
377 upstream_relay,
378 envelope_buffer,
379 autoscaling,
380 #[cfg(feature = "processing")]
381 objectstore,
382 upload,
383 global_config_handle,
384 };
385
386 let state = StateInner {
387 config: config.clone(),
388 memory_checker: MemoryChecker::new(memory_stat, config.clone()),
389 registry,
390 };
391
392 Ok(ServiceState {
393 inner: Arc::new(state),
394 })
395 }
396
397 pub fn config(&self) -> &Config {
399 &self.inner.config
400 }
401
402 pub fn memory_checker(&self) -> &MemoryChecker {
406 &self.inner.memory_checker
407 }
408
409 pub fn autoscaling(&self) -> Option<&Addr<AutoscalingMetrics>> {
410 self.inner.registry.autoscaling.as_ref()
411 }
412
413 pub fn envelope_buffer(&self, project_key_pair: ProjectKeyPair) -> &ObservableEnvelopeBuffer {
415 self.inner.registry.envelope_buffer.buffer(project_key_pair)
416 }
417
418 pub fn project_cache_handle(&self) -> &ProjectCacheHandle {
420 &self.inner.registry.project_cache_handle
421 }
422
423 pub fn relay_cache(&self) -> &Addr<RelayCache> {
425 &self.inner.registry.relay_cache
426 }
427
428 pub fn health_check(&self) -> &Addr<HealthCheck> {
430 &self.inner.registry.health_check
431 }
432
433 pub fn upstream_relay(&self) -> &Addr<UpstreamRelay> {
435 &self.inner.registry.upstream_relay
436 }
437
438 pub fn processor(&self) -> &Addr<EnvelopeProcessor> {
440 &self.inner.registry.processor
441 }
442
443 pub fn global_config(&self) -> &Addr<GlobalConfigManager> {
445 &self.inner.registry.global_config
446 }
447
448 pub fn outcome_aggregator(&self) -> &Addr<TrackOutcome> {
450 &self.inner.registry.outcome_aggregator
451 }
452
453 #[cfg(feature = "processing")]
454 pub fn objectstore(&self) -> Option<&Addr<Objectstore>> {
456 self.inner.registry.objectstore.as_ref()
457 }
458
459 pub fn upload(&self) -> &Addr<Upload> {
461 &self.inner.registry.upload
462 }
463
464 pub fn global_config_handle(&self) -> &GlobalConfigHandle {
465 &self.inner.registry.global_config_handle
466 }
467}
468
469#[cfg(feature = "processing")]
477pub fn create_redis_clients(configs: RedisConfigsRef<'_>) -> Result<RedisClients, RedisError> {
478 const PROJECT_CONFIG_REDIS_CLIENT: &str = "projectconfig";
479 const QUOTA_REDIS_CLIENT: &str = "quotas";
480 const UNIFIED_REDIS_CLIENT: &str = "unified";
481
482 match configs {
483 RedisConfigsRef::Unified(unified) => {
484 let client = create_async_redis_client(UNIFIED_REDIS_CLIENT, &unified)?;
485
486 Ok(RedisClients {
487 project_configs: client.clone(),
488 quotas: client,
489 })
490 }
491 RedisConfigsRef::Individual {
492 project_configs,
493 quotas,
494 } => {
495 let project_configs =
496 create_async_redis_client(PROJECT_CONFIG_REDIS_CLIENT, &project_configs)?;
497 let quotas = create_async_redis_client(QUOTA_REDIS_CLIENT, "as)?;
498
499 Ok(RedisClients {
500 project_configs,
501 quotas,
502 })
503 }
504 }
505}
506
507#[cfg(feature = "processing")]
508fn create_async_redis_client(
509 name: &'static str,
510 config: &RedisConfigRef<'_>,
511) -> Result<AsyncRedisClient, RedisError> {
512 match config {
513 RedisConfigRef::Cluster {
514 cluster_nodes,
515 options,
516 } => AsyncRedisClient::cluster(name, cluster_nodes.iter().map(|s| s.as_str()), options),
517 RedisConfigRef::Single { server, options } => {
518 AsyncRedisClient::single(name, server, options)
519 }
520 }
521}
522
523#[cfg(feature = "processing")]
524async fn initialize_redis_scripts_for_client(
525 redis_clients: &RedisClients,
526) -> Result<(), RedisError> {
527 let scripts = RedisScripts::all();
528
529 let RedisClients {
530 project_configs,
531 quotas,
532 } = redis_clients;
533
534 initialize_redis_scripts(project_configs, &scripts).await?;
535 initialize_redis_scripts(quotas, &scripts).await?;
536
537 Ok(())
538}
539
540#[cfg(feature = "processing")]
541async fn initialize_redis_scripts(
542 client: &AsyncRedisClient,
543 scripts: &[&Script],
544) -> Result<(), RedisError> {
545 let mut connection = client.get_connection().await?;
546
547 for script in scripts {
548 script
551 .prepare_invoke()
552 .load_async(&mut connection)
553 .await
554 .map_err(RedisError::Redis)?;
555 }
556
557 Ok(())
558}
559
560impl FromRequestParts<Self> for ServiceState {
561 type Rejection = Infallible;
562
563 async fn from_request_parts(_: &mut Parts, state: &Self) -> Result<Self, Self::Rejection> {
564 Ok(state.clone())
565 }
566}