1use std::collections::{BTreeMap, HashMap};
2use std::error::Error;
3use std::io::Write;
4use std::net::{IpAddr, SocketAddr, ToSocketAddrs};
5use std::num::NonZeroU8;
6use std::path::{Path, PathBuf};
7use std::str::FromStr;
8use std::time::Duration;
9use std::{env, fmt, fs, io};
10
11use anyhow::Context;
12use relay_auth::{PublicKey, RelayId, SecretKey, generate_key_pair, generate_relay_id};
13use relay_common::Dsn;
14use relay_kafka::{
15 ConfigError as KafkaConfigError, KafkaConfigParam, KafkaTopic, KafkaTopicConfig,
16 TopicAssignments,
17};
18use relay_metrics::MetricNamespace;
19use serde::de::{DeserializeOwned, Unexpected, Visitor};
20use serde::{Deserialize, Deserializer, Serialize, Serializer};
21use uuid::Uuid;
22
23use crate::aggregator::{AggregatorServiceConfig, ScopedAggregatorConfig};
24use crate::byte_size::ByteSize;
25use crate::upstream::UpstreamDescriptor;
26use crate::{RedisConfig, RedisConfigs, RedisConfigsRef, build_redis_configs};
27
28const DEFAULT_NETWORK_OUTAGE_GRACE_PERIOD: u64 = 10;
29
30static CONFIG_YAML_HEADER: &str = r###"# Please see the relevant documentation.
31# Performance tuning: https://docs.sentry.io/product/relay/operating-guidelines/
32# All config options: https://docs.sentry.io/product/relay/options/
33"###;
34
35#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
37#[non_exhaustive]
38pub enum ConfigErrorKind {
39 CouldNotOpenFile,
41 CouldNotWriteFile,
43 BadYaml,
45 BadJson,
47 InvalidValue,
49 ProcessingNotAvailable,
52}
53
54impl fmt::Display for ConfigErrorKind {
55 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
56 match self {
57 Self::CouldNotOpenFile => write!(f, "could not open config file"),
58 Self::CouldNotWriteFile => write!(f, "could not write config file"),
59 Self::BadYaml => write!(f, "could not parse yaml config file"),
60 Self::BadJson => write!(f, "could not parse json config file"),
61 Self::InvalidValue => write!(f, "invalid config value"),
62 Self::ProcessingNotAvailable => write!(
63 f,
64 "was not compiled with processing, cannot enable processing"
65 ),
66 }
67 }
68}
69
70#[derive(Debug, Default)]
72enum ConfigErrorSource {
73 #[default]
75 None,
76 File(PathBuf),
78 FieldOverride(String),
80}
81
82impl fmt::Display for ConfigErrorSource {
83 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84 match self {
85 ConfigErrorSource::None => Ok(()),
86 ConfigErrorSource::File(file_name) => {
87 write!(f, " (file {})", file_name.display())
88 }
89 ConfigErrorSource::FieldOverride(name) => write!(f, " (field {name})"),
90 }
91 }
92}
93
94#[derive(Debug)]
96pub struct ConfigError {
97 source: ConfigErrorSource,
98 kind: ConfigErrorKind,
99}
100
101impl ConfigError {
102 #[inline]
103 fn new(kind: ConfigErrorKind) -> Self {
104 Self {
105 source: ConfigErrorSource::None,
106 kind,
107 }
108 }
109
110 #[inline]
111 fn field(field: &'static str) -> Self {
112 Self {
113 source: ConfigErrorSource::FieldOverride(field.to_owned()),
114 kind: ConfigErrorKind::InvalidValue,
115 }
116 }
117
118 #[inline]
119 fn file(kind: ConfigErrorKind, p: impl AsRef<Path>) -> Self {
120 Self {
121 source: ConfigErrorSource::File(p.as_ref().to_path_buf()),
122 kind,
123 }
124 }
125
126 pub fn kind(&self) -> ConfigErrorKind {
128 self.kind
129 }
130}
131
132impl fmt::Display for ConfigError {
133 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
134 write!(f, "{}{}", self.kind(), self.source)
135 }
136}
137
138impl Error for ConfigError {}
139
140enum ConfigFormat {
141 Yaml,
142 Json,
143}
144
145impl ConfigFormat {
146 pub fn extension(&self) -> &'static str {
147 match self {
148 ConfigFormat::Yaml => "yml",
149 ConfigFormat::Json => "json",
150 }
151 }
152}
153
154trait ConfigObject: DeserializeOwned + Serialize {
155 fn format() -> ConfigFormat;
157
158 fn name() -> &'static str;
160
161 fn path(base: &Path) -> PathBuf {
163 base.join(format!("{}.{}", Self::name(), Self::format().extension()))
164 }
165
166 fn load(base: &Path) -> anyhow::Result<Self> {
168 let path = Self::path(base);
169
170 let f = fs::File::open(&path)
171 .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotOpenFile, &path))?;
172 let f = io::BufReader::new(f);
173
174 let mut source = {
175 let file = serde_vars::FileSource::default()
176 .with_variable_prefix("${file:")
177 .with_variable_suffix("}")
178 .with_base_path(base);
179 let env = serde_vars::EnvSource::default()
180 .with_variable_prefix("${")
181 .with_variable_suffix("}");
182 (file, env)
183 };
184 match Self::format() {
185 ConfigFormat::Yaml => {
186 serde_vars::deserialize(serde_yaml::Deserializer::from_reader(f), &mut source)
187 .with_context(|| ConfigError::file(ConfigErrorKind::BadYaml, &path))
188 }
189 ConfigFormat::Json => {
190 serde_vars::deserialize(&mut serde_json::Deserializer::from_reader(f), &mut source)
191 .with_context(|| ConfigError::file(ConfigErrorKind::BadJson, &path))
192 }
193 }
194 }
195
196 fn save(&self, base: &Path) -> anyhow::Result<()> {
198 let path = Self::path(base);
199 let mut options = fs::OpenOptions::new();
200 options.write(true).truncate(true).create(true);
201
202 #[cfg(unix)]
204 {
205 use std::os::unix::fs::OpenOptionsExt;
206 options.mode(0o600);
207 }
208
209 let mut f = options
210 .open(&path)
211 .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path))?;
212
213 match Self::format() {
214 ConfigFormat::Yaml => {
215 f.write_all(CONFIG_YAML_HEADER.as_bytes())?;
216 serde_yaml::to_writer(&mut f, self)
217 .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path))?
218 }
219 ConfigFormat::Json => serde_json::to_writer_pretty(&mut f, self)
220 .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path))?,
221 }
222
223 f.write_all(b"\n").ok();
224
225 Ok(())
226 }
227}
228
229#[derive(Debug, Default)]
232pub struct OverridableConfig {
233 pub mode: Option<String>,
235 pub instance: Option<String>,
237 pub log_level: Option<String>,
239 pub log_format: Option<String>,
241 pub upstream: Option<String>,
243 pub upstream_dsn: Option<String>,
245 pub host: Option<String>,
247 pub port: Option<String>,
249 pub processing: Option<String>,
251 pub kafka_url: Option<String>,
253 pub redis_url: Option<String>,
255 pub id: Option<String>,
257 pub secret_key: Option<String>,
259 pub public_key: Option<String>,
261 pub outcome_source: Option<String>,
263 pub shutdown_timeout: Option<String>,
265 pub server_name: Option<String>,
267}
268
269#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
271pub struct Credentials {
272 pub secret_key: SecretKey,
274 pub public_key: PublicKey,
276 pub id: RelayId,
278}
279
280impl Credentials {
281 pub fn generate() -> Self {
283 relay_log::info!("generating new relay credentials");
284 let (sk, pk) = generate_key_pair();
285 Self {
286 secret_key: sk,
287 public_key: pk,
288 id: generate_relay_id(),
289 }
290 }
291
292 pub fn to_json_string(&self) -> anyhow::Result<String> {
294 serde_json::to_string(self)
295 .with_context(|| ConfigError::new(ConfigErrorKind::CouldNotWriteFile))
296 }
297}
298
299impl ConfigObject for Credentials {
300 fn format() -> ConfigFormat {
301 ConfigFormat::Json
302 }
303 fn name() -> &'static str {
304 "credentials"
305 }
306}
307
308#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
310#[serde(rename_all = "camelCase")]
311pub struct RelayInfo {
312 pub public_key: PublicKey,
314
315 #[serde(default)]
317 pub internal: bool,
318}
319
320impl RelayInfo {
321 pub fn new(public_key: PublicKey) -> Self {
323 Self {
324 public_key,
325 internal: false,
326 }
327 }
328}
329
330#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
332#[serde(rename_all = "camelCase")]
333pub enum RelayMode {
334 Proxy,
340
341 Managed,
347}
348
349impl<'de> Deserialize<'de> for RelayMode {
350 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
351 where
352 D: Deserializer<'de>,
353 {
354 let s = String::deserialize(deserializer)?;
355 match s.as_str() {
356 "proxy" => Ok(RelayMode::Proxy),
357 "managed" => Ok(RelayMode::Managed),
358 "static" => Err(serde::de::Error::custom(
359 "Relay mode 'static' has been removed. Please use 'managed' or 'proxy' instead.",
360 )),
361 other => Err(serde::de::Error::unknown_variant(
362 other,
363 &["proxy", "managed"],
364 )),
365 }
366 }
367}
368
369impl fmt::Display for RelayMode {
370 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
371 match self {
372 RelayMode::Proxy => write!(f, "proxy"),
373 RelayMode::Managed => write!(f, "managed"),
374 }
375 }
376}
377
378#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize)]
380#[serde(rename_all = "camelCase")]
381pub enum RelayInstance {
382 Default,
384
385 Canary,
387}
388
389impl RelayInstance {
390 pub fn is_canary(&self) -> bool {
392 matches!(self, RelayInstance::Canary)
393 }
394}
395
396impl fmt::Display for RelayInstance {
397 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
398 match self {
399 RelayInstance::Default => write!(f, "default"),
400 RelayInstance::Canary => write!(f, "canary"),
401 }
402 }
403}
404
405impl FromStr for RelayInstance {
406 type Err = fmt::Error;
407
408 fn from_str(s: &str) -> Result<Self, Self::Err> {
409 match s {
410 "canary" => Ok(RelayInstance::Canary),
411 _ => Ok(RelayInstance::Default),
412 }
413 }
414}
415
416#[derive(Clone, Copy, Debug, Eq, PartialEq)]
418pub struct ParseRelayModeError;
419
420impl fmt::Display for ParseRelayModeError {
421 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
422 write!(f, "Relay mode must be one of: managed or proxy")
423 }
424}
425
426impl Error for ParseRelayModeError {}
427
428impl FromStr for RelayMode {
429 type Err = ParseRelayModeError;
430
431 fn from_str(s: &str) -> Result<Self, Self::Err> {
432 match s {
433 "proxy" => Ok(RelayMode::Proxy),
434 "managed" => Ok(RelayMode::Managed),
435 _ => Err(ParseRelayModeError),
436 }
437 }
438}
439
440fn is_default<T: Default + PartialEq>(t: &T) -> bool {
442 *t == T::default()
443}
444
445fn is_docker() -> bool {
447 if fs::metadata("/.dockerenv").is_ok() {
448 return true;
449 }
450
451 fs::read_to_string("/proc/self/cgroup").is_ok_and(|s| s.contains("/docker"))
452}
453
454fn default_host() -> IpAddr {
456 if is_docker() {
457 "0.0.0.0".parse().unwrap()
459 } else {
460 "127.0.0.1".parse().unwrap()
461 }
462}
463
464#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize)]
468#[serde(rename_all = "lowercase")]
469#[derive(Default)]
470pub enum ReadinessCondition {
471 #[default]
480 Authenticated,
481 Always,
483}
484
485#[derive(Serialize, Deserialize, Debug)]
487#[serde(default)]
488pub struct Relay {
489 pub mode: RelayMode,
491 pub instance: RelayInstance,
493 pub upstream: UpstreamDescriptor<'static>,
495 pub host: IpAddr,
497 pub port: u16,
499 pub internal_host: Option<IpAddr>,
513 pub internal_port: Option<u16>,
517 #[serde(skip_serializing)]
519 pub tls_port: Option<u16>,
520 #[serde(skip_serializing)]
522 pub tls_identity_path: Option<PathBuf>,
523 #[serde(skip_serializing)]
525 pub tls_identity_password: Option<String>,
526 #[serde(skip_serializing_if = "is_default")]
531 pub override_project_ids: bool,
532}
533
534impl Default for Relay {
535 fn default() -> Self {
536 Relay {
537 mode: RelayMode::Managed,
538 instance: RelayInstance::Default,
539 upstream: "https://sentry.io/".parse().unwrap(),
540 host: default_host(),
541 port: 3000,
542 internal_host: None,
543 internal_port: None,
544 tls_port: None,
545 tls_identity_path: None,
546 tls_identity_password: None,
547 override_project_ids: false,
548 }
549 }
550}
551
552#[derive(Serialize, Deserialize, Debug)]
554#[serde(default)]
555pub struct Metrics {
556 pub statsd: Option<String>,
560 pub prefix: String,
564 pub default_tags: BTreeMap<String, String>,
566 pub hostname_tag: Option<String>,
568 pub sample_rate: f64,
573 pub periodic_secs: u64,
578 pub aggregate: bool,
582 pub allow_high_cardinality_tags: bool,
590}
591
592impl Default for Metrics {
593 fn default() -> Self {
594 Metrics {
595 statsd: None,
596 prefix: "sentry.relay".into(),
597 default_tags: BTreeMap::new(),
598 hostname_tag: None,
599 sample_rate: 1.0,
600 periodic_secs: 5,
601 aggregate: true,
602 allow_high_cardinality_tags: false,
603 }
604 }
605}
606
607#[derive(Serialize, Deserialize, Debug)]
609#[serde(default)]
610pub struct Limits {
611 pub max_concurrent_requests: usize,
614 pub max_concurrent_queries: usize,
619 pub max_event_size: ByteSize,
621 pub max_attachment_size: ByteSize,
623 pub max_upload_size: ByteSize,
625 pub max_attachments_size: ByteSize,
627 pub max_client_reports_size: ByteSize,
629 pub max_check_in_size: ByteSize,
631 pub max_envelope_size: ByteSize,
633 pub max_session_count: usize,
635 pub max_api_payload_size: ByteSize,
637 pub max_api_file_upload_size: ByteSize,
639 pub max_api_chunk_upload_size: ByteSize,
641 pub max_profile_size: ByteSize,
643 pub max_trace_metric_size: ByteSize,
645 pub max_log_size: ByteSize,
647 pub max_span_size: ByteSize,
649 pub max_container_size: ByteSize,
651 pub max_statsd_size: ByteSize,
653 pub max_metric_buckets_size: ByteSize,
655 pub max_replay_compressed_size: ByteSize,
657 #[serde(alias = "max_replay_size")]
659 max_replay_uncompressed_size: ByteSize,
660 pub max_replay_message_size: ByteSize,
662 pub max_removed_attribute_key_size: ByteSize,
672 pub max_thread_count: usize,
677 pub max_pool_concurrency: usize,
684 pub query_timeout: u64,
687 pub shutdown_timeout: u64,
690 pub keepalive_timeout: u64,
694 pub idle_timeout: Option<u64>,
701 pub max_connections: Option<usize>,
707 pub tcp_listen_backlog: u32,
715}
716
717impl Default for Limits {
718 fn default() -> Self {
719 Limits {
720 max_concurrent_requests: 100,
721 max_concurrent_queries: 5,
722 max_event_size: ByteSize::mebibytes(1),
723 max_attachment_size: ByteSize::mebibytes(200),
724 max_upload_size: ByteSize::mebibytes(1024),
725 max_attachments_size: ByteSize::mebibytes(200),
726 max_client_reports_size: ByteSize::kibibytes(4),
727 max_check_in_size: ByteSize::kibibytes(100),
728 max_envelope_size: ByteSize::mebibytes(200),
729 max_session_count: 100,
730 max_api_payload_size: ByteSize::mebibytes(20),
731 max_api_file_upload_size: ByteSize::mebibytes(40),
732 max_api_chunk_upload_size: ByteSize::mebibytes(100),
733 max_profile_size: ByteSize::mebibytes(50),
734 max_trace_metric_size: ByteSize::kibibytes(2),
735 max_log_size: ByteSize::mebibytes(1),
736 max_span_size: ByteSize::mebibytes(1),
737 max_container_size: ByteSize::mebibytes(12),
738 max_statsd_size: ByteSize::mebibytes(1),
739 max_metric_buckets_size: ByteSize::mebibytes(1),
740 max_replay_compressed_size: ByteSize::mebibytes(10),
741 max_replay_uncompressed_size: ByteSize::mebibytes(100),
742 max_replay_message_size: ByteSize::mebibytes(15),
743 max_thread_count: num_cpus::get(),
744 max_pool_concurrency: 1,
745 query_timeout: 30,
746 shutdown_timeout: 10,
747 keepalive_timeout: 5,
748 idle_timeout: None,
749 max_connections: None,
750 tcp_listen_backlog: 1024,
751 max_removed_attribute_key_size: ByteSize::kibibytes(10),
752 }
753 }
754}
755
756#[derive(Debug, Default, Deserialize, Serialize)]
758#[serde(default)]
759pub struct Routing {
760 pub accept_unknown_items: Option<bool>,
770}
771
772#[derive(Clone, Copy, Debug, Default, Deserialize, Serialize)]
774#[serde(rename_all = "lowercase")]
775pub enum HttpEncoding {
776 #[default]
781 Identity,
782 Deflate,
788 Gzip,
795 Br,
797 Zstd,
799}
800
801impl HttpEncoding {
802 pub fn parse(str: &str) -> Self {
804 let str = str.trim();
805 if str.eq_ignore_ascii_case("zstd") {
806 Self::Zstd
807 } else if str.eq_ignore_ascii_case("br") {
808 Self::Br
809 } else if str.eq_ignore_ascii_case("gzip") || str.eq_ignore_ascii_case("x-gzip") {
810 Self::Gzip
811 } else if str.eq_ignore_ascii_case("deflate") {
812 Self::Deflate
813 } else {
814 Self::Identity
815 }
816 }
817
818 pub fn name(&self) -> Option<&'static str> {
822 match self {
823 Self::Identity => None,
824 Self::Deflate => Some("deflate"),
825 Self::Gzip => Some("gzip"),
826 Self::Br => Some("br"),
827 Self::Zstd => Some("zstd"),
828 }
829 }
830}
831
832#[derive(Serialize, Deserialize, Debug)]
834#[serde(default)]
835pub struct Http {
836 pub timeout: u32,
842 pub connection_timeout: u32,
847 pub max_retry_interval: u32,
849 pub host_header: Option<String>,
851 pub auth_interval: Option<u64>,
859 pub outage_grace_period: u64,
865 pub retry_delay: u64,
869 pub project_failure_interval: u64,
874 pub encoding: HttpEncoding,
890 pub global_metrics: bool,
897 pub forward: bool,
901}
902
903impl Default for Http {
904 fn default() -> Self {
905 Http {
906 timeout: 5,
907 connection_timeout: 3,
908 max_retry_interval: 60, host_header: None,
910 auth_interval: Some(600), outage_grace_period: DEFAULT_NETWORK_OUTAGE_GRACE_PERIOD,
912 retry_delay: default_retry_delay(),
913 project_failure_interval: default_project_failure_interval(),
914 encoding: HttpEncoding::Zstd,
915 global_metrics: false,
916 forward: true,
917 }
918 }
919}
920
921fn default_retry_delay() -> u64 {
923 1
924}
925
926fn default_project_failure_interval() -> u64 {
928 90
929}
930
931fn spool_envelopes_max_disk_size() -> ByteSize {
933 ByteSize::mebibytes(500)
934}
935
936fn spool_envelopes_batch_size_bytes() -> ByteSize {
938 ByteSize::kibibytes(10)
939}
940
941fn spool_envelopes_max_envelope_delay_secs() -> u64 {
942 24 * 60 * 60
943}
944
945fn spool_disk_usage_refresh_frequency_ms() -> u64 {
947 100
948}
949
950fn spool_max_backpressure_memory_percent() -> f32 {
952 0.8
953}
954
955fn spool_envelopes_partitions() -> NonZeroU8 {
957 NonZeroU8::new(1).unwrap()
958}
959
960#[derive(Debug, Serialize, Deserialize)]
962pub struct EnvelopeSpool {
963 pub path: Option<PathBuf>,
969 #[serde(default = "spool_envelopes_max_disk_size")]
975 pub max_disk_size: ByteSize,
976 #[serde(default = "spool_envelopes_batch_size_bytes")]
983 pub batch_size_bytes: ByteSize,
984 #[serde(default = "spool_envelopes_max_envelope_delay_secs")]
991 pub max_envelope_delay_secs: u64,
992 #[serde(default = "spool_disk_usage_refresh_frequency_ms")]
997 pub disk_usage_refresh_frequency_ms: u64,
998 #[serde(default = "spool_max_backpressure_memory_percent")]
1028 pub max_backpressure_memory_percent: f32,
1029 #[serde(default = "spool_envelopes_partitions")]
1036 pub partitions: NonZeroU8,
1037}
1038
1039impl Default for EnvelopeSpool {
1040 fn default() -> Self {
1041 Self {
1042 path: None,
1043 max_disk_size: spool_envelopes_max_disk_size(),
1044 batch_size_bytes: spool_envelopes_batch_size_bytes(),
1045 max_envelope_delay_secs: spool_envelopes_max_envelope_delay_secs(),
1046 disk_usage_refresh_frequency_ms: spool_disk_usage_refresh_frequency_ms(),
1047 max_backpressure_memory_percent: spool_max_backpressure_memory_percent(),
1048 partitions: spool_envelopes_partitions(),
1049 }
1050 }
1051}
1052
1053#[derive(Debug, Serialize, Deserialize, Default)]
1055pub struct Spool {
1056 #[serde(default)]
1058 pub envelopes: EnvelopeSpool,
1059}
1060
1061#[derive(Serialize, Deserialize, Debug)]
1063#[serde(default)]
1064pub struct Cache {
1065 pub project_request_full_config: bool,
1067 pub project_expiry: u32,
1069 pub project_grace_period: u32,
1074 pub project_refresh_interval: Option<u32>,
1080 pub relay_expiry: u32,
1082 #[serde(alias = "event_expiry")]
1088 envelope_expiry: u32,
1089 #[serde(alias = "event_buffer_size")]
1091 envelope_buffer_size: u32,
1092 pub miss_expiry: u32,
1094 pub batch_interval: u32,
1096 pub downstream_relays_batch_interval: u32,
1098 pub batch_size: usize,
1102 pub file_interval: u32,
1104 pub global_config_fetch_interval: u32,
1106}
1107
1108impl Default for Cache {
1109 fn default() -> Self {
1110 Cache {
1111 project_request_full_config: false,
1112 project_expiry: 300, project_grace_period: 120, project_refresh_interval: None,
1115 relay_expiry: 3600, envelope_expiry: 600, envelope_buffer_size: 1000,
1118 miss_expiry: 60, batch_interval: 100, downstream_relays_batch_interval: 100, batch_size: 500,
1122 file_interval: 10, global_config_fetch_interval: 10, }
1125 }
1126}
1127
1128fn default_max_secs_in_future() -> u32 {
1129 60 }
1131
1132fn default_max_session_secs_in_past() -> u32 {
1133 5 * 24 * 3600 }
1135
1136fn default_chunk_size() -> ByteSize {
1137 ByteSize::mebibytes(1)
1138}
1139
1140fn default_projectconfig_cache_prefix() -> String {
1141 "relayconfig".to_owned()
1142}
1143
1144#[allow(clippy::unnecessary_wraps)]
1145fn default_max_rate_limit() -> Option<u32> {
1146 Some(300) }
1148
1149#[derive(Serialize, Deserialize, Debug)]
1151pub struct Processing {
1152 pub enabled: bool,
1154 #[serde(default)]
1156 pub geoip_path: Option<PathBuf>,
1157 #[serde(default = "default_max_secs_in_future")]
1159 pub max_secs_in_future: u32,
1160 #[serde(default = "default_max_session_secs_in_past")]
1162 pub max_session_secs_in_past: u32,
1163 pub kafka_config: Vec<KafkaConfigParam>,
1165 #[serde(default)]
1185 pub secondary_kafka_configs: BTreeMap<String, Vec<KafkaConfigParam>>,
1186 #[serde(default)]
1188 pub topics: TopicAssignments,
1189 #[serde(default)]
1191 pub kafka_validate_topics: bool,
1192 #[serde(default)]
1194 pub redis: Option<RedisConfigs>,
1195 #[serde(default = "default_chunk_size")]
1197 pub attachment_chunk_size: ByteSize,
1198 #[serde(default = "default_projectconfig_cache_prefix")]
1200 pub projectconfig_cache_prefix: String,
1201 #[serde(default = "default_max_rate_limit")]
1203 pub max_rate_limit: Option<u32>,
1204 pub quota_cache_ratio: Option<f32>,
1215 pub quota_cache_max: Option<f32>,
1222 #[serde(default)]
1224 pub upload: UploadServiceConfig,
1225}
1226
1227impl Default for Processing {
1228 fn default() -> Self {
1230 Self {
1231 enabled: false,
1232 geoip_path: None,
1233 max_secs_in_future: default_max_secs_in_future(),
1234 max_session_secs_in_past: default_max_session_secs_in_past(),
1235 kafka_config: Vec::new(),
1236 secondary_kafka_configs: BTreeMap::new(),
1237 topics: TopicAssignments::default(),
1238 kafka_validate_topics: false,
1239 redis: None,
1240 attachment_chunk_size: default_chunk_size(),
1241 projectconfig_cache_prefix: default_projectconfig_cache_prefix(),
1242 max_rate_limit: default_max_rate_limit(),
1243 quota_cache_ratio: None,
1244 quota_cache_max: None,
1245 upload: UploadServiceConfig::default(),
1246 }
1247 }
1248}
1249
1250#[derive(Debug, Default, Serialize, Deserialize)]
1252#[serde(default)]
1253pub struct Normalization {
1254 #[serde(default)]
1256 pub level: NormalizationLevel,
1257}
1258
1259#[derive(Copy, Clone, Debug, Default, Serialize, Deserialize, Eq, PartialEq)]
1261#[serde(rename_all = "lowercase")]
1262pub enum NormalizationLevel {
1263 #[default]
1267 Default,
1268 Full,
1273}
1274
1275#[derive(Serialize, Deserialize, Debug)]
1277#[serde(default)]
1278pub struct OutcomeAggregatorConfig {
1279 pub bucket_interval: u64,
1281 pub flush_interval: u64,
1283}
1284
1285impl Default for OutcomeAggregatorConfig {
1286 fn default() -> Self {
1287 Self {
1288 bucket_interval: 60,
1289 flush_interval: 120,
1290 }
1291 }
1292}
1293
1294#[derive(Serialize, Deserialize, Debug)]
1296#[serde(default)]
1297pub struct UploadServiceConfig {
1298 pub objectstore_url: Option<String>,
1303
1304 pub max_concurrent_requests: usize,
1306
1307 pub timeout: u64,
1309}
1310
1311impl Default for UploadServiceConfig {
1312 fn default() -> Self {
1313 Self {
1314 objectstore_url: None,
1315 max_concurrent_requests: 10,
1316 timeout: 60,
1317 }
1318 }
1319}
1320
1321#[derive(Copy, Clone, Debug, PartialEq, Eq)]
1324
1325pub enum EmitOutcomes {
1326 None,
1328 AsClientReports,
1330 AsOutcomes,
1332}
1333
1334impl EmitOutcomes {
1335 pub fn any(&self) -> bool {
1337 !matches!(self, EmitOutcomes::None)
1338 }
1339}
1340
1341impl Serialize for EmitOutcomes {
1342 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1343 where
1344 S: Serializer,
1345 {
1346 match self {
1348 Self::None => serializer.serialize_bool(false),
1349 Self::AsClientReports => serializer.serialize_str("as_client_reports"),
1350 Self::AsOutcomes => serializer.serialize_bool(true),
1351 }
1352 }
1353}
1354
1355struct EmitOutcomesVisitor;
1356
1357impl Visitor<'_> for EmitOutcomesVisitor {
1358 type Value = EmitOutcomes;
1359
1360 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
1361 formatter.write_str("true, false, or 'as_client_reports'")
1362 }
1363
1364 fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E>
1365 where
1366 E: serde::de::Error,
1367 {
1368 Ok(if v {
1369 EmitOutcomes::AsOutcomes
1370 } else {
1371 EmitOutcomes::None
1372 })
1373 }
1374
1375 fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
1376 where
1377 E: serde::de::Error,
1378 {
1379 if v == "as_client_reports" {
1380 Ok(EmitOutcomes::AsClientReports)
1381 } else {
1382 Err(E::invalid_value(Unexpected::Str(v), &"as_client_reports"))
1383 }
1384 }
1385}
1386
1387impl<'de> Deserialize<'de> for EmitOutcomes {
1388 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1389 where
1390 D: Deserializer<'de>,
1391 {
1392 deserializer.deserialize_any(EmitOutcomesVisitor)
1393 }
1394}
1395
1396#[derive(Serialize, Deserialize, Debug)]
1398#[serde(default)]
1399pub struct Outcomes {
1400 pub emit_outcomes: EmitOutcomes,
1404 pub emit_client_outcomes: bool,
1406 pub batch_size: usize,
1409 pub batch_interval: u64,
1412 pub source: Option<String>,
1415 pub aggregator: OutcomeAggregatorConfig,
1417}
1418
1419impl Default for Outcomes {
1420 fn default() -> Self {
1421 Outcomes {
1422 emit_outcomes: EmitOutcomes::AsClientReports,
1423 emit_client_outcomes: true,
1424 batch_size: 1000,
1425 batch_interval: 500,
1426 source: None,
1427 aggregator: OutcomeAggregatorConfig::default(),
1428 }
1429 }
1430}
1431
1432#[derive(Serialize, Deserialize, Debug, Default)]
1434pub struct MinimalConfig {
1435 pub relay: Relay,
1437}
1438
1439impl MinimalConfig {
1440 pub fn save_in_folder<P: AsRef<Path>>(&self, p: P) -> anyhow::Result<()> {
1442 let path = p.as_ref();
1443 if fs::metadata(path).is_err() {
1444 fs::create_dir_all(path)
1445 .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotOpenFile, path))?;
1446 }
1447 self.save(path)
1448 }
1449}
1450
1451impl ConfigObject for MinimalConfig {
1452 fn format() -> ConfigFormat {
1453 ConfigFormat::Yaml
1454 }
1455
1456 fn name() -> &'static str {
1457 "config"
1458 }
1459}
1460
1461mod config_relay_info {
1463 use serde::ser::SerializeMap;
1464
1465 use super::*;
1466
1467 #[derive(Debug, Serialize, Deserialize, Clone)]
1469 struct RelayInfoConfig {
1470 public_key: PublicKey,
1471 #[serde(default)]
1472 internal: bool,
1473 }
1474
1475 impl From<RelayInfoConfig> for RelayInfo {
1476 fn from(v: RelayInfoConfig) -> Self {
1477 RelayInfo {
1478 public_key: v.public_key,
1479 internal: v.internal,
1480 }
1481 }
1482 }
1483
1484 impl From<RelayInfo> for RelayInfoConfig {
1485 fn from(v: RelayInfo) -> Self {
1486 RelayInfoConfig {
1487 public_key: v.public_key,
1488 internal: v.internal,
1489 }
1490 }
1491 }
1492
1493 pub(super) fn deserialize<'de, D>(des: D) -> Result<HashMap<RelayId, RelayInfo>, D::Error>
1494 where
1495 D: Deserializer<'de>,
1496 {
1497 let map = HashMap::<RelayId, RelayInfoConfig>::deserialize(des)?;
1498 Ok(map.into_iter().map(|(k, v)| (k, v.into())).collect())
1499 }
1500
1501 pub(super) fn serialize<S>(elm: &HashMap<RelayId, RelayInfo>, ser: S) -> Result<S::Ok, S::Error>
1502 where
1503 S: Serializer,
1504 {
1505 let mut map = ser.serialize_map(Some(elm.len()))?;
1506
1507 for (k, v) in elm {
1508 map.serialize_entry(k, &RelayInfoConfig::from(v.clone()))?;
1509 }
1510
1511 map.end()
1512 }
1513}
1514
1515#[derive(Serialize, Deserialize, Debug, Default)]
1517pub struct AuthConfig {
1518 #[serde(default, skip_serializing_if = "is_default")]
1520 pub ready: ReadinessCondition,
1521
1522 #[serde(default, with = "config_relay_info")]
1524 pub static_relays: HashMap<RelayId, RelayInfo>,
1525
1526 #[serde(default = "default_max_age")]
1530 pub signature_max_age: u64,
1531}
1532
1533fn default_max_age() -> u64 {
1534 300
1535}
1536
1537#[derive(Serialize, Deserialize, Debug, Default)]
1539pub struct GeoIpConfig {
1540 pub path: Option<PathBuf>,
1542}
1543
1544#[derive(Serialize, Deserialize, Debug)]
1546#[serde(default)]
1547pub struct CardinalityLimiter {
1548 pub cache_vacuum_interval: u64,
1554}
1555
1556impl Default for CardinalityLimiter {
1557 fn default() -> Self {
1558 Self {
1559 cache_vacuum_interval: 180,
1560 }
1561 }
1562}
1563
1564#[derive(Serialize, Deserialize, Debug)]
1569#[serde(default)]
1570pub struct Health {
1571 pub refresh_interval_ms: u64,
1578 pub max_memory_bytes: Option<ByteSize>,
1583 pub max_memory_percent: f32,
1587 pub probe_timeout_ms: u64,
1594 pub memory_stat_refresh_frequency_ms: u64,
1600}
1601
1602impl Default for Health {
1603 fn default() -> Self {
1604 Self {
1605 refresh_interval_ms: 3000,
1606 max_memory_bytes: None,
1607 max_memory_percent: 0.95,
1608 probe_timeout_ms: 900,
1609 memory_stat_refresh_frequency_ms: 100,
1610 }
1611 }
1612}
1613
1614#[derive(Serialize, Deserialize, Debug)]
1616#[serde(default)]
1617pub struct Cogs {
1618 pub max_queue_size: u64,
1624 pub relay_resource_id: String,
1630}
1631
1632impl Default for Cogs {
1633 fn default() -> Self {
1634 Self {
1635 max_queue_size: 10_000,
1636 relay_resource_id: "relay_service".to_owned(),
1637 }
1638 }
1639}
1640
1641#[derive(Serialize, Deserialize, Debug, Default)]
1642struct ConfigValues {
1643 #[serde(default)]
1644 relay: Relay,
1645 #[serde(default)]
1646 http: Http,
1647 #[serde(default)]
1648 cache: Cache,
1649 #[serde(default)]
1650 spool: Spool,
1651 #[serde(default)]
1652 limits: Limits,
1653 #[serde(default)]
1654 logging: relay_log::LogConfig,
1655 #[serde(default)]
1656 routing: Routing,
1657 #[serde(default)]
1658 metrics: Metrics,
1659 #[serde(default)]
1660 sentry: relay_log::SentryConfig,
1661 #[serde(default)]
1662 processing: Processing,
1663 #[serde(default)]
1664 outcomes: Outcomes,
1665 #[serde(default)]
1666 aggregator: AggregatorServiceConfig,
1667 #[serde(default)]
1668 secondary_aggregators: Vec<ScopedAggregatorConfig>,
1669 #[serde(default)]
1670 auth: AuthConfig,
1671 #[serde(default)]
1672 geoip: GeoIpConfig,
1673 #[serde(default)]
1674 normalization: Normalization,
1675 #[serde(default)]
1676 cardinality_limiter: CardinalityLimiter,
1677 #[serde(default)]
1678 health: Health,
1679 #[serde(default)]
1680 cogs: Cogs,
1681}
1682
1683impl ConfigObject for ConfigValues {
1684 fn format() -> ConfigFormat {
1685 ConfigFormat::Yaml
1686 }
1687
1688 fn name() -> &'static str {
1689 "config"
1690 }
1691}
1692
1693pub struct Config {
1695 values: ConfigValues,
1696 credentials: Option<Credentials>,
1697 path: PathBuf,
1698}
1699
1700impl fmt::Debug for Config {
1701 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1702 f.debug_struct("Config")
1703 .field("path", &self.path)
1704 .field("values", &self.values)
1705 .finish()
1706 }
1707}
1708
1709impl Config {
1710 pub fn from_path<P: AsRef<Path>>(path: P) -> anyhow::Result<Config> {
1712 let path = env::current_dir()
1713 .map(|x| x.join(path.as_ref()))
1714 .unwrap_or_else(|_| path.as_ref().to_path_buf());
1715
1716 let config = Config {
1717 values: ConfigValues::load(&path)?,
1718 credentials: if Credentials::path(&path).exists() {
1719 Some(Credentials::load(&path)?)
1720 } else {
1721 None
1722 },
1723 path: path.clone(),
1724 };
1725
1726 if cfg!(not(feature = "processing")) && config.processing_enabled() {
1727 return Err(ConfigError::file(ConfigErrorKind::ProcessingNotAvailable, &path).into());
1728 }
1729
1730 Ok(config)
1731 }
1732
1733 pub fn from_json_value(value: serde_json::Value) -> anyhow::Result<Config> {
1737 Ok(Config {
1738 values: serde_json::from_value(value)
1739 .with_context(|| ConfigError::new(ConfigErrorKind::BadJson))?,
1740 credentials: None,
1741 path: PathBuf::new(),
1742 })
1743 }
1744
1745 pub fn apply_override(
1748 &mut self,
1749 mut overrides: OverridableConfig,
1750 ) -> anyhow::Result<&mut Self> {
1751 let relay = &mut self.values.relay;
1752
1753 if let Some(mode) = overrides.mode {
1754 relay.mode = mode
1755 .parse::<RelayMode>()
1756 .with_context(|| ConfigError::field("mode"))?;
1757 }
1758
1759 if let Some(deployment) = overrides.instance {
1760 relay.instance = deployment
1761 .parse::<RelayInstance>()
1762 .with_context(|| ConfigError::field("deployment"))?;
1763 }
1764
1765 if let Some(log_level) = overrides.log_level {
1766 self.values.logging.level = log_level.parse()?;
1767 }
1768
1769 if let Some(log_format) = overrides.log_format {
1770 self.values.logging.format = log_format.parse()?;
1771 }
1772
1773 if let Some(upstream) = overrides.upstream {
1774 relay.upstream = upstream
1775 .parse::<UpstreamDescriptor>()
1776 .with_context(|| ConfigError::field("upstream"))?;
1777 } else if let Some(upstream_dsn) = overrides.upstream_dsn {
1778 relay.upstream = upstream_dsn
1779 .parse::<Dsn>()
1780 .map(|dsn| UpstreamDescriptor::from_dsn(&dsn).into_owned())
1781 .with_context(|| ConfigError::field("upstream_dsn"))?;
1782 }
1783
1784 if let Some(host) = overrides.host {
1785 relay.host = host
1786 .parse::<IpAddr>()
1787 .with_context(|| ConfigError::field("host"))?;
1788 }
1789
1790 if let Some(port) = overrides.port {
1791 relay.port = port
1792 .as_str()
1793 .parse()
1794 .with_context(|| ConfigError::field("port"))?;
1795 }
1796
1797 let processing = &mut self.values.processing;
1798 if let Some(enabled) = overrides.processing {
1799 match enabled.to_lowercase().as_str() {
1800 "true" | "1" => processing.enabled = true,
1801 "false" | "0" | "" => processing.enabled = false,
1802 _ => return Err(ConfigError::field("processing").into()),
1803 }
1804 }
1805
1806 if let Some(redis) = overrides.redis_url {
1807 processing.redis = Some(RedisConfigs::Unified(RedisConfig::single(redis)))
1808 }
1809
1810 if let Some(kafka_url) = overrides.kafka_url {
1811 let existing = processing
1812 .kafka_config
1813 .iter_mut()
1814 .find(|e| e.name == "bootstrap.servers");
1815
1816 if let Some(config_param) = existing {
1817 config_param.value = kafka_url;
1818 } else {
1819 processing.kafka_config.push(KafkaConfigParam {
1820 name: "bootstrap.servers".to_owned(),
1821 value: kafka_url,
1822 })
1823 }
1824 }
1825 let id = if let Some(id) = overrides.id {
1827 let id = Uuid::parse_str(&id).with_context(|| ConfigError::field("id"))?;
1828 Some(id)
1829 } else {
1830 None
1831 };
1832 let public_key = if let Some(public_key) = overrides.public_key {
1833 let public_key = public_key
1834 .parse::<PublicKey>()
1835 .with_context(|| ConfigError::field("public_key"))?;
1836 Some(public_key)
1837 } else {
1838 None
1839 };
1840
1841 let secret_key = if let Some(secret_key) = overrides.secret_key {
1842 let secret_key = secret_key
1843 .parse::<SecretKey>()
1844 .with_context(|| ConfigError::field("secret_key"))?;
1845 Some(secret_key)
1846 } else {
1847 None
1848 };
1849 let outcomes = &mut self.values.outcomes;
1850 if overrides.outcome_source.is_some() {
1851 outcomes.source = overrides.outcome_source.take();
1852 }
1853
1854 if let Some(credentials) = &mut self.credentials {
1855 if let Some(id) = id {
1857 credentials.id = id;
1858 }
1859 if let Some(public_key) = public_key {
1860 credentials.public_key = public_key;
1861 }
1862 if let Some(secret_key) = secret_key {
1863 credentials.secret_key = secret_key
1864 }
1865 } else {
1866 match (id, public_key, secret_key) {
1868 (Some(id), Some(public_key), Some(secret_key)) => {
1869 self.credentials = Some(Credentials {
1870 secret_key,
1871 public_key,
1872 id,
1873 })
1874 }
1875 (None, None, None) => {
1876 }
1879 _ => {
1880 return Err(ConfigError::field("incomplete credentials").into());
1881 }
1882 }
1883 }
1884
1885 let limits = &mut self.values.limits;
1886 if let Some(shutdown_timeout) = overrides.shutdown_timeout
1887 && let Ok(shutdown_timeout) = shutdown_timeout.parse::<u64>()
1888 {
1889 limits.shutdown_timeout = shutdown_timeout;
1890 }
1891
1892 if let Some(server_name) = overrides.server_name {
1893 self.values.sentry.server_name = Some(server_name.into());
1894 }
1895
1896 Ok(self)
1897 }
1898
1899 pub fn config_exists<P: AsRef<Path>>(path: P) -> bool {
1901 fs::metadata(ConfigValues::path(path.as_ref())).is_ok()
1902 }
1903
1904 pub fn path(&self) -> &Path {
1906 &self.path
1907 }
1908
1909 pub fn to_yaml_string(&self) -> anyhow::Result<String> {
1911 serde_yaml::to_string(&self.values)
1912 .with_context(|| ConfigError::new(ConfigErrorKind::CouldNotWriteFile))
1913 }
1914
1915 pub fn regenerate_credentials(&mut self, save: bool) -> anyhow::Result<()> {
1919 let creds = Credentials::generate();
1920 if save {
1921 creds.save(&self.path)?;
1922 }
1923 self.credentials = Some(creds);
1924 Ok(())
1925 }
1926
1927 pub fn credentials(&self) -> Option<&Credentials> {
1929 self.credentials.as_ref()
1930 }
1931
1932 pub fn replace_credentials(
1936 &mut self,
1937 credentials: Option<Credentials>,
1938 ) -> anyhow::Result<bool> {
1939 if self.credentials == credentials {
1940 return Ok(false);
1941 }
1942
1943 match credentials {
1944 Some(ref creds) => {
1945 creds.save(&self.path)?;
1946 }
1947 None => {
1948 let path = Credentials::path(&self.path);
1949 if fs::metadata(&path).is_ok() {
1950 fs::remove_file(&path).with_context(|| {
1951 ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path)
1952 })?;
1953 }
1954 }
1955 }
1956
1957 self.credentials = credentials;
1958 Ok(true)
1959 }
1960
1961 pub fn has_credentials(&self) -> bool {
1963 self.credentials.is_some()
1964 }
1965
1966 pub fn secret_key(&self) -> Option<&SecretKey> {
1968 self.credentials.as_ref().map(|x| &x.secret_key)
1969 }
1970
1971 pub fn public_key(&self) -> Option<&PublicKey> {
1973 self.credentials.as_ref().map(|x| &x.public_key)
1974 }
1975
1976 pub fn relay_id(&self) -> Option<&RelayId> {
1978 self.credentials.as_ref().map(|x| &x.id)
1979 }
1980
1981 pub fn relay_mode(&self) -> RelayMode {
1983 self.values.relay.mode
1984 }
1985
1986 pub fn relay_instance(&self) -> RelayInstance {
1988 self.values.relay.instance
1989 }
1990
1991 pub fn upstream_descriptor(&self) -> &UpstreamDescriptor<'_> {
1993 &self.values.relay.upstream
1994 }
1995
1996 pub fn http_host_header(&self) -> Option<&str> {
1998 self.values.http.host_header.as_deref()
1999 }
2000
2001 pub fn listen_addr(&self) -> SocketAddr {
2003 (self.values.relay.host, self.values.relay.port).into()
2004 }
2005
2006 pub fn listen_addr_internal(&self) -> Option<SocketAddr> {
2014 match (
2015 self.values.relay.internal_host,
2016 self.values.relay.internal_port,
2017 ) {
2018 (Some(host), None) => Some((host, self.values.relay.port).into()),
2019 (None, Some(port)) => Some((self.values.relay.host, port).into()),
2020 (Some(host), Some(port)) => Some((host, port).into()),
2021 (None, None) => None,
2022 }
2023 }
2024
2025 pub fn tls_listen_addr(&self) -> Option<SocketAddr> {
2027 if self.values.relay.tls_identity_path.is_some() {
2028 let port = self.values.relay.tls_port.unwrap_or(3443);
2029 Some((self.values.relay.host, port).into())
2030 } else {
2031 None
2032 }
2033 }
2034
2035 pub fn tls_identity_path(&self) -> Option<&Path> {
2037 self.values.relay.tls_identity_path.as_deref()
2038 }
2039
2040 pub fn tls_identity_password(&self) -> Option<&str> {
2042 self.values.relay.tls_identity_password.as_deref()
2043 }
2044
2045 pub fn override_project_ids(&self) -> bool {
2049 self.values.relay.override_project_ids
2050 }
2051
2052 pub fn requires_auth(&self) -> bool {
2056 match self.values.auth.ready {
2057 ReadinessCondition::Authenticated => self.relay_mode() == RelayMode::Managed,
2058 ReadinessCondition::Always => false,
2059 }
2060 }
2061
2062 pub fn http_auth_interval(&self) -> Option<Duration> {
2066 if self.processing_enabled() {
2067 return None;
2068 }
2069
2070 match self.values.http.auth_interval {
2071 None | Some(0) => None,
2072 Some(secs) => Some(Duration::from_secs(secs)),
2073 }
2074 }
2075
2076 pub fn http_outage_grace_period(&self) -> Duration {
2079 Duration::from_secs(self.values.http.outage_grace_period)
2080 }
2081
2082 pub fn http_retry_delay(&self) -> Duration {
2087 Duration::from_secs(self.values.http.retry_delay)
2088 }
2089
2090 pub fn http_project_failure_interval(&self) -> Duration {
2092 Duration::from_secs(self.values.http.project_failure_interval)
2093 }
2094
2095 pub fn http_encoding(&self) -> HttpEncoding {
2097 self.values.http.encoding
2098 }
2099
2100 pub fn http_global_metrics(&self) -> bool {
2102 self.values.http.global_metrics
2103 }
2104
2105 pub fn http_forward(&self) -> bool {
2107 self.values.http.forward
2108 }
2109
2110 pub fn emit_outcomes(&self) -> EmitOutcomes {
2115 if self.processing_enabled() {
2116 return EmitOutcomes::AsOutcomes;
2117 }
2118 self.values.outcomes.emit_outcomes
2119 }
2120
2121 pub fn emit_client_outcomes(&self) -> bool {
2131 self.values.outcomes.emit_client_outcomes
2132 }
2133
2134 pub fn outcome_batch_size(&self) -> usize {
2136 self.values.outcomes.batch_size
2137 }
2138
2139 pub fn outcome_batch_interval(&self) -> Duration {
2141 Duration::from_millis(self.values.outcomes.batch_interval)
2142 }
2143
2144 pub fn outcome_source(&self) -> Option<&str> {
2146 self.values.outcomes.source.as_deref()
2147 }
2148
2149 pub fn outcome_aggregator(&self) -> &OutcomeAggregatorConfig {
2151 &self.values.outcomes.aggregator
2152 }
2153
2154 pub fn logging(&self) -> &relay_log::LogConfig {
2156 &self.values.logging
2157 }
2158
2159 pub fn sentry(&self) -> &relay_log::SentryConfig {
2161 &self.values.sentry
2162 }
2163
2164 pub fn statsd_addrs(&self) -> anyhow::Result<Vec<SocketAddr>> {
2168 if let Some(ref addr) = self.values.metrics.statsd {
2169 let addrs = addr
2170 .as_str()
2171 .to_socket_addrs()
2172 .with_context(|| ConfigError::file(ConfigErrorKind::InvalidValue, &self.path))?
2173 .collect();
2174 Ok(addrs)
2175 } else {
2176 Ok(vec![])
2177 }
2178 }
2179
2180 pub fn metrics_prefix(&self) -> &str {
2182 &self.values.metrics.prefix
2183 }
2184
2185 pub fn metrics_default_tags(&self) -> &BTreeMap<String, String> {
2187 &self.values.metrics.default_tags
2188 }
2189
2190 pub fn metrics_hostname_tag(&self) -> Option<&str> {
2192 self.values.metrics.hostname_tag.as_deref()
2193 }
2194
2195 pub fn metrics_sample_rate(&self) -> f64 {
2197 self.values.metrics.sample_rate
2198 }
2199
2200 pub fn metrics_aggregate(&self) -> bool {
2202 self.values.metrics.aggregate
2203 }
2204
2205 pub fn metrics_allow_high_cardinality_tags(&self) -> bool {
2207 self.values.metrics.allow_high_cardinality_tags
2208 }
2209
2210 pub fn metrics_periodic_interval(&self) -> Option<Duration> {
2214 match self.values.metrics.periodic_secs {
2215 0 => None,
2216 secs => Some(Duration::from_secs(secs)),
2217 }
2218 }
2219
2220 pub fn http_timeout(&self) -> Duration {
2222 Duration::from_secs(self.values.http.timeout.into())
2223 }
2224
2225 pub fn http_connection_timeout(&self) -> Duration {
2227 Duration::from_secs(self.values.http.connection_timeout.into())
2228 }
2229
2230 pub fn http_max_retry_interval(&self) -> Duration {
2232 Duration::from_secs(self.values.http.max_retry_interval.into())
2233 }
2234
2235 pub fn project_cache_expiry(&self) -> Duration {
2237 Duration::from_secs(self.values.cache.project_expiry.into())
2238 }
2239
2240 pub fn request_full_project_config(&self) -> bool {
2242 self.values.cache.project_request_full_config
2243 }
2244
2245 pub fn relay_cache_expiry(&self) -> Duration {
2247 Duration::from_secs(self.values.cache.relay_expiry.into())
2248 }
2249
2250 pub fn envelope_buffer_size(&self) -> usize {
2252 self.values
2253 .cache
2254 .envelope_buffer_size
2255 .try_into()
2256 .unwrap_or(usize::MAX)
2257 }
2258
2259 pub fn cache_miss_expiry(&self) -> Duration {
2261 Duration::from_secs(self.values.cache.miss_expiry.into())
2262 }
2263
2264 pub fn project_grace_period(&self) -> Duration {
2266 Duration::from_secs(self.values.cache.project_grace_period.into())
2267 }
2268
2269 pub fn project_refresh_interval(&self) -> Option<Duration> {
2273 self.values
2274 .cache
2275 .project_refresh_interval
2276 .map(Into::into)
2277 .map(Duration::from_secs)
2278 }
2279
2280 pub fn query_batch_interval(&self) -> Duration {
2283 Duration::from_millis(self.values.cache.batch_interval.into())
2284 }
2285
2286 pub fn downstream_relays_batch_interval(&self) -> Duration {
2288 Duration::from_millis(self.values.cache.downstream_relays_batch_interval.into())
2289 }
2290
2291 pub fn local_cache_interval(&self) -> Duration {
2293 Duration::from_secs(self.values.cache.file_interval.into())
2294 }
2295
2296 pub fn global_config_fetch_interval(&self) -> Duration {
2299 Duration::from_secs(self.values.cache.global_config_fetch_interval.into())
2300 }
2301
2302 pub fn spool_envelopes_path(&self, partition_id: u8) -> Option<PathBuf> {
2307 let mut path = self
2308 .values
2309 .spool
2310 .envelopes
2311 .path
2312 .as_ref()
2313 .map(|path| path.to_owned())?;
2314
2315 if partition_id == 0 {
2316 return Some(path);
2317 }
2318
2319 let file_name = path.file_name().and_then(|f| f.to_str())?;
2320 let new_file_name = format!("{file_name}.{partition_id}");
2321 path.set_file_name(new_file_name);
2322
2323 Some(path)
2324 }
2325
2326 pub fn spool_envelopes_max_disk_size(&self) -> usize {
2328 self.values.spool.envelopes.max_disk_size.as_bytes()
2329 }
2330
2331 pub fn spool_envelopes_batch_size_bytes(&self) -> usize {
2334 self.values.spool.envelopes.batch_size_bytes.as_bytes()
2335 }
2336
2337 pub fn spool_envelopes_max_age(&self) -> Duration {
2339 Duration::from_secs(self.values.spool.envelopes.max_envelope_delay_secs)
2340 }
2341
2342 pub fn spool_disk_usage_refresh_frequency_ms(&self) -> Duration {
2344 Duration::from_millis(self.values.spool.envelopes.disk_usage_refresh_frequency_ms)
2345 }
2346
2347 pub fn spool_max_backpressure_memory_percent(&self) -> f32 {
2349 self.values.spool.envelopes.max_backpressure_memory_percent
2350 }
2351
2352 pub fn spool_partitions(&self) -> NonZeroU8 {
2354 self.values.spool.envelopes.partitions
2355 }
2356
2357 pub fn max_event_size(&self) -> usize {
2359 self.values.limits.max_event_size.as_bytes()
2360 }
2361
2362 pub fn max_attachment_size(&self) -> usize {
2364 self.values.limits.max_attachment_size.as_bytes()
2365 }
2366
2367 pub fn max_upload_size(&self) -> usize {
2369 self.values.limits.max_upload_size.as_bytes()
2370 }
2371
2372 pub fn max_attachments_size(&self) -> usize {
2375 self.values.limits.max_attachments_size.as_bytes()
2376 }
2377
2378 pub fn max_client_reports_size(&self) -> usize {
2380 self.values.limits.max_client_reports_size.as_bytes()
2381 }
2382
2383 pub fn max_check_in_size(&self) -> usize {
2385 self.values.limits.max_check_in_size.as_bytes()
2386 }
2387
2388 pub fn max_log_size(&self) -> usize {
2390 self.values.limits.max_log_size.as_bytes()
2391 }
2392
2393 pub fn max_span_size(&self) -> usize {
2395 self.values.limits.max_span_size.as_bytes()
2396 }
2397
2398 pub fn max_container_size(&self) -> usize {
2400 self.values.limits.max_container_size.as_bytes()
2401 }
2402
2403 pub fn max_logs_integration_size(&self) -> usize {
2405 self.max_container_size()
2407 }
2408
2409 pub fn max_spans_integration_size(&self) -> usize {
2411 self.max_container_size()
2413 }
2414
2415 pub fn max_envelope_size(&self) -> usize {
2419 self.values.limits.max_envelope_size.as_bytes()
2420 }
2421
2422 pub fn max_session_count(&self) -> usize {
2424 self.values.limits.max_session_count
2425 }
2426
2427 pub fn max_statsd_size(&self) -> usize {
2429 self.values.limits.max_statsd_size.as_bytes()
2430 }
2431
2432 pub fn max_metric_buckets_size(&self) -> usize {
2434 self.values.limits.max_metric_buckets_size.as_bytes()
2435 }
2436
2437 pub fn max_api_payload_size(&self) -> usize {
2439 self.values.limits.max_api_payload_size.as_bytes()
2440 }
2441
2442 pub fn max_api_file_upload_size(&self) -> usize {
2444 self.values.limits.max_api_file_upload_size.as_bytes()
2445 }
2446
2447 pub fn max_api_chunk_upload_size(&self) -> usize {
2449 self.values.limits.max_api_chunk_upload_size.as_bytes()
2450 }
2451
2452 pub fn max_profile_size(&self) -> usize {
2454 self.values.limits.max_profile_size.as_bytes()
2455 }
2456
2457 pub fn max_trace_metric_size(&self) -> usize {
2459 self.values.limits.max_trace_metric_size.as_bytes()
2460 }
2461
2462 pub fn max_replay_compressed_size(&self) -> usize {
2464 self.values.limits.max_replay_compressed_size.as_bytes()
2465 }
2466
2467 pub fn max_replay_uncompressed_size(&self) -> usize {
2469 self.values.limits.max_replay_uncompressed_size.as_bytes()
2470 }
2471
2472 pub fn max_replay_message_size(&self) -> usize {
2478 self.values.limits.max_replay_message_size.as_bytes()
2479 }
2480
2481 pub fn max_concurrent_requests(&self) -> usize {
2483 self.values.limits.max_concurrent_requests
2484 }
2485
2486 pub fn max_concurrent_queries(&self) -> usize {
2488 self.values.limits.max_concurrent_queries
2489 }
2490
2491 pub fn max_removed_attribute_key_size(&self) -> usize {
2493 self.values.limits.max_removed_attribute_key_size.as_bytes()
2494 }
2495
2496 pub fn query_timeout(&self) -> Duration {
2498 Duration::from_secs(self.values.limits.query_timeout)
2499 }
2500
2501 pub fn shutdown_timeout(&self) -> Duration {
2504 Duration::from_secs(self.values.limits.shutdown_timeout)
2505 }
2506
2507 pub fn keepalive_timeout(&self) -> Duration {
2511 Duration::from_secs(self.values.limits.keepalive_timeout)
2512 }
2513
2514 pub fn idle_timeout(&self) -> Option<Duration> {
2516 self.values.limits.idle_timeout.map(Duration::from_secs)
2517 }
2518
2519 pub fn max_connections(&self) -> Option<usize> {
2521 self.values.limits.max_connections
2522 }
2523
2524 pub fn tcp_listen_backlog(&self) -> u32 {
2526 self.values.limits.tcp_listen_backlog
2527 }
2528
2529 pub fn cpu_concurrency(&self) -> usize {
2531 self.values.limits.max_thread_count
2532 }
2533
2534 pub fn pool_concurrency(&self) -> usize {
2536 self.values.limits.max_pool_concurrency
2537 }
2538
2539 pub fn query_batch_size(&self) -> usize {
2541 self.values.cache.batch_size
2542 }
2543
2544 pub fn project_configs_path(&self) -> PathBuf {
2546 self.path.join("projects")
2547 }
2548
2549 pub fn processing_enabled(&self) -> bool {
2551 self.values.processing.enabled
2552 }
2553
2554 pub fn normalization_level(&self) -> NormalizationLevel {
2556 self.values.normalization.level
2557 }
2558
2559 pub fn geoip_path(&self) -> Option<&Path> {
2561 self.values
2562 .geoip
2563 .path
2564 .as_deref()
2565 .or(self.values.processing.geoip_path.as_deref())
2566 }
2567
2568 pub fn max_secs_in_future(&self) -> i64 {
2572 self.values.processing.max_secs_in_future.into()
2573 }
2574
2575 pub fn max_session_secs_in_past(&self) -> i64 {
2577 self.values.processing.max_session_secs_in_past.into()
2578 }
2579
2580 pub fn kafka_configs(
2582 &self,
2583 topic: KafkaTopic,
2584 ) -> Result<KafkaTopicConfig<'_>, KafkaConfigError> {
2585 self.values.processing.topics.get(topic).kafka_configs(
2586 &self.values.processing.kafka_config,
2587 &self.values.processing.secondary_kafka_configs,
2588 )
2589 }
2590
2591 pub fn kafka_validate_topics(&self) -> bool {
2593 self.values.processing.kafka_validate_topics
2594 }
2595
2596 pub fn unused_topic_assignments(&self) -> &relay_kafka::Unused {
2598 &self.values.processing.topics.unused
2599 }
2600
2601 pub fn upload(&self) -> &UploadServiceConfig {
2603 &self.values.processing.upload
2604 }
2605
2606 pub fn redis(&self) -> Option<RedisConfigsRef<'_>> {
2609 let redis_configs = self.values.processing.redis.as_ref()?;
2610
2611 Some(build_redis_configs(
2612 redis_configs,
2613 self.cpu_concurrency() as u32,
2614 self.pool_concurrency() as u32,
2615 ))
2616 }
2617
2618 pub fn attachment_chunk_size(&self) -> usize {
2620 self.values.processing.attachment_chunk_size.as_bytes()
2621 }
2622
2623 pub fn metrics_max_batch_size_bytes(&self) -> usize {
2625 self.values.aggregator.max_flush_bytes
2626 }
2627
2628 pub fn projectconfig_cache_prefix(&self) -> &str {
2631 &self.values.processing.projectconfig_cache_prefix
2632 }
2633
2634 pub fn max_rate_limit(&self) -> Option<u64> {
2636 self.values.processing.max_rate_limit.map(u32::into)
2637 }
2638
2639 pub fn quota_cache_ratio(&self) -> Option<f32> {
2641 self.values.processing.quota_cache_ratio
2642 }
2643
2644 pub fn quota_cache_max(&self) -> Option<f32> {
2646 self.values.processing.quota_cache_max
2647 }
2648
2649 pub fn cardinality_limiter_cache_vacuum_interval(&self) -> Duration {
2653 Duration::from_secs(self.values.cardinality_limiter.cache_vacuum_interval)
2654 }
2655
2656 pub fn health_refresh_interval(&self) -> Duration {
2658 Duration::from_millis(self.values.health.refresh_interval_ms)
2659 }
2660
2661 pub fn health_max_memory_watermark_bytes(&self) -> u64 {
2663 self.values
2664 .health
2665 .max_memory_bytes
2666 .as_ref()
2667 .map_or(u64::MAX, |b| b.as_bytes() as u64)
2668 }
2669
2670 pub fn health_max_memory_watermark_percent(&self) -> f32 {
2672 self.values.health.max_memory_percent
2673 }
2674
2675 pub fn health_probe_timeout(&self) -> Duration {
2677 Duration::from_millis(self.values.health.probe_timeout_ms)
2678 }
2679
2680 pub fn memory_stat_refresh_frequency_ms(&self) -> u64 {
2682 self.values.health.memory_stat_refresh_frequency_ms
2683 }
2684
2685 pub fn cogs_max_queue_size(&self) -> u64 {
2687 self.values.cogs.max_queue_size
2688 }
2689
2690 pub fn cogs_relay_resource_id(&self) -> &str {
2692 &self.values.cogs.relay_resource_id
2693 }
2694
2695 pub fn default_aggregator_config(&self) -> &AggregatorServiceConfig {
2697 &self.values.aggregator
2698 }
2699
2700 pub fn secondary_aggregator_configs(&self) -> &Vec<ScopedAggregatorConfig> {
2702 &self.values.secondary_aggregators
2703 }
2704
2705 pub fn aggregator_config_for(&self, namespace: MetricNamespace) -> &AggregatorServiceConfig {
2707 for entry in &self.values.secondary_aggregators {
2708 if entry.condition.matches(Some(namespace)) {
2709 return &entry.config;
2710 }
2711 }
2712 &self.values.aggregator
2713 }
2714
2715 pub fn static_relays(&self) -> &HashMap<RelayId, RelayInfo> {
2717 &self.values.auth.static_relays
2718 }
2719
2720 pub fn signature_max_age(&self) -> Duration {
2722 Duration::from_secs(self.values.auth.signature_max_age)
2723 }
2724
2725 pub fn accept_unknown_items(&self) -> bool {
2727 let forward = self.values.routing.accept_unknown_items;
2728 forward.unwrap_or_else(|| !self.processing_enabled())
2729 }
2730}
2731
2732impl Default for Config {
2733 fn default() -> Self {
2734 Self {
2735 values: ConfigValues::default(),
2736 credentials: None,
2737 path: PathBuf::new(),
2738 }
2739 }
2740}
2741
2742#[cfg(test)]
2743mod tests {
2744
2745 use super::*;
2746
2747 #[test]
2749 fn test_event_buffer_size() {
2750 let yaml = r###"
2751cache:
2752 event_buffer_size: 1000000
2753 event_expiry: 1800
2754"###;
2755
2756 let values: ConfigValues = serde_yaml::from_str(yaml).unwrap();
2757 assert_eq!(values.cache.envelope_buffer_size, 1_000_000);
2758 assert_eq!(values.cache.envelope_expiry, 1800);
2759 }
2760
2761 #[test]
2762 fn test_emit_outcomes() {
2763 for (serialized, deserialized) in &[
2764 ("true", EmitOutcomes::AsOutcomes),
2765 ("false", EmitOutcomes::None),
2766 ("\"as_client_reports\"", EmitOutcomes::AsClientReports),
2767 ] {
2768 let value: EmitOutcomes = serde_json::from_str(serialized).unwrap();
2769 assert_eq!(value, *deserialized);
2770 assert_eq!(serde_json::to_string(&value).unwrap(), *serialized);
2771 }
2772 }
2773
2774 #[test]
2775 fn test_emit_outcomes_invalid() {
2776 assert!(serde_json::from_str::<EmitOutcomes>("asdf").is_err());
2777 }
2778}