relay/
setup.rs

1#[cfg(feature = "processing")]
2use anyhow::Context;
3use anyhow::Result;
4use relay_config::{Config, RelayMode};
5use relay_server::MemoryStat;
6use relay_statsd::MetricsConfig;
7
8/// Validates that the `batch_size_bytes` of the configuration is correct and doesn't lead to
9/// deadlocks in the buffer.
10fn assert_batch_size_bytes(config: &Config) -> Result<()> {
11    // We create a temporary memory reading used just for the config check.
12    let memory = MemoryStat::current_memory();
13
14    // We expect the batch size for the spooler to be 10% of the memory threshold over which the
15    // buffer stops unspooling.
16    //
17    // The 10% threshold was arbitrarily chosen to give the system leeway when spooling.
18    let configured_batch_size_bytes = config.spool_envelopes_batch_size_bytes() as f32;
19    let maximum_batch_size_bytes =
20        memory.total as f32 * config.spool_max_backpressure_memory_percent() * 0.1;
21
22    if configured_batch_size_bytes > maximum_batch_size_bytes {
23        anyhow::bail!(
24            "the configured `spool.envelopes.batch_size_bytes` is {} bytes but it must be <= than {} bytes",
25            configured_batch_size_bytes,
26            maximum_batch_size_bytes
27        )
28    }
29
30    Ok(())
31}
32
33pub fn check_config(config: &Config) -> Result<()> {
34    if config.relay_mode() == RelayMode::Managed && config.credentials().is_none() {
35        anyhow::bail!(
36            "relay has no credentials, which are required in managed mode. \
37             Generate some with \"relay credentials generate\" first.",
38        );
39    }
40
41    if config.relay_mode() != RelayMode::Managed && config.processing_enabled() {
42        anyhow::bail!("Processing can only be enabled in managed mode.");
43    }
44
45    #[cfg(feature = "processing")]
46    if config.processing_enabled() {
47        for name in config.unused_topic_assignments().names() {
48            relay_log::with_scope(
49                |scope| scope.set_extra("topic", name.as_str().into()),
50                || relay_log::error!("unused topic assignment '{name}'"),
51            );
52        }
53
54        for topic in relay_kafka::KafkaTopic::iter() {
55            let _ = config
56                .kafka_configs(*topic)
57                .with_context(|| format!("invalid kafka configuration for topic '{topic:?}'"))?;
58        }
59    }
60
61    assert_batch_size_bytes(config)?;
62
63    Ok(())
64}
65
66/// Print spawn infos to the log.
67pub fn dump_spawn_infos(config: &Config) {
68    if config.path().as_os_str().is_empty() {
69        relay_log::info!("launching relay without config folder");
70    } else {
71        relay_log::info!(
72            "launching relay from config folder {}",
73            config.path().display()
74        );
75    }
76    relay_log::info!("  relay mode: {}", config.relay_mode());
77
78    match config.relay_id() {
79        Some(id) => relay_log::info!("  relay id: {id}"),
80        None => relay_log::info!("  relay id: -"),
81    };
82    match config.public_key() {
83        Some(key) => relay_log::info!("  public key: {key}"),
84        None => relay_log::info!("  public key: -"),
85    };
86    relay_log::info!("  log level: {}", config.logging().level);
87}
88
89/// Dumps out credential info.
90pub fn dump_credentials(config: &Config) {
91    match config.relay_id() {
92        Some(id) => println!("  relay id: {id}"),
93        None => println!("  relay id: -"),
94    };
95    match config.public_key() {
96        Some(key) => println!("  public key: {key}"),
97        None => println!("  public key: -"),
98    };
99}
100
101/// Initialize the metric system.
102pub fn init_metrics(config: &Config) -> Result<()> {
103    let Some(host) = config.statsd_addr() else {
104        return Ok(());
105    };
106
107    let mut default_tags = config.metrics_default_tags().clone();
108    if let Some(hostname_tag) = config.metrics_hostname_tag()
109        && let Some(hostname) = hostname::get().ok().and_then(|s| s.into_string().ok())
110    {
111        default_tags.insert(hostname_tag.to_owned(), hostname);
112    }
113    relay_statsd::init(MetricsConfig {
114        prefix: config.metrics_prefix().to_owned(),
115        host: host.to_owned(),
116        buffer_size: config.statsd_buffer_size(),
117        default_tags,
118    })?;
119
120    Ok(())
121}