1#[cfg(feature = "processing")]
2use anyhow::Context;
3use anyhow::Result;
4use relay_config::{Config, RelayMode};
5use relay_server::MemoryStat;
6
7fn assert_batch_size_bytes(config: &Config) -> Result<()> {
10 let memory = MemoryStat::current_memory();
12
13 let configured_batch_size_bytes = config.spool_envelopes_batch_size_bytes() as f32;
18 let maximum_batch_size_bytes =
19 memory.total as f32 * config.spool_max_backpressure_memory_percent() * 0.1;
20
21 if configured_batch_size_bytes > maximum_batch_size_bytes {
22 anyhow::bail!(
23 "the configured `spool.envelopes.batch_size_bytes` is {} bytes but it must be <= than {} bytes",
24 configured_batch_size_bytes,
25 maximum_batch_size_bytes
26 )
27 }
28
29 Ok(())
30}
31
32pub fn check_config(config: &Config) -> Result<()> {
33 if config.relay_mode() == RelayMode::Managed && config.credentials().is_none() {
34 anyhow::bail!(
35 "relay has no credentials, which are required in managed mode. \
36 Generate some with \"relay credentials generate\" first.",
37 );
38 }
39
40 if config.relay_mode() != RelayMode::Managed && config.processing_enabled() {
41 anyhow::bail!("Processing can only be enabled in managed mode.");
42 }
43
44 #[cfg(feature = "processing")]
45 if config.processing_enabled() {
46 for (name, topic) in config.unused_topic_assignments() {
47 relay_log::with_scope(
48 |scope| scope.set_extra("topic", format!("{topic:?}").into()),
49 || relay_log::error!("unused topic assignment '{name}'"),
50 );
51 }
52
53 for topic in relay_kafka::KafkaTopic::iter() {
54 let _ = config
55 .kafka_config(*topic)
56 .with_context(|| format!("invalid kafka configuration for topic '{topic:?}'"))?;
57 }
58 }
59
60 assert_batch_size_bytes(config)?;
61
62 Ok(())
63}
64
65pub fn dump_spawn_infos(config: &Config) {
67 if config.path().as_os_str().is_empty() {
68 relay_log::info!("launching relay without config folder");
69 } else {
70 relay_log::info!(
71 "launching relay from config folder {}",
72 config.path().display()
73 );
74 }
75 relay_log::info!(" relay mode: {}", config.relay_mode());
76
77 match config.relay_id() {
78 Some(id) => relay_log::info!(" relay id: {id}"),
79 None => relay_log::info!(" relay id: -"),
80 };
81 match config.public_key() {
82 Some(key) => relay_log::info!(" public key: {key}"),
83 None => relay_log::info!(" public key: -"),
84 };
85 relay_log::info!(" log level: {}", config.logging().level);
86}
87
88pub fn dump_credentials(config: &Config) {
90 match config.relay_id() {
91 Some(id) => println!(" relay id: {id}"),
92 None => println!(" relay id: -"),
93 };
94 match config.public_key() {
95 Some(key) => println!(" public key: {key}"),
96 None => println!(" public key: -"),
97 };
98}
99
100pub fn init_metrics(config: &Config) -> Result<()> {
102 let addrs = config.statsd_addrs()?;
103 if addrs.is_empty() {
104 return Ok(());
105 }
106
107 let mut default_tags = config.metrics_default_tags().clone();
108 if let Some(hostname_tag) = config.metrics_hostname_tag() {
109 if let Some(hostname) = hostname::get().ok().and_then(|s| s.into_string().ok()) {
110 default_tags.insert(hostname_tag.to_owned(), hostname);
111 }
112 }
113 relay_statsd::init(
114 config.metrics_prefix(),
115 &addrs[..],
116 default_tags,
117 config.metrics_sample_rate(),
118 config.metrics_aggregate(),
119 );
120
121 Ok(())
122}