1use std::convert::Infallible;
2use std::sync::Arc;
3use std::time::Duration;
4
5use crate::metrics::MetricOutcomes;
6use crate::services::autoscaling::{AutoscalingMetricService, AutoscalingMetrics};
7use crate::services::buffer::{
8 ObservableEnvelopeBuffer, PartitionedEnvelopeBuffer, ProjectKeyPair,
9};
10use crate::services::cogs::{CogsService, CogsServiceRecorder};
11use crate::services::global_config::{GlobalConfigManager, GlobalConfigService};
12use crate::services::health_check::{HealthCheck, HealthCheckService};
13use crate::services::metrics::RouterService;
14#[cfg(feature = "processing")]
15use crate::services::objectstore::Objectstore;
16#[cfg(feature = "processing")]
17use crate::services::objectstore::ObjectstoreService;
18use crate::services::outcome::{OutcomeProducer, OutcomeProducerService, TrackOutcome};
19use crate::services::outcome_aggregator::OutcomeAggregator;
20use crate::services::processor::{
21 self, EnvelopeProcessor, EnvelopeProcessorService, EnvelopeProcessorServicePool,
22};
23use crate::services::projects::cache::{ProjectCacheHandle, ProjectCacheService};
24use crate::services::projects::source::ProjectSource;
25use crate::services::proxy_processor::{ProxyAddrs, ProxyProcessorService};
26use crate::services::relays::{RelayCache, RelayCacheService};
27use crate::services::stats::RelayStats;
28#[cfg(feature = "processing")]
29use crate::services::store::{StoreService, StoreServicePool};
30use crate::services::upload::{self, Upload};
31use crate::services::upstream::{UpstreamRelay, UpstreamRelayService};
32use crate::utils::{MemoryChecker, MemoryStat, ThreadKind};
33#[cfg(feature = "processing")]
34use anyhow::Context;
35use anyhow::Result;
36use axum::extract::FromRequestParts;
37use axum::http::request::Parts;
38use relay_cogs::Cogs;
39use relay_config::Config;
40#[cfg(feature = "processing")]
41use relay_config::{RedisConfigRef, RedisConfigsRef};
42#[cfg(feature = "processing")]
43use relay_redis::AsyncRedisClient;
44#[cfg(feature = "processing")]
45use relay_redis::redis::Script;
46#[cfg(feature = "processing")]
47use relay_redis::{RedisClients, RedisError, RedisScripts};
48#[cfg(feature = "processing")]
49use relay_system::ConcurrentService;
50use relay_system::{Addr, Service, ServiceSpawn, ServiceSpawnExt as _, channel};
51
52#[derive(Debug, thiserror::Error)]
54pub enum ServiceError {
55 #[error("could not load the Geoip Db")]
57 GeoIp,
58
59 #[cfg(feature = "processing")]
61 #[error("could not initialize kafka producer: {0}")]
62 Kafka(String),
63
64 #[cfg(feature = "processing")]
66 #[error("could not initialize redis client during startup")]
67 Redis,
68}
69
70#[derive(Clone, Debug)]
71pub struct Registry {
72 pub health_check: Addr<HealthCheck>,
73 pub outcome_producer: Addr<OutcomeProducer>,
74 pub outcome_aggregator: Addr<TrackOutcome>,
75 pub processor: Addr<EnvelopeProcessor>,
76 pub relay_cache: Addr<RelayCache>,
77 pub global_config: Addr<GlobalConfigManager>,
78 pub upstream_relay: Addr<UpstreamRelay>,
79 pub envelope_buffer: PartitionedEnvelopeBuffer,
80 pub project_cache_handle: ProjectCacheHandle,
81 pub autoscaling: Option<Addr<AutoscalingMetrics>>,
82 #[cfg(feature = "processing")]
83 pub objectstore: Option<Addr<Objectstore>>,
84 pub upload: Addr<Upload>,
85}
86
87pub fn create_runtime(name: &'static str, threads: usize) -> relay_system::Runtime {
89 relay_system::Runtime::builder(name)
90 .worker_threads(threads)
91 .max_blocking_threads(150)
102 .thread_keep_alive(Duration::from_secs(1))
106 .build()
107}
108
109fn create_processor_pool(config: &Config) -> Result<EnvelopeProcessorServicePool> {
110 let thread_count = match config.cpu_concurrency() {
113 conc @ 0..=2 => conc.max(1),
114 conc @ 3..=4 => conc - 1,
115 conc => conc - 2,
116 };
117 relay_log::info!("starting {thread_count} envelope processing workers");
118
119 let pool = crate::utils::ThreadPoolBuilder::new("processor", tokio::runtime::Handle::current())
120 .num_threads(thread_count)
121 .max_concurrency(config.pool_concurrency())
122 .thread_kind(ThreadKind::Worker)
123 .build()?;
124
125 Ok(pool)
126}
127
128#[cfg(feature = "processing")]
129fn create_store_pool(config: &Config) -> Result<StoreServicePool> {
130 let thread_count = config.cpu_concurrency().div_ceil(12);
136 relay_log::info!("starting {thread_count} store workers");
137
138 let pool = crate::utils::ThreadPoolBuilder::new("store", tokio::runtime::Handle::current())
139 .num_threads(thread_count)
140 .max_concurrency(config.pool_concurrency())
141 .build()?;
142
143 Ok(pool)
144}
145
146#[derive(Debug)]
147struct StateInner {
148 config: Arc<Config>,
149 memory_checker: MemoryChecker,
150 registry: Registry,
151}
152
153#[derive(Clone, Debug)]
155pub struct ServiceState {
156 inner: Arc<StateInner>,
157}
158
159impl ServiceState {
160 pub async fn start(
162 handle: &relay_system::Handle,
163 services: &dyn ServiceSpawn,
164 config: Arc<Config>,
165 ) -> Result<Self> {
166 let upstream_relay = services.start(UpstreamRelayService::new(config.clone()));
167
168 #[cfg(feature = "processing")]
169 let redis_clients = config
170 .redis()
171 .filter(|_| config.processing_enabled())
172 .map(create_redis_clients)
173 .transpose()
174 .context(ServiceError::Redis)?;
175
176 #[cfg(feature = "processing")]
181 if let Some(redis_clients) = &redis_clients {
182 initialize_redis_scripts_for_client(redis_clients)
183 .await
184 .context(ServiceError::Redis)?;
185 }
186
187 let memory_stat = MemoryStat::new(config.memory_stat_refresh_frequency_ms());
190
191 let (processor, processor_rx) = match config.relay_mode() {
194 relay_config::RelayMode::Proxy => channel(ProxyProcessorService::name()),
195 relay_config::RelayMode::Managed => channel(EnvelopeProcessorService::name()),
196 };
197
198 let outcome_producer = services.start(OutcomeProducerService::create(
199 config.clone(),
200 upstream_relay.clone(),
201 processor.clone(),
202 )?);
203 let outcome_aggregator =
204 services.start(OutcomeAggregator::new(&config, outcome_producer.clone()));
205
206 let (global_config, global_config_rx) =
207 GlobalConfigService::new(config.clone(), upstream_relay.clone());
208 let global_config_handle = global_config.handle();
209 let global_config = services.start(global_config);
213
214 let project_source = ProjectSource::start_in(
215 services,
216 Arc::clone(&config),
217 upstream_relay.clone(),
218 #[cfg(feature = "processing")]
219 redis_clients.clone(),
220 )
221 .await;
222 let project_cache_handle =
223 ProjectCacheService::new(Arc::clone(&config), project_source).start_in(services);
224
225 let metric_outcomes = MetricOutcomes::new(outcome_aggregator.clone());
226
227 #[cfg(feature = "processing")]
228 let store_pool = create_store_pool(&config)?;
229 #[cfg(feature = "processing")]
230 let store = config
231 .processing_enabled()
232 .then(|| {
233 StoreService::create(
234 store_pool.clone(),
235 config.clone(),
236 global_config_handle.clone(),
237 outcome_aggregator.clone(),
238 metric_outcomes.clone(),
239 )
240 .map(|s| services.start(s))
241 })
242 .transpose()?;
243
244 #[cfg(feature = "processing")]
245 let objectstore = ObjectstoreService::new(config.objectstore(), store.clone())?.map(|s| {
246 let concurrent = ConcurrentService::new(s)
247 .with_backlog_limit(config.objectstore().max_backlog)
248 .with_concurrency_limit(config.objectstore().max_concurrent_requests);
249 services.start(concurrent)
250 });
251
252 let envelope_buffer = PartitionedEnvelopeBuffer::create(
253 config.spool_partitions(),
254 config.clone(),
255 memory_stat.clone(),
256 global_config_rx.clone(),
257 project_cache_handle.clone(),
258 processor.clone(),
259 outcome_aggregator.clone(),
260 services,
261 );
262
263 let (processor_pool, aggregator_handle, autoscaling) = match config.relay_mode() {
264 relay_config::RelayMode::Proxy => {
265 services.start_with(
266 ProxyProcessorService::new(
267 config.clone(),
268 project_cache_handle.clone(),
269 ProxyAddrs {
270 outcome_aggregator: outcome_aggregator.clone(),
271 upstream_relay: upstream_relay.clone(),
272 },
273 ),
274 processor_rx,
275 );
276 (None, None, None)
277 }
278 relay_config::RelayMode::Managed => {
279 let processor_pool = create_processor_pool(&config)?;
280
281 let aggregator = RouterService::new(
282 handle.clone(),
283 config.default_aggregator_config().clone(),
284 config.secondary_aggregator_configs().clone(),
285 Some(processor.clone().recipient()),
286 project_cache_handle.clone(),
287 );
288 let aggregator_handle = aggregator.handle();
289 let aggregator = services.start(aggregator);
290
291 let cogs = CogsService::new(&config);
292 let cogs = Cogs::new(CogsServiceRecorder::new(&config, services.start(cogs)));
293
294 services.start_with(
295 EnvelopeProcessorService::new(
296 processor_pool.clone(),
297 config.clone(),
298 global_config_handle,
299 project_cache_handle.clone(),
300 cogs,
301 #[cfg(feature = "processing")]
302 redis_clients.clone(),
303 processor::Addrs {
304 outcome_aggregator: outcome_aggregator.clone(),
305 upstream_relay: upstream_relay.clone(),
306 #[cfg(feature = "processing")]
307 objectstore: objectstore.clone(),
308 #[cfg(feature = "processing")]
309 store_forwarder: store,
310 aggregator: aggregator.clone(),
311 },
312 metric_outcomes.clone(),
313 ),
314 processor_rx,
315 );
316
317 let autoscaling = services.start(AutoscalingMetricService::new(
318 memory_stat.clone(),
319 envelope_buffer.clone(),
320 handle.clone(),
321 processor_pool.clone(),
322 ));
323
324 (
325 Some(processor_pool),
326 Some(aggregator_handle),
327 Some(autoscaling),
328 )
329 }
330 };
331
332 let health_check = services.start(HealthCheckService::new(
333 config.clone(),
334 MemoryChecker::new(memory_stat.clone(), config.clone()),
335 aggregator_handle,
336 upstream_relay.clone(),
337 envelope_buffer.clone(),
338 ));
339
340 services.start(RelayStats::new(
341 config.clone(),
342 handle.clone(),
343 upstream_relay.clone(),
344 #[cfg(feature = "processing")]
345 redis_clients.clone(),
346 processor_pool,
347 #[cfg(feature = "processing")]
348 store_pool,
349 ));
350
351 let relay_cache = services.start(RelayCacheService::new(
352 config.clone(),
353 upstream_relay.clone(),
354 ));
355
356 let upload = services.start(upload::create_service(
357 &config,
358 &upstream_relay,
359 #[cfg(feature = "processing")]
360 &objectstore,
361 ));
362
363 let registry = Registry {
364 processor,
365 health_check,
366 outcome_producer,
367 outcome_aggregator,
368 relay_cache,
369 global_config,
370 project_cache_handle,
371 upstream_relay,
372 envelope_buffer,
373 autoscaling,
374 #[cfg(feature = "processing")]
375 objectstore,
376 upload,
377 };
378
379 let state = StateInner {
380 config: config.clone(),
381 memory_checker: MemoryChecker::new(memory_stat, config.clone()),
382 registry,
383 };
384
385 Ok(ServiceState {
386 inner: Arc::new(state),
387 })
388 }
389
390 pub fn config(&self) -> &Config {
392 &self.inner.config
393 }
394
395 pub fn memory_checker(&self) -> &MemoryChecker {
399 &self.inner.memory_checker
400 }
401
402 pub fn autoscaling(&self) -> Option<&Addr<AutoscalingMetrics>> {
403 self.inner.registry.autoscaling.as_ref()
404 }
405
406 pub fn envelope_buffer(&self, project_key_pair: ProjectKeyPair) -> &ObservableEnvelopeBuffer {
408 self.inner.registry.envelope_buffer.buffer(project_key_pair)
409 }
410
411 pub fn project_cache_handle(&self) -> &ProjectCacheHandle {
413 &self.inner.registry.project_cache_handle
414 }
415
416 pub fn relay_cache(&self) -> &Addr<RelayCache> {
418 &self.inner.registry.relay_cache
419 }
420
421 pub fn health_check(&self) -> &Addr<HealthCheck> {
423 &self.inner.registry.health_check
424 }
425
426 pub fn outcome_producer(&self) -> &Addr<OutcomeProducer> {
428 &self.inner.registry.outcome_producer
429 }
430
431 pub fn upstream_relay(&self) -> &Addr<UpstreamRelay> {
433 &self.inner.registry.upstream_relay
434 }
435
436 pub fn processor(&self) -> &Addr<EnvelopeProcessor> {
438 &self.inner.registry.processor
439 }
440
441 pub fn global_config(&self) -> &Addr<GlobalConfigManager> {
443 &self.inner.registry.global_config
444 }
445
446 pub fn outcome_aggregator(&self) -> &Addr<TrackOutcome> {
448 &self.inner.registry.outcome_aggregator
449 }
450
451 #[cfg(feature = "processing")]
452 pub fn objectstore(&self) -> Option<&Addr<Objectstore>> {
454 self.inner.registry.objectstore.as_ref()
455 }
456
457 pub fn upload(&self) -> &Addr<Upload> {
459 &self.inner.registry.upload
460 }
461}
462
463#[cfg(feature = "processing")]
471pub fn create_redis_clients(configs: RedisConfigsRef<'_>) -> Result<RedisClients, RedisError> {
472 const CARDINALITY_REDIS_CLIENT: &str = "cardinality";
473 const PROJECT_CONFIG_REDIS_CLIENT: &str = "projectconfig";
474 const QUOTA_REDIS_CLIENT: &str = "quotas";
475 const UNIFIED_REDIS_CLIENT: &str = "unified";
476
477 match configs {
478 RedisConfigsRef::Unified(unified) => {
479 let client = create_async_redis_client(UNIFIED_REDIS_CLIENT, &unified)?;
480
481 Ok(RedisClients {
482 project_configs: client.clone(),
483 cardinality: client.clone(),
484 quotas: client,
485 })
486 }
487 RedisConfigsRef::Individual {
488 project_configs,
489 cardinality,
490 quotas,
491 } => {
492 let project_configs =
493 create_async_redis_client(PROJECT_CONFIG_REDIS_CLIENT, &project_configs)?;
494 let cardinality = create_async_redis_client(CARDINALITY_REDIS_CLIENT, &cardinality)?;
495 let quotas = create_async_redis_client(QUOTA_REDIS_CLIENT, "as)?;
496
497 Ok(RedisClients {
498 project_configs,
499 cardinality,
500 quotas,
501 })
502 }
503 }
504}
505
506#[cfg(feature = "processing")]
507fn create_async_redis_client(
508 name: &'static str,
509 config: &RedisConfigRef<'_>,
510) -> Result<AsyncRedisClient, RedisError> {
511 match config {
512 RedisConfigRef::Cluster {
513 cluster_nodes,
514 options,
515 } => AsyncRedisClient::cluster(name, cluster_nodes.iter().map(|s| s.as_str()), options),
516 RedisConfigRef::Single { server, options } => {
517 AsyncRedisClient::single(name, server, options)
518 }
519 }
520}
521
522#[cfg(feature = "processing")]
523async fn initialize_redis_scripts_for_client(
524 redis_clients: &RedisClients,
525) -> Result<(), RedisError> {
526 let scripts = RedisScripts::all();
527
528 let clients = [&redis_clients.cardinality, &redis_clients.quotas];
529 for client in clients {
530 initialize_redis_scripts(client, &scripts).await?;
531 }
532
533 Ok(())
534}
535
536#[cfg(feature = "processing")]
537async fn initialize_redis_scripts(
538 client: &AsyncRedisClient,
539 scripts: &[&Script; 3],
540) -> Result<(), RedisError> {
541 let mut connection = client.get_connection().await?;
542
543 for script in scripts {
544 script
547 .prepare_invoke()
548 .load_async(&mut connection)
549 .await
550 .map_err(RedisError::Redis)?;
551 }
552
553 Ok(())
554}
555
556impl FromRequestParts<Self> for ServiceState {
557 type Rejection = Infallible;
558
559 async fn from_request_parts(_: &mut Parts, state: &Self) -> Result<Self, Self::Rejection> {
560 Ok(state.clone())
561 }
562}