relay_server/
service.rs

1use std::convert::Infallible;
2use std::sync::Arc;
3use std::time::Duration;
4
5use crate::metrics::{MetricOutcomes, MetricStats};
6use crate::services::autoscaling::{AutoscalingMetricService, AutoscalingMetrics};
7use crate::services::buffer::{
8    ObservableEnvelopeBuffer, PartitionedEnvelopeBuffer, ProjectKeyPair,
9};
10use crate::services::cogs::{CogsService, CogsServiceRecorder};
11use crate::services::global_config::{GlobalConfigManager, GlobalConfigService};
12#[cfg(feature = "processing")]
13use crate::services::global_rate_limits::GlobalRateLimitsService;
14use crate::services::health_check::{HealthCheck, HealthCheckService};
15use crate::services::metrics::RouterService;
16use crate::services::outcome::{OutcomeProducer, OutcomeProducerService, TrackOutcome};
17use crate::services::outcome_aggregator::OutcomeAggregator;
18use crate::services::processor::{
19    self, EnvelopeProcessor, EnvelopeProcessorService, EnvelopeProcessorServicePool,
20};
21use crate::services::projects::cache::{ProjectCacheHandle, ProjectCacheService};
22use crate::services::projects::source::ProjectSource;
23use crate::services::relays::{RelayCache, RelayCacheService};
24use crate::services::stats::RelayStats;
25#[cfg(feature = "processing")]
26use crate::services::store::{StoreService, StoreServicePool};
27use crate::services::test_store::{TestStore, TestStoreService};
28use crate::services::upstream::{UpstreamRelay, UpstreamRelayService};
29use crate::utils::{MemoryChecker, MemoryStat, ThreadKind};
30#[cfg(feature = "processing")]
31use anyhow::Context;
32use anyhow::Result;
33use axum::extract::FromRequestParts;
34use axum::http::request::Parts;
35use relay_cogs::Cogs;
36use relay_config::Config;
37#[cfg(feature = "processing")]
38use relay_config::{RedisConfigRef, RedisConfigsRef};
39#[cfg(feature = "processing")]
40use relay_redis::AsyncRedisClient;
41#[cfg(feature = "processing")]
42use relay_redis::redis::Script;
43#[cfg(feature = "processing")]
44use relay_redis::{RedisClients, RedisError, RedisScripts};
45use relay_system::{Addr, Service, ServiceSpawn, ServiceSpawnExt as _, channel};
46
47/// Indicates the type of failure of the server.
48#[derive(Debug, thiserror::Error)]
49pub enum ServiceError {
50    /// GeoIp construction failed.
51    #[error("could not load the Geoip Db")]
52    GeoIp,
53
54    /// Initializing the Kafka producer failed.
55    #[cfg(feature = "processing")]
56    #[error("could not initialize kafka producer: {0}")]
57    Kafka(String),
58
59    /// Initializing the Redis client failed.
60    #[cfg(feature = "processing")]
61    #[error("could not initialize redis client during startup")]
62    Redis,
63}
64
65#[derive(Clone, Debug)]
66pub struct Registry {
67    pub health_check: Addr<HealthCheck>,
68    pub outcome_producer: Addr<OutcomeProducer>,
69    pub outcome_aggregator: Addr<TrackOutcome>,
70    pub processor: Addr<EnvelopeProcessor>,
71    pub test_store: Addr<TestStore>,
72    pub relay_cache: Addr<RelayCache>,
73    pub global_config: Addr<GlobalConfigManager>,
74    pub upstream_relay: Addr<UpstreamRelay>,
75    pub envelope_buffer: PartitionedEnvelopeBuffer,
76    pub project_cache_handle: ProjectCacheHandle,
77    pub autoscaling: Addr<AutoscalingMetrics>,
78}
79
80/// Constructs a Tokio [`relay_system::Runtime`] configured for running [services](relay_system::Service).
81pub fn create_runtime(name: &'static str, threads: usize) -> relay_system::Runtime {
82    relay_system::Runtime::builder(name)
83        .worker_threads(threads)
84        // Relay uses `spawn_blocking` only for Redis connections within the project
85        // cache, those should never exceed 100 concurrent connections
86        // (limited by connection pool).
87        //
88        // Relay also does not use other blocking operations from Tokio which require
89        // this pool, no usage of `tokio::fs` and `tokio::io::{Stdin, Stdout, Stderr}`.
90        //
91        // We limit the maximum amount of threads here, we've seen that Tokio
92        // expands this pool very very aggressively and basically never shrinks it
93        // which leads to a massive resource waste.
94        .max_blocking_threads(150)
95        // We also lower down the default (10s) keep alive timeout for blocking
96        // threads to encourage the runtime to not keep too many idle blocking threads
97        // around.
98        .thread_keep_alive(Duration::from_secs(1))
99        .build()
100}
101
102fn create_processor_pool(config: &Config) -> Result<EnvelopeProcessorServicePool> {
103    // Adjust thread count for small cpu counts to not have too many idle cores
104    // and distribute workload better.
105    let thread_count = match config.cpu_concurrency() {
106        conc @ 0..=2 => conc.max(1),
107        conc @ 3..=4 => conc - 1,
108        conc => conc - 2,
109    };
110    relay_log::info!("starting {thread_count} envelope processing workers");
111
112    let pool = crate::utils::ThreadPoolBuilder::new("processor", tokio::runtime::Handle::current())
113        .num_threads(thread_count)
114        .max_concurrency(config.pool_concurrency())
115        .thread_kind(ThreadKind::Worker)
116        .build()?;
117
118    Ok(pool)
119}
120
121#[cfg(feature = "processing")]
122fn create_store_pool(config: &Config) -> Result<StoreServicePool> {
123    // Spawn a store worker for every 12 threads in the processor pool.
124    // This ratio was found empirically and may need adjustments in the future.
125    //
126    // Ideally in the future the store will be single threaded again, after we move
127    // all the heavy processing (de- and re-serialization) into the processor.
128    let thread_count = config.cpu_concurrency().div_ceil(12);
129    relay_log::info!("starting {thread_count} store workers");
130
131    let pool = crate::utils::ThreadPoolBuilder::new("store", tokio::runtime::Handle::current())
132        .num_threads(thread_count)
133        .max_concurrency(config.pool_concurrency())
134        .build()?;
135
136    Ok(pool)
137}
138
139#[derive(Debug)]
140struct StateInner {
141    config: Arc<Config>,
142    memory_checker: MemoryChecker,
143    registry: Registry,
144}
145
146/// Server state.
147#[derive(Clone, Debug)]
148pub struct ServiceState {
149    inner: Arc<StateInner>,
150}
151
152impl ServiceState {
153    /// Starts all services and returns addresses to all of them.
154    pub async fn start(
155        handle: &relay_system::Handle,
156        services: &dyn ServiceSpawn,
157        config: Arc<Config>,
158    ) -> Result<Self> {
159        let upstream_relay = services.start(UpstreamRelayService::new(config.clone()));
160        let test_store = services.start(TestStoreService::new(config.clone()));
161
162        #[cfg(feature = "processing")]
163        let redis_clients = config
164            .redis()
165            .filter(|_| config.processing_enabled())
166            .map(create_redis_clients)
167            .transpose()
168            .context(ServiceError::Redis)?;
169
170        // If we have Redis configured, we want to initialize all the scripts by loading them in
171        // the scripts cache if not present. Our custom ConnectionLike implementation relies on this
172        // initialization to work properly since it assumes that scripts are loaded across all Redis
173        // instances.
174        #[cfg(feature = "processing")]
175        if let Some(redis_clients) = &redis_clients {
176            initialize_redis_scripts_for_client(redis_clients)
177                .await
178                .context(ServiceError::Redis)?;
179        }
180
181        // We create an instance of `MemoryStat` which can be supplied composed with any arbitrary
182        // configuration object down the line.
183        let memory_stat = MemoryStat::new(config.memory_stat_refresh_frequency_ms());
184
185        // Create an address for the `EnvelopeProcessor`, which can be injected into the
186        // other services.
187        let (processor, processor_rx) = channel(EnvelopeProcessorService::name());
188        let outcome_producer = services.start(OutcomeProducerService::create(
189            config.clone(),
190            upstream_relay.clone(),
191            processor.clone(),
192        )?);
193        let outcome_aggregator =
194            services.start(OutcomeAggregator::new(&config, outcome_producer.clone()));
195
196        let (global_config, global_config_rx) =
197            GlobalConfigService::new(config.clone(), upstream_relay.clone());
198        let global_config_handle = global_config.handle();
199        // The global config service must start before dependant services are
200        // started. Messages like subscription requests to the global config
201        // service fail if the service is not running.
202        let global_config = services.start(global_config);
203
204        let project_source = ProjectSource::start_in(
205            services,
206            Arc::clone(&config),
207            upstream_relay.clone(),
208            #[cfg(feature = "processing")]
209            redis_clients.clone(),
210        )
211        .await;
212        let project_cache_handle =
213            ProjectCacheService::new(Arc::clone(&config), project_source).start_in(services);
214
215        let aggregator = RouterService::new(
216            handle.clone(),
217            config.default_aggregator_config().clone(),
218            config.secondary_aggregator_configs().clone(),
219            Some(processor.clone().recipient()),
220            project_cache_handle.clone(),
221        );
222        let aggregator_handle = aggregator.handle();
223        let aggregator = services.start(aggregator);
224
225        let metric_stats = MetricStats::new(
226            config.clone(),
227            global_config_handle.clone(),
228            aggregator.clone(),
229        );
230
231        let metric_outcomes = MetricOutcomes::new(metric_stats, outcome_aggregator.clone());
232
233        #[cfg(feature = "processing")]
234        let store_pool = create_store_pool(&config)?;
235        #[cfg(feature = "processing")]
236        let store = config
237            .processing_enabled()
238            .then(|| {
239                StoreService::create(
240                    store_pool.clone(),
241                    config.clone(),
242                    global_config_handle.clone(),
243                    outcome_aggregator.clone(),
244                    metric_outcomes.clone(),
245                )
246                .map(|s| services.start(s))
247            })
248            .transpose()?;
249
250        let cogs = CogsService::new(&config);
251        let cogs = Cogs::new(CogsServiceRecorder::new(&config, services.start(cogs)));
252
253        #[cfg(feature = "processing")]
254        let global_rate_limits = redis_clients
255            .as_ref()
256            .map(|p| services.start(GlobalRateLimitsService::new(p.quotas.clone())));
257
258        let processor_pool = create_processor_pool(&config)?;
259        services.start_with(
260            EnvelopeProcessorService::new(
261                processor_pool.clone(),
262                config.clone(),
263                global_config_handle,
264                project_cache_handle.clone(),
265                cogs,
266                #[cfg(feature = "processing")]
267                redis_clients.clone(),
268                processor::Addrs {
269                    outcome_aggregator: outcome_aggregator.clone(),
270                    upstream_relay: upstream_relay.clone(),
271                    test_store: test_store.clone(),
272                    #[cfg(feature = "processing")]
273                    store_forwarder: store.clone(),
274                    aggregator: aggregator.clone(),
275                    #[cfg(feature = "processing")]
276                    global_rate_limits,
277                },
278                metric_outcomes.clone(),
279            ),
280            processor_rx,
281        );
282
283        let envelope_buffer = PartitionedEnvelopeBuffer::create(
284            config.spool_partitions(),
285            config.clone(),
286            memory_stat.clone(),
287            global_config_rx.clone(),
288            project_cache_handle.clone(),
289            processor.clone(),
290            outcome_aggregator.clone(),
291            test_store.clone(),
292            services,
293        );
294
295        let health_check = services.start(HealthCheckService::new(
296            config.clone(),
297            MemoryChecker::new(memory_stat.clone(), config.clone()),
298            aggregator_handle,
299            upstream_relay.clone(),
300            envelope_buffer.clone(),
301        ));
302
303        let autoscaling = services.start(AutoscalingMetricService::new(
304            memory_stat.clone(),
305            envelope_buffer.clone(),
306            handle.clone(),
307            processor_pool.clone(),
308        ));
309
310        services.start(RelayStats::new(
311            config.clone(),
312            handle.clone(),
313            upstream_relay.clone(),
314            #[cfg(feature = "processing")]
315            redis_clients.clone(),
316            processor_pool,
317            #[cfg(feature = "processing")]
318            store_pool,
319        ));
320
321        let relay_cache = services.start(RelayCacheService::new(
322            config.clone(),
323            upstream_relay.clone(),
324        ));
325
326        let registry = Registry {
327            processor,
328            health_check,
329            outcome_producer,
330            outcome_aggregator,
331            test_store,
332            relay_cache,
333            global_config,
334            project_cache_handle,
335            upstream_relay,
336            envelope_buffer,
337            autoscaling,
338        };
339
340        let state = StateInner {
341            config: config.clone(),
342            memory_checker: MemoryChecker::new(memory_stat, config.clone()),
343            registry,
344        };
345
346        Ok(ServiceState {
347            inner: Arc::new(state),
348        })
349    }
350
351    /// Returns a reference to the Relay configuration.
352    pub fn config(&self) -> &Config {
353        &self.inner.config
354    }
355
356    /// Returns a reference to the [`MemoryChecker`] which is a [`Config`] aware wrapper on the
357    /// [`MemoryStat`] which gives utility methods to determine whether memory usage is above
358    /// thresholds set in the [`Config`].
359    pub fn memory_checker(&self) -> &MemoryChecker {
360        &self.inner.memory_checker
361    }
362
363    pub fn autoscaling(&self) -> &Addr<AutoscalingMetrics> {
364        &self.inner.registry.autoscaling
365    }
366
367    /// Returns the V2 envelope buffer, if present.
368    pub fn envelope_buffer(&self, project_key_pair: ProjectKeyPair) -> &ObservableEnvelopeBuffer {
369        self.inner.registry.envelope_buffer.buffer(project_key_pair)
370    }
371
372    /// Returns a [`ProjectCacheHandle`].
373    pub fn project_cache_handle(&self) -> &ProjectCacheHandle {
374        &self.inner.registry.project_cache_handle
375    }
376
377    /// Returns the address of the [`RelayCache`] service.
378    pub fn relay_cache(&self) -> &Addr<RelayCache> {
379        &self.inner.registry.relay_cache
380    }
381
382    /// Returns the address of the [`HealthCheck`] service.
383    pub fn health_check(&self) -> &Addr<HealthCheck> {
384        &self.inner.registry.health_check
385    }
386
387    /// Returns the address of the [`OutcomeProducer`] service.
388    pub fn outcome_producer(&self) -> &Addr<OutcomeProducer> {
389        &self.inner.registry.outcome_producer
390    }
391
392    /// Returns the address of the [`OutcomeProducer`] service.
393    pub fn test_store(&self) -> &Addr<TestStore> {
394        &self.inner.registry.test_store
395    }
396
397    /// Returns the address of the [`OutcomeProducer`] service.
398    pub fn upstream_relay(&self) -> &Addr<UpstreamRelay> {
399        &self.inner.registry.upstream_relay
400    }
401
402    /// Returns the address of the [`OutcomeProducer`] service.
403    pub fn processor(&self) -> &Addr<EnvelopeProcessor> {
404        &self.inner.registry.processor
405    }
406
407    /// Returns the address of the [`GlobalConfigService`] service.
408    pub fn global_config(&self) -> &Addr<GlobalConfigManager> {
409        &self.inner.registry.global_config
410    }
411
412    /// Returns the address of the [`OutcomeProducer`] service.
413    pub fn outcome_aggregator(&self) -> &Addr<TrackOutcome> {
414        &self.inner.registry.outcome_aggregator
415    }
416}
417
418/// Creates Redis clients from the given `configs`.
419///
420/// If `configs` is [`Unified`](RedisConfigsRef::Unified), one client is created and then cloned
421/// for project configs, cardinality, and quotas, meaning that they really use the same client.
422///
423/// If it is [`Individual`](RedisConfigsRef::Individual), an actual separate client
424/// is created for each use case.
425#[cfg(feature = "processing")]
426pub fn create_redis_clients(configs: RedisConfigsRef<'_>) -> Result<RedisClients, RedisError> {
427    match configs {
428        RedisConfigsRef::Unified(unified) => {
429            let client = create_async_redis_client(&unified)?;
430
431            Ok(RedisClients {
432                project_configs: client.clone(),
433                cardinality: client.clone(),
434                quotas: client,
435            })
436        }
437        RedisConfigsRef::Individual {
438            project_configs,
439            cardinality,
440            quotas,
441        } => {
442            let project_configs = create_async_redis_client(&project_configs)?;
443            let cardinality = create_async_redis_client(&cardinality)?;
444            let quotas = create_async_redis_client(&quotas)?;
445
446            Ok(RedisClients {
447                project_configs,
448                cardinality,
449                quotas,
450            })
451        }
452    }
453}
454
455#[cfg(feature = "processing")]
456fn create_async_redis_client(config: &RedisConfigRef<'_>) -> Result<AsyncRedisClient, RedisError> {
457    match config {
458        RedisConfigRef::Cluster {
459            cluster_nodes,
460            options,
461        } => AsyncRedisClient::cluster(cluster_nodes.iter().map(|s| s.as_str()), options),
462        RedisConfigRef::Single { server, options } => AsyncRedisClient::single(server, options),
463    }
464}
465
466#[cfg(feature = "processing")]
467async fn initialize_redis_scripts_for_client(
468    redis_clients: &RedisClients,
469) -> Result<(), RedisError> {
470    let scripts = RedisScripts::all();
471
472    let clients = [&redis_clients.cardinality, &redis_clients.quotas];
473    for client in clients {
474        initialize_redis_scripts(client, &scripts).await?;
475    }
476
477    Ok(())
478}
479
480#[cfg(feature = "processing")]
481async fn initialize_redis_scripts(
482    client: &AsyncRedisClient,
483    scripts: &[&Script; 3],
484) -> Result<(), RedisError> {
485    let mut connection = client.get_connection().await?;
486
487    for script in scripts {
488        // We load on all instances without checking if the script is already in cache because of a
489        // limitation in the connection implementation.
490        script
491            .prepare_invoke()
492            .load_async(&mut connection)
493            .await
494            .map_err(RedisError::Redis)?;
495    }
496
497    Ok(())
498}
499
500impl FromRequestParts<Self> for ServiceState {
501    type Rejection = Infallible;
502
503    async fn from_request_parts(_: &mut Parts, state: &Self) -> Result<Self, Self::Rejection> {
504        Ok(state.clone())
505    }
506}