relay_server/
service.rs

1use std::convert::Infallible;
2use std::sync::Arc;
3use std::time::Duration;
4
5use crate::metrics::MetricOutcomes;
6use crate::services::autoscaling::{AutoscalingMetricService, AutoscalingMetrics};
7use crate::services::buffer::{
8    ObservableEnvelopeBuffer, PartitionedEnvelopeBuffer, ProjectKeyPair,
9};
10use crate::services::cogs::{CogsService, CogsServiceRecorder};
11use crate::services::global_config::{GlobalConfigManager, GlobalConfigService};
12#[cfg(feature = "processing")]
13use crate::services::global_rate_limits::GlobalRateLimitsService;
14use crate::services::health_check::{HealthCheck, HealthCheckService};
15use crate::services::metrics::RouterService;
16use crate::services::outcome::{OutcomeProducer, OutcomeProducerService, TrackOutcome};
17use crate::services::outcome_aggregator::OutcomeAggregator;
18use crate::services::processor::{
19    self, EnvelopeProcessor, EnvelopeProcessorService, EnvelopeProcessorServicePool,
20};
21use crate::services::projects::cache::{ProjectCacheHandle, ProjectCacheService};
22use crate::services::projects::source::ProjectSource;
23use crate::services::relays::{RelayCache, RelayCacheService};
24use crate::services::stats::RelayStats;
25#[cfg(feature = "processing")]
26use crate::services::store::{StoreService, StoreServicePool};
27use crate::services::upstream::{UpstreamRelay, UpstreamRelayService};
28use crate::utils::{MemoryChecker, MemoryStat, ThreadKind};
29#[cfg(feature = "processing")]
30use anyhow::Context;
31use anyhow::Result;
32use axum::extract::FromRequestParts;
33use axum::http::request::Parts;
34use relay_cogs::Cogs;
35use relay_config::Config;
36#[cfg(feature = "processing")]
37use relay_config::{RedisConfigRef, RedisConfigsRef};
38#[cfg(feature = "processing")]
39use relay_redis::AsyncRedisClient;
40#[cfg(feature = "processing")]
41use relay_redis::redis::Script;
42#[cfg(feature = "processing")]
43use relay_redis::{RedisClients, RedisError, RedisScripts};
44use relay_system::{Addr, Service, ServiceSpawn, ServiceSpawnExt as _, channel};
45
46/// Indicates the type of failure of the server.
47#[derive(Debug, thiserror::Error)]
48pub enum ServiceError {
49    /// GeoIp construction failed.
50    #[error("could not load the Geoip Db")]
51    GeoIp,
52
53    /// Initializing the Kafka producer failed.
54    #[cfg(feature = "processing")]
55    #[error("could not initialize kafka producer: {0}")]
56    Kafka(String),
57
58    /// Initializing the Redis client failed.
59    #[cfg(feature = "processing")]
60    #[error("could not initialize redis client during startup")]
61    Redis,
62}
63
64#[derive(Clone, Debug)]
65pub struct Registry {
66    pub health_check: Addr<HealthCheck>,
67    pub outcome_producer: Addr<OutcomeProducer>,
68    pub outcome_aggregator: Addr<TrackOutcome>,
69    pub processor: Addr<EnvelopeProcessor>,
70    pub relay_cache: Addr<RelayCache>,
71    pub global_config: Addr<GlobalConfigManager>,
72    pub upstream_relay: Addr<UpstreamRelay>,
73    pub envelope_buffer: PartitionedEnvelopeBuffer,
74    pub project_cache_handle: ProjectCacheHandle,
75    pub autoscaling: Addr<AutoscalingMetrics>,
76}
77
78/// Constructs a Tokio [`relay_system::Runtime`] configured for running [services](relay_system::Service).
79pub fn create_runtime(name: &'static str, threads: usize) -> relay_system::Runtime {
80    relay_system::Runtime::builder(name)
81        .worker_threads(threads)
82        // Relay uses `spawn_blocking` only for Redis connections within the project
83        // cache, those should never exceed 100 concurrent connections
84        // (limited by connection pool).
85        //
86        // Relay also does not use other blocking operations from Tokio which require
87        // this pool, no usage of `tokio::fs` and `tokio::io::{Stdin, Stdout, Stderr}`.
88        //
89        // We limit the maximum amount of threads here, we've seen that Tokio
90        // expands this pool very very aggressively and basically never shrinks it
91        // which leads to a massive resource waste.
92        .max_blocking_threads(150)
93        // We also lower down the default (10s) keep alive timeout for blocking
94        // threads to encourage the runtime to not keep too many idle blocking threads
95        // around.
96        .thread_keep_alive(Duration::from_secs(1))
97        .build()
98}
99
100fn create_processor_pool(config: &Config) -> Result<EnvelopeProcessorServicePool> {
101    // Adjust thread count for small cpu counts to not have too many idle cores
102    // and distribute workload better.
103    let thread_count = match config.cpu_concurrency() {
104        conc @ 0..=2 => conc.max(1),
105        conc @ 3..=4 => conc - 1,
106        conc => conc - 2,
107    };
108    relay_log::info!("starting {thread_count} envelope processing workers");
109
110    let pool = crate::utils::ThreadPoolBuilder::new("processor", tokio::runtime::Handle::current())
111        .num_threads(thread_count)
112        .max_concurrency(config.pool_concurrency())
113        .thread_kind(ThreadKind::Worker)
114        .build()?;
115
116    Ok(pool)
117}
118
119#[cfg(feature = "processing")]
120fn create_store_pool(config: &Config) -> Result<StoreServicePool> {
121    // Spawn a store worker for every 12 threads in the processor pool.
122    // This ratio was found empirically and may need adjustments in the future.
123    //
124    // Ideally in the future the store will be single threaded again, after we move
125    // all the heavy processing (de- and re-serialization) into the processor.
126    let thread_count = config.cpu_concurrency().div_ceil(12);
127    relay_log::info!("starting {thread_count} store workers");
128
129    let pool = crate::utils::ThreadPoolBuilder::new("store", tokio::runtime::Handle::current())
130        .num_threads(thread_count)
131        .max_concurrency(config.pool_concurrency())
132        .build()?;
133
134    Ok(pool)
135}
136
137#[derive(Debug)]
138struct StateInner {
139    config: Arc<Config>,
140    memory_checker: MemoryChecker,
141    registry: Registry,
142}
143
144/// Server state.
145#[derive(Clone, Debug)]
146pub struct ServiceState {
147    inner: Arc<StateInner>,
148}
149
150impl ServiceState {
151    /// Starts all services and returns addresses to all of them.
152    pub async fn start(
153        handle: &relay_system::Handle,
154        services: &dyn ServiceSpawn,
155        config: Arc<Config>,
156    ) -> Result<Self> {
157        let upstream_relay = services.start(UpstreamRelayService::new(config.clone()));
158
159        #[cfg(feature = "processing")]
160        let redis_clients = config
161            .redis()
162            .filter(|_| config.processing_enabled())
163            .map(create_redis_clients)
164            .transpose()
165            .context(ServiceError::Redis)?;
166
167        // If we have Redis configured, we want to initialize all the scripts by loading them in
168        // the scripts cache if not present. Our custom ConnectionLike implementation relies on this
169        // initialization to work properly since it assumes that scripts are loaded across all Redis
170        // instances.
171        #[cfg(feature = "processing")]
172        if let Some(redis_clients) = &redis_clients {
173            initialize_redis_scripts_for_client(redis_clients)
174                .await
175                .context(ServiceError::Redis)?;
176        }
177
178        // We create an instance of `MemoryStat` which can be supplied composed with any arbitrary
179        // configuration object down the line.
180        let memory_stat = MemoryStat::new(config.memory_stat_refresh_frequency_ms());
181
182        // Create an address for the `EnvelopeProcessor`, which can be injected into the
183        // other services.
184        let (processor, processor_rx) = channel(EnvelopeProcessorService::name());
185        let outcome_producer = services.start(OutcomeProducerService::create(
186            config.clone(),
187            upstream_relay.clone(),
188            processor.clone(),
189        )?);
190        let outcome_aggregator =
191            services.start(OutcomeAggregator::new(&config, outcome_producer.clone()));
192
193        let (global_config, global_config_rx) =
194            GlobalConfigService::new(config.clone(), upstream_relay.clone());
195        let global_config_handle = global_config.handle();
196        // The global config service must start before dependant services are
197        // started. Messages like subscription requests to the global config
198        // service fail if the service is not running.
199        let global_config = services.start(global_config);
200
201        let project_source = ProjectSource::start_in(
202            services,
203            Arc::clone(&config),
204            upstream_relay.clone(),
205            #[cfg(feature = "processing")]
206            redis_clients.clone(),
207        )
208        .await;
209        let project_cache_handle =
210            ProjectCacheService::new(Arc::clone(&config), project_source).start_in(services);
211
212        let aggregator = RouterService::new(
213            handle.clone(),
214            config.default_aggregator_config().clone(),
215            config.secondary_aggregator_configs().clone(),
216            Some(processor.clone().recipient()),
217            project_cache_handle.clone(),
218        );
219        let aggregator_handle = aggregator.handle();
220        let aggregator = services.start(aggregator);
221
222        let metric_outcomes = MetricOutcomes::new(outcome_aggregator.clone());
223
224        #[cfg(feature = "processing")]
225        let store_pool = create_store_pool(&config)?;
226        #[cfg(feature = "processing")]
227        let store = config
228            .processing_enabled()
229            .then(|| {
230                StoreService::create(
231                    store_pool.clone(),
232                    config.clone(),
233                    global_config_handle.clone(),
234                    outcome_aggregator.clone(),
235                    metric_outcomes.clone(),
236                )
237                .map(|s| services.start(s))
238            })
239            .transpose()?;
240
241        let cogs = CogsService::new(&config);
242        let cogs = Cogs::new(CogsServiceRecorder::new(&config, services.start(cogs)));
243
244        #[cfg(feature = "processing")]
245        let global_rate_limits = redis_clients
246            .as_ref()
247            .map(|p| services.start(GlobalRateLimitsService::new(p.quotas.clone())));
248
249        let processor_pool = create_processor_pool(&config)?;
250        services.start_with(
251            EnvelopeProcessorService::new(
252                processor_pool.clone(),
253                config.clone(),
254                global_config_handle,
255                project_cache_handle.clone(),
256                cogs,
257                #[cfg(feature = "processing")]
258                redis_clients.clone(),
259                processor::Addrs {
260                    outcome_aggregator: outcome_aggregator.clone(),
261                    upstream_relay: upstream_relay.clone(),
262                    #[cfg(feature = "processing")]
263                    store_forwarder: store.clone(),
264                    aggregator: aggregator.clone(),
265                    #[cfg(feature = "processing")]
266                    global_rate_limits,
267                },
268                metric_outcomes.clone(),
269            ),
270            processor_rx,
271        );
272
273        let envelope_buffer = PartitionedEnvelopeBuffer::create(
274            config.spool_partitions(),
275            config.clone(),
276            memory_stat.clone(),
277            global_config_rx.clone(),
278            project_cache_handle.clone(),
279            processor.clone(),
280            outcome_aggregator.clone(),
281            services,
282        );
283
284        let health_check = services.start(HealthCheckService::new(
285            config.clone(),
286            MemoryChecker::new(memory_stat.clone(), config.clone()),
287            aggregator_handle,
288            upstream_relay.clone(),
289            envelope_buffer.clone(),
290        ));
291
292        let autoscaling = services.start(AutoscalingMetricService::new(
293            memory_stat.clone(),
294            envelope_buffer.clone(),
295            handle.clone(),
296            processor_pool.clone(),
297        ));
298
299        services.start(RelayStats::new(
300            config.clone(),
301            handle.clone(),
302            upstream_relay.clone(),
303            #[cfg(feature = "processing")]
304            redis_clients.clone(),
305            processor_pool,
306            #[cfg(feature = "processing")]
307            store_pool,
308        ));
309
310        let relay_cache = services.start(RelayCacheService::new(
311            config.clone(),
312            upstream_relay.clone(),
313        ));
314
315        let registry = Registry {
316            processor,
317            health_check,
318            outcome_producer,
319            outcome_aggregator,
320            relay_cache,
321            global_config,
322            project_cache_handle,
323            upstream_relay,
324            envelope_buffer,
325            autoscaling,
326        };
327
328        let state = StateInner {
329            config: config.clone(),
330            memory_checker: MemoryChecker::new(memory_stat, config.clone()),
331            registry,
332        };
333
334        Ok(ServiceState {
335            inner: Arc::new(state),
336        })
337    }
338
339    /// Returns a reference to the Relay configuration.
340    pub fn config(&self) -> &Config {
341        &self.inner.config
342    }
343
344    /// Returns a reference to the [`MemoryChecker`] which is a [`Config`] aware wrapper on the
345    /// [`MemoryStat`] which gives utility methods to determine whether memory usage is above
346    /// thresholds set in the [`Config`].
347    pub fn memory_checker(&self) -> &MemoryChecker {
348        &self.inner.memory_checker
349    }
350
351    pub fn autoscaling(&self) -> &Addr<AutoscalingMetrics> {
352        &self.inner.registry.autoscaling
353    }
354
355    /// Returns the V2 envelope buffer, if present.
356    pub fn envelope_buffer(&self, project_key_pair: ProjectKeyPair) -> &ObservableEnvelopeBuffer {
357        self.inner.registry.envelope_buffer.buffer(project_key_pair)
358    }
359
360    /// Returns a [`ProjectCacheHandle`].
361    pub fn project_cache_handle(&self) -> &ProjectCacheHandle {
362        &self.inner.registry.project_cache_handle
363    }
364
365    /// Returns the address of the [`RelayCache`] service.
366    pub fn relay_cache(&self) -> &Addr<RelayCache> {
367        &self.inner.registry.relay_cache
368    }
369
370    /// Returns the address of the [`HealthCheck`] service.
371    pub fn health_check(&self) -> &Addr<HealthCheck> {
372        &self.inner.registry.health_check
373    }
374
375    /// Returns the address of the [`OutcomeProducer`] service.
376    pub fn outcome_producer(&self) -> &Addr<OutcomeProducer> {
377        &self.inner.registry.outcome_producer
378    }
379
380    /// Returns the address of the [`OutcomeProducer`] service.
381    pub fn upstream_relay(&self) -> &Addr<UpstreamRelay> {
382        &self.inner.registry.upstream_relay
383    }
384
385    /// Returns the address of the [`OutcomeProducer`] service.
386    pub fn processor(&self) -> &Addr<EnvelopeProcessor> {
387        &self.inner.registry.processor
388    }
389
390    /// Returns the address of the [`GlobalConfigService`] service.
391    pub fn global_config(&self) -> &Addr<GlobalConfigManager> {
392        &self.inner.registry.global_config
393    }
394
395    /// Returns the address of the [`OutcomeProducer`] service.
396    pub fn outcome_aggregator(&self) -> &Addr<TrackOutcome> {
397        &self.inner.registry.outcome_aggregator
398    }
399}
400
401/// Creates Redis clients from the given `configs`.
402///
403/// If `configs` is [`Unified`](RedisConfigsRef::Unified), one client is created and then cloned
404/// for project configs, cardinality, and quotas, meaning that they really use the same client.
405///
406/// If it is [`Individual`](RedisConfigsRef::Individual), an actual separate client
407/// is created for each use case.
408#[cfg(feature = "processing")]
409pub fn create_redis_clients(configs: RedisConfigsRef<'_>) -> Result<RedisClients, RedisError> {
410    match configs {
411        RedisConfigsRef::Unified(unified) => {
412            let client = create_async_redis_client(&unified)?;
413
414            Ok(RedisClients {
415                project_configs: client.clone(),
416                cardinality: client.clone(),
417                quotas: client,
418            })
419        }
420        RedisConfigsRef::Individual {
421            project_configs,
422            cardinality,
423            quotas,
424        } => {
425            let project_configs = create_async_redis_client(&project_configs)?;
426            let cardinality = create_async_redis_client(&cardinality)?;
427            let quotas = create_async_redis_client(&quotas)?;
428
429            Ok(RedisClients {
430                project_configs,
431                cardinality,
432                quotas,
433            })
434        }
435    }
436}
437
438#[cfg(feature = "processing")]
439fn create_async_redis_client(config: &RedisConfigRef<'_>) -> Result<AsyncRedisClient, RedisError> {
440    match config {
441        RedisConfigRef::Cluster {
442            cluster_nodes,
443            options,
444        } => AsyncRedisClient::cluster(cluster_nodes.iter().map(|s| s.as_str()), options),
445        RedisConfigRef::Single { server, options } => AsyncRedisClient::single(server, options),
446    }
447}
448
449#[cfg(feature = "processing")]
450async fn initialize_redis_scripts_for_client(
451    redis_clients: &RedisClients,
452) -> Result<(), RedisError> {
453    let scripts = RedisScripts::all();
454
455    let clients = [&redis_clients.cardinality, &redis_clients.quotas];
456    for client in clients {
457        initialize_redis_scripts(client, &scripts).await?;
458    }
459
460    Ok(())
461}
462
463#[cfg(feature = "processing")]
464async fn initialize_redis_scripts(
465    client: &AsyncRedisClient,
466    scripts: &[&Script; 3],
467) -> Result<(), RedisError> {
468    let mut connection = client.get_connection().await?;
469
470    for script in scripts {
471        // We load on all instances without checking if the script is already in cache because of a
472        // limitation in the connection implementation.
473        script
474            .prepare_invoke()
475            .load_async(&mut connection)
476            .await
477            .map_err(RedisError::Redis)?;
478    }
479
480    Ok(())
481}
482
483impl FromRequestParts<Self> for ServiceState {
484    type Rejection = Infallible;
485
486    async fn from_request_parts(_: &mut Parts, state: &Self) -> Result<Self, Self::Rejection> {
487        Ok(state.clone())
488    }
489}