relay_server/
service.rs

1use std::convert::Infallible;
2use std::sync::Arc;
3use std::time::Duration;
4
5use crate::metrics::MetricOutcomes;
6use crate::services::autoscaling::{AutoscalingMetricService, AutoscalingMetrics};
7use crate::services::buffer::{
8    ObservableEnvelopeBuffer, PartitionedEnvelopeBuffer, ProjectKeyPair,
9};
10use crate::services::cogs::{CogsService, CogsServiceRecorder};
11use crate::services::global_config::{GlobalConfigManager, GlobalConfigService};
12use crate::services::health_check::{HealthCheck, HealthCheckService};
13use crate::services::metrics::RouterService;
14use crate::services::outcome::{OutcomeProducer, OutcomeProducerService, TrackOutcome};
15use crate::services::outcome_aggregator::OutcomeAggregator;
16use crate::services::processor::{
17    self, EnvelopeProcessor, EnvelopeProcessorService, EnvelopeProcessorServicePool,
18};
19use crate::services::projects::cache::{ProjectCacheHandle, ProjectCacheService};
20use crate::services::projects::source::ProjectSource;
21use crate::services::proxy_processor::{ProxyAddrs, ProxyProcessorService};
22use crate::services::relays::{RelayCache, RelayCacheService};
23use crate::services::stats::RelayStats;
24#[cfg(feature = "processing")]
25use crate::services::store::{StoreService, StoreServicePool};
26#[cfg(feature = "processing")]
27use crate::services::upload::UploadService;
28use crate::services::upstream::{UpstreamRelay, UpstreamRelayService};
29use crate::utils::{MemoryChecker, MemoryStat, ThreadKind};
30#[cfg(feature = "processing")]
31use anyhow::Context;
32use anyhow::Result;
33use axum::extract::FromRequestParts;
34use axum::http::request::Parts;
35use relay_cogs::Cogs;
36use relay_config::Config;
37#[cfg(feature = "processing")]
38use relay_config::{RedisConfigRef, RedisConfigsRef};
39#[cfg(feature = "processing")]
40use relay_redis::AsyncRedisClient;
41#[cfg(feature = "processing")]
42use relay_redis::redis::Script;
43#[cfg(feature = "processing")]
44use relay_redis::{RedisClients, RedisError, RedisScripts};
45use relay_system::{Addr, Service, ServiceSpawn, ServiceSpawnExt as _, channel};
46
47/// Indicates the type of failure of the server.
48#[derive(Debug, thiserror::Error)]
49pub enum ServiceError {
50    /// GeoIp construction failed.
51    #[error("could not load the Geoip Db")]
52    GeoIp,
53
54    /// Initializing the Kafka producer failed.
55    #[cfg(feature = "processing")]
56    #[error("could not initialize kafka producer: {0}")]
57    Kafka(String),
58
59    /// Initializing the Redis client failed.
60    #[cfg(feature = "processing")]
61    #[error("could not initialize redis client during startup")]
62    Redis,
63}
64
65#[derive(Clone, Debug)]
66pub struct Registry {
67    pub health_check: Addr<HealthCheck>,
68    pub outcome_producer: Addr<OutcomeProducer>,
69    pub outcome_aggregator: Addr<TrackOutcome>,
70    pub processor: Addr<EnvelopeProcessor>,
71    pub relay_cache: Addr<RelayCache>,
72    pub global_config: Addr<GlobalConfigManager>,
73    pub upstream_relay: Addr<UpstreamRelay>,
74    pub envelope_buffer: PartitionedEnvelopeBuffer,
75    pub project_cache_handle: ProjectCacheHandle,
76    pub autoscaling: Option<Addr<AutoscalingMetrics>>,
77}
78
79/// Constructs a Tokio [`relay_system::Runtime`] configured for running [services](relay_system::Service).
80pub fn create_runtime(name: &'static str, threads: usize) -> relay_system::Runtime {
81    relay_system::Runtime::builder(name)
82        .worker_threads(threads)
83        // Relay uses `spawn_blocking` only for Redis connections within the project
84        // cache, those should never exceed 100 concurrent connections
85        // (limited by connection pool).
86        //
87        // Relay also does not use other blocking operations from Tokio which require
88        // this pool, no usage of `tokio::fs` and `tokio::io::{Stdin, Stdout, Stderr}`.
89        //
90        // We limit the maximum amount of threads here, we've seen that Tokio
91        // expands this pool very very aggressively and basically never shrinks it
92        // which leads to a massive resource waste.
93        .max_blocking_threads(150)
94        // We also lower down the default (10s) keep alive timeout for blocking
95        // threads to encourage the runtime to not keep too many idle blocking threads
96        // around.
97        .thread_keep_alive(Duration::from_secs(1))
98        .build()
99}
100
101fn create_processor_pool(config: &Config) -> Result<EnvelopeProcessorServicePool> {
102    // Adjust thread count for small cpu counts to not have too many idle cores
103    // and distribute workload better.
104    let thread_count = match config.cpu_concurrency() {
105        conc @ 0..=2 => conc.max(1),
106        conc @ 3..=4 => conc - 1,
107        conc => conc - 2,
108    };
109    relay_log::info!("starting {thread_count} envelope processing workers");
110
111    let pool = crate::utils::ThreadPoolBuilder::new("processor", tokio::runtime::Handle::current())
112        .num_threads(thread_count)
113        .max_concurrency(config.pool_concurrency())
114        .thread_kind(ThreadKind::Worker)
115        .build()?;
116
117    Ok(pool)
118}
119
120#[cfg(feature = "processing")]
121fn create_store_pool(config: &Config) -> Result<StoreServicePool> {
122    // Spawn a store worker for every 12 threads in the processor pool.
123    // This ratio was found empirically and may need adjustments in the future.
124    //
125    // Ideally in the future the store will be single threaded again, after we move
126    // all the heavy processing (de- and re-serialization) into the processor.
127    let thread_count = config.cpu_concurrency().div_ceil(12);
128    relay_log::info!("starting {thread_count} store workers");
129
130    let pool = crate::utils::ThreadPoolBuilder::new("store", tokio::runtime::Handle::current())
131        .num_threads(thread_count)
132        .max_concurrency(config.pool_concurrency())
133        .build()?;
134
135    Ok(pool)
136}
137
138#[derive(Debug)]
139struct StateInner {
140    config: Arc<Config>,
141    memory_checker: MemoryChecker,
142    registry: Registry,
143}
144
145/// Server state.
146#[derive(Clone, Debug)]
147pub struct ServiceState {
148    inner: Arc<StateInner>,
149}
150
151impl ServiceState {
152    /// Starts all services and returns addresses to all of them.
153    pub async fn start(
154        handle: &relay_system::Handle,
155        services: &dyn ServiceSpawn,
156        config: Arc<Config>,
157    ) -> Result<Self> {
158        let upstream_relay = services.start(UpstreamRelayService::new(config.clone()));
159
160        #[cfg(feature = "processing")]
161        let redis_clients = config
162            .redis()
163            .filter(|_| config.processing_enabled())
164            .map(create_redis_clients)
165            .transpose()
166            .context(ServiceError::Redis)?;
167
168        // If we have Redis configured, we want to initialize all the scripts by loading them in
169        // the scripts cache if not present. Our custom ConnectionLike implementation relies on this
170        // initialization to work properly since it assumes that scripts are loaded across all Redis
171        // instances.
172        #[cfg(feature = "processing")]
173        if let Some(redis_clients) = &redis_clients {
174            initialize_redis_scripts_for_client(redis_clients)
175                .await
176                .context(ServiceError::Redis)?;
177        }
178
179        // We create an instance of `MemoryStat` which can be supplied composed with any arbitrary
180        // configuration object down the line.
181        let memory_stat = MemoryStat::new(config.memory_stat_refresh_frequency_ms());
182
183        // Create an address for the `EnvelopeProcessor`, which can be injected into the
184        // other services.
185        let (processor, processor_rx) = match config.relay_mode() {
186            relay_config::RelayMode::Proxy => channel(ProxyProcessorService::name()),
187            relay_config::RelayMode::Managed => channel(EnvelopeProcessorService::name()),
188        };
189
190        let outcome_producer = services.start(OutcomeProducerService::create(
191            config.clone(),
192            upstream_relay.clone(),
193            processor.clone(),
194        )?);
195        let outcome_aggregator =
196            services.start(OutcomeAggregator::new(&config, outcome_producer.clone()));
197
198        let (global_config, global_config_rx) =
199            GlobalConfigService::new(config.clone(), upstream_relay.clone());
200        let global_config_handle = global_config.handle();
201        // The global config service must start before dependant services are
202        // started. Messages like subscription requests to the global config
203        // service fail if the service is not running.
204        let global_config = services.start(global_config);
205
206        let project_source = ProjectSource::start_in(
207            services,
208            Arc::clone(&config),
209            upstream_relay.clone(),
210            #[cfg(feature = "processing")]
211            redis_clients.clone(),
212        )
213        .await;
214        let project_cache_handle =
215            ProjectCacheService::new(Arc::clone(&config), project_source).start_in(services);
216
217        let metric_outcomes = MetricOutcomes::new(outcome_aggregator.clone());
218
219        #[cfg(feature = "processing")]
220        let store_pool = create_store_pool(&config)?;
221        #[cfg(feature = "processing")]
222        let store = config
223            .processing_enabled()
224            .then(|| {
225                StoreService::create(
226                    store_pool.clone(),
227                    config.clone(),
228                    global_config_handle.clone(),
229                    outcome_aggregator.clone(),
230                    metric_outcomes.clone(),
231                )
232                .map(|s| services.start(s))
233            })
234            .transpose()?;
235
236        #[cfg(feature = "processing")]
237        let upload = UploadService::new(config.upload(), store.clone())?.map(|s| services.start(s));
238
239        let envelope_buffer = PartitionedEnvelopeBuffer::create(
240            config.spool_partitions(),
241            config.clone(),
242            memory_stat.clone(),
243            global_config_rx.clone(),
244            project_cache_handle.clone(),
245            processor.clone(),
246            outcome_aggregator.clone(),
247            services,
248        );
249
250        let (processor_pool, aggregator_handle, autoscaling) = match config.relay_mode() {
251            relay_config::RelayMode::Proxy => {
252                services.start_with(
253                    ProxyProcessorService::new(
254                        config.clone(),
255                        project_cache_handle.clone(),
256                        ProxyAddrs {
257                            outcome_aggregator: outcome_aggregator.clone(),
258                            upstream_relay: upstream_relay.clone(),
259                        },
260                    ),
261                    processor_rx,
262                );
263                (None, None, None)
264            }
265            relay_config::RelayMode::Managed => {
266                let processor_pool = create_processor_pool(&config)?;
267
268                let aggregator = RouterService::new(
269                    handle.clone(),
270                    config.default_aggregator_config().clone(),
271                    config.secondary_aggregator_configs().clone(),
272                    Some(processor.clone().recipient()),
273                    project_cache_handle.clone(),
274                );
275                let aggregator_handle = aggregator.handle();
276                let aggregator = services.start(aggregator);
277
278                let cogs = CogsService::new(&config);
279                let cogs = Cogs::new(CogsServiceRecorder::new(&config, services.start(cogs)));
280
281                services.start_with(
282                    EnvelopeProcessorService::new(
283                        processor_pool.clone(),
284                        config.clone(),
285                        global_config_handle,
286                        project_cache_handle.clone(),
287                        cogs,
288                        #[cfg(feature = "processing")]
289                        redis_clients.clone(),
290                        processor::Addrs {
291                            outcome_aggregator: outcome_aggregator.clone(),
292                            upstream_relay: upstream_relay.clone(),
293                            #[cfg(feature = "processing")]
294                            upload,
295                            #[cfg(feature = "processing")]
296                            store_forwarder: store,
297                            aggregator: aggregator.clone(),
298                        },
299                        metric_outcomes.clone(),
300                    ),
301                    processor_rx,
302                );
303
304                let autoscaling = services.start(AutoscalingMetricService::new(
305                    memory_stat.clone(),
306                    envelope_buffer.clone(),
307                    handle.clone(),
308                    processor_pool.clone(),
309                ));
310
311                (
312                    Some(processor_pool),
313                    Some(aggregator_handle),
314                    Some(autoscaling),
315                )
316            }
317        };
318
319        let health_check = services.start(HealthCheckService::new(
320            config.clone(),
321            MemoryChecker::new(memory_stat.clone(), config.clone()),
322            aggregator_handle,
323            upstream_relay.clone(),
324            envelope_buffer.clone(),
325        ));
326
327        services.start(RelayStats::new(
328            config.clone(),
329            handle.clone(),
330            upstream_relay.clone(),
331            #[cfg(feature = "processing")]
332            redis_clients.clone(),
333            processor_pool,
334            #[cfg(feature = "processing")]
335            store_pool,
336        ));
337
338        let relay_cache = services.start(RelayCacheService::new(
339            config.clone(),
340            upstream_relay.clone(),
341        ));
342
343        let registry = Registry {
344            processor,
345            health_check,
346            outcome_producer,
347            outcome_aggregator,
348            relay_cache,
349            global_config,
350            project_cache_handle,
351            upstream_relay,
352            envelope_buffer,
353            autoscaling,
354        };
355
356        let state = StateInner {
357            config: config.clone(),
358            memory_checker: MemoryChecker::new(memory_stat, config.clone()),
359            registry,
360        };
361
362        Ok(ServiceState {
363            inner: Arc::new(state),
364        })
365    }
366
367    /// Returns a reference to the Relay configuration.
368    pub fn config(&self) -> &Config {
369        &self.inner.config
370    }
371
372    /// Returns a reference to the [`MemoryChecker`] which is a [`Config`] aware wrapper on the
373    /// [`MemoryStat`] which gives utility methods to determine whether memory usage is above
374    /// thresholds set in the [`Config`].
375    pub fn memory_checker(&self) -> &MemoryChecker {
376        &self.inner.memory_checker
377    }
378
379    pub fn autoscaling(&self) -> Option<&Addr<AutoscalingMetrics>> {
380        self.inner.registry.autoscaling.as_ref()
381    }
382
383    /// Returns the V2 envelope buffer, if present.
384    pub fn envelope_buffer(&self, project_key_pair: ProjectKeyPair) -> &ObservableEnvelopeBuffer {
385        self.inner.registry.envelope_buffer.buffer(project_key_pair)
386    }
387
388    /// Returns a [`ProjectCacheHandle`].
389    pub fn project_cache_handle(&self) -> &ProjectCacheHandle {
390        &self.inner.registry.project_cache_handle
391    }
392
393    /// Returns the address of the [`RelayCache`] service.
394    pub fn relay_cache(&self) -> &Addr<RelayCache> {
395        &self.inner.registry.relay_cache
396    }
397
398    /// Returns the address of the [`HealthCheck`] service.
399    pub fn health_check(&self) -> &Addr<HealthCheck> {
400        &self.inner.registry.health_check
401    }
402
403    /// Returns the address of the [`OutcomeProducer`] service.
404    pub fn outcome_producer(&self) -> &Addr<OutcomeProducer> {
405        &self.inner.registry.outcome_producer
406    }
407
408    /// Returns the address of the [`OutcomeProducer`] service.
409    pub fn upstream_relay(&self) -> &Addr<UpstreamRelay> {
410        &self.inner.registry.upstream_relay
411    }
412
413    /// Returns the address of the [`OutcomeProducer`] service.
414    pub fn processor(&self) -> &Addr<EnvelopeProcessor> {
415        &self.inner.registry.processor
416    }
417
418    /// Returns the address of the [`GlobalConfigService`] service.
419    pub fn global_config(&self) -> &Addr<GlobalConfigManager> {
420        &self.inner.registry.global_config
421    }
422
423    /// Returns the address of the [`OutcomeProducer`] service.
424    pub fn outcome_aggregator(&self) -> &Addr<TrackOutcome> {
425        &self.inner.registry.outcome_aggregator
426    }
427}
428
429/// Creates Redis clients from the given `configs`.
430///
431/// If `configs` is [`Unified`](RedisConfigsRef::Unified), one client is created and then cloned
432/// for project configs, cardinality, and quotas, meaning that they really use the same client.
433///
434/// If it is [`Individual`](RedisConfigsRef::Individual), an actual separate client
435/// is created for each use case.
436#[cfg(feature = "processing")]
437pub fn create_redis_clients(configs: RedisConfigsRef<'_>) -> Result<RedisClients, RedisError> {
438    const CARDINALITY_REDIS_CLIENT: &str = "cardinality";
439    const PROJECT_CONFIG_REDIS_CLIENT: &str = "projectconfig";
440    const QUOTA_REDIS_CLIENT: &str = "quotas";
441    const UNIFIED_REDIS_CLIENT: &str = "unified";
442
443    match configs {
444        RedisConfigsRef::Unified(unified) => {
445            let client = create_async_redis_client(UNIFIED_REDIS_CLIENT, &unified)?;
446
447            Ok(RedisClients {
448                project_configs: client.clone(),
449                cardinality: client.clone(),
450                quotas: client,
451            })
452        }
453        RedisConfigsRef::Individual {
454            project_configs,
455            cardinality,
456            quotas,
457        } => {
458            let project_configs =
459                create_async_redis_client(PROJECT_CONFIG_REDIS_CLIENT, &project_configs)?;
460            let cardinality = create_async_redis_client(CARDINALITY_REDIS_CLIENT, &cardinality)?;
461            let quotas = create_async_redis_client(QUOTA_REDIS_CLIENT, &quotas)?;
462
463            Ok(RedisClients {
464                project_configs,
465                cardinality,
466                quotas,
467            })
468        }
469    }
470}
471
472#[cfg(feature = "processing")]
473fn create_async_redis_client(
474    name: &'static str,
475    config: &RedisConfigRef<'_>,
476) -> Result<AsyncRedisClient, RedisError> {
477    match config {
478        RedisConfigRef::Cluster {
479            cluster_nodes,
480            options,
481        } => AsyncRedisClient::cluster(name, cluster_nodes.iter().map(|s| s.as_str()), options),
482        RedisConfigRef::Single { server, options } => {
483            AsyncRedisClient::single(name, server, options)
484        }
485    }
486}
487
488#[cfg(feature = "processing")]
489async fn initialize_redis_scripts_for_client(
490    redis_clients: &RedisClients,
491) -> Result<(), RedisError> {
492    let scripts = RedisScripts::all();
493
494    let clients = [&redis_clients.cardinality, &redis_clients.quotas];
495    for client in clients {
496        initialize_redis_scripts(client, &scripts).await?;
497    }
498
499    Ok(())
500}
501
502#[cfg(feature = "processing")]
503async fn initialize_redis_scripts(
504    client: &AsyncRedisClient,
505    scripts: &[&Script; 3],
506) -> Result<(), RedisError> {
507    let mut connection = client.get_connection().await?;
508
509    for script in scripts {
510        // We load on all instances without checking if the script is already in cache because of a
511        // limitation in the connection implementation.
512        script
513            .prepare_invoke()
514            .load_async(&mut connection)
515            .await
516            .map_err(RedisError::Redis)?;
517    }
518
519    Ok(())
520}
521
522impl FromRequestParts<Self> for ServiceState {
523    type Rejection = Infallible;
524
525    async fn from_request_parts(_: &mut Parts, state: &Self) -> Result<Self, Self::Rejection> {
526        Ok(state.clone())
527    }
528}