1#[cfg(feature = "processing")]
2use anyhow::Context;
3use anyhow::Result;
4use relay_config::{Config, RelayMode};
5use relay_server::MemoryStat;
6use relay_statsd::MetricsConfig;
7
8fn assert_batch_size_bytes(config: &Config) -> Result<()> {
11 let memory = MemoryStat::current_memory();
13
14 let configured_batch_size_bytes = config.spool_envelopes_batch_size_bytes() as f32;
19 let maximum_batch_size_bytes =
20 memory.total as f32 * config.spool_max_backpressure_memory_percent() * 0.1;
21
22 if configured_batch_size_bytes > maximum_batch_size_bytes {
23 anyhow::bail!(
24 "the configured `spool.envelopes.batch_size_bytes` is {} bytes but it must be <= than {} bytes",
25 configured_batch_size_bytes,
26 maximum_batch_size_bytes
27 )
28 }
29
30 Ok(())
31}
32
33pub fn check_config(config: &Config) -> Result<()> {
34 if config.relay_mode() == RelayMode::Managed && config.credentials().is_none() {
35 anyhow::bail!(
36 "relay has no credentials, which are required in managed mode. \
37 Generate some with \"relay credentials generate\" first.",
38 );
39 }
40
41 if config.relay_mode() != RelayMode::Managed && config.processing_enabled() {
42 anyhow::bail!("Processing can only be enabled in managed mode.");
43 }
44
45 #[cfg(feature = "processing")]
46 if config.processing_enabled() {
47 for name in config.unused_topic_assignments().names() {
48 relay_log::with_scope(
49 |scope| scope.set_extra("topic", name.as_str().into()),
50 || relay_log::error!("unused topic assignment '{name}'"),
51 );
52 }
53
54 for topic in relay_kafka::KafkaTopic::iter() {
55 let _ = config
56 .kafka_configs(*topic)
57 .with_context(|| format!("invalid kafka configuration for topic '{topic:?}'"))?;
58 }
59 }
60
61 assert_batch_size_bytes(config)?;
62
63 Ok(())
64}
65
66pub fn dump_spawn_infos(config: &Config) {
68 if config.path().as_os_str().is_empty() {
69 relay_log::info!("launching relay without config folder");
70 } else {
71 relay_log::info!(
72 "launching relay from config folder {}",
73 config.path().display()
74 );
75 }
76 relay_log::info!(" relay mode: {}", config.relay_mode());
77
78 match config.relay_id() {
79 Some(id) => relay_log::info!(" relay id: {id}"),
80 None => relay_log::info!(" relay id: -"),
81 };
82 match config.public_key() {
83 Some(key) => relay_log::info!(" public key: {key}"),
84 None => relay_log::info!(" public key: -"),
85 };
86 relay_log::info!(" log level: {}", config.logging().level);
87}
88
89pub fn dump_credentials(config: &Config) {
91 match config.relay_id() {
92 Some(id) => println!(" relay id: {id}"),
93 None => println!(" relay id: -"),
94 };
95 match config.public_key() {
96 Some(key) => println!(" public key: {key}"),
97 None => println!(" public key: -"),
98 };
99}
100
101pub fn init_metrics(config: &Config) -> Result<()> {
103 let Some(host) = config.statsd_addr() else {
104 return Ok(());
105 };
106
107 let mut default_tags = config.metrics_default_tags().clone();
108 if let Some(hostname_tag) = config.metrics_hostname_tag()
109 && let Some(hostname) = hostname::get().ok().and_then(|s| s.into_string().ok())
110 {
111 default_tags.insert(hostname_tag.to_owned(), hostname);
112 }
113 relay_statsd::init(MetricsConfig {
114 prefix: config.metrics_prefix().to_owned(),
115 host: host.to_owned(),
116 buffer_size: config.statsd_buffer_size(),
117 default_tags,
118 })?;
119
120 Ok(())
121}