relay_server/
service.rs

1use std::convert::Infallible;
2use std::sync::Arc;
3use std::time::Duration;
4
5use crate::metrics::MetricOutcomes;
6use crate::services::autoscaling::{AutoscalingMetricService, AutoscalingMetrics};
7use crate::services::buffer::{
8    ObservableEnvelopeBuffer, PartitionedEnvelopeBuffer, ProjectKeyPair,
9};
10use crate::services::cogs::{CogsService, CogsServiceRecorder};
11use crate::services::global_config::{GlobalConfigManager, GlobalConfigService};
12use crate::services::health_check::{HealthCheck, HealthCheckService};
13use crate::services::metrics::RouterService;
14#[cfg(feature = "processing")]
15use crate::services::objectstore::Objectstore;
16#[cfg(feature = "processing")]
17use crate::services::objectstore::ObjectstoreService;
18use crate::services::outcome::{OutcomeProducer, OutcomeProducerService, TrackOutcome};
19use crate::services::outcome_aggregator::OutcomeAggregator;
20use crate::services::processor::{
21    self, EnvelopeProcessor, EnvelopeProcessorService, EnvelopeProcessorServicePool,
22};
23use crate::services::projects::cache::{ProjectCacheHandle, ProjectCacheService};
24use crate::services::projects::source::ProjectSource;
25use crate::services::proxy_processor::{ProxyAddrs, ProxyProcessorService};
26use crate::services::relays::{RelayCache, RelayCacheService};
27use crate::services::stats::RelayStats;
28#[cfg(feature = "processing")]
29use crate::services::store::{StoreService, StoreServicePool};
30use crate::services::upload::{self, Upload};
31use crate::services::upstream::{UpstreamRelay, UpstreamRelayService};
32use crate::utils::{MemoryChecker, MemoryStat, ThreadKind};
33#[cfg(feature = "processing")]
34use anyhow::Context;
35use anyhow::Result;
36use axum::extract::FromRequestParts;
37use axum::http::request::Parts;
38use relay_cogs::Cogs;
39use relay_config::Config;
40#[cfg(feature = "processing")]
41use relay_config::{RedisConfigRef, RedisConfigsRef};
42#[cfg(feature = "processing")]
43use relay_redis::AsyncRedisClient;
44#[cfg(feature = "processing")]
45use relay_redis::redis::Script;
46#[cfg(feature = "processing")]
47use relay_redis::{RedisClients, RedisError, RedisScripts};
48#[cfg(feature = "processing")]
49use relay_system::ConcurrentService;
50use relay_system::{Addr, Service, ServiceSpawn, ServiceSpawnExt as _, channel};
51
52/// Indicates the type of failure of the server.
53#[derive(Debug, thiserror::Error)]
54pub enum ServiceError {
55    /// GeoIp construction failed.
56    #[error("could not load the Geoip Db")]
57    GeoIp,
58
59    /// Initializing the Kafka producer failed.
60    #[cfg(feature = "processing")]
61    #[error("could not initialize kafka producer: {0}")]
62    Kafka(String),
63
64    /// Initializing the Redis client failed.
65    #[cfg(feature = "processing")]
66    #[error("could not initialize redis client during startup")]
67    Redis,
68}
69
70#[derive(Clone, Debug)]
71pub struct Registry {
72    pub health_check: Addr<HealthCheck>,
73    pub outcome_producer: Addr<OutcomeProducer>,
74    pub outcome_aggregator: Addr<TrackOutcome>,
75    pub processor: Addr<EnvelopeProcessor>,
76    pub relay_cache: Addr<RelayCache>,
77    pub global_config: Addr<GlobalConfigManager>,
78    pub upstream_relay: Addr<UpstreamRelay>,
79    pub envelope_buffer: PartitionedEnvelopeBuffer,
80    pub project_cache_handle: ProjectCacheHandle,
81    pub autoscaling: Option<Addr<AutoscalingMetrics>>,
82    #[cfg(feature = "processing")]
83    pub objectstore: Option<Addr<Objectstore>>,
84    pub upload: Addr<Upload>,
85}
86
87/// Constructs a Tokio [`relay_system::Runtime`] configured for running [services](relay_system::Service).
88pub fn create_runtime(name: &'static str, threads: usize) -> relay_system::Runtime {
89    relay_system::Runtime::builder(name)
90        .worker_threads(threads)
91        // Relay uses `spawn_blocking` only for Redis connections within the project
92        // cache, those should never exceed 100 concurrent connections
93        // (limited by connection pool).
94        //
95        // Relay also does not use other blocking operations from Tokio which require
96        // this pool, no usage of `tokio::fs` and `tokio::io::{Stdin, Stdout, Stderr}`.
97        //
98        // We limit the maximum amount of threads here, we've seen that Tokio
99        // expands this pool very very aggressively and basically never shrinks it
100        // which leads to a massive resource waste.
101        .max_blocking_threads(150)
102        // We also lower down the default (10s) keep alive timeout for blocking
103        // threads to encourage the runtime to not keep too many idle blocking threads
104        // around.
105        .thread_keep_alive(Duration::from_secs(1))
106        .build()
107}
108
109fn create_processor_pool(config: &Config) -> Result<EnvelopeProcessorServicePool> {
110    // Adjust thread count for small cpu counts to not have too many idle cores
111    // and distribute workload better.
112    let thread_count = match config.cpu_concurrency() {
113        conc @ 0..=2 => conc.max(1),
114        conc @ 3..=4 => conc - 1,
115        conc => conc - 2,
116    };
117    relay_log::info!("starting {thread_count} envelope processing workers");
118
119    let pool = crate::utils::ThreadPoolBuilder::new("processor", tokio::runtime::Handle::current())
120        .num_threads(thread_count)
121        .max_concurrency(config.pool_concurrency())
122        .thread_kind(ThreadKind::Worker)
123        .build()?;
124
125    Ok(pool)
126}
127
128#[cfg(feature = "processing")]
129fn create_store_pool(config: &Config) -> Result<StoreServicePool> {
130    // Spawn a store worker for every 12 threads in the processor pool.
131    // This ratio was found empirically and may need adjustments in the future.
132    //
133    // Ideally in the future the store will be single threaded again, after we move
134    // all the heavy processing (de- and re-serialization) into the processor.
135    let thread_count = config.cpu_concurrency().div_ceil(12);
136    relay_log::info!("starting {thread_count} store workers");
137
138    let pool = crate::utils::ThreadPoolBuilder::new("store", tokio::runtime::Handle::current())
139        .num_threads(thread_count)
140        .max_concurrency(config.pool_concurrency())
141        .build()?;
142
143    Ok(pool)
144}
145
146#[derive(Debug)]
147struct StateInner {
148    config: Arc<Config>,
149    memory_checker: MemoryChecker,
150    registry: Registry,
151}
152
153/// Server state.
154#[derive(Clone, Debug)]
155pub struct ServiceState {
156    inner: Arc<StateInner>,
157}
158
159impl ServiceState {
160    /// Starts all services and returns addresses to all of them.
161    pub async fn start(
162        handle: &relay_system::Handle,
163        services: &dyn ServiceSpawn,
164        config: Arc<Config>,
165    ) -> Result<Self> {
166        let upstream_relay = services.start(UpstreamRelayService::new(config.clone()));
167
168        #[cfg(feature = "processing")]
169        let redis_clients = config
170            .redis()
171            .filter(|_| config.processing_enabled())
172            .map(create_redis_clients)
173            .transpose()
174            .context(ServiceError::Redis)?;
175
176        // If we have Redis configured, we want to initialize all the scripts by loading them in
177        // the scripts cache if not present. Our custom ConnectionLike implementation relies on this
178        // initialization to work properly since it assumes that scripts are loaded across all Redis
179        // instances.
180        #[cfg(feature = "processing")]
181        if let Some(redis_clients) = &redis_clients {
182            initialize_redis_scripts_for_client(redis_clients)
183                .await
184                .context(ServiceError::Redis)?;
185        }
186
187        // We create an instance of `MemoryStat` which can be supplied composed with any arbitrary
188        // configuration object down the line.
189        let memory_stat = MemoryStat::new(config.memory_stat_refresh_frequency_ms());
190
191        // Create an address for the `EnvelopeProcessor`, which can be injected into the
192        // other services.
193        let (processor, processor_rx) = match config.relay_mode() {
194            relay_config::RelayMode::Proxy => channel(ProxyProcessorService::name()),
195            relay_config::RelayMode::Managed => channel(EnvelopeProcessorService::name()),
196        };
197
198        let outcome_producer = services.start(OutcomeProducerService::create(
199            config.clone(),
200            upstream_relay.clone(),
201            processor.clone(),
202        )?);
203        let outcome_aggregator =
204            services.start(OutcomeAggregator::new(&config, outcome_producer.clone()));
205
206        let (global_config, global_config_rx) =
207            GlobalConfigService::new(config.clone(), upstream_relay.clone());
208        let global_config_handle = global_config.handle();
209        // The global config service must start before dependant services are
210        // started. Messages like subscription requests to the global config
211        // service fail if the service is not running.
212        let global_config = services.start(global_config);
213
214        let project_source = ProjectSource::start_in(
215            services,
216            Arc::clone(&config),
217            upstream_relay.clone(),
218            #[cfg(feature = "processing")]
219            redis_clients.clone(),
220        )
221        .await;
222        let project_cache_handle =
223            ProjectCacheService::new(Arc::clone(&config), project_source).start_in(services);
224
225        let metric_outcomes = MetricOutcomes::new(outcome_aggregator.clone());
226
227        #[cfg(feature = "processing")]
228        let store_pool = create_store_pool(&config)?;
229        #[cfg(feature = "processing")]
230        let store = config
231            .processing_enabled()
232            .then(|| {
233                StoreService::create(
234                    store_pool.clone(),
235                    config.clone(),
236                    global_config_handle.clone(),
237                    outcome_aggregator.clone(),
238                    metric_outcomes.clone(),
239                )
240                .map(|s| services.start(s))
241            })
242            .transpose()?;
243
244        #[cfg(feature = "processing")]
245        let objectstore = ObjectstoreService::new(config.objectstore(), store.clone())?.map(|s| {
246            let concurrent = ConcurrentService::new(s)
247                .with_backlog_limit(config.objectstore().max_backlog)
248                .with_concurrency_limit(config.objectstore().max_concurrent_requests);
249            services.start(concurrent)
250        });
251
252        let envelope_buffer = PartitionedEnvelopeBuffer::create(
253            config.spool_partitions(),
254            config.clone(),
255            memory_stat.clone(),
256            global_config_rx.clone(),
257            project_cache_handle.clone(),
258            processor.clone(),
259            outcome_aggregator.clone(),
260            services,
261        );
262
263        let (processor_pool, aggregator_handle, autoscaling) = match config.relay_mode() {
264            relay_config::RelayMode::Proxy => {
265                services.start_with(
266                    ProxyProcessorService::new(
267                        config.clone(),
268                        project_cache_handle.clone(),
269                        ProxyAddrs {
270                            outcome_aggregator: outcome_aggregator.clone(),
271                            upstream_relay: upstream_relay.clone(),
272                        },
273                    ),
274                    processor_rx,
275                );
276                (None, None, None)
277            }
278            relay_config::RelayMode::Managed => {
279                let processor_pool = create_processor_pool(&config)?;
280
281                let aggregator = RouterService::new(
282                    handle.clone(),
283                    config.default_aggregator_config().clone(),
284                    config.secondary_aggregator_configs().clone(),
285                    Some(processor.clone().recipient()),
286                    project_cache_handle.clone(),
287                );
288                let aggregator_handle = aggregator.handle();
289                let aggregator = services.start(aggregator);
290
291                let cogs = CogsService::new(&config);
292                let cogs = Cogs::new(CogsServiceRecorder::new(&config, services.start(cogs)));
293
294                services.start_with(
295                    EnvelopeProcessorService::new(
296                        processor_pool.clone(),
297                        config.clone(),
298                        global_config_handle,
299                        project_cache_handle.clone(),
300                        cogs,
301                        #[cfg(feature = "processing")]
302                        redis_clients.clone(),
303                        processor::Addrs {
304                            outcome_aggregator: outcome_aggregator.clone(),
305                            upstream_relay: upstream_relay.clone(),
306                            #[cfg(feature = "processing")]
307                            objectstore: objectstore.clone(),
308                            #[cfg(feature = "processing")]
309                            store_forwarder: store,
310                            aggregator: aggregator.clone(),
311                        },
312                        metric_outcomes.clone(),
313                    ),
314                    processor_rx,
315                );
316
317                let autoscaling = services.start(AutoscalingMetricService::new(
318                    memory_stat.clone(),
319                    envelope_buffer.clone(),
320                    handle.clone(),
321                    processor_pool.clone(),
322                ));
323
324                (
325                    Some(processor_pool),
326                    Some(aggregator_handle),
327                    Some(autoscaling),
328                )
329            }
330        };
331
332        let health_check = services.start(HealthCheckService::new(
333            config.clone(),
334            MemoryChecker::new(memory_stat.clone(), config.clone()),
335            aggregator_handle,
336            upstream_relay.clone(),
337            envelope_buffer.clone(),
338        ));
339
340        services.start(RelayStats::new(
341            config.clone(),
342            handle.clone(),
343            upstream_relay.clone(),
344            #[cfg(feature = "processing")]
345            redis_clients.clone(),
346            processor_pool,
347            #[cfg(feature = "processing")]
348            store_pool,
349        ));
350
351        let relay_cache = services.start(RelayCacheService::new(
352            config.clone(),
353            upstream_relay.clone(),
354        ));
355
356        let upload = services.start(upload::create_service(
357            &config,
358            &upstream_relay,
359            #[cfg(feature = "processing")]
360            &objectstore,
361        ));
362
363        let registry = Registry {
364            processor,
365            health_check,
366            outcome_producer,
367            outcome_aggregator,
368            relay_cache,
369            global_config,
370            project_cache_handle,
371            upstream_relay,
372            envelope_buffer,
373            autoscaling,
374            #[cfg(feature = "processing")]
375            objectstore,
376            upload,
377        };
378
379        let state = StateInner {
380            config: config.clone(),
381            memory_checker: MemoryChecker::new(memory_stat, config.clone()),
382            registry,
383        };
384
385        Ok(ServiceState {
386            inner: Arc::new(state),
387        })
388    }
389
390    /// Returns a reference to the Relay configuration.
391    pub fn config(&self) -> &Config {
392        &self.inner.config
393    }
394
395    /// Returns a reference to the [`MemoryChecker`] which is a [`Config`] aware wrapper on the
396    /// [`MemoryStat`] which gives utility methods to determine whether memory usage is above
397    /// thresholds set in the [`Config`].
398    pub fn memory_checker(&self) -> &MemoryChecker {
399        &self.inner.memory_checker
400    }
401
402    pub fn autoscaling(&self) -> Option<&Addr<AutoscalingMetrics>> {
403        self.inner.registry.autoscaling.as_ref()
404    }
405
406    /// Returns the V2 envelope buffer, if present.
407    pub fn envelope_buffer(&self, project_key_pair: ProjectKeyPair) -> &ObservableEnvelopeBuffer {
408        self.inner.registry.envelope_buffer.buffer(project_key_pair)
409    }
410
411    /// Returns a [`ProjectCacheHandle`].
412    pub fn project_cache_handle(&self) -> &ProjectCacheHandle {
413        &self.inner.registry.project_cache_handle
414    }
415
416    /// Returns the address of the [`RelayCache`] service.
417    pub fn relay_cache(&self) -> &Addr<RelayCache> {
418        &self.inner.registry.relay_cache
419    }
420
421    /// Returns the address of the [`HealthCheck`] service.
422    pub fn health_check(&self) -> &Addr<HealthCheck> {
423        &self.inner.registry.health_check
424    }
425
426    /// Returns the address of the [`OutcomeProducer`] service.
427    pub fn outcome_producer(&self) -> &Addr<OutcomeProducer> {
428        &self.inner.registry.outcome_producer
429    }
430
431    /// Returns the address of the [`OutcomeProducer`] service.
432    pub fn upstream_relay(&self) -> &Addr<UpstreamRelay> {
433        &self.inner.registry.upstream_relay
434    }
435
436    /// Returns the address of the [`OutcomeProducer`] service.
437    pub fn processor(&self) -> &Addr<EnvelopeProcessor> {
438        &self.inner.registry.processor
439    }
440
441    /// Returns the address of the [`GlobalConfigService`] service.
442    pub fn global_config(&self) -> &Addr<GlobalConfigManager> {
443        &self.inner.registry.global_config
444    }
445
446    /// Returns the address of the [`OutcomeProducer`] service.
447    pub fn outcome_aggregator(&self) -> &Addr<TrackOutcome> {
448        &self.inner.registry.outcome_aggregator
449    }
450
451    #[cfg(feature = "processing")]
452    /// Returns the address of the [`Objectstore`] service.
453    pub fn objectstore(&self) -> Option<&Addr<Objectstore>> {
454        self.inner.registry.objectstore.as_ref()
455    }
456
457    /// Returns the address of the [`Upload`] service.
458    pub fn upload(&self) -> &Addr<Upload> {
459        &self.inner.registry.upload
460    }
461}
462
463/// Creates Redis clients from the given `configs`.
464///
465/// If `configs` is [`Unified`](RedisConfigsRef::Unified), one client is created and then cloned
466/// for project configs, cardinality, and quotas, meaning that they really use the same client.
467///
468/// If it is [`Individual`](RedisConfigsRef::Individual), an actual separate client
469/// is created for each use case.
470#[cfg(feature = "processing")]
471pub fn create_redis_clients(configs: RedisConfigsRef<'_>) -> Result<RedisClients, RedisError> {
472    const CARDINALITY_REDIS_CLIENT: &str = "cardinality";
473    const PROJECT_CONFIG_REDIS_CLIENT: &str = "projectconfig";
474    const QUOTA_REDIS_CLIENT: &str = "quotas";
475    const UNIFIED_REDIS_CLIENT: &str = "unified";
476
477    match configs {
478        RedisConfigsRef::Unified(unified) => {
479            let client = create_async_redis_client(UNIFIED_REDIS_CLIENT, &unified)?;
480
481            Ok(RedisClients {
482                project_configs: client.clone(),
483                cardinality: client.clone(),
484                quotas: client,
485            })
486        }
487        RedisConfigsRef::Individual {
488            project_configs,
489            cardinality,
490            quotas,
491        } => {
492            let project_configs =
493                create_async_redis_client(PROJECT_CONFIG_REDIS_CLIENT, &project_configs)?;
494            let cardinality = create_async_redis_client(CARDINALITY_REDIS_CLIENT, &cardinality)?;
495            let quotas = create_async_redis_client(QUOTA_REDIS_CLIENT, &quotas)?;
496
497            Ok(RedisClients {
498                project_configs,
499                cardinality,
500                quotas,
501            })
502        }
503    }
504}
505
506#[cfg(feature = "processing")]
507fn create_async_redis_client(
508    name: &'static str,
509    config: &RedisConfigRef<'_>,
510) -> Result<AsyncRedisClient, RedisError> {
511    match config {
512        RedisConfigRef::Cluster {
513            cluster_nodes,
514            options,
515        } => AsyncRedisClient::cluster(name, cluster_nodes.iter().map(|s| s.as_str()), options),
516        RedisConfigRef::Single { server, options } => {
517            AsyncRedisClient::single(name, server, options)
518        }
519    }
520}
521
522#[cfg(feature = "processing")]
523async fn initialize_redis_scripts_for_client(
524    redis_clients: &RedisClients,
525) -> Result<(), RedisError> {
526    let scripts = RedisScripts::all();
527
528    let clients = [&redis_clients.cardinality, &redis_clients.quotas];
529    for client in clients {
530        initialize_redis_scripts(client, &scripts).await?;
531    }
532
533    Ok(())
534}
535
536#[cfg(feature = "processing")]
537async fn initialize_redis_scripts(
538    client: &AsyncRedisClient,
539    scripts: &[&Script; 3],
540) -> Result<(), RedisError> {
541    let mut connection = client.get_connection().await?;
542
543    for script in scripts {
544        // We load on all instances without checking if the script is already in cache because of a
545        // limitation in the connection implementation.
546        script
547            .prepare_invoke()
548            .load_async(&mut connection)
549            .await
550            .map_err(RedisError::Redis)?;
551    }
552
553    Ok(())
554}
555
556impl FromRequestParts<Self> for ServiceState {
557    type Rejection = Infallible;
558
559    async fn from_request_parts(_: &mut Parts, state: &Self) -> Result<Self, Self::Rejection> {
560        Ok(state.clone())
561    }
562}