relay/
setup.rs

1#[cfg(feature = "processing")]
2use anyhow::Context;
3use anyhow::Result;
4use relay_config::{Config, RelayMode};
5use relay_server::MemoryStat;
6
7/// Validates that the `batch_size_bytes` of the configuration is correct and doesn't lead to
8/// deadlocks in the buffer.
9fn assert_batch_size_bytes(config: &Config) -> Result<()> {
10    // We create a temporary memory reading used just for the config check.
11    let memory = MemoryStat::current_memory();
12
13    // We expect the batch size for the spooler to be 10% of the memory threshold over which the
14    // buffer stops unspooling.
15    //
16    // The 10% threshold was arbitrarily chosen to give the system leeway when spooling.
17    let configured_batch_size_bytes = config.spool_envelopes_batch_size_bytes() as f32;
18    let maximum_batch_size_bytes =
19        memory.total as f32 * config.spool_max_backpressure_memory_percent() * 0.1;
20
21    if configured_batch_size_bytes > maximum_batch_size_bytes {
22        anyhow::bail!(
23            "the configured `spool.envelopes.batch_size_bytes` is {} bytes but it must be <= than {} bytes",
24            configured_batch_size_bytes,
25            maximum_batch_size_bytes
26        )
27    }
28
29    Ok(())
30}
31
32pub fn check_config(config: &Config) -> Result<()> {
33    if config.relay_mode() == RelayMode::Managed && config.credentials().is_none() {
34        anyhow::bail!(
35            "relay has no credentials, which are required in managed mode. \
36             Generate some with \"relay credentials generate\" first.",
37        );
38    }
39
40    if config.relay_mode() != RelayMode::Managed && config.processing_enabled() {
41        anyhow::bail!("Processing can only be enabled in managed mode.");
42    }
43
44    #[cfg(feature = "processing")]
45    if config.processing_enabled() {
46        for (name, topic) in config.unused_topic_assignments() {
47            relay_log::with_scope(
48                |scope| scope.set_extra("topic", format!("{topic:?}").into()),
49                || relay_log::error!("unused topic assignment '{name}'"),
50            );
51        }
52
53        for topic in relay_kafka::KafkaTopic::iter() {
54            let _ = config
55                .kafka_config(*topic)
56                .with_context(|| format!("invalid kafka configuration for topic '{topic:?}'"))?;
57        }
58    }
59
60    assert_batch_size_bytes(config)?;
61
62    Ok(())
63}
64
65/// Print spawn infos to the log.
66pub fn dump_spawn_infos(config: &Config) {
67    if config.path().as_os_str().is_empty() {
68        relay_log::info!("launching relay without config folder");
69    } else {
70        relay_log::info!(
71            "launching relay from config folder {}",
72            config.path().display()
73        );
74    }
75    relay_log::info!("  relay mode: {}", config.relay_mode());
76
77    match config.relay_id() {
78        Some(id) => relay_log::info!("  relay id: {id}"),
79        None => relay_log::info!("  relay id: -"),
80    };
81    match config.public_key() {
82        Some(key) => relay_log::info!("  public key: {key}"),
83        None => relay_log::info!("  public key: -"),
84    };
85    relay_log::info!("  log level: {}", config.logging().level);
86}
87
88/// Dumps out credential info.
89pub fn dump_credentials(config: &Config) {
90    match config.relay_id() {
91        Some(id) => println!("  relay id: {id}"),
92        None => println!("  relay id: -"),
93    };
94    match config.public_key() {
95        Some(key) => println!("  public key: {key}"),
96        None => println!("  public key: -"),
97    };
98}
99
100/// Initialize the metric system.
101pub fn init_metrics(config: &Config) -> Result<()> {
102    let addrs = config.statsd_addrs()?;
103    if addrs.is_empty() {
104        return Ok(());
105    }
106
107    let mut default_tags = config.metrics_default_tags().clone();
108    if let Some(hostname_tag) = config.metrics_hostname_tag() {
109        if let Some(hostname) = hostname::get().ok().and_then(|s| s.into_string().ok()) {
110            default_tags.insert(hostname_tag.to_owned(), hostname);
111        }
112    }
113    relay_statsd::init(
114        config.metrics_prefix(),
115        &addrs[..],
116        default_tags,
117        config.metrics_sample_rate(),
118        config.metrics_aggregate(),
119    );
120
121    Ok(())
122}