relay_server/
service.rs

1use std::convert::Infallible;
2use std::sync::Arc;
3use std::time::Duration;
4
5use crate::metrics::MetricOutcomes;
6use crate::services::autoscaling::{AutoscalingMetricService, AutoscalingMetrics};
7use crate::services::buffer::{
8    ObservableEnvelopeBuffer, PartitionedEnvelopeBuffer, ProjectKeyPair,
9};
10use crate::services::cogs::{CogsService, CogsServiceRecorder};
11use crate::services::global_config::{GlobalConfigManager, GlobalConfigService};
12#[cfg(feature = "processing")]
13use crate::services::global_rate_limits::GlobalRateLimitsService;
14use crate::services::health_check::{HealthCheck, HealthCheckService};
15use crate::services::metrics::RouterService;
16use crate::services::outcome::{OutcomeProducer, OutcomeProducerService, TrackOutcome};
17use crate::services::outcome_aggregator::OutcomeAggregator;
18use crate::services::processor::{
19    self, EnvelopeProcessor, EnvelopeProcessorService, EnvelopeProcessorServicePool,
20};
21use crate::services::projects::cache::{ProjectCacheHandle, ProjectCacheService};
22use crate::services::projects::source::ProjectSource;
23use crate::services::proxy_processor::{ProxyAddrs, ProxyProcessorService};
24use crate::services::relays::{RelayCache, RelayCacheService};
25use crate::services::stats::RelayStats;
26#[cfg(feature = "processing")]
27use crate::services::store::{StoreService, StoreServicePool};
28#[cfg(feature = "processing")]
29use crate::services::upload::UploadService;
30use crate::services::upstream::{UpstreamRelay, UpstreamRelayService};
31use crate::utils::{MemoryChecker, MemoryStat, ThreadKind};
32#[cfg(feature = "processing")]
33use anyhow::Context;
34use anyhow::Result;
35use axum::extract::FromRequestParts;
36use axum::http::request::Parts;
37use relay_cogs::Cogs;
38use relay_config::Config;
39#[cfg(feature = "processing")]
40use relay_config::{RedisConfigRef, RedisConfigsRef};
41#[cfg(feature = "processing")]
42use relay_redis::AsyncRedisClient;
43#[cfg(feature = "processing")]
44use relay_redis::redis::Script;
45#[cfg(feature = "processing")]
46use relay_redis::{RedisClients, RedisError, RedisScripts};
47use relay_system::{Addr, Service, ServiceSpawn, ServiceSpawnExt as _, channel};
48
49/// Indicates the type of failure of the server.
50#[derive(Debug, thiserror::Error)]
51pub enum ServiceError {
52    /// GeoIp construction failed.
53    #[error("could not load the Geoip Db")]
54    GeoIp,
55
56    /// Initializing the Kafka producer failed.
57    #[cfg(feature = "processing")]
58    #[error("could not initialize kafka producer: {0}")]
59    Kafka(String),
60
61    /// Initializing the Redis client failed.
62    #[cfg(feature = "processing")]
63    #[error("could not initialize redis client during startup")]
64    Redis,
65}
66
67#[derive(Clone, Debug)]
68pub struct Registry {
69    pub health_check: Addr<HealthCheck>,
70    pub outcome_producer: Addr<OutcomeProducer>,
71    pub outcome_aggregator: Addr<TrackOutcome>,
72    pub processor: Addr<EnvelopeProcessor>,
73    pub relay_cache: Addr<RelayCache>,
74    pub global_config: Addr<GlobalConfigManager>,
75    pub upstream_relay: Addr<UpstreamRelay>,
76    pub envelope_buffer: PartitionedEnvelopeBuffer,
77    pub project_cache_handle: ProjectCacheHandle,
78    pub autoscaling: Option<Addr<AutoscalingMetrics>>,
79}
80
81/// Constructs a Tokio [`relay_system::Runtime`] configured for running [services](relay_system::Service).
82pub fn create_runtime(name: &'static str, threads: usize) -> relay_system::Runtime {
83    relay_system::Runtime::builder(name)
84        .worker_threads(threads)
85        // Relay uses `spawn_blocking` only for Redis connections within the project
86        // cache, those should never exceed 100 concurrent connections
87        // (limited by connection pool).
88        //
89        // Relay also does not use other blocking operations from Tokio which require
90        // this pool, no usage of `tokio::fs` and `tokio::io::{Stdin, Stdout, Stderr}`.
91        //
92        // We limit the maximum amount of threads here, we've seen that Tokio
93        // expands this pool very very aggressively and basically never shrinks it
94        // which leads to a massive resource waste.
95        .max_blocking_threads(150)
96        // We also lower down the default (10s) keep alive timeout for blocking
97        // threads to encourage the runtime to not keep too many idle blocking threads
98        // around.
99        .thread_keep_alive(Duration::from_secs(1))
100        .build()
101}
102
103fn create_processor_pool(config: &Config) -> Result<EnvelopeProcessorServicePool> {
104    // Adjust thread count for small cpu counts to not have too many idle cores
105    // and distribute workload better.
106    let thread_count = match config.cpu_concurrency() {
107        conc @ 0..=2 => conc.max(1),
108        conc @ 3..=4 => conc - 1,
109        conc => conc - 2,
110    };
111    relay_log::info!("starting {thread_count} envelope processing workers");
112
113    let pool = crate::utils::ThreadPoolBuilder::new("processor", tokio::runtime::Handle::current())
114        .num_threads(thread_count)
115        .max_concurrency(config.pool_concurrency())
116        .thread_kind(ThreadKind::Worker)
117        .build()?;
118
119    Ok(pool)
120}
121
122#[cfg(feature = "processing")]
123fn create_store_pool(config: &Config) -> Result<StoreServicePool> {
124    // Spawn a store worker for every 12 threads in the processor pool.
125    // This ratio was found empirically and may need adjustments in the future.
126    //
127    // Ideally in the future the store will be single threaded again, after we move
128    // all the heavy processing (de- and re-serialization) into the processor.
129    let thread_count = config.cpu_concurrency().div_ceil(12);
130    relay_log::info!("starting {thread_count} store workers");
131
132    let pool = crate::utils::ThreadPoolBuilder::new("store", tokio::runtime::Handle::current())
133        .num_threads(thread_count)
134        .max_concurrency(config.pool_concurrency())
135        .build()?;
136
137    Ok(pool)
138}
139
140#[derive(Debug)]
141struct StateInner {
142    config: Arc<Config>,
143    memory_checker: MemoryChecker,
144    registry: Registry,
145}
146
147/// Server state.
148#[derive(Clone, Debug)]
149pub struct ServiceState {
150    inner: Arc<StateInner>,
151}
152
153impl ServiceState {
154    /// Starts all services and returns addresses to all of them.
155    pub async fn start(
156        handle: &relay_system::Handle,
157        services: &dyn ServiceSpawn,
158        config: Arc<Config>,
159    ) -> Result<Self> {
160        let upstream_relay = services.start(UpstreamRelayService::new(config.clone()));
161
162        #[cfg(feature = "processing")]
163        let redis_clients = config
164            .redis()
165            .filter(|_| config.processing_enabled())
166            .map(create_redis_clients)
167            .transpose()
168            .context(ServiceError::Redis)?;
169
170        // If we have Redis configured, we want to initialize all the scripts by loading them in
171        // the scripts cache if not present. Our custom ConnectionLike implementation relies on this
172        // initialization to work properly since it assumes that scripts are loaded across all Redis
173        // instances.
174        #[cfg(feature = "processing")]
175        if let Some(redis_clients) = &redis_clients {
176            initialize_redis_scripts_for_client(redis_clients)
177                .await
178                .context(ServiceError::Redis)?;
179        }
180
181        // We create an instance of `MemoryStat` which can be supplied composed with any arbitrary
182        // configuration object down the line.
183        let memory_stat = MemoryStat::new(config.memory_stat_refresh_frequency_ms());
184
185        // Create an address for the `EnvelopeProcessor`, which can be injected into the
186        // other services.
187        let (processor, processor_rx) = match config.relay_mode() {
188            relay_config::RelayMode::Proxy => channel(ProxyProcessorService::name()),
189            relay_config::RelayMode::Managed => channel(EnvelopeProcessorService::name()),
190        };
191
192        let outcome_producer = services.start(OutcomeProducerService::create(
193            config.clone(),
194            upstream_relay.clone(),
195            processor.clone(),
196        )?);
197        let outcome_aggregator =
198            services.start(OutcomeAggregator::new(&config, outcome_producer.clone()));
199
200        let (global_config, global_config_rx) =
201            GlobalConfigService::new(config.clone(), upstream_relay.clone());
202        let global_config_handle = global_config.handle();
203        // The global config service must start before dependant services are
204        // started. Messages like subscription requests to the global config
205        // service fail if the service is not running.
206        let global_config = services.start(global_config);
207
208        let project_source = ProjectSource::start_in(
209            services,
210            Arc::clone(&config),
211            upstream_relay.clone(),
212            #[cfg(feature = "processing")]
213            redis_clients.clone(),
214        )
215        .await;
216        let project_cache_handle =
217            ProjectCacheService::new(Arc::clone(&config), project_source).start_in(services);
218
219        let metric_outcomes = MetricOutcomes::new(outcome_aggregator.clone());
220
221        #[cfg(feature = "processing")]
222        let store_pool = create_store_pool(&config)?;
223        #[cfg(feature = "processing")]
224        let store = config
225            .processing_enabled()
226            .then(|| {
227                StoreService::create(
228                    store_pool.clone(),
229                    config.clone(),
230                    global_config_handle.clone(),
231                    outcome_aggregator.clone(),
232                    metric_outcomes.clone(),
233                )
234                .map(|s| services.start(s))
235            })
236            .transpose()?;
237
238        #[cfg(feature = "processing")]
239        let upload = UploadService::new(config.upload(), store.clone())?.map(|s| services.start(s));
240
241        #[cfg(feature = "processing")]
242        let global_rate_limits = redis_clients
243            .as_ref()
244            .map(|p| services.start(GlobalRateLimitsService::new(p.quotas.clone())));
245
246        let envelope_buffer = PartitionedEnvelopeBuffer::create(
247            config.spool_partitions(),
248            config.clone(),
249            memory_stat.clone(),
250            global_config_rx.clone(),
251            project_cache_handle.clone(),
252            processor.clone(),
253            outcome_aggregator.clone(),
254            services,
255        );
256
257        let (processor_pool, aggregator_handle, autoscaling) = match config.relay_mode() {
258            relay_config::RelayMode::Proxy => {
259                services.start_with(
260                    ProxyProcessorService::new(
261                        config.clone(),
262                        project_cache_handle.clone(),
263                        ProxyAddrs {
264                            outcome_aggregator: outcome_aggregator.clone(),
265                            upstream_relay: upstream_relay.clone(),
266                        },
267                    ),
268                    processor_rx,
269                );
270                (None, None, None)
271            }
272            relay_config::RelayMode::Managed => {
273                let processor_pool = create_processor_pool(&config)?;
274
275                let aggregator = RouterService::new(
276                    handle.clone(),
277                    config.default_aggregator_config().clone(),
278                    config.secondary_aggregator_configs().clone(),
279                    Some(processor.clone().recipient()),
280                    project_cache_handle.clone(),
281                );
282                let aggregator_handle = aggregator.handle();
283                let aggregator = services.start(aggregator);
284
285                let cogs = CogsService::new(&config);
286                let cogs = Cogs::new(CogsServiceRecorder::new(&config, services.start(cogs)));
287
288                services.start_with(
289                    EnvelopeProcessorService::new(
290                        processor_pool.clone(),
291                        config.clone(),
292                        global_config_handle,
293                        project_cache_handle.clone(),
294                        cogs,
295                        #[cfg(feature = "processing")]
296                        redis_clients.clone(),
297                        processor::Addrs {
298                            outcome_aggregator: outcome_aggregator.clone(),
299                            upstream_relay: upstream_relay.clone(),
300                            #[cfg(feature = "processing")]
301                            upload,
302                            #[cfg(feature = "processing")]
303                            store_forwarder: store,
304                            aggregator: aggregator.clone(),
305                            #[cfg(feature = "processing")]
306                            global_rate_limits,
307                        },
308                        metric_outcomes.clone(),
309                    ),
310                    processor_rx,
311                );
312
313                let autoscaling = services.start(AutoscalingMetricService::new(
314                    memory_stat.clone(),
315                    envelope_buffer.clone(),
316                    handle.clone(),
317                    processor_pool.clone(),
318                ));
319
320                (
321                    Some(processor_pool),
322                    Some(aggregator_handle),
323                    Some(autoscaling),
324                )
325            }
326        };
327
328        let health_check = services.start(HealthCheckService::new(
329            config.clone(),
330            MemoryChecker::new(memory_stat.clone(), config.clone()),
331            aggregator_handle,
332            upstream_relay.clone(),
333            envelope_buffer.clone(),
334        ));
335
336        services.start(RelayStats::new(
337            config.clone(),
338            handle.clone(),
339            upstream_relay.clone(),
340            #[cfg(feature = "processing")]
341            redis_clients.clone(),
342            processor_pool,
343            #[cfg(feature = "processing")]
344            store_pool,
345        ));
346
347        let relay_cache = services.start(RelayCacheService::new(
348            config.clone(),
349            upstream_relay.clone(),
350        ));
351
352        let registry = Registry {
353            processor,
354            health_check,
355            outcome_producer,
356            outcome_aggregator,
357            relay_cache,
358            global_config,
359            project_cache_handle,
360            upstream_relay,
361            envelope_buffer,
362            autoscaling,
363        };
364
365        let state = StateInner {
366            config: config.clone(),
367            memory_checker: MemoryChecker::new(memory_stat, config.clone()),
368            registry,
369        };
370
371        Ok(ServiceState {
372            inner: Arc::new(state),
373        })
374    }
375
376    /// Returns a reference to the Relay configuration.
377    pub fn config(&self) -> &Config {
378        &self.inner.config
379    }
380
381    /// Returns a reference to the [`MemoryChecker`] which is a [`Config`] aware wrapper on the
382    /// [`MemoryStat`] which gives utility methods to determine whether memory usage is above
383    /// thresholds set in the [`Config`].
384    pub fn memory_checker(&self) -> &MemoryChecker {
385        &self.inner.memory_checker
386    }
387
388    pub fn autoscaling(&self) -> Option<&Addr<AutoscalingMetrics>> {
389        self.inner.registry.autoscaling.as_ref()
390    }
391
392    /// Returns the V2 envelope buffer, if present.
393    pub fn envelope_buffer(&self, project_key_pair: ProjectKeyPair) -> &ObservableEnvelopeBuffer {
394        self.inner.registry.envelope_buffer.buffer(project_key_pair)
395    }
396
397    /// Returns a [`ProjectCacheHandle`].
398    pub fn project_cache_handle(&self) -> &ProjectCacheHandle {
399        &self.inner.registry.project_cache_handle
400    }
401
402    /// Returns the address of the [`RelayCache`] service.
403    pub fn relay_cache(&self) -> &Addr<RelayCache> {
404        &self.inner.registry.relay_cache
405    }
406
407    /// Returns the address of the [`HealthCheck`] service.
408    pub fn health_check(&self) -> &Addr<HealthCheck> {
409        &self.inner.registry.health_check
410    }
411
412    /// Returns the address of the [`OutcomeProducer`] service.
413    pub fn outcome_producer(&self) -> &Addr<OutcomeProducer> {
414        &self.inner.registry.outcome_producer
415    }
416
417    /// Returns the address of the [`OutcomeProducer`] service.
418    pub fn upstream_relay(&self) -> &Addr<UpstreamRelay> {
419        &self.inner.registry.upstream_relay
420    }
421
422    /// Returns the address of the [`OutcomeProducer`] service.
423    pub fn processor(&self) -> &Addr<EnvelopeProcessor> {
424        &self.inner.registry.processor
425    }
426
427    /// Returns the address of the [`GlobalConfigService`] service.
428    pub fn global_config(&self) -> &Addr<GlobalConfigManager> {
429        &self.inner.registry.global_config
430    }
431
432    /// Returns the address of the [`OutcomeProducer`] service.
433    pub fn outcome_aggregator(&self) -> &Addr<TrackOutcome> {
434        &self.inner.registry.outcome_aggregator
435    }
436}
437
438/// Creates Redis clients from the given `configs`.
439///
440/// If `configs` is [`Unified`](RedisConfigsRef::Unified), one client is created and then cloned
441/// for project configs, cardinality, and quotas, meaning that they really use the same client.
442///
443/// If it is [`Individual`](RedisConfigsRef::Individual), an actual separate client
444/// is created for each use case.
445#[cfg(feature = "processing")]
446pub fn create_redis_clients(configs: RedisConfigsRef<'_>) -> Result<RedisClients, RedisError> {
447    const CARDINALITY_REDIS_CLIENT: &str = "cardinality";
448    const PROJECT_CONFIG_REDIS_CLIENT: &str = "projectconfig";
449    const QUOTA_REDIS_CLIENT: &str = "quotas";
450    const UNIFIED_REDIS_CLIENT: &str = "unified";
451
452    match configs {
453        RedisConfigsRef::Unified(unified) => {
454            let client = create_async_redis_client(UNIFIED_REDIS_CLIENT, &unified)?;
455
456            Ok(RedisClients {
457                project_configs: client.clone(),
458                cardinality: client.clone(),
459                quotas: client,
460            })
461        }
462        RedisConfigsRef::Individual {
463            project_configs,
464            cardinality,
465            quotas,
466        } => {
467            let project_configs =
468                create_async_redis_client(PROJECT_CONFIG_REDIS_CLIENT, &project_configs)?;
469            let cardinality = create_async_redis_client(CARDINALITY_REDIS_CLIENT, &cardinality)?;
470            let quotas = create_async_redis_client(QUOTA_REDIS_CLIENT, &quotas)?;
471
472            Ok(RedisClients {
473                project_configs,
474                cardinality,
475                quotas,
476            })
477        }
478    }
479}
480
481#[cfg(feature = "processing")]
482fn create_async_redis_client(
483    name: &'static str,
484    config: &RedisConfigRef<'_>,
485) -> Result<AsyncRedisClient, RedisError> {
486    match config {
487        RedisConfigRef::Cluster {
488            cluster_nodes,
489            options,
490        } => AsyncRedisClient::cluster(name, cluster_nodes.iter().map(|s| s.as_str()), options),
491        RedisConfigRef::Single { server, options } => {
492            AsyncRedisClient::single(name, server, options)
493        }
494    }
495}
496
497#[cfg(feature = "processing")]
498async fn initialize_redis_scripts_for_client(
499    redis_clients: &RedisClients,
500) -> Result<(), RedisError> {
501    let scripts = RedisScripts::all();
502
503    let clients = [&redis_clients.cardinality, &redis_clients.quotas];
504    for client in clients {
505        initialize_redis_scripts(client, &scripts).await?;
506    }
507
508    Ok(())
509}
510
511#[cfg(feature = "processing")]
512async fn initialize_redis_scripts(
513    client: &AsyncRedisClient,
514    scripts: &[&Script; 3],
515) -> Result<(), RedisError> {
516    let mut connection = client.get_connection().await?;
517
518    for script in scripts {
519        // We load on all instances without checking if the script is already in cache because of a
520        // limitation in the connection implementation.
521        script
522            .prepare_invoke()
523            .load_async(&mut connection)
524            .await
525            .map_err(RedisError::Redis)?;
526    }
527
528    Ok(())
529}
530
531impl FromRequestParts<Self> for ServiceState {
532    type Rejection = Infallible;
533
534    async fn from_request_parts(_: &mut Parts, state: &Self) -> Result<Self, Self::Rejection> {
535        Ok(state.clone())
536    }
537}