1use std::convert::Infallible;
2use std::sync::Arc;
3use std::time::Duration;
4
5use crate::metrics::MetricOutcomes;
6use crate::services::autoscaling::{AutoscalingMetricService, AutoscalingMetrics};
7use crate::services::buffer::{
8 ObservableEnvelopeBuffer, PartitionedEnvelopeBuffer, ProjectKeyPair,
9};
10use crate::services::cogs::{CogsService, CogsServiceRecorder};
11use crate::services::global_config::{
12 GlobalConfigHandle, GlobalConfigManager, GlobalConfigService,
13};
14use crate::services::health_check::{HealthCheck, HealthCheckService};
15use crate::services::metrics::RouterService;
16#[cfg(feature = "processing")]
17use crate::services::objectstore::Objectstore;
18#[cfg(feature = "processing")]
19use crate::services::objectstore::ObjectstoreService;
20use crate::services::outcome::{
21 OutcomeAggregator, OutcomeProducer, OutcomeProducerService, TrackOutcome,
22};
23use crate::services::processor::{
24 self, EnvelopeProcessor, EnvelopeProcessorService, EnvelopeProcessorServicePool,
25};
26use crate::services::projects::cache::{ProjectCacheHandle, ProjectCacheService};
27use crate::services::projects::source::ProjectSource;
28use crate::services::proxy_processor::{ProxyAddrs, ProxyProcessorService};
29use crate::services::relays::{RelayCache, RelayCacheService};
30use crate::services::stats::RelayStats;
31#[cfg(feature = "processing")]
32use crate::services::store::{StoreService, StoreServicePool};
33use crate::services::upload::{self, Upload};
34use crate::services::upstream::{UpstreamRelay, UpstreamRelayService};
35use crate::utils::{MemoryChecker, MemoryStat, ThreadKind};
36#[cfg(feature = "processing")]
37use anyhow::Context;
38use anyhow::Result;
39use axum::extract::FromRequestParts;
40use axum::http::request::Parts;
41use relay_cogs::Cogs;
42use relay_config::Config;
43#[cfg(feature = "processing")]
44use relay_config::{RedisConfigRef, RedisConfigsRef};
45#[cfg(feature = "processing")]
46use relay_redis::AsyncRedisClient;
47#[cfg(feature = "processing")]
48use relay_redis::redis::Script;
49#[cfg(feature = "processing")]
50use relay_redis::{RedisClients, RedisError, RedisScripts};
51#[cfg(feature = "processing")]
52use relay_system::ConcurrentService;
53use relay_system::{Addr, Service, ServiceSpawn, ServiceSpawnExt as _, channel};
54
55#[derive(Debug, thiserror::Error)]
57pub enum ServiceError {
58 #[error("could not load the Geoip Db")]
60 GeoIp,
61
62 #[cfg(feature = "processing")]
64 #[error("could not initialize kafka producer: {0}")]
65 Kafka(String),
66
67 #[cfg(feature = "processing")]
69 #[error("could not initialize redis client during startup")]
70 Redis,
71}
72
73#[derive(Clone, Debug)]
74pub struct Registry {
75 pub health_check: Addr<HealthCheck>,
76 pub outcome_producer: Addr<OutcomeProducer>,
77 pub outcome_aggregator: Addr<TrackOutcome>,
78 pub processor: Addr<EnvelopeProcessor>,
79 pub relay_cache: Addr<RelayCache>,
80 pub global_config: Addr<GlobalConfigManager>,
81 pub upstream_relay: Addr<UpstreamRelay>,
82 pub envelope_buffer: Arc<PartitionedEnvelopeBuffer>,
83 pub project_cache_handle: ProjectCacheHandle,
84 pub autoscaling: Option<Addr<AutoscalingMetrics>>,
85 #[cfg(feature = "processing")]
86 pub objectstore: Option<Addr<Objectstore>>,
87 pub upload: Addr<Upload>,
88 pub global_config_handle: GlobalConfigHandle,
89}
90
91pub fn create_runtime(name: &'static str, threads: usize) -> relay_system::Runtime {
93 relay_system::Runtime::builder(name)
94 .worker_threads(threads)
95 .max_blocking_threads(150)
106 .thread_keep_alive(Duration::from_secs(1))
110 .build()
111}
112
113fn create_processor_pool(config: &Config) -> Result<EnvelopeProcessorServicePool> {
114 let thread_count = match config.cpu_concurrency() {
117 conc @ 0..=2 => conc.max(1),
118 conc @ 3..=4 => conc - 1,
119 conc => conc - 2,
120 };
121 relay_log::info!("starting {thread_count} envelope processing workers");
122
123 let pool = crate::utils::ThreadPoolBuilder::new("processor", tokio::runtime::Handle::current())
124 .num_threads(thread_count)
125 .max_concurrency(config.pool_concurrency())
126 .thread_kind(ThreadKind::Worker)
127 .build()?;
128
129 Ok(pool)
130}
131
132#[cfg(feature = "processing")]
133fn create_store_pool(config: &Config) -> Result<StoreServicePool> {
134 let thread_count = config.cpu_concurrency().div_ceil(12);
140 relay_log::info!("starting {thread_count} store workers");
141
142 let pool = crate::utils::ThreadPoolBuilder::new("store", tokio::runtime::Handle::current())
143 .num_threads(thread_count)
144 .max_concurrency(config.pool_concurrency())
145 .build()?;
146
147 Ok(pool)
148}
149
150#[derive(Debug)]
151struct StateInner {
152 config: Arc<Config>,
153 memory_checker: MemoryChecker,
154 registry: Registry,
155}
156
157#[derive(Clone, Debug)]
159pub struct ServiceState {
160 inner: Arc<StateInner>,
161}
162
163impl ServiceState {
164 pub async fn start(
166 handle: &relay_system::Handle,
167 services: &dyn ServiceSpawn,
168 config: Arc<Config>,
169 ) -> Result<Self> {
170 let upstream_relay = services.start(UpstreamRelayService::new(config.clone()));
171
172 #[cfg(feature = "processing")]
173 let redis_clients = config
174 .redis()
175 .filter(|_| config.processing_enabled())
176 .map(create_redis_clients)
177 .transpose()
178 .context(ServiceError::Redis)?;
179
180 #[cfg(feature = "processing")]
185 if let Some(redis_clients) = &redis_clients {
186 initialize_redis_scripts_for_client(redis_clients)
187 .await
188 .context(ServiceError::Redis)?;
189 }
190
191 let memory_stat = MemoryStat::new(config.memory_stat_refresh_frequency_ms());
194
195 let (processor, processor_rx) = match config.relay_mode() {
198 relay_config::RelayMode::Proxy => channel(ProxyProcessorService::name()),
199 relay_config::RelayMode::Managed => channel(EnvelopeProcessorService::name()),
200 };
201
202 let outcome_producer = services.start(OutcomeProducerService::create(
203 config.clone(),
204 upstream_relay.clone(),
205 processor.clone(),
206 )?);
207 let outcome_aggregator =
208 services.start(OutcomeAggregator::new(&config, outcome_producer.clone()));
209
210 let (global_config, global_config_rx) =
211 GlobalConfigService::new(config.clone(), upstream_relay.clone());
212 let global_config_handle = global_config.handle();
213 let global_config = services.start(global_config);
217
218 let project_source = ProjectSource::start_in(
219 services,
220 Arc::clone(&config),
221 upstream_relay.clone(),
222 #[cfg(feature = "processing")]
223 redis_clients.clone(),
224 )
225 .await;
226 let project_cache_handle =
227 ProjectCacheService::new(Arc::clone(&config), project_source).start_in(services);
228
229 let metric_outcomes = MetricOutcomes::new(outcome_aggregator.clone());
230
231 #[cfg(feature = "processing")]
232 let store_pool = create_store_pool(&config)?;
233 #[cfg(feature = "processing")]
234 let store = config
235 .processing_enabled()
236 .then(|| {
237 StoreService::create(
238 store_pool.clone(),
239 config.clone(),
240 global_config_handle.clone(),
241 outcome_aggregator.clone(),
242 metric_outcomes.clone(),
243 )
244 .map(|s| services.start(s))
245 })
246 .transpose()?;
247
248 #[cfg(feature = "processing")]
249 let objectstore = ObjectstoreService::new(config.objectstore(), store.clone())?.map(|s| {
250 let concurrent = ConcurrentService::new(s)
251 .with_backlog_limit(config.objectstore().max_backlog)
252 .with_concurrency_limit(config.objectstore().max_concurrent_requests);
253 services.start(concurrent)
254 });
255
256 let envelope_buffer = PartitionedEnvelopeBuffer::create(
257 config.spool_partitions(),
258 config.clone(),
259 memory_stat.clone(),
260 global_config_rx.clone(),
261 project_cache_handle.clone(),
262 processor.clone(),
263 outcome_aggregator.clone(),
264 services,
265 );
266
267 let (processor_pool, aggregator_handle, autoscaling) = match config.relay_mode() {
268 relay_config::RelayMode::Proxy => {
269 services.start_with(
270 ProxyProcessorService::new(
271 config.clone(),
272 project_cache_handle.clone(),
273 ProxyAddrs {
274 outcome_aggregator: outcome_aggregator.clone(),
275 upstream_relay: upstream_relay.clone(),
276 },
277 ),
278 processor_rx,
279 );
280 (None, None, None)
281 }
282 relay_config::RelayMode::Managed => {
283 let processor_pool = create_processor_pool(&config)?;
284
285 let aggregator = RouterService::new(
286 handle.clone(),
287 config.default_aggregator_config().clone(),
288 config.secondary_aggregator_configs().clone(),
289 Some(processor.clone().recipient()),
290 project_cache_handle.clone(),
291 );
292 let aggregator_handle = aggregator.handle();
293 let aggregator = services.start(aggregator);
294
295 let cogs = CogsService::new(&config);
296 let cogs = Cogs::new(CogsServiceRecorder::new(&config, services.start(cogs)));
297
298 services.start_with(
299 EnvelopeProcessorService::new(
300 processor_pool.clone(),
301 config.clone(),
302 global_config_handle.clone(),
303 project_cache_handle.clone(),
304 cogs,
305 #[cfg(feature = "processing")]
306 redis_clients.clone(),
307 processor::Addrs {
308 outcome_aggregator: outcome_aggregator.clone(),
309 upstream_relay: upstream_relay.clone(),
310 #[cfg(feature = "processing")]
311 objectstore: objectstore.clone(),
312 #[cfg(feature = "processing")]
313 store_forwarder: store,
314 aggregator: aggregator.clone(),
315 },
316 metric_outcomes.clone(),
317 ),
318 processor_rx,
319 );
320
321 let autoscaling = services.start(AutoscalingMetricService::new(
322 memory_stat.clone(),
323 envelope_buffer.clone(),
324 handle.clone(),
325 processor_pool.clone(),
326 ));
327
328 (
329 Some(processor_pool),
330 Some(aggregator_handle),
331 Some(autoscaling),
332 )
333 }
334 };
335
336 let health_check = services.start(HealthCheckService::new(
337 config.clone(),
338 MemoryChecker::new(memory_stat.clone(), config.clone()),
339 aggregator_handle,
340 upstream_relay.clone(),
341 envelope_buffer.clone(),
342 ));
343
344 services.start(RelayStats::new(
345 config.clone(),
346 handle.clone(),
347 upstream_relay.clone(),
348 #[cfg(feature = "processing")]
349 redis_clients.clone(),
350 processor_pool,
351 #[cfg(feature = "processing")]
352 store_pool,
353 ));
354
355 let relay_cache = services.start(RelayCacheService::new(
356 config.clone(),
357 upstream_relay.clone(),
358 ));
359
360 let upload = services.start(upload::create_service(
361 &config,
362 &upstream_relay,
363 #[cfg(feature = "processing")]
364 &objectstore,
365 ));
366
367 let registry = Registry {
368 processor,
369 health_check,
370 outcome_producer,
371 outcome_aggregator,
372 relay_cache,
373 global_config,
374 project_cache_handle,
375 upstream_relay,
376 envelope_buffer,
377 autoscaling,
378 #[cfg(feature = "processing")]
379 objectstore,
380 upload,
381 global_config_handle,
382 };
383
384 let state = StateInner {
385 config: config.clone(),
386 memory_checker: MemoryChecker::new(memory_stat, config.clone()),
387 registry,
388 };
389
390 Ok(ServiceState {
391 inner: Arc::new(state),
392 })
393 }
394
395 pub fn config(&self) -> &Config {
397 &self.inner.config
398 }
399
400 pub fn memory_checker(&self) -> &MemoryChecker {
404 &self.inner.memory_checker
405 }
406
407 pub fn autoscaling(&self) -> Option<&Addr<AutoscalingMetrics>> {
408 self.inner.registry.autoscaling.as_ref()
409 }
410
411 pub fn envelope_buffer(&self, project_key_pair: ProjectKeyPair) -> &ObservableEnvelopeBuffer {
413 self.inner.registry.envelope_buffer.buffer(project_key_pair)
414 }
415
416 pub fn project_cache_handle(&self) -> &ProjectCacheHandle {
418 &self.inner.registry.project_cache_handle
419 }
420
421 pub fn relay_cache(&self) -> &Addr<RelayCache> {
423 &self.inner.registry.relay_cache
424 }
425
426 pub fn health_check(&self) -> &Addr<HealthCheck> {
428 &self.inner.registry.health_check
429 }
430
431 pub fn outcome_producer(&self) -> &Addr<OutcomeProducer> {
433 &self.inner.registry.outcome_producer
434 }
435
436 pub fn upstream_relay(&self) -> &Addr<UpstreamRelay> {
438 &self.inner.registry.upstream_relay
439 }
440
441 pub fn processor(&self) -> &Addr<EnvelopeProcessor> {
443 &self.inner.registry.processor
444 }
445
446 pub fn global_config(&self) -> &Addr<GlobalConfigManager> {
448 &self.inner.registry.global_config
449 }
450
451 pub fn outcome_aggregator(&self) -> &Addr<TrackOutcome> {
453 &self.inner.registry.outcome_aggregator
454 }
455
456 #[cfg(feature = "processing")]
457 pub fn objectstore(&self) -> Option<&Addr<Objectstore>> {
459 self.inner.registry.objectstore.as_ref()
460 }
461
462 pub fn upload(&self) -> &Addr<Upload> {
464 &self.inner.registry.upload
465 }
466
467 pub fn global_config_handle(&self) -> &GlobalConfigHandle {
468 &self.inner.registry.global_config_handle
469 }
470}
471
472#[cfg(feature = "processing")]
480pub fn create_redis_clients(configs: RedisConfigsRef<'_>) -> Result<RedisClients, RedisError> {
481 const PROJECT_CONFIG_REDIS_CLIENT: &str = "projectconfig";
482 const QUOTA_REDIS_CLIENT: &str = "quotas";
483 const UNIFIED_REDIS_CLIENT: &str = "unified";
484
485 match configs {
486 RedisConfigsRef::Unified(unified) => {
487 let client = create_async_redis_client(UNIFIED_REDIS_CLIENT, &unified)?;
488
489 Ok(RedisClients {
490 project_configs: client.clone(),
491 quotas: client,
492 })
493 }
494 RedisConfigsRef::Individual {
495 project_configs,
496 quotas,
497 } => {
498 let project_configs =
499 create_async_redis_client(PROJECT_CONFIG_REDIS_CLIENT, &project_configs)?;
500 let quotas = create_async_redis_client(QUOTA_REDIS_CLIENT, "as)?;
501
502 Ok(RedisClients {
503 project_configs,
504 quotas,
505 })
506 }
507 }
508}
509
510#[cfg(feature = "processing")]
511fn create_async_redis_client(
512 name: &'static str,
513 config: &RedisConfigRef<'_>,
514) -> Result<AsyncRedisClient, RedisError> {
515 match config {
516 RedisConfigRef::Cluster {
517 cluster_nodes,
518 options,
519 } => AsyncRedisClient::cluster(name, cluster_nodes.iter().map(|s| s.as_str()), options),
520 RedisConfigRef::Single { server, options } => {
521 AsyncRedisClient::single(name, server, options)
522 }
523 }
524}
525
526#[cfg(feature = "processing")]
527async fn initialize_redis_scripts_for_client(
528 redis_clients: &RedisClients,
529) -> Result<(), RedisError> {
530 let scripts = RedisScripts::all();
531
532 let RedisClients {
533 project_configs,
534 quotas,
535 } = redis_clients;
536
537 initialize_redis_scripts(project_configs, &scripts).await?;
538 initialize_redis_scripts(quotas, &scripts).await?;
539
540 Ok(())
541}
542
543#[cfg(feature = "processing")]
544async fn initialize_redis_scripts(
545 client: &AsyncRedisClient,
546 scripts: &[&Script],
547) -> Result<(), RedisError> {
548 let mut connection = client.get_connection().await?;
549
550 for script in scripts {
551 script
554 .prepare_invoke()
555 .load_async(&mut connection)
556 .await
557 .map_err(RedisError::Redis)?;
558 }
559
560 Ok(())
561}
562
563impl FromRequestParts<Self> for ServiceState {
564 type Rejection = Infallible;
565
566 async fn from_request_parts(_: &mut Parts, state: &Self) -> Result<Self, Self::Rejection> {
567 Ok(state.clone())
568 }
569}