Skip to main content

relay_server/
service.rs

1use std::convert::Infallible;
2use std::sync::Arc;
3use std::time::Duration;
4
5use crate::metrics::MetricOutcomes;
6use crate::services::autoscaling::{AutoscalingMetricService, AutoscalingMetrics};
7use crate::services::buffer::{
8    ObservableEnvelopeBuffer, PartitionedEnvelopeBuffer, ProjectKeyPair,
9};
10use crate::services::cogs::{CogsService, CogsServiceRecorder};
11use crate::services::global_config::{
12    GlobalConfigHandle, GlobalConfigManager, GlobalConfigService,
13};
14use crate::services::health_check::{HealthCheck, HealthCheckService};
15use crate::services::metrics::RouterService;
16#[cfg(feature = "processing")]
17use crate::services::objectstore::Objectstore;
18#[cfg(feature = "processing")]
19use crate::services::objectstore::ObjectstoreService;
20use crate::services::outcome::{OutcomeProducer, OutcomeProducerService, TrackOutcome};
21use crate::services::outcome_aggregator::OutcomeAggregator;
22use crate::services::processor::{
23    self, EnvelopeProcessor, EnvelopeProcessorService, EnvelopeProcessorServicePool,
24};
25use crate::services::projects::cache::{ProjectCacheHandle, ProjectCacheService};
26use crate::services::projects::source::ProjectSource;
27use crate::services::proxy_processor::{ProxyAddrs, ProxyProcessorService};
28use crate::services::relays::{RelayCache, RelayCacheService};
29use crate::services::stats::RelayStats;
30#[cfg(feature = "processing")]
31use crate::services::store::{StoreService, StoreServicePool};
32use crate::services::upload::{self, Upload};
33use crate::services::upstream::{UpstreamRelay, UpstreamRelayService};
34use crate::utils::{MemoryChecker, MemoryStat, ThreadKind};
35#[cfg(feature = "processing")]
36use anyhow::Context;
37use anyhow::Result;
38use axum::extract::FromRequestParts;
39use axum::http::request::Parts;
40use relay_cogs::Cogs;
41use relay_config::Config;
42#[cfg(feature = "processing")]
43use relay_config::{RedisConfigRef, RedisConfigsRef};
44#[cfg(feature = "processing")]
45use relay_redis::AsyncRedisClient;
46#[cfg(feature = "processing")]
47use relay_redis::redis::Script;
48#[cfg(feature = "processing")]
49use relay_redis::{RedisClients, RedisError, RedisScripts};
50#[cfg(feature = "processing")]
51use relay_system::ConcurrentService;
52use relay_system::{Addr, Service, ServiceSpawn, ServiceSpawnExt as _, channel};
53
54/// Indicates the type of failure of the server.
55#[derive(Debug, thiserror::Error)]
56pub enum ServiceError {
57    /// GeoIp construction failed.
58    #[error("could not load the Geoip Db")]
59    GeoIp,
60
61    /// Initializing the Kafka producer failed.
62    #[cfg(feature = "processing")]
63    #[error("could not initialize kafka producer: {0}")]
64    Kafka(String),
65
66    /// Initializing the Redis client failed.
67    #[cfg(feature = "processing")]
68    #[error("could not initialize redis client during startup")]
69    Redis,
70}
71
72#[derive(Clone, Debug)]
73pub struct Registry {
74    pub health_check: Addr<HealthCheck>,
75    pub outcome_producer: Addr<OutcomeProducer>,
76    pub outcome_aggregator: Addr<TrackOutcome>,
77    pub processor: Addr<EnvelopeProcessor>,
78    pub relay_cache: Addr<RelayCache>,
79    pub global_config: Addr<GlobalConfigManager>,
80    pub upstream_relay: Addr<UpstreamRelay>,
81    pub envelope_buffer: Arc<PartitionedEnvelopeBuffer>,
82    pub project_cache_handle: ProjectCacheHandle,
83    pub autoscaling: Option<Addr<AutoscalingMetrics>>,
84    #[cfg(feature = "processing")]
85    pub objectstore: Option<Addr<Objectstore>>,
86    pub upload: Addr<Upload>,
87    pub global_config_handle: GlobalConfigHandle,
88}
89
90/// Constructs a Tokio [`relay_system::Runtime`] configured for running [services](relay_system::Service).
91pub fn create_runtime(name: &'static str, threads: usize) -> relay_system::Runtime {
92    relay_system::Runtime::builder(name)
93        .worker_threads(threads)
94        // Relay uses `spawn_blocking` only for Redis connections within the project
95        // cache, those should never exceed 100 concurrent connections
96        // (limited by connection pool).
97        //
98        // Relay also does not use other blocking operations from Tokio which require
99        // this pool, no usage of `tokio::fs` and `tokio::io::{Stdin, Stdout, Stderr}`.
100        //
101        // We limit the maximum amount of threads here, we've seen that Tokio
102        // expands this pool very very aggressively and basically never shrinks it
103        // which leads to a massive resource waste.
104        .max_blocking_threads(150)
105        // We also lower down the default (10s) keep alive timeout for blocking
106        // threads to encourage the runtime to not keep too many idle blocking threads
107        // around.
108        .thread_keep_alive(Duration::from_secs(1))
109        .build()
110}
111
112fn create_processor_pool(config: &Config) -> Result<EnvelopeProcessorServicePool> {
113    // Adjust thread count for small cpu counts to not have too many idle cores
114    // and distribute workload better.
115    let thread_count = match config.cpu_concurrency() {
116        conc @ 0..=2 => conc.max(1),
117        conc @ 3..=4 => conc - 1,
118        conc => conc - 2,
119    };
120    relay_log::info!("starting {thread_count} envelope processing workers");
121
122    let pool = crate::utils::ThreadPoolBuilder::new("processor", tokio::runtime::Handle::current())
123        .num_threads(thread_count)
124        .max_concurrency(config.pool_concurrency())
125        .thread_kind(ThreadKind::Worker)
126        .build()?;
127
128    Ok(pool)
129}
130
131#[cfg(feature = "processing")]
132fn create_store_pool(config: &Config) -> Result<StoreServicePool> {
133    // Spawn a store worker for every 12 threads in the processor pool.
134    // This ratio was found empirically and may need adjustments in the future.
135    //
136    // Ideally in the future the store will be single threaded again, after we move
137    // all the heavy processing (de- and re-serialization) into the processor.
138    let thread_count = config.cpu_concurrency().div_ceil(12);
139    relay_log::info!("starting {thread_count} store workers");
140
141    let pool = crate::utils::ThreadPoolBuilder::new("store", tokio::runtime::Handle::current())
142        .num_threads(thread_count)
143        .max_concurrency(config.pool_concurrency())
144        .build()?;
145
146    Ok(pool)
147}
148
149#[derive(Debug)]
150struct StateInner {
151    config: Arc<Config>,
152    memory_checker: MemoryChecker,
153    registry: Registry,
154}
155
156/// Server state.
157#[derive(Clone, Debug)]
158pub struct ServiceState {
159    inner: Arc<StateInner>,
160}
161
162impl ServiceState {
163    /// Starts all services and returns addresses to all of them.
164    pub async fn start(
165        handle: &relay_system::Handle,
166        services: &dyn ServiceSpawn,
167        config: Arc<Config>,
168    ) -> Result<Self> {
169        let upstream_relay = services.start(UpstreamRelayService::new(config.clone()));
170
171        #[cfg(feature = "processing")]
172        let redis_clients = config
173            .redis()
174            .filter(|_| config.processing_enabled())
175            .map(create_redis_clients)
176            .transpose()
177            .context(ServiceError::Redis)?;
178
179        // If we have Redis configured, we want to initialize all the scripts by loading them in
180        // the scripts cache if not present. Our custom ConnectionLike implementation relies on this
181        // initialization to work properly since it assumes that scripts are loaded across all Redis
182        // instances.
183        #[cfg(feature = "processing")]
184        if let Some(redis_clients) = &redis_clients {
185            initialize_redis_scripts_for_client(redis_clients)
186                .await
187                .context(ServiceError::Redis)?;
188        }
189
190        // We create an instance of `MemoryStat` which can be supplied composed with any arbitrary
191        // configuration object down the line.
192        let memory_stat = MemoryStat::new(config.memory_stat_refresh_frequency_ms());
193
194        // Create an address for the `EnvelopeProcessor`, which can be injected into the
195        // other services.
196        let (processor, processor_rx) = match config.relay_mode() {
197            relay_config::RelayMode::Proxy => channel(ProxyProcessorService::name()),
198            relay_config::RelayMode::Managed => channel(EnvelopeProcessorService::name()),
199        };
200
201        let outcome_producer = services.start(OutcomeProducerService::create(
202            config.clone(),
203            upstream_relay.clone(),
204            processor.clone(),
205        )?);
206        let outcome_aggregator =
207            services.start(OutcomeAggregator::new(&config, outcome_producer.clone()));
208
209        let (global_config, global_config_rx) =
210            GlobalConfigService::new(config.clone(), upstream_relay.clone());
211        let global_config_handle = global_config.handle();
212        // The global config service must start before dependant services are
213        // started. Messages like subscription requests to the global config
214        // service fail if the service is not running.
215        let global_config = services.start(global_config);
216
217        let project_source = ProjectSource::start_in(
218            services,
219            Arc::clone(&config),
220            upstream_relay.clone(),
221            #[cfg(feature = "processing")]
222            redis_clients.clone(),
223        )
224        .await;
225        let project_cache_handle =
226            ProjectCacheService::new(Arc::clone(&config), project_source).start_in(services);
227
228        let metric_outcomes = MetricOutcomes::new(outcome_aggregator.clone());
229
230        #[cfg(feature = "processing")]
231        let store_pool = create_store_pool(&config)?;
232        #[cfg(feature = "processing")]
233        let store = config
234            .processing_enabled()
235            .then(|| {
236                StoreService::create(
237                    store_pool.clone(),
238                    config.clone(),
239                    global_config_handle.clone(),
240                    outcome_aggregator.clone(),
241                    metric_outcomes.clone(),
242                )
243                .map(|s| services.start(s))
244            })
245            .transpose()?;
246
247        #[cfg(feature = "processing")]
248        let objectstore = ObjectstoreService::new(config.objectstore(), store.clone())?.map(|s| {
249            let concurrent = ConcurrentService::new(s)
250                .with_backlog_limit(config.objectstore().max_backlog)
251                .with_concurrency_limit(config.objectstore().max_concurrent_requests);
252            services.start(concurrent)
253        });
254
255        let envelope_buffer = PartitionedEnvelopeBuffer::create(
256            config.spool_partitions(),
257            config.clone(),
258            memory_stat.clone(),
259            global_config_rx.clone(),
260            project_cache_handle.clone(),
261            processor.clone(),
262            outcome_aggregator.clone(),
263            services,
264        );
265
266        let (processor_pool, aggregator_handle, autoscaling) = match config.relay_mode() {
267            relay_config::RelayMode::Proxy => {
268                services.start_with(
269                    ProxyProcessorService::new(
270                        config.clone(),
271                        project_cache_handle.clone(),
272                        ProxyAddrs {
273                            outcome_aggregator: outcome_aggregator.clone(),
274                            upstream_relay: upstream_relay.clone(),
275                        },
276                    ),
277                    processor_rx,
278                );
279                (None, None, None)
280            }
281            relay_config::RelayMode::Managed => {
282                let processor_pool = create_processor_pool(&config)?;
283
284                let aggregator = RouterService::new(
285                    handle.clone(),
286                    config.default_aggregator_config().clone(),
287                    config.secondary_aggregator_configs().clone(),
288                    Some(processor.clone().recipient()),
289                    project_cache_handle.clone(),
290                );
291                let aggregator_handle = aggregator.handle();
292                let aggregator = services.start(aggregator);
293
294                let cogs = CogsService::new(&config);
295                let cogs = Cogs::new(CogsServiceRecorder::new(&config, services.start(cogs)));
296
297                services.start_with(
298                    EnvelopeProcessorService::new(
299                        processor_pool.clone(),
300                        config.clone(),
301                        global_config_handle.clone(),
302                        project_cache_handle.clone(),
303                        cogs,
304                        #[cfg(feature = "processing")]
305                        redis_clients.clone(),
306                        processor::Addrs {
307                            outcome_aggregator: outcome_aggregator.clone(),
308                            upstream_relay: upstream_relay.clone(),
309                            #[cfg(feature = "processing")]
310                            objectstore: objectstore.clone(),
311                            #[cfg(feature = "processing")]
312                            store_forwarder: store,
313                            aggregator: aggregator.clone(),
314                        },
315                        metric_outcomes.clone(),
316                    ),
317                    processor_rx,
318                );
319
320                let autoscaling = services.start(AutoscalingMetricService::new(
321                    memory_stat.clone(),
322                    envelope_buffer.clone(),
323                    handle.clone(),
324                    processor_pool.clone(),
325                ));
326
327                (
328                    Some(processor_pool),
329                    Some(aggregator_handle),
330                    Some(autoscaling),
331                )
332            }
333        };
334
335        let health_check = services.start(HealthCheckService::new(
336            config.clone(),
337            MemoryChecker::new(memory_stat.clone(), config.clone()),
338            aggregator_handle,
339            upstream_relay.clone(),
340            envelope_buffer.clone(),
341        ));
342
343        services.start(RelayStats::new(
344            config.clone(),
345            handle.clone(),
346            upstream_relay.clone(),
347            #[cfg(feature = "processing")]
348            redis_clients.clone(),
349            processor_pool,
350            #[cfg(feature = "processing")]
351            store_pool,
352        ));
353
354        let relay_cache = services.start(RelayCacheService::new(
355            config.clone(),
356            upstream_relay.clone(),
357        ));
358
359        let upload = services.start(upload::create_service(
360            &config,
361            &upstream_relay,
362            #[cfg(feature = "processing")]
363            &objectstore,
364        ));
365
366        let registry = Registry {
367            processor,
368            health_check,
369            outcome_producer,
370            outcome_aggregator,
371            relay_cache,
372            global_config,
373            project_cache_handle,
374            upstream_relay,
375            envelope_buffer,
376            autoscaling,
377            #[cfg(feature = "processing")]
378            objectstore,
379            upload,
380            global_config_handle,
381        };
382
383        let state = StateInner {
384            config: config.clone(),
385            memory_checker: MemoryChecker::new(memory_stat, config.clone()),
386            registry,
387        };
388
389        Ok(ServiceState {
390            inner: Arc::new(state),
391        })
392    }
393
394    /// Returns a reference to the Relay configuration.
395    pub fn config(&self) -> &Config {
396        &self.inner.config
397    }
398
399    /// Returns a reference to the [`MemoryChecker`] which is a [`Config`] aware wrapper on the
400    /// [`MemoryStat`] which gives utility methods to determine whether memory usage is above
401    /// thresholds set in the [`Config`].
402    pub fn memory_checker(&self) -> &MemoryChecker {
403        &self.inner.memory_checker
404    }
405
406    pub fn autoscaling(&self) -> Option<&Addr<AutoscalingMetrics>> {
407        self.inner.registry.autoscaling.as_ref()
408    }
409
410    /// Returns the V2 envelope buffer, if present.
411    pub fn envelope_buffer(&self, project_key_pair: ProjectKeyPair) -> &ObservableEnvelopeBuffer {
412        self.inner.registry.envelope_buffer.buffer(project_key_pair)
413    }
414
415    /// Returns a [`ProjectCacheHandle`].
416    pub fn project_cache_handle(&self) -> &ProjectCacheHandle {
417        &self.inner.registry.project_cache_handle
418    }
419
420    /// Returns the address of the [`RelayCache`] service.
421    pub fn relay_cache(&self) -> &Addr<RelayCache> {
422        &self.inner.registry.relay_cache
423    }
424
425    /// Returns the address of the [`HealthCheck`] service.
426    pub fn health_check(&self) -> &Addr<HealthCheck> {
427        &self.inner.registry.health_check
428    }
429
430    /// Returns the address of the [`OutcomeProducer`] service.
431    pub fn outcome_producer(&self) -> &Addr<OutcomeProducer> {
432        &self.inner.registry.outcome_producer
433    }
434
435    /// Returns the address of the [`OutcomeProducer`] service.
436    pub fn upstream_relay(&self) -> &Addr<UpstreamRelay> {
437        &self.inner.registry.upstream_relay
438    }
439
440    /// Returns the address of the [`OutcomeProducer`] service.
441    pub fn processor(&self) -> &Addr<EnvelopeProcessor> {
442        &self.inner.registry.processor
443    }
444
445    /// Returns the address of the [`GlobalConfigService`] service.
446    pub fn global_config(&self) -> &Addr<GlobalConfigManager> {
447        &self.inner.registry.global_config
448    }
449
450    /// Returns the address of the [`OutcomeProducer`] service.
451    pub fn outcome_aggregator(&self) -> &Addr<TrackOutcome> {
452        &self.inner.registry.outcome_aggregator
453    }
454
455    #[cfg(feature = "processing")]
456    /// Returns the address of the [`Objectstore`] service.
457    pub fn objectstore(&self) -> Option<&Addr<Objectstore>> {
458        self.inner.registry.objectstore.as_ref()
459    }
460
461    /// Returns the address of the [`Upload`] service.
462    pub fn upload(&self) -> &Addr<Upload> {
463        &self.inner.registry.upload
464    }
465
466    pub fn global_config_handle(&self) -> &GlobalConfigHandle {
467        &self.inner.registry.global_config_handle
468    }
469}
470
471/// Creates Redis clients from the given `configs`.
472///
473/// If `configs` is [`Unified`](RedisConfigsRef::Unified), one client is created and then cloned
474/// for project configs, cardinality, and quotas, meaning that they really use the same client.
475///
476/// If it is [`Individual`](RedisConfigsRef::Individual), an actual separate client
477/// is created for each use case.
478#[cfg(feature = "processing")]
479pub fn create_redis_clients(configs: RedisConfigsRef<'_>) -> Result<RedisClients, RedisError> {
480    const PROJECT_CONFIG_REDIS_CLIENT: &str = "projectconfig";
481    const QUOTA_REDIS_CLIENT: &str = "quotas";
482    const UNIFIED_REDIS_CLIENT: &str = "unified";
483
484    match configs {
485        RedisConfigsRef::Unified(unified) => {
486            let client = create_async_redis_client(UNIFIED_REDIS_CLIENT, &unified)?;
487
488            Ok(RedisClients {
489                project_configs: client.clone(),
490                quotas: client,
491            })
492        }
493        RedisConfigsRef::Individual {
494            project_configs,
495            quotas,
496        } => {
497            let project_configs =
498                create_async_redis_client(PROJECT_CONFIG_REDIS_CLIENT, &project_configs)?;
499            let quotas = create_async_redis_client(QUOTA_REDIS_CLIENT, &quotas)?;
500
501            Ok(RedisClients {
502                project_configs,
503                quotas,
504            })
505        }
506    }
507}
508
509#[cfg(feature = "processing")]
510fn create_async_redis_client(
511    name: &'static str,
512    config: &RedisConfigRef<'_>,
513) -> Result<AsyncRedisClient, RedisError> {
514    match config {
515        RedisConfigRef::Cluster {
516            cluster_nodes,
517            options,
518        } => AsyncRedisClient::cluster(name, cluster_nodes.iter().map(|s| s.as_str()), options),
519        RedisConfigRef::Single { server, options } => {
520            AsyncRedisClient::single(name, server, options)
521        }
522    }
523}
524
525#[cfg(feature = "processing")]
526async fn initialize_redis_scripts_for_client(
527    redis_clients: &RedisClients,
528) -> Result<(), RedisError> {
529    let scripts = RedisScripts::all();
530
531    let RedisClients {
532        project_configs,
533        quotas,
534    } = redis_clients;
535
536    initialize_redis_scripts(project_configs, &scripts).await?;
537    initialize_redis_scripts(quotas, &scripts).await?;
538
539    Ok(())
540}
541
542#[cfg(feature = "processing")]
543async fn initialize_redis_scripts(
544    client: &AsyncRedisClient,
545    scripts: &[&Script],
546) -> Result<(), RedisError> {
547    let mut connection = client.get_connection().await?;
548
549    for script in scripts {
550        // We load on all instances without checking if the script is already in cache because of a
551        // limitation in the connection implementation.
552        script
553            .prepare_invoke()
554            .load_async(&mut connection)
555            .await
556            .map_err(RedisError::Redis)?;
557    }
558
559    Ok(())
560}
561
562impl FromRequestParts<Self> for ServiceState {
563    type Rejection = Infallible;
564
565    async fn from_request_parts(_: &mut Parts, state: &Self) -> Result<Self, Self::Rejection> {
566        Ok(state.clone())
567    }
568}