1use std::collections::{BTreeMap, HashMap};
2use std::error::Error;
3use std::io::Write;
4use std::net::{IpAddr, SocketAddr, ToSocketAddrs};
5use std::num::NonZeroU8;
6use std::path::{Path, PathBuf};
7use std::str::FromStr;
8use std::time::Duration;
9use std::{env, fmt, fs, io};
10
11use anyhow::Context;
12use relay_auth::{PublicKey, RelayId, SecretKey, generate_key_pair, generate_relay_id};
13use relay_common::Dsn;
14use relay_kafka::{
15 ConfigError as KafkaConfigError, KafkaConfigParam, KafkaTopic, KafkaTopicConfig,
16 TopicAssignments,
17};
18use relay_metrics::MetricNamespace;
19use serde::de::{DeserializeOwned, Unexpected, Visitor};
20use serde::{Deserialize, Deserializer, Serialize, Serializer};
21use uuid::Uuid;
22
23use crate::aggregator::{AggregatorServiceConfig, ScopedAggregatorConfig};
24use crate::byte_size::ByteSize;
25use crate::upstream::UpstreamDescriptor;
26use crate::{RedisConfig, RedisConfigs, RedisConfigsRef, build_redis_configs};
27
28const DEFAULT_NETWORK_OUTAGE_GRACE_PERIOD: u64 = 10;
29
30static CONFIG_YAML_HEADER: &str = r###"# Please see the relevant documentation.
31# Performance tuning: https://docs.sentry.io/product/relay/operating-guidelines/
32# All config options: https://docs.sentry.io/product/relay/options/
33"###;
34
35#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
37#[non_exhaustive]
38pub enum ConfigErrorKind {
39 CouldNotOpenFile,
41 CouldNotWriteFile,
43 BadYaml,
45 BadJson,
47 InvalidValue,
49 ProcessingNotAvailable,
52}
53
54impl fmt::Display for ConfigErrorKind {
55 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
56 match self {
57 Self::CouldNotOpenFile => write!(f, "could not open config file"),
58 Self::CouldNotWriteFile => write!(f, "could not write config file"),
59 Self::BadYaml => write!(f, "could not parse yaml config file"),
60 Self::BadJson => write!(f, "could not parse json config file"),
61 Self::InvalidValue => write!(f, "invalid config value"),
62 Self::ProcessingNotAvailable => write!(
63 f,
64 "was not compiled with processing, cannot enable processing"
65 ),
66 }
67 }
68}
69
70#[derive(Debug, Default)]
72enum ConfigErrorSource {
73 #[default]
75 None,
76 File(PathBuf),
78 FieldOverride(String),
80}
81
82impl fmt::Display for ConfigErrorSource {
83 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84 match self {
85 ConfigErrorSource::None => Ok(()),
86 ConfigErrorSource::File(file_name) => {
87 write!(f, " (file {})", file_name.display())
88 }
89 ConfigErrorSource::FieldOverride(name) => write!(f, " (field {name})"),
90 }
91 }
92}
93
94#[derive(Debug)]
96pub struct ConfigError {
97 source: ConfigErrorSource,
98 kind: ConfigErrorKind,
99}
100
101impl ConfigError {
102 #[inline]
103 fn new(kind: ConfigErrorKind) -> Self {
104 Self {
105 source: ConfigErrorSource::None,
106 kind,
107 }
108 }
109
110 #[inline]
111 fn field(field: &'static str) -> Self {
112 Self {
113 source: ConfigErrorSource::FieldOverride(field.to_owned()),
114 kind: ConfigErrorKind::InvalidValue,
115 }
116 }
117
118 #[inline]
119 fn file(kind: ConfigErrorKind, p: impl AsRef<Path>) -> Self {
120 Self {
121 source: ConfigErrorSource::File(p.as_ref().to_path_buf()),
122 kind,
123 }
124 }
125
126 pub fn kind(&self) -> ConfigErrorKind {
128 self.kind
129 }
130}
131
132impl fmt::Display for ConfigError {
133 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
134 write!(f, "{}{}", self.kind(), self.source)
135 }
136}
137
138impl Error for ConfigError {}
139
140enum ConfigFormat {
141 Yaml,
142 Json,
143}
144
145impl ConfigFormat {
146 pub fn extension(&self) -> &'static str {
147 match self {
148 ConfigFormat::Yaml => "yml",
149 ConfigFormat::Json => "json",
150 }
151 }
152}
153
154trait ConfigObject: DeserializeOwned + Serialize {
155 fn format() -> ConfigFormat;
157
158 fn name() -> &'static str;
160
161 fn path(base: &Path) -> PathBuf {
163 base.join(format!("{}.{}", Self::name(), Self::format().extension()))
164 }
165
166 fn load(base: &Path) -> anyhow::Result<Self> {
168 let path = Self::path(base);
169
170 let f = fs::File::open(&path)
171 .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotOpenFile, &path))?;
172 let f = io::BufReader::new(f);
173
174 let mut source = serde_vars::EnvSource::default();
175 match Self::format() {
176 ConfigFormat::Yaml => {
177 serde_vars::deserialize(serde_yaml::Deserializer::from_reader(f), &mut source)
178 .with_context(|| ConfigError::file(ConfigErrorKind::BadYaml, &path))
179 }
180 ConfigFormat::Json => {
181 serde_vars::deserialize(&mut serde_json::Deserializer::from_reader(f), &mut source)
182 .with_context(|| ConfigError::file(ConfigErrorKind::BadJson, &path))
183 }
184 }
185 }
186
187 fn save(&self, base: &Path) -> anyhow::Result<()> {
189 let path = Self::path(base);
190 let mut options = fs::OpenOptions::new();
191 options.write(true).truncate(true).create(true);
192
193 #[cfg(unix)]
195 {
196 use std::os::unix::fs::OpenOptionsExt;
197 options.mode(0o600);
198 }
199
200 let mut f = options
201 .open(&path)
202 .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path))?;
203
204 match Self::format() {
205 ConfigFormat::Yaml => {
206 f.write_all(CONFIG_YAML_HEADER.as_bytes())?;
207 serde_yaml::to_writer(&mut f, self)
208 .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path))?
209 }
210 ConfigFormat::Json => serde_json::to_writer_pretty(&mut f, self)
211 .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path))?,
212 }
213
214 f.write_all(b"\n").ok();
215
216 Ok(())
217 }
218}
219
220#[derive(Debug, Default)]
223pub struct OverridableConfig {
224 pub mode: Option<String>,
226 pub instance: Option<String>,
228 pub log_level: Option<String>,
230 pub log_format: Option<String>,
232 pub upstream: Option<String>,
234 pub upstream_dsn: Option<String>,
236 pub host: Option<String>,
238 pub port: Option<String>,
240 pub processing: Option<String>,
242 pub kafka_url: Option<String>,
244 pub redis_url: Option<String>,
246 pub id: Option<String>,
248 pub secret_key: Option<String>,
250 pub public_key: Option<String>,
252 pub outcome_source: Option<String>,
254 pub shutdown_timeout: Option<String>,
256 pub server_name: Option<String>,
258}
259
260#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
262pub struct Credentials {
263 pub secret_key: SecretKey,
265 pub public_key: PublicKey,
267 pub id: RelayId,
269}
270
271impl Credentials {
272 pub fn generate() -> Self {
274 relay_log::info!("generating new relay credentials");
275 let (sk, pk) = generate_key_pair();
276 Self {
277 secret_key: sk,
278 public_key: pk,
279 id: generate_relay_id(),
280 }
281 }
282
283 pub fn to_json_string(&self) -> anyhow::Result<String> {
285 serde_json::to_string(self)
286 .with_context(|| ConfigError::new(ConfigErrorKind::CouldNotWriteFile))
287 }
288}
289
290impl ConfigObject for Credentials {
291 fn format() -> ConfigFormat {
292 ConfigFormat::Json
293 }
294 fn name() -> &'static str {
295 "credentials"
296 }
297}
298
299#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
301#[serde(rename_all = "camelCase")]
302pub struct RelayInfo {
303 pub public_key: PublicKey,
305
306 #[serde(default)]
308 pub internal: bool,
309}
310
311impl RelayInfo {
312 pub fn new(public_key: PublicKey) -> Self {
314 Self {
315 public_key,
316 internal: false,
317 }
318 }
319}
320
321#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
323#[serde(rename_all = "camelCase")]
324pub enum RelayMode {
325 Proxy,
331
332 Managed,
338}
339
340impl<'de> Deserialize<'de> for RelayMode {
341 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
342 where
343 D: Deserializer<'de>,
344 {
345 let s = String::deserialize(deserializer)?;
346 match s.as_str() {
347 "proxy" => Ok(RelayMode::Proxy),
348 "managed" => Ok(RelayMode::Managed),
349 "static" => Err(serde::de::Error::custom(
350 "Relay mode 'static' has been removed. Please use 'managed' or 'proxy' instead.",
351 )),
352 other => Err(serde::de::Error::unknown_variant(
353 other,
354 &["proxy", "managed"],
355 )),
356 }
357 }
358}
359
360impl fmt::Display for RelayMode {
361 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
362 match self {
363 RelayMode::Proxy => write!(f, "proxy"),
364 RelayMode::Managed => write!(f, "managed"),
365 }
366 }
367}
368
369#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize)]
371#[serde(rename_all = "camelCase")]
372pub enum RelayInstance {
373 Default,
375
376 Canary,
378}
379
380impl RelayInstance {
381 pub fn is_canary(&self) -> bool {
383 matches!(self, RelayInstance::Canary)
384 }
385}
386
387impl fmt::Display for RelayInstance {
388 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
389 match self {
390 RelayInstance::Default => write!(f, "default"),
391 RelayInstance::Canary => write!(f, "canary"),
392 }
393 }
394}
395
396impl FromStr for RelayInstance {
397 type Err = fmt::Error;
398
399 fn from_str(s: &str) -> Result<Self, Self::Err> {
400 match s {
401 "canary" => Ok(RelayInstance::Canary),
402 _ => Ok(RelayInstance::Default),
403 }
404 }
405}
406
407#[derive(Clone, Copy, Debug, Eq, PartialEq)]
409pub struct ParseRelayModeError;
410
411impl fmt::Display for ParseRelayModeError {
412 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
413 write!(f, "Relay mode must be one of: managed or proxy")
414 }
415}
416
417impl Error for ParseRelayModeError {}
418
419impl FromStr for RelayMode {
420 type Err = ParseRelayModeError;
421
422 fn from_str(s: &str) -> Result<Self, Self::Err> {
423 match s {
424 "proxy" => Ok(RelayMode::Proxy),
425 "managed" => Ok(RelayMode::Managed),
426 _ => Err(ParseRelayModeError),
427 }
428 }
429}
430
431fn is_default<T: Default + PartialEq>(t: &T) -> bool {
433 *t == T::default()
434}
435
436fn is_docker() -> bool {
438 if fs::metadata("/.dockerenv").is_ok() {
439 return true;
440 }
441
442 fs::read_to_string("/proc/self/cgroup").is_ok_and(|s| s.contains("/docker"))
443}
444
445fn default_host() -> IpAddr {
447 if is_docker() {
448 "0.0.0.0".parse().unwrap()
450 } else {
451 "127.0.0.1".parse().unwrap()
452 }
453}
454
455#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize)]
459#[serde(rename_all = "lowercase")]
460#[derive(Default)]
461pub enum ReadinessCondition {
462 #[default]
471 Authenticated,
472 Always,
474}
475
476#[derive(Serialize, Deserialize, Debug)]
478#[serde(default)]
479pub struct Relay {
480 pub mode: RelayMode,
482 pub instance: RelayInstance,
484 pub upstream: UpstreamDescriptor<'static>,
486 pub host: IpAddr,
488 pub port: u16,
490 #[serde(skip_serializing)]
492 pub tls_port: Option<u16>,
493 #[serde(skip_serializing)]
495 pub tls_identity_path: Option<PathBuf>,
496 #[serde(skip_serializing)]
498 pub tls_identity_password: Option<String>,
499 #[serde(skip_serializing_if = "is_default")]
504 pub override_project_ids: bool,
505}
506
507impl Default for Relay {
508 fn default() -> Self {
509 Relay {
510 mode: RelayMode::Managed,
511 instance: RelayInstance::Default,
512 upstream: "https://sentry.io/".parse().unwrap(),
513 host: default_host(),
514 port: 3000,
515 tls_port: None,
516 tls_identity_path: None,
517 tls_identity_password: None,
518 override_project_ids: false,
519 }
520 }
521}
522
523#[derive(Serialize, Deserialize, Debug)]
525#[serde(default)]
526pub struct Metrics {
527 pub statsd: Option<String>,
531 pub prefix: String,
535 pub default_tags: BTreeMap<String, String>,
537 pub hostname_tag: Option<String>,
539 pub sample_rate: f32,
544 pub periodic_secs: u64,
549 pub aggregate: bool,
553 pub allow_high_cardinality_tags: bool,
561}
562
563impl Default for Metrics {
564 fn default() -> Self {
565 Metrics {
566 statsd: None,
567 prefix: "sentry.relay".into(),
568 default_tags: BTreeMap::new(),
569 hostname_tag: None,
570 sample_rate: 1.0,
571 periodic_secs: 5,
572 aggregate: true,
573 allow_high_cardinality_tags: false,
574 }
575 }
576}
577
578#[derive(Serialize, Deserialize, Debug)]
580#[serde(default)]
581pub struct Limits {
582 pub max_concurrent_requests: usize,
585 pub max_concurrent_queries: usize,
590 pub max_event_size: ByteSize,
592 pub max_attachment_size: ByteSize,
594 pub max_attachments_size: ByteSize,
596 pub max_client_reports_size: ByteSize,
598 pub max_check_in_size: ByteSize,
600 pub max_envelope_size: ByteSize,
602 pub max_session_count: usize,
604 pub max_span_count: usize,
606 pub max_log_count: usize,
608 pub max_trace_metric_count: usize,
610 pub max_api_payload_size: ByteSize,
612 pub max_api_file_upload_size: ByteSize,
614 pub max_api_chunk_upload_size: ByteSize,
616 pub max_profile_size: ByteSize,
618 pub max_trace_metric_size: ByteSize,
620 pub max_log_size: ByteSize,
622 pub max_span_size: ByteSize,
624 pub max_container_size: ByteSize,
626 pub max_statsd_size: ByteSize,
628 pub max_metric_buckets_size: ByteSize,
630 pub max_replay_compressed_size: ByteSize,
632 #[serde(alias = "max_replay_size")]
634 max_replay_uncompressed_size: ByteSize,
635 pub max_replay_message_size: ByteSize,
637 pub max_thread_count: usize,
642 pub max_pool_concurrency: usize,
649 pub query_timeout: u64,
652 pub shutdown_timeout: u64,
655 pub keepalive_timeout: u64,
659 pub idle_timeout: Option<u64>,
666 pub max_connections: Option<usize>,
672 pub tcp_listen_backlog: u32,
680}
681
682impl Default for Limits {
683 fn default() -> Self {
684 Limits {
685 max_concurrent_requests: 100,
686 max_concurrent_queries: 5,
687 max_event_size: ByteSize::mebibytes(1),
688 max_attachment_size: ByteSize::mebibytes(100),
689 max_attachments_size: ByteSize::mebibytes(100),
690 max_client_reports_size: ByteSize::kibibytes(4),
691 max_check_in_size: ByteSize::kibibytes(100),
692 max_envelope_size: ByteSize::mebibytes(100),
693 max_session_count: 100,
694 max_span_count: 1000,
695 max_log_count: 1000,
696 max_trace_metric_count: 1000,
697 max_api_payload_size: ByteSize::mebibytes(20),
698 max_api_file_upload_size: ByteSize::mebibytes(40),
699 max_api_chunk_upload_size: ByteSize::mebibytes(100),
700 max_profile_size: ByteSize::mebibytes(50),
701 max_trace_metric_size: ByteSize::kibibytes(2),
702 max_log_size: ByteSize::mebibytes(1),
703 max_span_size: ByteSize::mebibytes(1),
704 max_container_size: ByteSize::mebibytes(3),
705 max_statsd_size: ByteSize::mebibytes(1),
706 max_metric_buckets_size: ByteSize::mebibytes(1),
707 max_replay_compressed_size: ByteSize::mebibytes(10),
708 max_replay_uncompressed_size: ByteSize::mebibytes(100),
709 max_replay_message_size: ByteSize::mebibytes(15),
710 max_thread_count: num_cpus::get(),
711 max_pool_concurrency: 1,
712 query_timeout: 30,
713 shutdown_timeout: 10,
714 keepalive_timeout: 5,
715 idle_timeout: None,
716 max_connections: None,
717 tcp_listen_backlog: 1024,
718 }
719 }
720}
721
722#[derive(Debug, Default, Deserialize, Serialize)]
724#[serde(default)]
725pub struct Routing {
726 pub accept_unknown_items: Option<bool>,
736}
737
738#[derive(Clone, Copy, Debug, Default, Deserialize, Serialize)]
740#[serde(rename_all = "lowercase")]
741pub enum HttpEncoding {
742 #[default]
747 Identity,
748 Deflate,
754 Gzip,
761 Br,
763 Zstd,
765}
766
767impl HttpEncoding {
768 pub fn parse(str: &str) -> Self {
770 let str = str.trim();
771 if str.eq_ignore_ascii_case("zstd") {
772 Self::Zstd
773 } else if str.eq_ignore_ascii_case("br") {
774 Self::Br
775 } else if str.eq_ignore_ascii_case("gzip") || str.eq_ignore_ascii_case("x-gzip") {
776 Self::Gzip
777 } else if str.eq_ignore_ascii_case("deflate") {
778 Self::Deflate
779 } else {
780 Self::Identity
781 }
782 }
783
784 pub fn name(&self) -> Option<&'static str> {
788 match self {
789 Self::Identity => None,
790 Self::Deflate => Some("deflate"),
791 Self::Gzip => Some("gzip"),
792 Self::Br => Some("br"),
793 Self::Zstd => Some("zstd"),
794 }
795 }
796}
797
798#[derive(Serialize, Deserialize, Debug)]
800#[serde(default)]
801pub struct Http {
802 pub timeout: u32,
808 pub connection_timeout: u32,
813 pub max_retry_interval: u32,
815 pub host_header: Option<String>,
817 pub auth_interval: Option<u64>,
825 pub outage_grace_period: u64,
831 pub retry_delay: u64,
835 pub project_failure_interval: u64,
840 pub encoding: HttpEncoding,
856 pub global_metrics: bool,
863}
864
865impl Default for Http {
866 fn default() -> Self {
867 Http {
868 timeout: 5,
869 connection_timeout: 3,
870 max_retry_interval: 60, host_header: None,
872 auth_interval: Some(600), outage_grace_period: DEFAULT_NETWORK_OUTAGE_GRACE_PERIOD,
874 retry_delay: default_retry_delay(),
875 project_failure_interval: default_project_failure_interval(),
876 encoding: HttpEncoding::Zstd,
877 global_metrics: false,
878 }
879 }
880}
881
882fn default_retry_delay() -> u64 {
884 1
885}
886
887fn default_project_failure_interval() -> u64 {
889 90
890}
891
892fn spool_envelopes_max_disk_size() -> ByteSize {
894 ByteSize::mebibytes(500)
895}
896
897fn spool_envelopes_batch_size_bytes() -> ByteSize {
899 ByteSize::kibibytes(10)
900}
901
902fn spool_envelopes_max_envelope_delay_secs() -> u64 {
903 24 * 60 * 60
904}
905
906fn spool_disk_usage_refresh_frequency_ms() -> u64 {
908 100
909}
910
911fn spool_max_backpressure_envelopes() -> usize {
913 500
914}
915
916fn spool_max_backpressure_memory_percent() -> f32 {
918 0.9
919}
920
921fn spool_envelopes_partitions() -> NonZeroU8 {
923 NonZeroU8::new(1).unwrap()
924}
925
926#[derive(Debug, Serialize, Deserialize)]
928pub struct EnvelopeSpool {
929 pub path: Option<PathBuf>,
935 #[serde(default = "spool_envelopes_max_disk_size")]
941 pub max_disk_size: ByteSize,
942 #[serde(default = "spool_envelopes_batch_size_bytes")]
949 pub batch_size_bytes: ByteSize,
950 #[serde(default = "spool_envelopes_max_envelope_delay_secs")]
957 pub max_envelope_delay_secs: u64,
958 #[serde(default = "spool_disk_usage_refresh_frequency_ms")]
963 pub disk_usage_refresh_frequency_ms: u64,
964 #[serde(default = "spool_max_backpressure_envelopes")]
968 pub max_backpressure_envelopes: usize,
969 #[serde(default = "spool_max_backpressure_memory_percent")]
999 pub max_backpressure_memory_percent: f32,
1000 #[serde(default = "spool_envelopes_partitions")]
1007 pub partitions: NonZeroU8,
1008}
1009
1010impl Default for EnvelopeSpool {
1011 fn default() -> Self {
1012 Self {
1013 path: None,
1014 max_disk_size: spool_envelopes_max_disk_size(),
1015 batch_size_bytes: spool_envelopes_batch_size_bytes(),
1016 max_envelope_delay_secs: spool_envelopes_max_envelope_delay_secs(),
1017 disk_usage_refresh_frequency_ms: spool_disk_usage_refresh_frequency_ms(),
1018 max_backpressure_envelopes: spool_max_backpressure_envelopes(),
1019 max_backpressure_memory_percent: spool_max_backpressure_memory_percent(),
1020 partitions: spool_envelopes_partitions(),
1021 }
1022 }
1023}
1024
1025#[derive(Debug, Serialize, Deserialize, Default)]
1027pub struct Spool {
1028 #[serde(default)]
1030 pub envelopes: EnvelopeSpool,
1031}
1032
1033#[derive(Serialize, Deserialize, Debug)]
1035#[serde(default)]
1036pub struct Cache {
1037 pub project_request_full_config: bool,
1039 pub project_expiry: u32,
1041 pub project_grace_period: u32,
1046 pub project_refresh_interval: Option<u32>,
1052 pub relay_expiry: u32,
1054 #[serde(alias = "event_expiry")]
1060 envelope_expiry: u32,
1061 #[serde(alias = "event_buffer_size")]
1063 envelope_buffer_size: u32,
1064 pub miss_expiry: u32,
1066 pub batch_interval: u32,
1068 pub downstream_relays_batch_interval: u32,
1070 pub batch_size: usize,
1074 pub file_interval: u32,
1076 pub global_config_fetch_interval: u32,
1078}
1079
1080impl Default for Cache {
1081 fn default() -> Self {
1082 Cache {
1083 project_request_full_config: false,
1084 project_expiry: 300, project_grace_period: 120, project_refresh_interval: None,
1087 relay_expiry: 3600, envelope_expiry: 600, envelope_buffer_size: 1000,
1090 miss_expiry: 60, batch_interval: 100, downstream_relays_batch_interval: 100, batch_size: 500,
1094 file_interval: 10, global_config_fetch_interval: 10, }
1097 }
1098}
1099
1100fn default_max_secs_in_future() -> u32 {
1101 60 }
1103
1104fn default_max_session_secs_in_past() -> u32 {
1105 5 * 24 * 3600 }
1107
1108fn default_chunk_size() -> ByteSize {
1109 ByteSize::mebibytes(1)
1110}
1111
1112fn default_projectconfig_cache_prefix() -> String {
1113 "relayconfig".to_owned()
1114}
1115
1116#[allow(clippy::unnecessary_wraps)]
1117fn default_max_rate_limit() -> Option<u32> {
1118 Some(300) }
1120
1121#[derive(Serialize, Deserialize, Debug)]
1123pub struct Processing {
1124 pub enabled: bool,
1126 #[serde(default)]
1128 pub geoip_path: Option<PathBuf>,
1129 #[serde(default = "default_max_secs_in_future")]
1131 pub max_secs_in_future: u32,
1132 #[serde(default = "default_max_session_secs_in_past")]
1134 pub max_session_secs_in_past: u32,
1135 pub kafka_config: Vec<KafkaConfigParam>,
1137 #[serde(default)]
1157 pub secondary_kafka_configs: BTreeMap<String, Vec<KafkaConfigParam>>,
1158 #[serde(default)]
1160 pub topics: TopicAssignments,
1161 #[serde(default)]
1163 pub kafka_validate_topics: bool,
1164 #[serde(default)]
1166 pub redis: Option<RedisConfigs>,
1167 #[serde(default = "default_chunk_size")]
1169 pub attachment_chunk_size: ByteSize,
1170 #[serde(default = "default_projectconfig_cache_prefix")]
1172 pub projectconfig_cache_prefix: String,
1173 #[serde(default = "default_max_rate_limit")]
1175 pub max_rate_limit: Option<u32>,
1176}
1177
1178impl Default for Processing {
1179 fn default() -> Self {
1181 Self {
1182 enabled: false,
1183 geoip_path: None,
1184 max_secs_in_future: default_max_secs_in_future(),
1185 max_session_secs_in_past: default_max_session_secs_in_past(),
1186 kafka_config: Vec::new(),
1187 secondary_kafka_configs: BTreeMap::new(),
1188 topics: TopicAssignments::default(),
1189 kafka_validate_topics: false,
1190 redis: None,
1191 attachment_chunk_size: default_chunk_size(),
1192 projectconfig_cache_prefix: default_projectconfig_cache_prefix(),
1193 max_rate_limit: default_max_rate_limit(),
1194 }
1195 }
1196}
1197
1198#[derive(Debug, Default, Serialize, Deserialize)]
1200#[serde(default)]
1201pub struct Normalization {
1202 #[serde(default)]
1204 pub level: NormalizationLevel,
1205}
1206
1207#[derive(Copy, Clone, Debug, Default, Serialize, Deserialize, Eq, PartialEq)]
1209#[serde(rename_all = "lowercase")]
1210pub enum NormalizationLevel {
1211 #[default]
1215 Default,
1216 Full,
1221}
1222
1223#[derive(Serialize, Deserialize, Debug)]
1225#[serde(default)]
1226pub struct OutcomeAggregatorConfig {
1227 pub bucket_interval: u64,
1229 pub flush_interval: u64,
1231}
1232
1233impl Default for OutcomeAggregatorConfig {
1234 fn default() -> Self {
1235 Self {
1236 bucket_interval: 60,
1237 flush_interval: 120,
1238 }
1239 }
1240}
1241
1242#[derive(Copy, Clone, Debug, PartialEq, Eq)]
1245
1246pub enum EmitOutcomes {
1247 None,
1249 AsClientReports,
1251 AsOutcomes,
1253}
1254
1255impl EmitOutcomes {
1256 pub fn any(&self) -> bool {
1258 !matches!(self, EmitOutcomes::None)
1259 }
1260}
1261
1262impl Serialize for EmitOutcomes {
1263 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1264 where
1265 S: Serializer,
1266 {
1267 match self {
1269 Self::None => serializer.serialize_bool(false),
1270 Self::AsClientReports => serializer.serialize_str("as_client_reports"),
1271 Self::AsOutcomes => serializer.serialize_bool(true),
1272 }
1273 }
1274}
1275
1276struct EmitOutcomesVisitor;
1277
1278impl Visitor<'_> for EmitOutcomesVisitor {
1279 type Value = EmitOutcomes;
1280
1281 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
1282 formatter.write_str("true, false, or 'as_client_reports'")
1283 }
1284
1285 fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E>
1286 where
1287 E: serde::de::Error,
1288 {
1289 Ok(if v {
1290 EmitOutcomes::AsOutcomes
1291 } else {
1292 EmitOutcomes::None
1293 })
1294 }
1295
1296 fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
1297 where
1298 E: serde::de::Error,
1299 {
1300 if v == "as_client_reports" {
1301 Ok(EmitOutcomes::AsClientReports)
1302 } else {
1303 Err(E::invalid_value(Unexpected::Str(v), &"as_client_reports"))
1304 }
1305 }
1306}
1307
1308impl<'de> Deserialize<'de> for EmitOutcomes {
1309 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1310 where
1311 D: Deserializer<'de>,
1312 {
1313 deserializer.deserialize_any(EmitOutcomesVisitor)
1314 }
1315}
1316
1317#[derive(Serialize, Deserialize, Debug)]
1319#[serde(default)]
1320pub struct Outcomes {
1321 pub emit_outcomes: EmitOutcomes,
1325 pub emit_client_outcomes: bool,
1327 pub batch_size: usize,
1330 pub batch_interval: u64,
1333 pub source: Option<String>,
1336 pub aggregator: OutcomeAggregatorConfig,
1338}
1339
1340impl Default for Outcomes {
1341 fn default() -> Self {
1342 Outcomes {
1343 emit_outcomes: EmitOutcomes::AsClientReports,
1344 emit_client_outcomes: true,
1345 batch_size: 1000,
1346 batch_interval: 500,
1347 source: None,
1348 aggregator: OutcomeAggregatorConfig::default(),
1349 }
1350 }
1351}
1352
1353#[derive(Serialize, Deserialize, Debug, Default)]
1355pub struct MinimalConfig {
1356 pub relay: Relay,
1358}
1359
1360impl MinimalConfig {
1361 pub fn save_in_folder<P: AsRef<Path>>(&self, p: P) -> anyhow::Result<()> {
1363 let path = p.as_ref();
1364 if fs::metadata(path).is_err() {
1365 fs::create_dir_all(path)
1366 .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotOpenFile, path))?;
1367 }
1368 self.save(path)
1369 }
1370}
1371
1372impl ConfigObject for MinimalConfig {
1373 fn format() -> ConfigFormat {
1374 ConfigFormat::Yaml
1375 }
1376
1377 fn name() -> &'static str {
1378 "config"
1379 }
1380}
1381
1382mod config_relay_info {
1384 use serde::ser::SerializeMap;
1385
1386 use super::*;
1387
1388 #[derive(Debug, Serialize, Deserialize, Clone)]
1390 struct RelayInfoConfig {
1391 public_key: PublicKey,
1392 #[serde(default)]
1393 internal: bool,
1394 }
1395
1396 impl From<RelayInfoConfig> for RelayInfo {
1397 fn from(v: RelayInfoConfig) -> Self {
1398 RelayInfo {
1399 public_key: v.public_key,
1400 internal: v.internal,
1401 }
1402 }
1403 }
1404
1405 impl From<RelayInfo> for RelayInfoConfig {
1406 fn from(v: RelayInfo) -> Self {
1407 RelayInfoConfig {
1408 public_key: v.public_key,
1409 internal: v.internal,
1410 }
1411 }
1412 }
1413
1414 pub(super) fn deserialize<'de, D>(des: D) -> Result<HashMap<RelayId, RelayInfo>, D::Error>
1415 where
1416 D: Deserializer<'de>,
1417 {
1418 let map = HashMap::<RelayId, RelayInfoConfig>::deserialize(des)?;
1419 Ok(map.into_iter().map(|(k, v)| (k, v.into())).collect())
1420 }
1421
1422 pub(super) fn serialize<S>(elm: &HashMap<RelayId, RelayInfo>, ser: S) -> Result<S::Ok, S::Error>
1423 where
1424 S: Serializer,
1425 {
1426 let mut map = ser.serialize_map(Some(elm.len()))?;
1427
1428 for (k, v) in elm {
1429 map.serialize_entry(k, &RelayInfoConfig::from(v.clone()))?;
1430 }
1431
1432 map.end()
1433 }
1434}
1435
1436#[derive(Serialize, Deserialize, Debug, Default)]
1438pub struct AuthConfig {
1439 #[serde(default, skip_serializing_if = "is_default")]
1441 pub ready: ReadinessCondition,
1442
1443 #[serde(default, with = "config_relay_info")]
1445 pub static_relays: HashMap<RelayId, RelayInfo>,
1446
1447 #[serde(default = "default_max_age")]
1451 pub signature_max_age: u64,
1452}
1453
1454fn default_max_age() -> u64 {
1455 300
1456}
1457
1458#[derive(Serialize, Deserialize, Debug, Default)]
1460pub struct GeoIpConfig {
1461 pub path: Option<PathBuf>,
1463}
1464
1465#[derive(Serialize, Deserialize, Debug)]
1467#[serde(default)]
1468pub struct CardinalityLimiter {
1469 pub cache_vacuum_interval: u64,
1475}
1476
1477impl Default for CardinalityLimiter {
1478 fn default() -> Self {
1479 Self {
1480 cache_vacuum_interval: 180,
1481 }
1482 }
1483}
1484
1485#[derive(Serialize, Deserialize, Debug)]
1490#[serde(default)]
1491pub struct Health {
1492 pub refresh_interval_ms: u64,
1499 pub max_memory_bytes: Option<ByteSize>,
1504 pub max_memory_percent: f32,
1508 pub probe_timeout_ms: u64,
1515 pub memory_stat_refresh_frequency_ms: u64,
1521}
1522
1523impl Default for Health {
1524 fn default() -> Self {
1525 Self {
1526 refresh_interval_ms: 3000,
1527 max_memory_bytes: None,
1528 max_memory_percent: 0.95,
1529 probe_timeout_ms: 900,
1530 memory_stat_refresh_frequency_ms: 100,
1531 }
1532 }
1533}
1534
1535#[derive(Serialize, Deserialize, Debug)]
1537#[serde(default)]
1538pub struct Cogs {
1539 pub max_queue_size: u64,
1545 pub relay_resource_id: String,
1551}
1552
1553impl Default for Cogs {
1554 fn default() -> Self {
1555 Self {
1556 max_queue_size: 10_000,
1557 relay_resource_id: "relay_service".to_owned(),
1558 }
1559 }
1560}
1561
1562#[derive(Serialize, Deserialize, Debug, Default)]
1563struct ConfigValues {
1564 #[serde(default)]
1565 relay: Relay,
1566 #[serde(default)]
1567 http: Http,
1568 #[serde(default)]
1569 cache: Cache,
1570 #[serde(default)]
1571 spool: Spool,
1572 #[serde(default)]
1573 limits: Limits,
1574 #[serde(default)]
1575 logging: relay_log::LogConfig,
1576 #[serde(default)]
1577 routing: Routing,
1578 #[serde(default)]
1579 metrics: Metrics,
1580 #[serde(default)]
1581 sentry: relay_log::SentryConfig,
1582 #[serde(default)]
1583 processing: Processing,
1584 #[serde(default)]
1585 outcomes: Outcomes,
1586 #[serde(default)]
1587 aggregator: AggregatorServiceConfig,
1588 #[serde(default)]
1589 secondary_aggregators: Vec<ScopedAggregatorConfig>,
1590 #[serde(default)]
1591 auth: AuthConfig,
1592 #[serde(default)]
1593 geoip: GeoIpConfig,
1594 #[serde(default)]
1595 normalization: Normalization,
1596 #[serde(default)]
1597 cardinality_limiter: CardinalityLimiter,
1598 #[serde(default)]
1599 health: Health,
1600 #[serde(default)]
1601 cogs: Cogs,
1602}
1603
1604impl ConfigObject for ConfigValues {
1605 fn format() -> ConfigFormat {
1606 ConfigFormat::Yaml
1607 }
1608
1609 fn name() -> &'static str {
1610 "config"
1611 }
1612}
1613
1614pub struct Config {
1616 values: ConfigValues,
1617 credentials: Option<Credentials>,
1618 path: PathBuf,
1619}
1620
1621impl fmt::Debug for Config {
1622 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1623 f.debug_struct("Config")
1624 .field("path", &self.path)
1625 .field("values", &self.values)
1626 .finish()
1627 }
1628}
1629
1630impl Config {
1631 pub fn from_path<P: AsRef<Path>>(path: P) -> anyhow::Result<Config> {
1633 let path = env::current_dir()
1634 .map(|x| x.join(path.as_ref()))
1635 .unwrap_or_else(|_| path.as_ref().to_path_buf());
1636
1637 let config = Config {
1638 values: ConfigValues::load(&path)?,
1639 credentials: if Credentials::path(&path).exists() {
1640 Some(Credentials::load(&path)?)
1641 } else {
1642 None
1643 },
1644 path: path.clone(),
1645 };
1646
1647 if cfg!(not(feature = "processing")) && config.processing_enabled() {
1648 return Err(ConfigError::file(ConfigErrorKind::ProcessingNotAvailable, &path).into());
1649 }
1650
1651 Ok(config)
1652 }
1653
1654 pub fn from_json_value(value: serde_json::Value) -> anyhow::Result<Config> {
1658 Ok(Config {
1659 values: serde_json::from_value(value)
1660 .with_context(|| ConfigError::new(ConfigErrorKind::BadJson))?,
1661 credentials: None,
1662 path: PathBuf::new(),
1663 })
1664 }
1665
1666 pub fn apply_override(
1669 &mut self,
1670 mut overrides: OverridableConfig,
1671 ) -> anyhow::Result<&mut Self> {
1672 let relay = &mut self.values.relay;
1673
1674 if let Some(mode) = overrides.mode {
1675 relay.mode = mode
1676 .parse::<RelayMode>()
1677 .with_context(|| ConfigError::field("mode"))?;
1678 }
1679
1680 if let Some(deployment) = overrides.instance {
1681 relay.instance = deployment
1682 .parse::<RelayInstance>()
1683 .with_context(|| ConfigError::field("deployment"))?;
1684 }
1685
1686 if let Some(log_level) = overrides.log_level {
1687 self.values.logging.level = log_level.parse()?;
1688 }
1689
1690 if let Some(log_format) = overrides.log_format {
1691 self.values.logging.format = log_format.parse()?;
1692 }
1693
1694 if let Some(upstream) = overrides.upstream {
1695 relay.upstream = upstream
1696 .parse::<UpstreamDescriptor>()
1697 .with_context(|| ConfigError::field("upstream"))?;
1698 } else if let Some(upstream_dsn) = overrides.upstream_dsn {
1699 relay.upstream = upstream_dsn
1700 .parse::<Dsn>()
1701 .map(|dsn| UpstreamDescriptor::from_dsn(&dsn).into_owned())
1702 .with_context(|| ConfigError::field("upstream_dsn"))?;
1703 }
1704
1705 if let Some(host) = overrides.host {
1706 relay.host = host
1707 .parse::<IpAddr>()
1708 .with_context(|| ConfigError::field("host"))?;
1709 }
1710
1711 if let Some(port) = overrides.port {
1712 relay.port = port
1713 .as_str()
1714 .parse()
1715 .with_context(|| ConfigError::field("port"))?;
1716 }
1717
1718 let processing = &mut self.values.processing;
1719 if let Some(enabled) = overrides.processing {
1720 match enabled.to_lowercase().as_str() {
1721 "true" | "1" => processing.enabled = true,
1722 "false" | "0" | "" => processing.enabled = false,
1723 _ => return Err(ConfigError::field("processing").into()),
1724 }
1725 }
1726
1727 if let Some(redis) = overrides.redis_url {
1728 processing.redis = Some(RedisConfigs::Unified(RedisConfig::single(redis)))
1729 }
1730
1731 if let Some(kafka_url) = overrides.kafka_url {
1732 let existing = processing
1733 .kafka_config
1734 .iter_mut()
1735 .find(|e| e.name == "bootstrap.servers");
1736
1737 if let Some(config_param) = existing {
1738 config_param.value = kafka_url;
1739 } else {
1740 processing.kafka_config.push(KafkaConfigParam {
1741 name: "bootstrap.servers".to_owned(),
1742 value: kafka_url,
1743 })
1744 }
1745 }
1746 let id = if let Some(id) = overrides.id {
1748 let id = Uuid::parse_str(&id).with_context(|| ConfigError::field("id"))?;
1749 Some(id)
1750 } else {
1751 None
1752 };
1753 let public_key = if let Some(public_key) = overrides.public_key {
1754 let public_key = public_key
1755 .parse::<PublicKey>()
1756 .with_context(|| ConfigError::field("public_key"))?;
1757 Some(public_key)
1758 } else {
1759 None
1760 };
1761
1762 let secret_key = if let Some(secret_key) = overrides.secret_key {
1763 let secret_key = secret_key
1764 .parse::<SecretKey>()
1765 .with_context(|| ConfigError::field("secret_key"))?;
1766 Some(secret_key)
1767 } else {
1768 None
1769 };
1770 let outcomes = &mut self.values.outcomes;
1771 if overrides.outcome_source.is_some() {
1772 outcomes.source = overrides.outcome_source.take();
1773 }
1774
1775 if let Some(credentials) = &mut self.credentials {
1776 if let Some(id) = id {
1778 credentials.id = id;
1779 }
1780 if let Some(public_key) = public_key {
1781 credentials.public_key = public_key;
1782 }
1783 if let Some(secret_key) = secret_key {
1784 credentials.secret_key = secret_key
1785 }
1786 } else {
1787 match (id, public_key, secret_key) {
1789 (Some(id), Some(public_key), Some(secret_key)) => {
1790 self.credentials = Some(Credentials {
1791 secret_key,
1792 public_key,
1793 id,
1794 })
1795 }
1796 (None, None, None) => {
1797 }
1800 _ => {
1801 return Err(ConfigError::field("incomplete credentials").into());
1802 }
1803 }
1804 }
1805
1806 let limits = &mut self.values.limits;
1807 if let Some(shutdown_timeout) = overrides.shutdown_timeout
1808 && let Ok(shutdown_timeout) = shutdown_timeout.parse::<u64>()
1809 {
1810 limits.shutdown_timeout = shutdown_timeout;
1811 }
1812
1813 if let Some(server_name) = overrides.server_name {
1814 self.values.sentry.server_name = Some(server_name.into());
1815 }
1816
1817 Ok(self)
1818 }
1819
1820 pub fn config_exists<P: AsRef<Path>>(path: P) -> bool {
1822 fs::metadata(ConfigValues::path(path.as_ref())).is_ok()
1823 }
1824
1825 pub fn path(&self) -> &Path {
1827 &self.path
1828 }
1829
1830 pub fn to_yaml_string(&self) -> anyhow::Result<String> {
1832 serde_yaml::to_string(&self.values)
1833 .with_context(|| ConfigError::new(ConfigErrorKind::CouldNotWriteFile))
1834 }
1835
1836 pub fn regenerate_credentials(&mut self, save: bool) -> anyhow::Result<()> {
1840 let creds = Credentials::generate();
1841 if save {
1842 creds.save(&self.path)?;
1843 }
1844 self.credentials = Some(creds);
1845 Ok(())
1846 }
1847
1848 pub fn credentials(&self) -> Option<&Credentials> {
1850 self.credentials.as_ref()
1851 }
1852
1853 pub fn replace_credentials(
1857 &mut self,
1858 credentials: Option<Credentials>,
1859 ) -> anyhow::Result<bool> {
1860 if self.credentials == credentials {
1861 return Ok(false);
1862 }
1863
1864 match credentials {
1865 Some(ref creds) => {
1866 creds.save(&self.path)?;
1867 }
1868 None => {
1869 let path = Credentials::path(&self.path);
1870 if fs::metadata(&path).is_ok() {
1871 fs::remove_file(&path).with_context(|| {
1872 ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path)
1873 })?;
1874 }
1875 }
1876 }
1877
1878 self.credentials = credentials;
1879 Ok(true)
1880 }
1881
1882 pub fn has_credentials(&self) -> bool {
1884 self.credentials.is_some()
1885 }
1886
1887 pub fn secret_key(&self) -> Option<&SecretKey> {
1889 self.credentials.as_ref().map(|x| &x.secret_key)
1890 }
1891
1892 pub fn public_key(&self) -> Option<&PublicKey> {
1894 self.credentials.as_ref().map(|x| &x.public_key)
1895 }
1896
1897 pub fn relay_id(&self) -> Option<&RelayId> {
1899 self.credentials.as_ref().map(|x| &x.id)
1900 }
1901
1902 pub fn relay_mode(&self) -> RelayMode {
1904 self.values.relay.mode
1905 }
1906
1907 pub fn relay_instance(&self) -> RelayInstance {
1909 self.values.relay.instance
1910 }
1911
1912 pub fn upstream_descriptor(&self) -> &UpstreamDescriptor<'_> {
1914 &self.values.relay.upstream
1915 }
1916
1917 pub fn http_host_header(&self) -> Option<&str> {
1919 self.values.http.host_header.as_deref()
1920 }
1921
1922 pub fn listen_addr(&self) -> SocketAddr {
1924 (self.values.relay.host, self.values.relay.port).into()
1925 }
1926
1927 pub fn tls_listen_addr(&self) -> Option<SocketAddr> {
1929 if self.values.relay.tls_identity_path.is_some() {
1930 let port = self.values.relay.tls_port.unwrap_or(3443);
1931 Some((self.values.relay.host, port).into())
1932 } else {
1933 None
1934 }
1935 }
1936
1937 pub fn tls_identity_path(&self) -> Option<&Path> {
1939 self.values.relay.tls_identity_path.as_deref()
1940 }
1941
1942 pub fn tls_identity_password(&self) -> Option<&str> {
1944 self.values.relay.tls_identity_password.as_deref()
1945 }
1946
1947 pub fn override_project_ids(&self) -> bool {
1951 self.values.relay.override_project_ids
1952 }
1953
1954 pub fn requires_auth(&self) -> bool {
1958 match self.values.auth.ready {
1959 ReadinessCondition::Authenticated => self.relay_mode() == RelayMode::Managed,
1960 ReadinessCondition::Always => false,
1961 }
1962 }
1963
1964 pub fn http_auth_interval(&self) -> Option<Duration> {
1968 if self.processing_enabled() {
1969 return None;
1970 }
1971
1972 match self.values.http.auth_interval {
1973 None | Some(0) => None,
1974 Some(secs) => Some(Duration::from_secs(secs)),
1975 }
1976 }
1977
1978 pub fn http_outage_grace_period(&self) -> Duration {
1981 Duration::from_secs(self.values.http.outage_grace_period)
1982 }
1983
1984 pub fn http_retry_delay(&self) -> Duration {
1989 Duration::from_secs(self.values.http.retry_delay)
1990 }
1991
1992 pub fn http_project_failure_interval(&self) -> Duration {
1994 Duration::from_secs(self.values.http.project_failure_interval)
1995 }
1996
1997 pub fn http_encoding(&self) -> HttpEncoding {
1999 self.values.http.encoding
2000 }
2001
2002 pub fn http_global_metrics(&self) -> bool {
2004 self.values.http.global_metrics
2005 }
2006
2007 pub fn emit_outcomes(&self) -> EmitOutcomes {
2012 if self.processing_enabled() {
2013 return EmitOutcomes::AsOutcomes;
2014 }
2015 self.values.outcomes.emit_outcomes
2016 }
2017
2018 pub fn emit_client_outcomes(&self) -> bool {
2028 self.values.outcomes.emit_client_outcomes
2029 }
2030
2031 pub fn outcome_batch_size(&self) -> usize {
2033 self.values.outcomes.batch_size
2034 }
2035
2036 pub fn outcome_batch_interval(&self) -> Duration {
2038 Duration::from_millis(self.values.outcomes.batch_interval)
2039 }
2040
2041 pub fn outcome_source(&self) -> Option<&str> {
2043 self.values.outcomes.source.as_deref()
2044 }
2045
2046 pub fn outcome_aggregator(&self) -> &OutcomeAggregatorConfig {
2048 &self.values.outcomes.aggregator
2049 }
2050
2051 pub fn logging(&self) -> &relay_log::LogConfig {
2053 &self.values.logging
2054 }
2055
2056 pub fn sentry(&self) -> &relay_log::SentryConfig {
2058 &self.values.sentry
2059 }
2060
2061 pub fn statsd_addrs(&self) -> anyhow::Result<Vec<SocketAddr>> {
2065 if let Some(ref addr) = self.values.metrics.statsd {
2066 let addrs = addr
2067 .as_str()
2068 .to_socket_addrs()
2069 .with_context(|| ConfigError::file(ConfigErrorKind::InvalidValue, &self.path))?
2070 .collect();
2071 Ok(addrs)
2072 } else {
2073 Ok(vec![])
2074 }
2075 }
2076
2077 pub fn metrics_prefix(&self) -> &str {
2079 &self.values.metrics.prefix
2080 }
2081
2082 pub fn metrics_default_tags(&self) -> &BTreeMap<String, String> {
2084 &self.values.metrics.default_tags
2085 }
2086
2087 pub fn metrics_hostname_tag(&self) -> Option<&str> {
2089 self.values.metrics.hostname_tag.as_deref()
2090 }
2091
2092 pub fn metrics_sample_rate(&self) -> f32 {
2094 self.values.metrics.sample_rate
2095 }
2096
2097 pub fn metrics_aggregate(&self) -> bool {
2099 self.values.metrics.aggregate
2100 }
2101
2102 pub fn metrics_allow_high_cardinality_tags(&self) -> bool {
2104 self.values.metrics.allow_high_cardinality_tags
2105 }
2106
2107 pub fn metrics_periodic_interval(&self) -> Option<Duration> {
2111 match self.values.metrics.periodic_secs {
2112 0 => None,
2113 secs => Some(Duration::from_secs(secs)),
2114 }
2115 }
2116
2117 pub fn http_timeout(&self) -> Duration {
2119 Duration::from_secs(self.values.http.timeout.into())
2120 }
2121
2122 pub fn http_connection_timeout(&self) -> Duration {
2124 Duration::from_secs(self.values.http.connection_timeout.into())
2125 }
2126
2127 pub fn http_max_retry_interval(&self) -> Duration {
2129 Duration::from_secs(self.values.http.max_retry_interval.into())
2130 }
2131
2132 pub fn project_cache_expiry(&self) -> Duration {
2134 Duration::from_secs(self.values.cache.project_expiry.into())
2135 }
2136
2137 pub fn request_full_project_config(&self) -> bool {
2139 self.values.cache.project_request_full_config
2140 }
2141
2142 pub fn relay_cache_expiry(&self) -> Duration {
2144 Duration::from_secs(self.values.cache.relay_expiry.into())
2145 }
2146
2147 pub fn envelope_buffer_size(&self) -> usize {
2149 self.values
2150 .cache
2151 .envelope_buffer_size
2152 .try_into()
2153 .unwrap_or(usize::MAX)
2154 }
2155
2156 pub fn cache_miss_expiry(&self) -> Duration {
2158 Duration::from_secs(self.values.cache.miss_expiry.into())
2159 }
2160
2161 pub fn project_grace_period(&self) -> Duration {
2163 Duration::from_secs(self.values.cache.project_grace_period.into())
2164 }
2165
2166 pub fn project_refresh_interval(&self) -> Option<Duration> {
2170 self.values
2171 .cache
2172 .project_refresh_interval
2173 .map(Into::into)
2174 .map(Duration::from_secs)
2175 }
2176
2177 pub fn query_batch_interval(&self) -> Duration {
2180 Duration::from_millis(self.values.cache.batch_interval.into())
2181 }
2182
2183 pub fn downstream_relays_batch_interval(&self) -> Duration {
2185 Duration::from_millis(self.values.cache.downstream_relays_batch_interval.into())
2186 }
2187
2188 pub fn local_cache_interval(&self) -> Duration {
2190 Duration::from_secs(self.values.cache.file_interval.into())
2191 }
2192
2193 pub fn global_config_fetch_interval(&self) -> Duration {
2196 Duration::from_secs(self.values.cache.global_config_fetch_interval.into())
2197 }
2198
2199 pub fn spool_envelopes_path(&self, partition_id: u8) -> Option<PathBuf> {
2204 let mut path = self
2205 .values
2206 .spool
2207 .envelopes
2208 .path
2209 .as_ref()
2210 .map(|path| path.to_owned())?;
2211
2212 if partition_id == 0 {
2213 return Some(path);
2214 }
2215
2216 let file_name = path.file_name().and_then(|f| f.to_str())?;
2217 let new_file_name = format!("{file_name}.{partition_id}");
2218 path.set_file_name(new_file_name);
2219
2220 Some(path)
2221 }
2222
2223 pub fn spool_envelopes_max_disk_size(&self) -> usize {
2225 self.values.spool.envelopes.max_disk_size.as_bytes()
2226 }
2227
2228 pub fn spool_envelopes_batch_size_bytes(&self) -> usize {
2231 self.values.spool.envelopes.batch_size_bytes.as_bytes()
2232 }
2233
2234 pub fn spool_envelopes_max_age(&self) -> Duration {
2236 Duration::from_secs(self.values.spool.envelopes.max_envelope_delay_secs)
2237 }
2238
2239 pub fn spool_disk_usage_refresh_frequency_ms(&self) -> Duration {
2241 Duration::from_millis(self.values.spool.envelopes.disk_usage_refresh_frequency_ms)
2242 }
2243
2244 pub fn spool_max_backpressure_envelopes(&self) -> usize {
2246 self.values.spool.envelopes.max_backpressure_envelopes
2247 }
2248
2249 pub fn spool_max_backpressure_memory_percent(&self) -> f32 {
2251 self.values.spool.envelopes.max_backpressure_memory_percent
2252 }
2253
2254 pub fn spool_partitions(&self) -> NonZeroU8 {
2256 self.values.spool.envelopes.partitions
2257 }
2258
2259 pub fn max_event_size(&self) -> usize {
2261 self.values.limits.max_event_size.as_bytes()
2262 }
2263
2264 pub fn max_attachment_size(&self) -> usize {
2266 self.values.limits.max_attachment_size.as_bytes()
2267 }
2268
2269 pub fn max_attachments_size(&self) -> usize {
2272 self.values.limits.max_attachments_size.as_bytes()
2273 }
2274
2275 pub fn max_client_reports_size(&self) -> usize {
2277 self.values.limits.max_client_reports_size.as_bytes()
2278 }
2279
2280 pub fn max_check_in_size(&self) -> usize {
2282 self.values.limits.max_check_in_size.as_bytes()
2283 }
2284
2285 pub fn max_log_size(&self) -> usize {
2287 self.values.limits.max_log_size.as_bytes()
2288 }
2289
2290 pub fn max_span_size(&self) -> usize {
2292 self.values.limits.max_span_size.as_bytes()
2293 }
2294
2295 pub fn max_container_size(&self) -> usize {
2297 self.values.limits.max_container_size.as_bytes()
2298 }
2299
2300 pub fn max_envelope_size(&self) -> usize {
2304 self.values.limits.max_envelope_size.as_bytes()
2305 }
2306
2307 pub fn max_session_count(&self) -> usize {
2309 self.values.limits.max_session_count
2310 }
2311
2312 pub fn max_span_count(&self) -> usize {
2314 self.values.limits.max_span_count
2315 }
2316
2317 pub fn max_log_count(&self) -> usize {
2319 self.values.limits.max_log_count
2320 }
2321
2322 pub fn max_trace_metric_count(&self) -> usize {
2324 self.values.limits.max_trace_metric_count
2325 }
2326
2327 pub fn max_statsd_size(&self) -> usize {
2329 self.values.limits.max_statsd_size.as_bytes()
2330 }
2331
2332 pub fn max_metric_buckets_size(&self) -> usize {
2334 self.values.limits.max_metric_buckets_size.as_bytes()
2335 }
2336
2337 pub fn max_api_payload_size(&self) -> usize {
2339 self.values.limits.max_api_payload_size.as_bytes()
2340 }
2341
2342 pub fn max_api_file_upload_size(&self) -> usize {
2344 self.values.limits.max_api_file_upload_size.as_bytes()
2345 }
2346
2347 pub fn max_api_chunk_upload_size(&self) -> usize {
2349 self.values.limits.max_api_chunk_upload_size.as_bytes()
2350 }
2351
2352 pub fn max_profile_size(&self) -> usize {
2354 self.values.limits.max_profile_size.as_bytes()
2355 }
2356
2357 pub fn max_trace_metric_size(&self) -> usize {
2359 self.values.limits.max_trace_metric_size.as_bytes()
2360 }
2361
2362 pub fn max_replay_compressed_size(&self) -> usize {
2364 self.values.limits.max_replay_compressed_size.as_bytes()
2365 }
2366
2367 pub fn max_replay_uncompressed_size(&self) -> usize {
2369 self.values.limits.max_replay_uncompressed_size.as_bytes()
2370 }
2371
2372 pub fn max_replay_message_size(&self) -> usize {
2378 self.values.limits.max_replay_message_size.as_bytes()
2379 }
2380
2381 pub fn max_concurrent_requests(&self) -> usize {
2383 self.values.limits.max_concurrent_requests
2384 }
2385
2386 pub fn max_concurrent_queries(&self) -> usize {
2388 self.values.limits.max_concurrent_queries
2389 }
2390
2391 pub fn query_timeout(&self) -> Duration {
2393 Duration::from_secs(self.values.limits.query_timeout)
2394 }
2395
2396 pub fn shutdown_timeout(&self) -> Duration {
2399 Duration::from_secs(self.values.limits.shutdown_timeout)
2400 }
2401
2402 pub fn keepalive_timeout(&self) -> Duration {
2406 Duration::from_secs(self.values.limits.keepalive_timeout)
2407 }
2408
2409 pub fn idle_timeout(&self) -> Option<Duration> {
2411 self.values.limits.idle_timeout.map(Duration::from_secs)
2412 }
2413
2414 pub fn max_connections(&self) -> Option<usize> {
2416 self.values.limits.max_connections
2417 }
2418
2419 pub fn tcp_listen_backlog(&self) -> u32 {
2421 self.values.limits.tcp_listen_backlog
2422 }
2423
2424 pub fn cpu_concurrency(&self) -> usize {
2426 self.values.limits.max_thread_count
2427 }
2428
2429 pub fn pool_concurrency(&self) -> usize {
2431 self.values.limits.max_pool_concurrency
2432 }
2433
2434 pub fn query_batch_size(&self) -> usize {
2436 self.values.cache.batch_size
2437 }
2438
2439 pub fn project_configs_path(&self) -> PathBuf {
2441 self.path.join("projects")
2442 }
2443
2444 pub fn processing_enabled(&self) -> bool {
2446 self.values.processing.enabled
2447 }
2448
2449 pub fn normalization_level(&self) -> NormalizationLevel {
2451 self.values.normalization.level
2452 }
2453
2454 pub fn geoip_path(&self) -> Option<&Path> {
2456 self.values
2457 .geoip
2458 .path
2459 .as_deref()
2460 .or(self.values.processing.geoip_path.as_deref())
2461 }
2462
2463 pub fn max_secs_in_future(&self) -> i64 {
2467 self.values.processing.max_secs_in_future.into()
2468 }
2469
2470 pub fn max_session_secs_in_past(&self) -> i64 {
2472 self.values.processing.max_session_secs_in_past.into()
2473 }
2474
2475 pub fn kafka_configs(
2477 &self,
2478 topic: KafkaTopic,
2479 ) -> Result<KafkaTopicConfig<'_>, KafkaConfigError> {
2480 self.values.processing.topics.get(topic).kafka_configs(
2481 &self.values.processing.kafka_config,
2482 &self.values.processing.secondary_kafka_configs,
2483 )
2484 }
2485
2486 pub fn kafka_validate_topics(&self) -> bool {
2488 self.values.processing.kafka_validate_topics
2489 }
2490
2491 pub fn unused_topic_assignments(&self) -> &relay_kafka::Unused {
2493 &self.values.processing.topics.unused
2494 }
2495
2496 pub fn redis(&self) -> Option<RedisConfigsRef<'_>> {
2499 let redis_configs = self.values.processing.redis.as_ref()?;
2500
2501 Some(build_redis_configs(
2502 redis_configs,
2503 self.cpu_concurrency() as u32,
2504 ))
2505 }
2506
2507 pub fn attachment_chunk_size(&self) -> usize {
2509 self.values.processing.attachment_chunk_size.as_bytes()
2510 }
2511
2512 pub fn metrics_max_batch_size_bytes(&self) -> usize {
2514 self.values.aggregator.max_flush_bytes
2515 }
2516
2517 pub fn projectconfig_cache_prefix(&self) -> &str {
2520 &self.values.processing.projectconfig_cache_prefix
2521 }
2522
2523 pub fn max_rate_limit(&self) -> Option<u64> {
2525 self.values.processing.max_rate_limit.map(u32::into)
2526 }
2527
2528 pub fn cardinality_limiter_cache_vacuum_interval(&self) -> Duration {
2532 Duration::from_secs(self.values.cardinality_limiter.cache_vacuum_interval)
2533 }
2534
2535 pub fn health_refresh_interval(&self) -> Duration {
2537 Duration::from_millis(self.values.health.refresh_interval_ms)
2538 }
2539
2540 pub fn health_max_memory_watermark_bytes(&self) -> u64 {
2542 self.values
2543 .health
2544 .max_memory_bytes
2545 .as_ref()
2546 .map_or(u64::MAX, |b| b.as_bytes() as u64)
2547 }
2548
2549 pub fn health_max_memory_watermark_percent(&self) -> f32 {
2551 self.values.health.max_memory_percent
2552 }
2553
2554 pub fn health_probe_timeout(&self) -> Duration {
2556 Duration::from_millis(self.values.health.probe_timeout_ms)
2557 }
2558
2559 pub fn memory_stat_refresh_frequency_ms(&self) -> u64 {
2561 self.values.health.memory_stat_refresh_frequency_ms
2562 }
2563
2564 pub fn cogs_max_queue_size(&self) -> u64 {
2566 self.values.cogs.max_queue_size
2567 }
2568
2569 pub fn cogs_relay_resource_id(&self) -> &str {
2571 &self.values.cogs.relay_resource_id
2572 }
2573
2574 pub fn default_aggregator_config(&self) -> &AggregatorServiceConfig {
2576 &self.values.aggregator
2577 }
2578
2579 pub fn secondary_aggregator_configs(&self) -> &Vec<ScopedAggregatorConfig> {
2581 &self.values.secondary_aggregators
2582 }
2583
2584 pub fn aggregator_config_for(&self, namespace: MetricNamespace) -> &AggregatorServiceConfig {
2586 for entry in &self.values.secondary_aggregators {
2587 if entry.condition.matches(Some(namespace)) {
2588 return &entry.config;
2589 }
2590 }
2591 &self.values.aggregator
2592 }
2593
2594 pub fn static_relays(&self) -> &HashMap<RelayId, RelayInfo> {
2596 &self.values.auth.static_relays
2597 }
2598
2599 pub fn signature_max_age(&self) -> Duration {
2601 Duration::from_secs(self.values.auth.signature_max_age)
2602 }
2603
2604 pub fn accept_unknown_items(&self) -> bool {
2606 let forward = self.values.routing.accept_unknown_items;
2607 forward.unwrap_or_else(|| !self.processing_enabled())
2608 }
2609}
2610
2611impl Default for Config {
2612 fn default() -> Self {
2613 Self {
2614 values: ConfigValues::default(),
2615 credentials: None,
2616 path: PathBuf::new(),
2617 }
2618 }
2619}
2620
2621#[cfg(test)]
2622mod tests {
2623
2624 use super::*;
2625
2626 #[test]
2628 fn test_event_buffer_size() {
2629 let yaml = r###"
2630cache:
2631 event_buffer_size: 1000000
2632 event_expiry: 1800
2633"###;
2634
2635 let values: ConfigValues = serde_yaml::from_str(yaml).unwrap();
2636 assert_eq!(values.cache.envelope_buffer_size, 1_000_000);
2637 assert_eq!(values.cache.envelope_expiry, 1800);
2638 }
2639
2640 #[test]
2641 fn test_emit_outcomes() {
2642 for (serialized, deserialized) in &[
2643 ("true", EmitOutcomes::AsOutcomes),
2644 ("false", EmitOutcomes::None),
2645 ("\"as_client_reports\"", EmitOutcomes::AsClientReports),
2646 ] {
2647 let value: EmitOutcomes = serde_json::from_str(serialized).unwrap();
2648 assert_eq!(value, *deserialized);
2649 assert_eq!(serde_json::to_string(&value).unwrap(), *serialized);
2650 }
2651 }
2652
2653 #[test]
2654 fn test_emit_outcomes_invalid() {
2655 assert!(serde_json::from_str::<EmitOutcomes>("asdf").is_err());
2656 }
2657}