Skip to main content

relay_server/
service.rs

1use std::convert::Infallible;
2use std::sync::Arc;
3use std::time::Duration;
4
5use crate::metrics::MetricOutcomes;
6use crate::services::autoscaling::{AutoscalingMetricService, AutoscalingMetrics};
7use crate::services::buffer::{
8    ObservableEnvelopeBuffer, PartitionedEnvelopeBuffer, ProjectKeyPair,
9};
10use crate::services::cogs::{CogsService, CogsServiceRecorder};
11use crate::services::global_config::{
12    GlobalConfigHandle, GlobalConfigManager, GlobalConfigService,
13};
14use crate::services::health_check::{HealthCheck, HealthCheckService};
15use crate::services::metrics::RouterService;
16#[cfg(feature = "processing")]
17use crate::services::objectstore::Objectstore;
18#[cfg(feature = "processing")]
19use crate::services::objectstore::ObjectstoreService;
20use crate::services::outcome::{
21    OutcomeAggregator, OutcomeProducer, OutcomeProducerService, TrackOutcome,
22};
23use crate::services::processor::{
24    self, EnvelopeProcessor, EnvelopeProcessorService, EnvelopeProcessorServicePool,
25};
26use crate::services::projects::cache::{ProjectCacheHandle, ProjectCacheService};
27use crate::services::projects::source::ProjectSource;
28use crate::services::proxy_processor::{ProxyAddrs, ProxyProcessorService};
29use crate::services::relays::{RelayCache, RelayCacheService};
30use crate::services::stats::RelayStats;
31#[cfg(feature = "processing")]
32use crate::services::store::{StoreService, StoreServicePool};
33use crate::services::upload::{self, Upload};
34use crate::services::upstream::{UpstreamRelay, UpstreamRelayService};
35use crate::utils::{MemoryChecker, MemoryStat, ThreadKind};
36#[cfg(feature = "processing")]
37use anyhow::Context;
38use anyhow::Result;
39use axum::extract::FromRequestParts;
40use axum::http::request::Parts;
41use relay_cogs::Cogs;
42use relay_config::Config;
43#[cfg(feature = "processing")]
44use relay_config::{RedisConfigRef, RedisConfigsRef};
45#[cfg(feature = "processing")]
46use relay_redis::AsyncRedisClient;
47#[cfg(feature = "processing")]
48use relay_redis::redis::Script;
49#[cfg(feature = "processing")]
50use relay_redis::{RedisClients, RedisError, RedisScripts};
51#[cfg(feature = "processing")]
52use relay_system::ConcurrentService;
53use relay_system::{Addr, Service, ServiceSpawn, ServiceSpawnExt as _, channel};
54
55/// Indicates the type of failure of the server.
56#[derive(Debug, thiserror::Error)]
57pub enum ServiceError {
58    /// GeoIp construction failed.
59    #[error("could not load the Geoip Db")]
60    GeoIp,
61
62    /// Initializing the Kafka producer failed.
63    #[cfg(feature = "processing")]
64    #[error("could not initialize kafka producer: {0}")]
65    Kafka(String),
66
67    /// Initializing the Redis client failed.
68    #[cfg(feature = "processing")]
69    #[error("could not initialize redis client during startup")]
70    Redis,
71}
72
73#[derive(Clone, Debug)]
74pub struct Registry {
75    pub health_check: Addr<HealthCheck>,
76    pub outcome_producer: Addr<OutcomeProducer>,
77    pub outcome_aggregator: Addr<TrackOutcome>,
78    pub processor: Addr<EnvelopeProcessor>,
79    pub relay_cache: Addr<RelayCache>,
80    pub global_config: Addr<GlobalConfigManager>,
81    pub upstream_relay: Addr<UpstreamRelay>,
82    pub envelope_buffer: Arc<PartitionedEnvelopeBuffer>,
83    pub project_cache_handle: ProjectCacheHandle,
84    pub autoscaling: Option<Addr<AutoscalingMetrics>>,
85    #[cfg(feature = "processing")]
86    pub objectstore: Option<Addr<Objectstore>>,
87    pub upload: Addr<Upload>,
88    pub global_config_handle: GlobalConfigHandle,
89}
90
91/// Constructs a Tokio [`relay_system::Runtime`] configured for running [services](relay_system::Service).
92pub fn create_runtime(name: &'static str, threads: usize) -> relay_system::Runtime {
93    relay_system::Runtime::builder(name)
94        .worker_threads(threads)
95        // Relay uses `spawn_blocking` only for Redis connections within the project
96        // cache, those should never exceed 100 concurrent connections
97        // (limited by connection pool).
98        //
99        // Relay also does not use other blocking operations from Tokio which require
100        // this pool, no usage of `tokio::fs` and `tokio::io::{Stdin, Stdout, Stderr}`.
101        //
102        // We limit the maximum amount of threads here, we've seen that Tokio
103        // expands this pool very very aggressively and basically never shrinks it
104        // which leads to a massive resource waste.
105        .max_blocking_threads(150)
106        // We also lower down the default (10s) keep alive timeout for blocking
107        // threads to encourage the runtime to not keep too many idle blocking threads
108        // around.
109        .thread_keep_alive(Duration::from_secs(1))
110        .build()
111}
112
113fn create_processor_pool(config: &Config) -> Result<EnvelopeProcessorServicePool> {
114    // Adjust thread count for small cpu counts to not have too many idle cores
115    // and distribute workload better.
116    let thread_count = match config.cpu_concurrency() {
117        conc @ 0..=2 => conc.max(1),
118        conc @ 3..=4 => conc - 1,
119        conc => conc - 2,
120    };
121    relay_log::info!("starting {thread_count} envelope processing workers");
122
123    let pool = crate::utils::ThreadPoolBuilder::new("processor", tokio::runtime::Handle::current())
124        .num_threads(thread_count)
125        .max_concurrency(config.pool_concurrency())
126        .thread_kind(ThreadKind::Worker)
127        .build()?;
128
129    Ok(pool)
130}
131
132#[cfg(feature = "processing")]
133fn create_store_pool(config: &Config) -> Result<StoreServicePool> {
134    // Spawn a store worker for every 12 threads in the processor pool.
135    // This ratio was found empirically and may need adjustments in the future.
136    //
137    // Ideally in the future the store will be single threaded again, after we move
138    // all the heavy processing (de- and re-serialization) into the processor.
139    let thread_count = config.cpu_concurrency().div_ceil(12);
140    relay_log::info!("starting {thread_count} store workers");
141
142    let pool = crate::utils::ThreadPoolBuilder::new("store", tokio::runtime::Handle::current())
143        .num_threads(thread_count)
144        .max_concurrency(config.pool_concurrency())
145        .build()?;
146
147    Ok(pool)
148}
149
150#[derive(Debug)]
151struct StateInner {
152    config: Arc<Config>,
153    memory_checker: MemoryChecker,
154    registry: Registry,
155}
156
157/// Server state.
158#[derive(Clone, Debug)]
159pub struct ServiceState {
160    inner: Arc<StateInner>,
161}
162
163impl ServiceState {
164    /// Starts all services and returns addresses to all of them.
165    pub async fn start(
166        handle: &relay_system::Handle,
167        services: &dyn ServiceSpawn,
168        config: Arc<Config>,
169    ) -> Result<Self> {
170        let upstream_relay = services.start(UpstreamRelayService::new(config.clone()));
171
172        #[cfg(feature = "processing")]
173        let redis_clients = config
174            .redis()
175            .filter(|_| config.processing_enabled())
176            .map(create_redis_clients)
177            .transpose()
178            .context(ServiceError::Redis)?;
179
180        // If we have Redis configured, we want to initialize all the scripts by loading them in
181        // the scripts cache if not present. Our custom ConnectionLike implementation relies on this
182        // initialization to work properly since it assumes that scripts are loaded across all Redis
183        // instances.
184        #[cfg(feature = "processing")]
185        if let Some(redis_clients) = &redis_clients {
186            initialize_redis_scripts_for_client(redis_clients)
187                .await
188                .context(ServiceError::Redis)?;
189        }
190
191        // We create an instance of `MemoryStat` which can be supplied composed with any arbitrary
192        // configuration object down the line.
193        let memory_stat = MemoryStat::new(config.memory_stat_refresh_frequency_ms());
194
195        // Create an address for the `EnvelopeProcessor`, which can be injected into the
196        // other services.
197        let (processor, processor_rx) = match config.relay_mode() {
198            relay_config::RelayMode::Proxy => channel(ProxyProcessorService::name()),
199            relay_config::RelayMode::Managed => channel(EnvelopeProcessorService::name()),
200        };
201
202        let (aggregator, aggregator_rx) = channel(RouterService::name());
203
204        let outcome_producer = services.start(OutcomeProducerService::create(
205            config.clone(),
206            processor.clone(),
207            aggregator.clone(),
208        )?);
209        let outcome_aggregator =
210            services.start(OutcomeAggregator::new(&config, outcome_producer.clone()));
211
212        let (global_config, global_config_rx) =
213            GlobalConfigService::new(config.clone(), upstream_relay.clone());
214        let global_config_handle = global_config.handle();
215        // The global config service must start before dependant services are
216        // started. Messages like subscription requests to the global config
217        // service fail if the service is not running.
218        let global_config = services.start(global_config);
219
220        let project_source = ProjectSource::start_in(
221            services,
222            Arc::clone(&config),
223            upstream_relay.clone(),
224            #[cfg(feature = "processing")]
225            redis_clients.clone(),
226        )
227        .await;
228        let project_cache_handle =
229            ProjectCacheService::new(Arc::clone(&config), project_source).start_in(services);
230
231        let metric_outcomes = MetricOutcomes::new(outcome_aggregator.clone());
232
233        #[cfg(feature = "processing")]
234        let store_pool = create_store_pool(&config)?;
235        #[cfg(feature = "processing")]
236        let store = config
237            .processing_enabled()
238            .then(|| {
239                StoreService::create(
240                    store_pool.clone(),
241                    config.clone(),
242                    global_config_handle.clone(),
243                    outcome_aggregator.clone(),
244                    metric_outcomes.clone(),
245                )
246                .map(|s| services.start(s))
247            })
248            .transpose()?;
249
250        #[cfg(feature = "processing")]
251        let objectstore = ObjectstoreService::new(config.objectstore(), store.clone())?.map(|s| {
252            let concurrent = ConcurrentService::new(s)
253                .with_backlog_limit(config.objectstore().max_backlog)
254                .with_concurrency_limit(config.objectstore().max_concurrent_requests);
255            services.start(concurrent)
256        });
257
258        let envelope_buffer = PartitionedEnvelopeBuffer::create(
259            config.spool_partitions(),
260            config.clone(),
261            memory_stat.clone(),
262            global_config_rx.clone(),
263            project_cache_handle.clone(),
264            processor.clone(),
265            outcome_aggregator.clone(),
266            services,
267        );
268
269        let (processor_pool, aggregator_handle, autoscaling) = match config.relay_mode() {
270            relay_config::RelayMode::Proxy => {
271                services.start_with(
272                    ProxyProcessorService::new(
273                        config.clone(),
274                        project_cache_handle.clone(),
275                        ProxyAddrs {
276                            outcome_aggregator: outcome_aggregator.clone(),
277                            upstream_relay: upstream_relay.clone(),
278                        },
279                    ),
280                    processor_rx,
281                );
282                (None, None, None)
283            }
284            relay_config::RelayMode::Managed => {
285                let processor_pool = create_processor_pool(&config)?;
286
287                let router = RouterService::new(
288                    handle.clone(),
289                    config.default_aggregator_config().clone(),
290                    config.secondary_aggregator_configs().clone(),
291                    Some(processor.clone().recipient()),
292                    project_cache_handle.clone(),
293                );
294                let router_handle = router.handle();
295                services.start_with(router, aggregator_rx);
296
297                let cogs = CogsService::new(&config);
298                let cogs = Cogs::new(CogsServiceRecorder::new(&config, services.start(cogs)));
299
300                services.start_with(
301                    EnvelopeProcessorService::new(
302                        processor_pool.clone(),
303                        config.clone(),
304                        global_config_handle.clone(),
305                        project_cache_handle.clone(),
306                        cogs,
307                        #[cfg(feature = "processing")]
308                        redis_clients.clone(),
309                        processor::Addrs {
310                            outcome_aggregator: outcome_aggregator.clone(),
311                            upstream_relay: upstream_relay.clone(),
312                            #[cfg(feature = "processing")]
313                            objectstore: objectstore.clone(),
314                            #[cfg(feature = "processing")]
315                            store_forwarder: store,
316                            aggregator: aggregator.clone(),
317                        },
318                        metric_outcomes.clone(),
319                    ),
320                    processor_rx,
321                );
322
323                let autoscaling = services.start(AutoscalingMetricService::new(
324                    memory_stat.clone(),
325                    envelope_buffer.clone(),
326                    handle.clone(),
327                    processor_pool.clone(),
328                ));
329
330                (Some(processor_pool), Some(router_handle), Some(autoscaling))
331            }
332        };
333
334        let health_check = services.start(HealthCheckService::new(
335            config.clone(),
336            MemoryChecker::new(memory_stat.clone(), config.clone()),
337            aggregator_handle,
338            upstream_relay.clone(),
339            envelope_buffer.clone(),
340        ));
341
342        services.start(RelayStats::new(
343            config.clone(),
344            handle.clone(),
345            upstream_relay.clone(),
346            #[cfg(feature = "processing")]
347            redis_clients.clone(),
348            processor_pool,
349            #[cfg(feature = "processing")]
350            store_pool,
351        ));
352
353        let relay_cache = services.start(RelayCacheService::new(
354            config.clone(),
355            upstream_relay.clone(),
356        ));
357
358        let upload = services.start(upload::create_service(
359            &config,
360            &upstream_relay,
361            #[cfg(feature = "processing")]
362            &objectstore,
363        ));
364
365        let registry = Registry {
366            processor,
367            health_check,
368            outcome_producer,
369            outcome_aggregator,
370            relay_cache,
371            global_config,
372            project_cache_handle,
373            upstream_relay,
374            envelope_buffer,
375            autoscaling,
376            #[cfg(feature = "processing")]
377            objectstore,
378            upload,
379            global_config_handle,
380        };
381
382        let state = StateInner {
383            config: config.clone(),
384            memory_checker: MemoryChecker::new(memory_stat, config.clone()),
385            registry,
386        };
387
388        Ok(ServiceState {
389            inner: Arc::new(state),
390        })
391    }
392
393    /// Returns a reference to the Relay configuration.
394    pub fn config(&self) -> &Config {
395        &self.inner.config
396    }
397
398    /// Returns a reference to the [`MemoryChecker`] which is a [`Config`] aware wrapper on the
399    /// [`MemoryStat`] which gives utility methods to determine whether memory usage is above
400    /// thresholds set in the [`Config`].
401    pub fn memory_checker(&self) -> &MemoryChecker {
402        &self.inner.memory_checker
403    }
404
405    pub fn autoscaling(&self) -> Option<&Addr<AutoscalingMetrics>> {
406        self.inner.registry.autoscaling.as_ref()
407    }
408
409    /// Returns the V2 envelope buffer, if present.
410    pub fn envelope_buffer(&self, project_key_pair: ProjectKeyPair) -> &ObservableEnvelopeBuffer {
411        self.inner.registry.envelope_buffer.buffer(project_key_pair)
412    }
413
414    /// Returns a [`ProjectCacheHandle`].
415    pub fn project_cache_handle(&self) -> &ProjectCacheHandle {
416        &self.inner.registry.project_cache_handle
417    }
418
419    /// Returns the address of the [`RelayCache`] service.
420    pub fn relay_cache(&self) -> &Addr<RelayCache> {
421        &self.inner.registry.relay_cache
422    }
423
424    /// Returns the address of the [`HealthCheck`] service.
425    pub fn health_check(&self) -> &Addr<HealthCheck> {
426        &self.inner.registry.health_check
427    }
428
429    /// Returns the address of the [`OutcomeProducer`] service.
430    pub fn outcome_producer(&self) -> &Addr<OutcomeProducer> {
431        &self.inner.registry.outcome_producer
432    }
433
434    /// Returns the address of the [`OutcomeProducer`] service.
435    pub fn upstream_relay(&self) -> &Addr<UpstreamRelay> {
436        &self.inner.registry.upstream_relay
437    }
438
439    /// Returns the address of the [`OutcomeProducer`] service.
440    pub fn processor(&self) -> &Addr<EnvelopeProcessor> {
441        &self.inner.registry.processor
442    }
443
444    /// Returns the address of the [`GlobalConfigService`] service.
445    pub fn global_config(&self) -> &Addr<GlobalConfigManager> {
446        &self.inner.registry.global_config
447    }
448
449    /// Returns the address of the [`OutcomeProducer`] service.
450    pub fn outcome_aggregator(&self) -> &Addr<TrackOutcome> {
451        &self.inner.registry.outcome_aggregator
452    }
453
454    #[cfg(feature = "processing")]
455    /// Returns the address of the [`Objectstore`] service.
456    pub fn objectstore(&self) -> Option<&Addr<Objectstore>> {
457        self.inner.registry.objectstore.as_ref()
458    }
459
460    /// Returns the address of the [`Upload`] service.
461    pub fn upload(&self) -> &Addr<Upload> {
462        &self.inner.registry.upload
463    }
464
465    pub fn global_config_handle(&self) -> &GlobalConfigHandle {
466        &self.inner.registry.global_config_handle
467    }
468}
469
470/// Creates Redis clients from the given `configs`.
471///
472/// If `configs` is [`Unified`](RedisConfigsRef::Unified), one client is created and then cloned
473/// for project configs, cardinality, and quotas, meaning that they really use the same client.
474///
475/// If it is [`Individual`](RedisConfigsRef::Individual), an actual separate client
476/// is created for each use case.
477#[cfg(feature = "processing")]
478pub fn create_redis_clients(configs: RedisConfigsRef<'_>) -> Result<RedisClients, RedisError> {
479    const PROJECT_CONFIG_REDIS_CLIENT: &str = "projectconfig";
480    const QUOTA_REDIS_CLIENT: &str = "quotas";
481    const UNIFIED_REDIS_CLIENT: &str = "unified";
482
483    match configs {
484        RedisConfigsRef::Unified(unified) => {
485            let client = create_async_redis_client(UNIFIED_REDIS_CLIENT, &unified)?;
486
487            Ok(RedisClients {
488                project_configs: client.clone(),
489                quotas: client,
490            })
491        }
492        RedisConfigsRef::Individual {
493            project_configs,
494            quotas,
495        } => {
496            let project_configs =
497                create_async_redis_client(PROJECT_CONFIG_REDIS_CLIENT, &project_configs)?;
498            let quotas = create_async_redis_client(QUOTA_REDIS_CLIENT, &quotas)?;
499
500            Ok(RedisClients {
501                project_configs,
502                quotas,
503            })
504        }
505    }
506}
507
508#[cfg(feature = "processing")]
509fn create_async_redis_client(
510    name: &'static str,
511    config: &RedisConfigRef<'_>,
512) -> Result<AsyncRedisClient, RedisError> {
513    match config {
514        RedisConfigRef::Cluster {
515            cluster_nodes,
516            options,
517        } => AsyncRedisClient::cluster(name, cluster_nodes.iter().map(|s| s.as_str()), options),
518        RedisConfigRef::Single { server, options } => {
519            AsyncRedisClient::single(name, server, options)
520        }
521    }
522}
523
524#[cfg(feature = "processing")]
525async fn initialize_redis_scripts_for_client(
526    redis_clients: &RedisClients,
527) -> Result<(), RedisError> {
528    let scripts = RedisScripts::all();
529
530    let RedisClients {
531        project_configs,
532        quotas,
533    } = redis_clients;
534
535    initialize_redis_scripts(project_configs, &scripts).await?;
536    initialize_redis_scripts(quotas, &scripts).await?;
537
538    Ok(())
539}
540
541#[cfg(feature = "processing")]
542async fn initialize_redis_scripts(
543    client: &AsyncRedisClient,
544    scripts: &[&Script],
545) -> Result<(), RedisError> {
546    let mut connection = client.get_connection().await?;
547
548    for script in scripts {
549        // We load on all instances without checking if the script is already in cache because of a
550        // limitation in the connection implementation.
551        script
552            .prepare_invoke()
553            .load_async(&mut connection)
554            .await
555            .map_err(RedisError::Redis)?;
556    }
557
558    Ok(())
559}
560
561impl FromRequestParts<Self> for ServiceState {
562    type Rejection = Infallible;
563
564    async fn from_request_parts(_: &mut Parts, state: &Self) -> Result<Self, Self::Rejection> {
565        Ok(state.clone())
566    }
567}