1use std::collections::{BTreeMap, HashMap};
2use std::error::Error;
3use std::io::Write;
4use std::net::{IpAddr, SocketAddr, ToSocketAddrs};
5use std::num::NonZeroU8;
6use std::path::{Path, PathBuf};
7use std::str::FromStr;
8use std::time::Duration;
9use std::{env, fmt, fs, io};
10
11use anyhow::Context;
12use relay_auth::{PublicKey, RelayId, SecretKey, generate_key_pair, generate_relay_id};
13use relay_common::Dsn;
14use relay_kafka::{
15 ConfigError as KafkaConfigError, KafkaConfigParam, KafkaTopic, KafkaTopicConfig,
16 TopicAssignments,
17};
18use relay_metrics::MetricNamespace;
19use serde::de::{DeserializeOwned, Unexpected, Visitor};
20use serde::{Deserialize, Deserializer, Serialize, Serializer};
21use uuid::Uuid;
22
23use crate::aggregator::{AggregatorServiceConfig, ScopedAggregatorConfig};
24use crate::byte_size::ByteSize;
25use crate::upstream::UpstreamDescriptor;
26use crate::{RedisConfig, RedisConfigs, RedisConfigsRef, build_redis_configs};
27
28const DEFAULT_NETWORK_OUTAGE_GRACE_PERIOD: u64 = 10;
29
30static CONFIG_YAML_HEADER: &str = r###"# Please see the relevant documentation.
31# Performance tuning: https://docs.sentry.io/product/relay/operating-guidelines/
32# All config options: https://docs.sentry.io/product/relay/options/
33"###;
34
35#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
37#[non_exhaustive]
38pub enum ConfigErrorKind {
39 CouldNotOpenFile,
41 CouldNotWriteFile,
43 BadYaml,
45 BadJson,
47 InvalidValue,
49 ProcessingNotAvailable,
52}
53
54impl fmt::Display for ConfigErrorKind {
55 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
56 match self {
57 Self::CouldNotOpenFile => write!(f, "could not open config file"),
58 Self::CouldNotWriteFile => write!(f, "could not write config file"),
59 Self::BadYaml => write!(f, "could not parse yaml config file"),
60 Self::BadJson => write!(f, "could not parse json config file"),
61 Self::InvalidValue => write!(f, "invalid config value"),
62 Self::ProcessingNotAvailable => write!(
63 f,
64 "was not compiled with processing, cannot enable processing"
65 ),
66 }
67 }
68}
69
70#[derive(Debug)]
72enum ConfigErrorSource {
73 None,
75 File(PathBuf),
77 FieldOverride(String),
79}
80
81impl Default for ConfigErrorSource {
82 fn default() -> Self {
83 Self::None
84 }
85}
86
87impl fmt::Display for ConfigErrorSource {
88 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
89 match self {
90 ConfigErrorSource::None => Ok(()),
91 ConfigErrorSource::File(file_name) => {
92 write!(f, " (file {})", file_name.display())
93 }
94 ConfigErrorSource::FieldOverride(name) => write!(f, " (field {name})"),
95 }
96 }
97}
98
99#[derive(Debug)]
101pub struct ConfigError {
102 source: ConfigErrorSource,
103 kind: ConfigErrorKind,
104}
105
106impl ConfigError {
107 #[inline]
108 fn new(kind: ConfigErrorKind) -> Self {
109 Self {
110 source: ConfigErrorSource::None,
111 kind,
112 }
113 }
114
115 #[inline]
116 fn field(field: &'static str) -> Self {
117 Self {
118 source: ConfigErrorSource::FieldOverride(field.to_owned()),
119 kind: ConfigErrorKind::InvalidValue,
120 }
121 }
122
123 #[inline]
124 fn file(kind: ConfigErrorKind, p: impl AsRef<Path>) -> Self {
125 Self {
126 source: ConfigErrorSource::File(p.as_ref().to_path_buf()),
127 kind,
128 }
129 }
130
131 pub fn kind(&self) -> ConfigErrorKind {
133 self.kind
134 }
135}
136
137impl fmt::Display for ConfigError {
138 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
139 write!(f, "{}{}", self.kind(), self.source)
140 }
141}
142
143impl Error for ConfigError {}
144
145enum ConfigFormat {
146 Yaml,
147 Json,
148}
149
150impl ConfigFormat {
151 pub fn extension(&self) -> &'static str {
152 match self {
153 ConfigFormat::Yaml => "yml",
154 ConfigFormat::Json => "json",
155 }
156 }
157}
158
159trait ConfigObject: DeserializeOwned + Serialize {
160 fn format() -> ConfigFormat;
162
163 fn name() -> &'static str;
165
166 fn path(base: &Path) -> PathBuf {
168 base.join(format!("{}.{}", Self::name(), Self::format().extension()))
169 }
170
171 fn load(base: &Path) -> anyhow::Result<Self> {
173 let path = Self::path(base);
174
175 let f = fs::File::open(&path)
176 .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotOpenFile, &path))?;
177 let f = io::BufReader::new(f);
178
179 let mut source = serde_vars::EnvSource::default();
180 match Self::format() {
181 ConfigFormat::Yaml => {
182 serde_vars::deserialize(serde_yaml::Deserializer::from_reader(f), &mut source)
183 .with_context(|| ConfigError::file(ConfigErrorKind::BadYaml, &path))
184 }
185 ConfigFormat::Json => {
186 serde_vars::deserialize(&mut serde_json::Deserializer::from_reader(f), &mut source)
187 .with_context(|| ConfigError::file(ConfigErrorKind::BadJson, &path))
188 }
189 }
190 }
191
192 fn save(&self, base: &Path) -> anyhow::Result<()> {
194 let path = Self::path(base);
195 let mut options = fs::OpenOptions::new();
196 options.write(true).truncate(true).create(true);
197
198 #[cfg(unix)]
200 {
201 use std::os::unix::fs::OpenOptionsExt;
202 options.mode(0o600);
203 }
204
205 let mut f = options
206 .open(&path)
207 .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path))?;
208
209 match Self::format() {
210 ConfigFormat::Yaml => {
211 f.write_all(CONFIG_YAML_HEADER.as_bytes())?;
212 serde_yaml::to_writer(&mut f, self)
213 .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path))?
214 }
215 ConfigFormat::Json => serde_json::to_writer_pretty(&mut f, self)
216 .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path))?,
217 }
218
219 f.write_all(b"\n").ok();
220
221 Ok(())
222 }
223}
224
225#[derive(Debug, Default)]
228pub struct OverridableConfig {
229 pub mode: Option<String>,
231 pub instance: Option<String>,
233 pub log_level: Option<String>,
235 pub log_format: Option<String>,
237 pub upstream: Option<String>,
239 pub upstream_dsn: Option<String>,
241 pub host: Option<String>,
243 pub port: Option<String>,
245 pub processing: Option<String>,
247 pub kafka_url: Option<String>,
249 pub redis_url: Option<String>,
251 pub id: Option<String>,
253 pub secret_key: Option<String>,
255 pub public_key: Option<String>,
257 pub outcome_source: Option<String>,
259 pub shutdown_timeout: Option<String>,
261 pub server_name: Option<String>,
263}
264
265#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
267pub struct Credentials {
268 pub secret_key: SecretKey,
270 pub public_key: PublicKey,
272 pub id: RelayId,
274}
275
276impl Credentials {
277 pub fn generate() -> Self {
279 relay_log::info!("generating new relay credentials");
280 let (sk, pk) = generate_key_pair();
281 Self {
282 secret_key: sk,
283 public_key: pk,
284 id: generate_relay_id(),
285 }
286 }
287
288 pub fn to_json_string(&self) -> anyhow::Result<String> {
290 serde_json::to_string(self)
291 .with_context(|| ConfigError::new(ConfigErrorKind::CouldNotWriteFile))
292 }
293}
294
295impl ConfigObject for Credentials {
296 fn format() -> ConfigFormat {
297 ConfigFormat::Json
298 }
299 fn name() -> &'static str {
300 "credentials"
301 }
302}
303
304#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
306#[serde(rename_all = "camelCase")]
307pub struct RelayInfo {
308 pub public_key: PublicKey,
310
311 #[serde(default)]
313 pub internal: bool,
314}
315
316impl RelayInfo {
317 pub fn new(public_key: PublicKey) -> Self {
319 Self {
320 public_key,
321 internal: false,
322 }
323 }
324}
325
326#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
328#[serde(rename_all = "camelCase")]
329pub enum RelayMode {
330 Proxy,
336
337 Managed,
343}
344
345impl<'de> Deserialize<'de> for RelayMode {
346 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
347 where
348 D: Deserializer<'de>,
349 {
350 let s = String::deserialize(deserializer)?;
351 match s.as_str() {
352 "proxy" => Ok(RelayMode::Proxy),
353 "managed" => Ok(RelayMode::Managed),
354 "static" => Err(serde::de::Error::custom(
355 "Relay mode 'static' has been removed. Please use 'managed' or 'proxy' instead.",
356 )),
357 other => Err(serde::de::Error::unknown_variant(
358 other,
359 &["proxy", "managed"],
360 )),
361 }
362 }
363}
364
365impl fmt::Display for RelayMode {
366 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
367 match self {
368 RelayMode::Proxy => write!(f, "proxy"),
369 RelayMode::Managed => write!(f, "managed"),
370 }
371 }
372}
373
374#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize)]
376#[serde(rename_all = "camelCase")]
377pub enum RelayInstance {
378 Default,
380
381 Canary,
383}
384
385impl RelayInstance {
386 pub fn is_canary(&self) -> bool {
388 matches!(self, RelayInstance::Canary)
389 }
390}
391
392impl fmt::Display for RelayInstance {
393 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
394 match self {
395 RelayInstance::Default => write!(f, "default"),
396 RelayInstance::Canary => write!(f, "canary"),
397 }
398 }
399}
400
401impl FromStr for RelayInstance {
402 type Err = fmt::Error;
403
404 fn from_str(s: &str) -> Result<Self, Self::Err> {
405 match s {
406 "canary" => Ok(RelayInstance::Canary),
407 _ => Ok(RelayInstance::Default),
408 }
409 }
410}
411
412#[derive(Clone, Copy, Debug, Eq, PartialEq)]
414pub struct ParseRelayModeError;
415
416impl fmt::Display for ParseRelayModeError {
417 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
418 write!(f, "Relay mode must be one of: managed or proxy")
419 }
420}
421
422impl Error for ParseRelayModeError {}
423
424impl FromStr for RelayMode {
425 type Err = ParseRelayModeError;
426
427 fn from_str(s: &str) -> Result<Self, Self::Err> {
428 match s {
429 "proxy" => Ok(RelayMode::Proxy),
430 "managed" => Ok(RelayMode::Managed),
431 _ => Err(ParseRelayModeError),
432 }
433 }
434}
435
436fn is_default<T: Default + PartialEq>(t: &T) -> bool {
438 *t == T::default()
439}
440
441fn is_docker() -> bool {
443 if fs::metadata("/.dockerenv").is_ok() {
444 return true;
445 }
446
447 fs::read_to_string("/proc/self/cgroup").is_ok_and(|s| s.contains("/docker"))
448}
449
450fn default_host() -> IpAddr {
452 if is_docker() {
453 "0.0.0.0".parse().unwrap()
455 } else {
456 "127.0.0.1".parse().unwrap()
457 }
458}
459
460#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize)]
464#[serde(rename_all = "lowercase")]
465pub enum ReadinessCondition {
466 Authenticated,
475 Always,
477}
478
479impl Default for ReadinessCondition {
480 fn default() -> Self {
481 Self::Authenticated
482 }
483}
484
485#[derive(Serialize, Deserialize, Debug)]
487#[serde(default)]
488pub struct Relay {
489 pub mode: RelayMode,
491 pub instance: RelayInstance,
493 pub upstream: UpstreamDescriptor<'static>,
495 pub host: IpAddr,
497 pub port: u16,
499 #[serde(skip_serializing)]
501 pub tls_port: Option<u16>,
502 #[serde(skip_serializing)]
504 pub tls_identity_path: Option<PathBuf>,
505 #[serde(skip_serializing)]
507 pub tls_identity_password: Option<String>,
508 #[serde(skip_serializing_if = "is_default")]
513 pub override_project_ids: bool,
514}
515
516impl Default for Relay {
517 fn default() -> Self {
518 Relay {
519 mode: RelayMode::Managed,
520 instance: RelayInstance::Default,
521 upstream: "https://sentry.io/".parse().unwrap(),
522 host: default_host(),
523 port: 3000,
524 tls_port: None,
525 tls_identity_path: None,
526 tls_identity_password: None,
527 override_project_ids: false,
528 }
529 }
530}
531
532#[derive(Serialize, Deserialize, Debug)]
534#[serde(default)]
535pub struct Metrics {
536 pub statsd: Option<String>,
540 pub prefix: String,
544 pub default_tags: BTreeMap<String, String>,
546 pub hostname_tag: Option<String>,
548 pub sample_rate: f32,
553 pub periodic_secs: u64,
558 pub aggregate: bool,
562 pub allow_high_cardinality_tags: bool,
570}
571
572impl Default for Metrics {
573 fn default() -> Self {
574 Metrics {
575 statsd: None,
576 prefix: "sentry.relay".into(),
577 default_tags: BTreeMap::new(),
578 hostname_tag: None,
579 sample_rate: 1.0,
580 periodic_secs: 5,
581 aggregate: true,
582 allow_high_cardinality_tags: false,
583 }
584 }
585}
586
587#[derive(Serialize, Deserialize, Debug)]
589#[serde(default)]
590pub struct Limits {
591 pub max_concurrent_requests: usize,
594 pub max_concurrent_queries: usize,
599 pub max_event_size: ByteSize,
601 pub max_attachment_size: ByteSize,
603 pub max_attachments_size: ByteSize,
605 pub max_client_reports_size: ByteSize,
607 pub max_check_in_size: ByteSize,
609 pub max_envelope_size: ByteSize,
611 pub max_session_count: usize,
613 pub max_span_count: usize,
615 pub max_log_count: usize,
617 pub max_api_payload_size: ByteSize,
619 pub max_api_file_upload_size: ByteSize,
621 pub max_api_chunk_upload_size: ByteSize,
623 pub max_profile_size: ByteSize,
625 pub max_log_size: ByteSize,
627 pub max_span_size: ByteSize,
629 pub max_container_size: ByteSize,
631 pub max_statsd_size: ByteSize,
633 pub max_metric_buckets_size: ByteSize,
635 pub max_replay_compressed_size: ByteSize,
637 #[serde(alias = "max_replay_size")]
639 max_replay_uncompressed_size: ByteSize,
640 pub max_replay_message_size: ByteSize,
642 pub max_thread_count: usize,
647 pub max_pool_concurrency: usize,
654 pub query_timeout: u64,
657 pub shutdown_timeout: u64,
660 pub keepalive_timeout: u64,
664 pub idle_timeout: Option<u64>,
671 pub max_connections: Option<usize>,
677 pub tcp_listen_backlog: u32,
685}
686
687impl Default for Limits {
688 fn default() -> Self {
689 Limits {
690 max_concurrent_requests: 100,
691 max_concurrent_queries: 5,
692 max_event_size: ByteSize::mebibytes(1),
693 max_attachment_size: ByteSize::mebibytes(100),
694 max_attachments_size: ByteSize::mebibytes(100),
695 max_client_reports_size: ByteSize::kibibytes(4),
696 max_check_in_size: ByteSize::kibibytes(100),
697 max_envelope_size: ByteSize::mebibytes(100),
698 max_session_count: 100,
699 max_span_count: 1000,
700 max_log_count: 1000,
701 max_api_payload_size: ByteSize::mebibytes(20),
702 max_api_file_upload_size: ByteSize::mebibytes(40),
703 max_api_chunk_upload_size: ByteSize::mebibytes(100),
704 max_profile_size: ByteSize::mebibytes(50),
705 max_log_size: ByteSize::mebibytes(1),
706 max_span_size: ByteSize::mebibytes(1),
707 max_container_size: ByteSize::mebibytes(3),
708 max_statsd_size: ByteSize::mebibytes(1),
709 max_metric_buckets_size: ByteSize::mebibytes(1),
710 max_replay_compressed_size: ByteSize::mebibytes(10),
711 max_replay_uncompressed_size: ByteSize::mebibytes(100),
712 max_replay_message_size: ByteSize::mebibytes(15),
713 max_thread_count: num_cpus::get(),
714 max_pool_concurrency: 1,
715 query_timeout: 30,
716 shutdown_timeout: 10,
717 keepalive_timeout: 5,
718 idle_timeout: None,
719 max_connections: None,
720 tcp_listen_backlog: 1024,
721 }
722 }
723}
724
725#[derive(Debug, Default, Deserialize, Serialize)]
727#[serde(default)]
728pub struct Routing {
729 pub accept_unknown_items: Option<bool>,
739}
740
741#[derive(Clone, Copy, Debug, Default, Deserialize, Serialize)]
743#[serde(rename_all = "lowercase")]
744pub enum HttpEncoding {
745 #[default]
750 Identity,
751 Deflate,
757 Gzip,
764 Br,
766 Zstd,
768}
769
770impl HttpEncoding {
771 pub fn parse(str: &str) -> Self {
773 let str = str.trim();
774 if str.eq_ignore_ascii_case("zstd") {
775 Self::Zstd
776 } else if str.eq_ignore_ascii_case("br") {
777 Self::Br
778 } else if str.eq_ignore_ascii_case("gzip") || str.eq_ignore_ascii_case("x-gzip") {
779 Self::Gzip
780 } else if str.eq_ignore_ascii_case("deflate") {
781 Self::Deflate
782 } else {
783 Self::Identity
784 }
785 }
786
787 pub fn name(&self) -> Option<&'static str> {
791 match self {
792 Self::Identity => None,
793 Self::Deflate => Some("deflate"),
794 Self::Gzip => Some("gzip"),
795 Self::Br => Some("br"),
796 Self::Zstd => Some("zstd"),
797 }
798 }
799}
800
801#[derive(Serialize, Deserialize, Debug)]
803#[serde(default)]
804pub struct Http {
805 pub timeout: u32,
811 pub connection_timeout: u32,
816 pub max_retry_interval: u32,
818 pub host_header: Option<String>,
820 pub auth_interval: Option<u64>,
828 pub outage_grace_period: u64,
834 pub retry_delay: u64,
838 pub project_failure_interval: u64,
843 pub encoding: HttpEncoding,
859 pub global_metrics: bool,
866}
867
868impl Default for Http {
869 fn default() -> Self {
870 Http {
871 timeout: 5,
872 connection_timeout: 3,
873 max_retry_interval: 60, host_header: None,
875 auth_interval: Some(600), outage_grace_period: DEFAULT_NETWORK_OUTAGE_GRACE_PERIOD,
877 retry_delay: default_retry_delay(),
878 project_failure_interval: default_project_failure_interval(),
879 encoding: HttpEncoding::Zstd,
880 global_metrics: false,
881 }
882 }
883}
884
885fn default_retry_delay() -> u64 {
887 1
888}
889
890fn default_project_failure_interval() -> u64 {
892 90
893}
894
895fn spool_envelopes_max_disk_size() -> ByteSize {
897 ByteSize::mebibytes(500)
898}
899
900fn spool_envelopes_batch_size_bytes() -> ByteSize {
902 ByteSize::kibibytes(10)
903}
904
905fn spool_envelopes_max_envelope_delay_secs() -> u64 {
906 24 * 60 * 60
907}
908
909fn spool_disk_usage_refresh_frequency_ms() -> u64 {
911 100
912}
913
914fn spool_max_backpressure_envelopes() -> usize {
916 500
917}
918
919fn spool_max_backpressure_memory_percent() -> f32 {
921 0.9
922}
923
924fn spool_envelopes_partitions() -> NonZeroU8 {
926 NonZeroU8::new(1).unwrap()
927}
928
929#[derive(Debug, Serialize, Deserialize)]
931pub struct EnvelopeSpool {
932 pub path: Option<PathBuf>,
938 #[serde(default = "spool_envelopes_max_disk_size")]
944 pub max_disk_size: ByteSize,
945 #[serde(default = "spool_envelopes_batch_size_bytes")]
952 pub batch_size_bytes: ByteSize,
953 #[serde(default = "spool_envelopes_max_envelope_delay_secs")]
960 pub max_envelope_delay_secs: u64,
961 #[serde(default = "spool_disk_usage_refresh_frequency_ms")]
966 pub disk_usage_refresh_frequency_ms: u64,
967 #[serde(default = "spool_max_backpressure_envelopes")]
971 pub max_backpressure_envelopes: usize,
972 #[serde(default = "spool_max_backpressure_memory_percent")]
1002 pub max_backpressure_memory_percent: f32,
1003 #[serde(default = "spool_envelopes_partitions")]
1010 pub partitions: NonZeroU8,
1011}
1012
1013impl Default for EnvelopeSpool {
1014 fn default() -> Self {
1015 Self {
1016 path: None,
1017 max_disk_size: spool_envelopes_max_disk_size(),
1018 batch_size_bytes: spool_envelopes_batch_size_bytes(),
1019 max_envelope_delay_secs: spool_envelopes_max_envelope_delay_secs(),
1020 disk_usage_refresh_frequency_ms: spool_disk_usage_refresh_frequency_ms(),
1021 max_backpressure_envelopes: spool_max_backpressure_envelopes(),
1022 max_backpressure_memory_percent: spool_max_backpressure_memory_percent(),
1023 partitions: spool_envelopes_partitions(),
1024 }
1025 }
1026}
1027
1028#[derive(Debug, Serialize, Deserialize, Default)]
1030pub struct Spool {
1031 #[serde(default)]
1033 pub envelopes: EnvelopeSpool,
1034}
1035
1036#[derive(Serialize, Deserialize, Debug)]
1038#[serde(default)]
1039pub struct Cache {
1040 pub project_request_full_config: bool,
1042 pub project_expiry: u32,
1044 pub project_grace_period: u32,
1049 pub project_refresh_interval: Option<u32>,
1055 pub relay_expiry: u32,
1057 #[serde(alias = "event_expiry")]
1063 envelope_expiry: u32,
1064 #[serde(alias = "event_buffer_size")]
1066 envelope_buffer_size: u32,
1067 pub miss_expiry: u32,
1069 pub batch_interval: u32,
1071 pub downstream_relays_batch_interval: u32,
1073 pub batch_size: usize,
1077 pub file_interval: u32,
1079 pub global_config_fetch_interval: u32,
1081}
1082
1083impl Default for Cache {
1084 fn default() -> Self {
1085 Cache {
1086 project_request_full_config: false,
1087 project_expiry: 300, project_grace_period: 120, project_refresh_interval: None,
1090 relay_expiry: 3600, envelope_expiry: 600, envelope_buffer_size: 1000,
1093 miss_expiry: 60, batch_interval: 100, downstream_relays_batch_interval: 100, batch_size: 500,
1097 file_interval: 10, global_config_fetch_interval: 10, }
1100 }
1101}
1102
1103fn default_max_secs_in_future() -> u32 {
1104 60 }
1106
1107fn default_max_session_secs_in_past() -> u32 {
1108 5 * 24 * 3600 }
1110
1111fn default_chunk_size() -> ByteSize {
1112 ByteSize::mebibytes(1)
1113}
1114
1115fn default_projectconfig_cache_prefix() -> String {
1116 "relayconfig".to_owned()
1117}
1118
1119#[allow(clippy::unnecessary_wraps)]
1120fn default_max_rate_limit() -> Option<u32> {
1121 Some(300) }
1123
1124#[derive(Serialize, Deserialize, Debug)]
1126pub struct Processing {
1127 pub enabled: bool,
1129 #[serde(default)]
1131 pub geoip_path: Option<PathBuf>,
1132 #[serde(default = "default_max_secs_in_future")]
1134 pub max_secs_in_future: u32,
1135 #[serde(default = "default_max_session_secs_in_past")]
1137 pub max_session_secs_in_past: u32,
1138 pub kafka_config: Vec<KafkaConfigParam>,
1140 #[serde(default)]
1160 pub secondary_kafka_configs: BTreeMap<String, Vec<KafkaConfigParam>>,
1161 #[serde(default)]
1163 pub topics: TopicAssignments,
1164 #[serde(default)]
1166 pub kafka_validate_topics: bool,
1167 #[serde(default)]
1169 pub redis: Option<RedisConfigs>,
1170 #[serde(default = "default_chunk_size")]
1172 pub attachment_chunk_size: ByteSize,
1173 #[serde(default = "default_projectconfig_cache_prefix")]
1175 pub projectconfig_cache_prefix: String,
1176 #[serde(default = "default_max_rate_limit")]
1178 pub max_rate_limit: Option<u32>,
1179}
1180
1181impl Default for Processing {
1182 fn default() -> Self {
1184 Self {
1185 enabled: false,
1186 geoip_path: None,
1187 max_secs_in_future: default_max_secs_in_future(),
1188 max_session_secs_in_past: default_max_session_secs_in_past(),
1189 kafka_config: Vec::new(),
1190 secondary_kafka_configs: BTreeMap::new(),
1191 topics: TopicAssignments::default(),
1192 kafka_validate_topics: false,
1193 redis: None,
1194 attachment_chunk_size: default_chunk_size(),
1195 projectconfig_cache_prefix: default_projectconfig_cache_prefix(),
1196 max_rate_limit: default_max_rate_limit(),
1197 }
1198 }
1199}
1200
1201#[derive(Debug, Default, Serialize, Deserialize)]
1203#[serde(default)]
1204pub struct Normalization {
1205 #[serde(default)]
1207 pub level: NormalizationLevel,
1208}
1209
1210#[derive(Copy, Clone, Debug, Default, Serialize, Deserialize, Eq, PartialEq)]
1212#[serde(rename_all = "lowercase")]
1213pub enum NormalizationLevel {
1214 #[default]
1218 Default,
1219 Full,
1224}
1225
1226#[derive(Serialize, Deserialize, Debug)]
1228#[serde(default)]
1229pub struct OutcomeAggregatorConfig {
1230 pub bucket_interval: u64,
1232 pub flush_interval: u64,
1234}
1235
1236impl Default for OutcomeAggregatorConfig {
1237 fn default() -> Self {
1238 Self {
1239 bucket_interval: 60,
1240 flush_interval: 120,
1241 }
1242 }
1243}
1244
1245#[derive(Copy, Clone, Debug, PartialEq, Eq)]
1248
1249pub enum EmitOutcomes {
1250 None,
1252 AsClientReports,
1254 AsOutcomes,
1256}
1257
1258impl EmitOutcomes {
1259 pub fn any(&self) -> bool {
1261 !matches!(self, EmitOutcomes::None)
1262 }
1263}
1264
1265impl Serialize for EmitOutcomes {
1266 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1267 where
1268 S: Serializer,
1269 {
1270 match self {
1272 Self::None => serializer.serialize_bool(false),
1273 Self::AsClientReports => serializer.serialize_str("as_client_reports"),
1274 Self::AsOutcomes => serializer.serialize_bool(true),
1275 }
1276 }
1277}
1278
1279struct EmitOutcomesVisitor;
1280
1281impl Visitor<'_> for EmitOutcomesVisitor {
1282 type Value = EmitOutcomes;
1283
1284 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
1285 formatter.write_str("true, false, or 'as_client_reports'")
1286 }
1287
1288 fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E>
1289 where
1290 E: serde::de::Error,
1291 {
1292 Ok(if v {
1293 EmitOutcomes::AsOutcomes
1294 } else {
1295 EmitOutcomes::None
1296 })
1297 }
1298
1299 fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
1300 where
1301 E: serde::de::Error,
1302 {
1303 if v == "as_client_reports" {
1304 Ok(EmitOutcomes::AsClientReports)
1305 } else {
1306 Err(E::invalid_value(Unexpected::Str(v), &"as_client_reports"))
1307 }
1308 }
1309}
1310
1311impl<'de> Deserialize<'de> for EmitOutcomes {
1312 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1313 where
1314 D: Deserializer<'de>,
1315 {
1316 deserializer.deserialize_any(EmitOutcomesVisitor)
1317 }
1318}
1319
1320#[derive(Serialize, Deserialize, Debug)]
1322#[serde(default)]
1323pub struct Outcomes {
1324 pub emit_outcomes: EmitOutcomes,
1328 pub emit_client_outcomes: bool,
1330 pub batch_size: usize,
1333 pub batch_interval: u64,
1336 pub source: Option<String>,
1339 pub aggregator: OutcomeAggregatorConfig,
1341}
1342
1343impl Default for Outcomes {
1344 fn default() -> Self {
1345 Outcomes {
1346 emit_outcomes: EmitOutcomes::AsClientReports,
1347 emit_client_outcomes: true,
1348 batch_size: 1000,
1349 batch_interval: 500,
1350 source: None,
1351 aggregator: OutcomeAggregatorConfig::default(),
1352 }
1353 }
1354}
1355
1356#[derive(Serialize, Deserialize, Debug, Default)]
1358pub struct MinimalConfig {
1359 pub relay: Relay,
1361}
1362
1363impl MinimalConfig {
1364 pub fn save_in_folder<P: AsRef<Path>>(&self, p: P) -> anyhow::Result<()> {
1366 let path = p.as_ref();
1367 if fs::metadata(path).is_err() {
1368 fs::create_dir_all(path)
1369 .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotOpenFile, path))?;
1370 }
1371 self.save(path)
1372 }
1373}
1374
1375impl ConfigObject for MinimalConfig {
1376 fn format() -> ConfigFormat {
1377 ConfigFormat::Yaml
1378 }
1379
1380 fn name() -> &'static str {
1381 "config"
1382 }
1383}
1384
1385mod config_relay_info {
1387 use serde::ser::SerializeMap;
1388
1389 use super::*;
1390
1391 #[derive(Debug, Serialize, Deserialize, Clone)]
1393 struct RelayInfoConfig {
1394 public_key: PublicKey,
1395 #[serde(default)]
1396 internal: bool,
1397 }
1398
1399 impl From<RelayInfoConfig> for RelayInfo {
1400 fn from(v: RelayInfoConfig) -> Self {
1401 RelayInfo {
1402 public_key: v.public_key,
1403 internal: v.internal,
1404 }
1405 }
1406 }
1407
1408 impl From<RelayInfo> for RelayInfoConfig {
1409 fn from(v: RelayInfo) -> Self {
1410 RelayInfoConfig {
1411 public_key: v.public_key,
1412 internal: v.internal,
1413 }
1414 }
1415 }
1416
1417 pub(super) fn deserialize<'de, D>(des: D) -> Result<HashMap<RelayId, RelayInfo>, D::Error>
1418 where
1419 D: Deserializer<'de>,
1420 {
1421 let map = HashMap::<RelayId, RelayInfoConfig>::deserialize(des)?;
1422 Ok(map.into_iter().map(|(k, v)| (k, v.into())).collect())
1423 }
1424
1425 pub(super) fn serialize<S>(elm: &HashMap<RelayId, RelayInfo>, ser: S) -> Result<S::Ok, S::Error>
1426 where
1427 S: Serializer,
1428 {
1429 let mut map = ser.serialize_map(Some(elm.len()))?;
1430
1431 for (k, v) in elm {
1432 map.serialize_entry(k, &RelayInfoConfig::from(v.clone()))?;
1433 }
1434
1435 map.end()
1436 }
1437}
1438
1439#[derive(Serialize, Deserialize, Debug, Default)]
1441pub struct AuthConfig {
1442 #[serde(default, skip_serializing_if = "is_default")]
1444 pub ready: ReadinessCondition,
1445
1446 #[serde(default, with = "config_relay_info")]
1448 pub static_relays: HashMap<RelayId, RelayInfo>,
1449
1450 #[serde(default = "default_max_age")]
1454 pub signature_max_age: u64,
1455}
1456
1457fn default_max_age() -> u64 {
1458 300
1459}
1460
1461#[derive(Serialize, Deserialize, Debug, Default)]
1463pub struct GeoIpConfig {
1464 pub path: Option<PathBuf>,
1466}
1467
1468#[derive(Serialize, Deserialize, Debug)]
1470#[serde(default)]
1471pub struct CardinalityLimiter {
1472 pub cache_vacuum_interval: u64,
1478}
1479
1480impl Default for CardinalityLimiter {
1481 fn default() -> Self {
1482 Self {
1483 cache_vacuum_interval: 180,
1484 }
1485 }
1486}
1487
1488#[derive(Serialize, Deserialize, Debug)]
1493#[serde(default)]
1494pub struct Health {
1495 pub refresh_interval_ms: u64,
1502 pub max_memory_bytes: Option<ByteSize>,
1507 pub max_memory_percent: f32,
1511 pub probe_timeout_ms: u64,
1518 pub memory_stat_refresh_frequency_ms: u64,
1524}
1525
1526impl Default for Health {
1527 fn default() -> Self {
1528 Self {
1529 refresh_interval_ms: 3000,
1530 max_memory_bytes: None,
1531 max_memory_percent: 0.95,
1532 probe_timeout_ms: 900,
1533 memory_stat_refresh_frequency_ms: 100,
1534 }
1535 }
1536}
1537
1538#[derive(Serialize, Deserialize, Debug)]
1540#[serde(default)]
1541pub struct Cogs {
1542 pub max_queue_size: u64,
1548 pub relay_resource_id: String,
1554}
1555
1556impl Default for Cogs {
1557 fn default() -> Self {
1558 Self {
1559 max_queue_size: 10_000,
1560 relay_resource_id: "relay_service".to_owned(),
1561 }
1562 }
1563}
1564
1565#[derive(Serialize, Deserialize, Debug, Default)]
1566struct ConfigValues {
1567 #[serde(default)]
1568 relay: Relay,
1569 #[serde(default)]
1570 http: Http,
1571 #[serde(default)]
1572 cache: Cache,
1573 #[serde(default)]
1574 spool: Spool,
1575 #[serde(default)]
1576 limits: Limits,
1577 #[serde(default)]
1578 logging: relay_log::LogConfig,
1579 #[serde(default)]
1580 routing: Routing,
1581 #[serde(default)]
1582 metrics: Metrics,
1583 #[serde(default)]
1584 sentry: relay_log::SentryConfig,
1585 #[serde(default)]
1586 processing: Processing,
1587 #[serde(default)]
1588 outcomes: Outcomes,
1589 #[serde(default)]
1590 aggregator: AggregatorServiceConfig,
1591 #[serde(default)]
1592 secondary_aggregators: Vec<ScopedAggregatorConfig>,
1593 #[serde(default)]
1594 auth: AuthConfig,
1595 #[serde(default)]
1596 geoip: GeoIpConfig,
1597 #[serde(default)]
1598 normalization: Normalization,
1599 #[serde(default)]
1600 cardinality_limiter: CardinalityLimiter,
1601 #[serde(default)]
1602 health: Health,
1603 #[serde(default)]
1604 cogs: Cogs,
1605}
1606
1607impl ConfigObject for ConfigValues {
1608 fn format() -> ConfigFormat {
1609 ConfigFormat::Yaml
1610 }
1611
1612 fn name() -> &'static str {
1613 "config"
1614 }
1615}
1616
1617pub struct Config {
1619 values: ConfigValues,
1620 credentials: Option<Credentials>,
1621 path: PathBuf,
1622}
1623
1624impl fmt::Debug for Config {
1625 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1626 f.debug_struct("Config")
1627 .field("path", &self.path)
1628 .field("values", &self.values)
1629 .finish()
1630 }
1631}
1632
1633impl Config {
1634 pub fn from_path<P: AsRef<Path>>(path: P) -> anyhow::Result<Config> {
1636 let path = env::current_dir()
1637 .map(|x| x.join(path.as_ref()))
1638 .unwrap_or_else(|_| path.as_ref().to_path_buf());
1639
1640 let config = Config {
1641 values: ConfigValues::load(&path)?,
1642 credentials: if Credentials::path(&path).exists() {
1643 Some(Credentials::load(&path)?)
1644 } else {
1645 None
1646 },
1647 path: path.clone(),
1648 };
1649
1650 if cfg!(not(feature = "processing")) && config.processing_enabled() {
1651 return Err(ConfigError::file(ConfigErrorKind::ProcessingNotAvailable, &path).into());
1652 }
1653
1654 Ok(config)
1655 }
1656
1657 pub fn from_json_value(value: serde_json::Value) -> anyhow::Result<Config> {
1661 Ok(Config {
1662 values: serde_json::from_value(value)
1663 .with_context(|| ConfigError::new(ConfigErrorKind::BadJson))?,
1664 credentials: None,
1665 path: PathBuf::new(),
1666 })
1667 }
1668
1669 pub fn apply_override(
1672 &mut self,
1673 mut overrides: OverridableConfig,
1674 ) -> anyhow::Result<&mut Self> {
1675 let relay = &mut self.values.relay;
1676
1677 if let Some(mode) = overrides.mode {
1678 relay.mode = mode
1679 .parse::<RelayMode>()
1680 .with_context(|| ConfigError::field("mode"))?;
1681 }
1682
1683 if let Some(deployment) = overrides.instance {
1684 relay.instance = deployment
1685 .parse::<RelayInstance>()
1686 .with_context(|| ConfigError::field("deployment"))?;
1687 }
1688
1689 if let Some(log_level) = overrides.log_level {
1690 self.values.logging.level = log_level.parse()?;
1691 }
1692
1693 if let Some(log_format) = overrides.log_format {
1694 self.values.logging.format = log_format.parse()?;
1695 }
1696
1697 if let Some(upstream) = overrides.upstream {
1698 relay.upstream = upstream
1699 .parse::<UpstreamDescriptor>()
1700 .with_context(|| ConfigError::field("upstream"))?;
1701 } else if let Some(upstream_dsn) = overrides.upstream_dsn {
1702 relay.upstream = upstream_dsn
1703 .parse::<Dsn>()
1704 .map(|dsn| UpstreamDescriptor::from_dsn(&dsn).into_owned())
1705 .with_context(|| ConfigError::field("upstream_dsn"))?;
1706 }
1707
1708 if let Some(host) = overrides.host {
1709 relay.host = host
1710 .parse::<IpAddr>()
1711 .with_context(|| ConfigError::field("host"))?;
1712 }
1713
1714 if let Some(port) = overrides.port {
1715 relay.port = port
1716 .as_str()
1717 .parse()
1718 .with_context(|| ConfigError::field("port"))?;
1719 }
1720
1721 let processing = &mut self.values.processing;
1722 if let Some(enabled) = overrides.processing {
1723 match enabled.to_lowercase().as_str() {
1724 "true" | "1" => processing.enabled = true,
1725 "false" | "0" | "" => processing.enabled = false,
1726 _ => return Err(ConfigError::field("processing").into()),
1727 }
1728 }
1729
1730 if let Some(redis) = overrides.redis_url {
1731 processing.redis = Some(RedisConfigs::Unified(RedisConfig::single(redis)))
1732 }
1733
1734 if let Some(kafka_url) = overrides.kafka_url {
1735 let existing = processing
1736 .kafka_config
1737 .iter_mut()
1738 .find(|e| e.name == "bootstrap.servers");
1739
1740 if let Some(config_param) = existing {
1741 config_param.value = kafka_url;
1742 } else {
1743 processing.kafka_config.push(KafkaConfigParam {
1744 name: "bootstrap.servers".to_owned(),
1745 value: kafka_url,
1746 })
1747 }
1748 }
1749 let id = if let Some(id) = overrides.id {
1751 let id = Uuid::parse_str(&id).with_context(|| ConfigError::field("id"))?;
1752 Some(id)
1753 } else {
1754 None
1755 };
1756 let public_key = if let Some(public_key) = overrides.public_key {
1757 let public_key = public_key
1758 .parse::<PublicKey>()
1759 .with_context(|| ConfigError::field("public_key"))?;
1760 Some(public_key)
1761 } else {
1762 None
1763 };
1764
1765 let secret_key = if let Some(secret_key) = overrides.secret_key {
1766 let secret_key = secret_key
1767 .parse::<SecretKey>()
1768 .with_context(|| ConfigError::field("secret_key"))?;
1769 Some(secret_key)
1770 } else {
1771 None
1772 };
1773 let outcomes = &mut self.values.outcomes;
1774 if overrides.outcome_source.is_some() {
1775 outcomes.source = overrides.outcome_source.take();
1776 }
1777
1778 if let Some(credentials) = &mut self.credentials {
1779 if let Some(id) = id {
1781 credentials.id = id;
1782 }
1783 if let Some(public_key) = public_key {
1784 credentials.public_key = public_key;
1785 }
1786 if let Some(secret_key) = secret_key {
1787 credentials.secret_key = secret_key
1788 }
1789 } else {
1790 match (id, public_key, secret_key) {
1792 (Some(id), Some(public_key), Some(secret_key)) => {
1793 self.credentials = Some(Credentials {
1794 secret_key,
1795 public_key,
1796 id,
1797 })
1798 }
1799 (None, None, None) => {
1800 }
1803 _ => {
1804 return Err(ConfigError::field("incomplete credentials").into());
1805 }
1806 }
1807 }
1808
1809 let limits = &mut self.values.limits;
1810 if let Some(shutdown_timeout) = overrides.shutdown_timeout
1811 && let Ok(shutdown_timeout) = shutdown_timeout.parse::<u64>()
1812 {
1813 limits.shutdown_timeout = shutdown_timeout;
1814 }
1815
1816 if let Some(server_name) = overrides.server_name {
1817 self.values.sentry.server_name = Some(server_name.into());
1818 }
1819
1820 Ok(self)
1821 }
1822
1823 pub fn config_exists<P: AsRef<Path>>(path: P) -> bool {
1825 fs::metadata(ConfigValues::path(path.as_ref())).is_ok()
1826 }
1827
1828 pub fn path(&self) -> &Path {
1830 &self.path
1831 }
1832
1833 pub fn to_yaml_string(&self) -> anyhow::Result<String> {
1835 serde_yaml::to_string(&self.values)
1836 .with_context(|| ConfigError::new(ConfigErrorKind::CouldNotWriteFile))
1837 }
1838
1839 pub fn regenerate_credentials(&mut self, save: bool) -> anyhow::Result<()> {
1843 let creds = Credentials::generate();
1844 if save {
1845 creds.save(&self.path)?;
1846 }
1847 self.credentials = Some(creds);
1848 Ok(())
1849 }
1850
1851 pub fn credentials(&self) -> Option<&Credentials> {
1853 self.credentials.as_ref()
1854 }
1855
1856 pub fn replace_credentials(
1860 &mut self,
1861 credentials: Option<Credentials>,
1862 ) -> anyhow::Result<bool> {
1863 if self.credentials == credentials {
1864 return Ok(false);
1865 }
1866
1867 match credentials {
1868 Some(ref creds) => {
1869 creds.save(&self.path)?;
1870 }
1871 None => {
1872 let path = Credentials::path(&self.path);
1873 if fs::metadata(&path).is_ok() {
1874 fs::remove_file(&path).with_context(|| {
1875 ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path)
1876 })?;
1877 }
1878 }
1879 }
1880
1881 self.credentials = credentials;
1882 Ok(true)
1883 }
1884
1885 pub fn has_credentials(&self) -> bool {
1887 self.credentials.is_some()
1888 }
1889
1890 pub fn secret_key(&self) -> Option<&SecretKey> {
1892 self.credentials.as_ref().map(|x| &x.secret_key)
1893 }
1894
1895 pub fn public_key(&self) -> Option<&PublicKey> {
1897 self.credentials.as_ref().map(|x| &x.public_key)
1898 }
1899
1900 pub fn relay_id(&self) -> Option<&RelayId> {
1902 self.credentials.as_ref().map(|x| &x.id)
1903 }
1904
1905 pub fn relay_mode(&self) -> RelayMode {
1907 self.values.relay.mode
1908 }
1909
1910 pub fn relay_instance(&self) -> RelayInstance {
1912 self.values.relay.instance
1913 }
1914
1915 pub fn upstream_descriptor(&self) -> &UpstreamDescriptor<'_> {
1917 &self.values.relay.upstream
1918 }
1919
1920 pub fn http_host_header(&self) -> Option<&str> {
1922 self.values.http.host_header.as_deref()
1923 }
1924
1925 pub fn listen_addr(&self) -> SocketAddr {
1927 (self.values.relay.host, self.values.relay.port).into()
1928 }
1929
1930 pub fn tls_listen_addr(&self) -> Option<SocketAddr> {
1932 if self.values.relay.tls_identity_path.is_some() {
1933 let port = self.values.relay.tls_port.unwrap_or(3443);
1934 Some((self.values.relay.host, port).into())
1935 } else {
1936 None
1937 }
1938 }
1939
1940 pub fn tls_identity_path(&self) -> Option<&Path> {
1942 self.values.relay.tls_identity_path.as_deref()
1943 }
1944
1945 pub fn tls_identity_password(&self) -> Option<&str> {
1947 self.values.relay.tls_identity_password.as_deref()
1948 }
1949
1950 pub fn override_project_ids(&self) -> bool {
1954 self.values.relay.override_project_ids
1955 }
1956
1957 pub fn requires_auth(&self) -> bool {
1961 match self.values.auth.ready {
1962 ReadinessCondition::Authenticated => self.relay_mode() == RelayMode::Managed,
1963 ReadinessCondition::Always => false,
1964 }
1965 }
1966
1967 pub fn http_auth_interval(&self) -> Option<Duration> {
1971 if self.processing_enabled() {
1972 return None;
1973 }
1974
1975 match self.values.http.auth_interval {
1976 None | Some(0) => None,
1977 Some(secs) => Some(Duration::from_secs(secs)),
1978 }
1979 }
1980
1981 pub fn http_outage_grace_period(&self) -> Duration {
1984 Duration::from_secs(self.values.http.outage_grace_period)
1985 }
1986
1987 pub fn http_retry_delay(&self) -> Duration {
1992 Duration::from_secs(self.values.http.retry_delay)
1993 }
1994
1995 pub fn http_project_failure_interval(&self) -> Duration {
1997 Duration::from_secs(self.values.http.project_failure_interval)
1998 }
1999
2000 pub fn http_encoding(&self) -> HttpEncoding {
2002 self.values.http.encoding
2003 }
2004
2005 pub fn http_global_metrics(&self) -> bool {
2007 self.values.http.global_metrics
2008 }
2009
2010 pub fn emit_outcomes(&self) -> EmitOutcomes {
2015 if self.processing_enabled() {
2016 return EmitOutcomes::AsOutcomes;
2017 }
2018 self.values.outcomes.emit_outcomes
2019 }
2020
2021 pub fn emit_client_outcomes(&self) -> bool {
2031 self.values.outcomes.emit_client_outcomes
2032 }
2033
2034 pub fn outcome_batch_size(&self) -> usize {
2036 self.values.outcomes.batch_size
2037 }
2038
2039 pub fn outcome_batch_interval(&self) -> Duration {
2041 Duration::from_millis(self.values.outcomes.batch_interval)
2042 }
2043
2044 pub fn outcome_source(&self) -> Option<&str> {
2046 self.values.outcomes.source.as_deref()
2047 }
2048
2049 pub fn outcome_aggregator(&self) -> &OutcomeAggregatorConfig {
2051 &self.values.outcomes.aggregator
2052 }
2053
2054 pub fn logging(&self) -> &relay_log::LogConfig {
2056 &self.values.logging
2057 }
2058
2059 pub fn sentry(&self) -> &relay_log::SentryConfig {
2061 &self.values.sentry
2062 }
2063
2064 pub fn statsd_addrs(&self) -> anyhow::Result<Vec<SocketAddr>> {
2068 if let Some(ref addr) = self.values.metrics.statsd {
2069 let addrs = addr
2070 .as_str()
2071 .to_socket_addrs()
2072 .with_context(|| ConfigError::file(ConfigErrorKind::InvalidValue, &self.path))?
2073 .collect();
2074 Ok(addrs)
2075 } else {
2076 Ok(vec![])
2077 }
2078 }
2079
2080 pub fn metrics_prefix(&self) -> &str {
2082 &self.values.metrics.prefix
2083 }
2084
2085 pub fn metrics_default_tags(&self) -> &BTreeMap<String, String> {
2087 &self.values.metrics.default_tags
2088 }
2089
2090 pub fn metrics_hostname_tag(&self) -> Option<&str> {
2092 self.values.metrics.hostname_tag.as_deref()
2093 }
2094
2095 pub fn metrics_sample_rate(&self) -> f32 {
2097 self.values.metrics.sample_rate
2098 }
2099
2100 pub fn metrics_aggregate(&self) -> bool {
2102 self.values.metrics.aggregate
2103 }
2104
2105 pub fn metrics_allow_high_cardinality_tags(&self) -> bool {
2107 self.values.metrics.allow_high_cardinality_tags
2108 }
2109
2110 pub fn metrics_periodic_interval(&self) -> Option<Duration> {
2114 match self.values.metrics.periodic_secs {
2115 0 => None,
2116 secs => Some(Duration::from_secs(secs)),
2117 }
2118 }
2119
2120 pub fn http_timeout(&self) -> Duration {
2122 Duration::from_secs(self.values.http.timeout.into())
2123 }
2124
2125 pub fn http_connection_timeout(&self) -> Duration {
2127 Duration::from_secs(self.values.http.connection_timeout.into())
2128 }
2129
2130 pub fn http_max_retry_interval(&self) -> Duration {
2132 Duration::from_secs(self.values.http.max_retry_interval.into())
2133 }
2134
2135 pub fn project_cache_expiry(&self) -> Duration {
2137 Duration::from_secs(self.values.cache.project_expiry.into())
2138 }
2139
2140 pub fn request_full_project_config(&self) -> bool {
2142 self.values.cache.project_request_full_config
2143 }
2144
2145 pub fn relay_cache_expiry(&self) -> Duration {
2147 Duration::from_secs(self.values.cache.relay_expiry.into())
2148 }
2149
2150 pub fn envelope_buffer_size(&self) -> usize {
2152 self.values
2153 .cache
2154 .envelope_buffer_size
2155 .try_into()
2156 .unwrap_or(usize::MAX)
2157 }
2158
2159 pub fn cache_miss_expiry(&self) -> Duration {
2161 Duration::from_secs(self.values.cache.miss_expiry.into())
2162 }
2163
2164 pub fn project_grace_period(&self) -> Duration {
2166 Duration::from_secs(self.values.cache.project_grace_period.into())
2167 }
2168
2169 pub fn project_refresh_interval(&self) -> Option<Duration> {
2173 self.values
2174 .cache
2175 .project_refresh_interval
2176 .map(Into::into)
2177 .map(Duration::from_secs)
2178 }
2179
2180 pub fn query_batch_interval(&self) -> Duration {
2183 Duration::from_millis(self.values.cache.batch_interval.into())
2184 }
2185
2186 pub fn downstream_relays_batch_interval(&self) -> Duration {
2188 Duration::from_millis(self.values.cache.downstream_relays_batch_interval.into())
2189 }
2190
2191 pub fn local_cache_interval(&self) -> Duration {
2193 Duration::from_secs(self.values.cache.file_interval.into())
2194 }
2195
2196 pub fn global_config_fetch_interval(&self) -> Duration {
2199 Duration::from_secs(self.values.cache.global_config_fetch_interval.into())
2200 }
2201
2202 pub fn spool_envelopes_path(&self, partition_id: u8) -> Option<PathBuf> {
2207 let mut path = self
2208 .values
2209 .spool
2210 .envelopes
2211 .path
2212 .as_ref()
2213 .map(|path| path.to_owned())?;
2214
2215 if partition_id == 0 {
2216 return Some(path);
2217 }
2218
2219 let file_name = path.file_name().and_then(|f| f.to_str())?;
2220 let new_file_name = format!("{file_name}.{partition_id}");
2221 path.set_file_name(new_file_name);
2222
2223 Some(path)
2224 }
2225
2226 pub fn spool_envelopes_max_disk_size(&self) -> usize {
2228 self.values.spool.envelopes.max_disk_size.as_bytes()
2229 }
2230
2231 pub fn spool_envelopes_batch_size_bytes(&self) -> usize {
2234 self.values.spool.envelopes.batch_size_bytes.as_bytes()
2235 }
2236
2237 pub fn spool_envelopes_max_age(&self) -> Duration {
2239 Duration::from_secs(self.values.spool.envelopes.max_envelope_delay_secs)
2240 }
2241
2242 pub fn spool_disk_usage_refresh_frequency_ms(&self) -> Duration {
2244 Duration::from_millis(self.values.spool.envelopes.disk_usage_refresh_frequency_ms)
2245 }
2246
2247 pub fn spool_max_backpressure_envelopes(&self) -> usize {
2249 self.values.spool.envelopes.max_backpressure_envelopes
2250 }
2251
2252 pub fn spool_max_backpressure_memory_percent(&self) -> f32 {
2254 self.values.spool.envelopes.max_backpressure_memory_percent
2255 }
2256
2257 pub fn spool_partitions(&self) -> NonZeroU8 {
2259 self.values.spool.envelopes.partitions
2260 }
2261
2262 pub fn max_event_size(&self) -> usize {
2264 self.values.limits.max_event_size.as_bytes()
2265 }
2266
2267 pub fn max_attachment_size(&self) -> usize {
2269 self.values.limits.max_attachment_size.as_bytes()
2270 }
2271
2272 pub fn max_attachments_size(&self) -> usize {
2275 self.values.limits.max_attachments_size.as_bytes()
2276 }
2277
2278 pub fn max_client_reports_size(&self) -> usize {
2280 self.values.limits.max_client_reports_size.as_bytes()
2281 }
2282
2283 pub fn max_check_in_size(&self) -> usize {
2285 self.values.limits.max_check_in_size.as_bytes()
2286 }
2287
2288 pub fn max_log_size(&self) -> usize {
2290 self.values.limits.max_log_size.as_bytes()
2291 }
2292
2293 pub fn max_span_size(&self) -> usize {
2295 self.values.limits.max_span_size.as_bytes()
2296 }
2297
2298 pub fn max_container_size(&self) -> usize {
2300 self.values.limits.max_container_size.as_bytes()
2301 }
2302
2303 pub fn max_envelope_size(&self) -> usize {
2307 self.values.limits.max_envelope_size.as_bytes()
2308 }
2309
2310 pub fn max_session_count(&self) -> usize {
2312 self.values.limits.max_session_count
2313 }
2314
2315 pub fn max_span_count(&self) -> usize {
2317 self.values.limits.max_span_count
2318 }
2319
2320 pub fn max_log_count(&self) -> usize {
2322 self.values.limits.max_log_count
2323 }
2324
2325 pub fn max_statsd_size(&self) -> usize {
2327 self.values.limits.max_statsd_size.as_bytes()
2328 }
2329
2330 pub fn max_metric_buckets_size(&self) -> usize {
2332 self.values.limits.max_metric_buckets_size.as_bytes()
2333 }
2334
2335 pub fn max_api_payload_size(&self) -> usize {
2337 self.values.limits.max_api_payload_size.as_bytes()
2338 }
2339
2340 pub fn max_api_file_upload_size(&self) -> usize {
2342 self.values.limits.max_api_file_upload_size.as_bytes()
2343 }
2344
2345 pub fn max_api_chunk_upload_size(&self) -> usize {
2347 self.values.limits.max_api_chunk_upload_size.as_bytes()
2348 }
2349
2350 pub fn max_profile_size(&self) -> usize {
2352 self.values.limits.max_profile_size.as_bytes()
2353 }
2354
2355 pub fn max_replay_compressed_size(&self) -> usize {
2357 self.values.limits.max_replay_compressed_size.as_bytes()
2358 }
2359
2360 pub fn max_replay_uncompressed_size(&self) -> usize {
2362 self.values.limits.max_replay_uncompressed_size.as_bytes()
2363 }
2364
2365 pub fn max_replay_message_size(&self) -> usize {
2371 self.values.limits.max_replay_message_size.as_bytes()
2372 }
2373
2374 pub fn max_concurrent_requests(&self) -> usize {
2376 self.values.limits.max_concurrent_requests
2377 }
2378
2379 pub fn max_concurrent_queries(&self) -> usize {
2381 self.values.limits.max_concurrent_queries
2382 }
2383
2384 pub fn query_timeout(&self) -> Duration {
2386 Duration::from_secs(self.values.limits.query_timeout)
2387 }
2388
2389 pub fn shutdown_timeout(&self) -> Duration {
2392 Duration::from_secs(self.values.limits.shutdown_timeout)
2393 }
2394
2395 pub fn keepalive_timeout(&self) -> Duration {
2399 Duration::from_secs(self.values.limits.keepalive_timeout)
2400 }
2401
2402 pub fn idle_timeout(&self) -> Option<Duration> {
2404 self.values.limits.idle_timeout.map(Duration::from_secs)
2405 }
2406
2407 pub fn max_connections(&self) -> Option<usize> {
2409 self.values.limits.max_connections
2410 }
2411
2412 pub fn tcp_listen_backlog(&self) -> u32 {
2414 self.values.limits.tcp_listen_backlog
2415 }
2416
2417 pub fn cpu_concurrency(&self) -> usize {
2419 self.values.limits.max_thread_count
2420 }
2421
2422 pub fn pool_concurrency(&self) -> usize {
2424 self.values.limits.max_pool_concurrency
2425 }
2426
2427 pub fn query_batch_size(&self) -> usize {
2429 self.values.cache.batch_size
2430 }
2431
2432 pub fn project_configs_path(&self) -> PathBuf {
2434 self.path.join("projects")
2435 }
2436
2437 pub fn processing_enabled(&self) -> bool {
2439 self.values.processing.enabled
2440 }
2441
2442 pub fn normalization_level(&self) -> NormalizationLevel {
2444 self.values.normalization.level
2445 }
2446
2447 pub fn geoip_path(&self) -> Option<&Path> {
2449 self.values
2450 .geoip
2451 .path
2452 .as_deref()
2453 .or(self.values.processing.geoip_path.as_deref())
2454 }
2455
2456 pub fn max_secs_in_future(&self) -> i64 {
2460 self.values.processing.max_secs_in_future.into()
2461 }
2462
2463 pub fn max_session_secs_in_past(&self) -> i64 {
2465 self.values.processing.max_session_secs_in_past.into()
2466 }
2467
2468 pub fn kafka_configs(
2470 &self,
2471 topic: KafkaTopic,
2472 ) -> Result<KafkaTopicConfig<'_>, KafkaConfigError> {
2473 self.values.processing.topics.get(topic).kafka_configs(
2474 &self.values.processing.kafka_config,
2475 &self.values.processing.secondary_kafka_configs,
2476 )
2477 }
2478
2479 pub fn kafka_validate_topics(&self) -> bool {
2481 self.values.processing.kafka_validate_topics
2482 }
2483
2484 pub fn unused_topic_assignments(&self) -> &relay_kafka::Unused {
2486 &self.values.processing.topics.unused
2487 }
2488
2489 pub fn redis(&self) -> Option<RedisConfigsRef<'_>> {
2492 let redis_configs = self.values.processing.redis.as_ref()?;
2493
2494 Some(build_redis_configs(
2495 redis_configs,
2496 self.cpu_concurrency() as u32,
2497 ))
2498 }
2499
2500 pub fn attachment_chunk_size(&self) -> usize {
2502 self.values.processing.attachment_chunk_size.as_bytes()
2503 }
2504
2505 pub fn metrics_max_batch_size_bytes(&self) -> usize {
2507 self.values.aggregator.max_flush_bytes
2508 }
2509
2510 pub fn projectconfig_cache_prefix(&self) -> &str {
2513 &self.values.processing.projectconfig_cache_prefix
2514 }
2515
2516 pub fn max_rate_limit(&self) -> Option<u64> {
2518 self.values.processing.max_rate_limit.map(u32::into)
2519 }
2520
2521 pub fn cardinality_limiter_cache_vacuum_interval(&self) -> Duration {
2525 Duration::from_secs(self.values.cardinality_limiter.cache_vacuum_interval)
2526 }
2527
2528 pub fn health_refresh_interval(&self) -> Duration {
2530 Duration::from_millis(self.values.health.refresh_interval_ms)
2531 }
2532
2533 pub fn health_max_memory_watermark_bytes(&self) -> u64 {
2535 self.values
2536 .health
2537 .max_memory_bytes
2538 .as_ref()
2539 .map_or(u64::MAX, |b| b.as_bytes() as u64)
2540 }
2541
2542 pub fn health_max_memory_watermark_percent(&self) -> f32 {
2544 self.values.health.max_memory_percent
2545 }
2546
2547 pub fn health_probe_timeout(&self) -> Duration {
2549 Duration::from_millis(self.values.health.probe_timeout_ms)
2550 }
2551
2552 pub fn memory_stat_refresh_frequency_ms(&self) -> u64 {
2554 self.values.health.memory_stat_refresh_frequency_ms
2555 }
2556
2557 pub fn cogs_max_queue_size(&self) -> u64 {
2559 self.values.cogs.max_queue_size
2560 }
2561
2562 pub fn cogs_relay_resource_id(&self) -> &str {
2564 &self.values.cogs.relay_resource_id
2565 }
2566
2567 pub fn default_aggregator_config(&self) -> &AggregatorServiceConfig {
2569 &self.values.aggregator
2570 }
2571
2572 pub fn secondary_aggregator_configs(&self) -> &Vec<ScopedAggregatorConfig> {
2574 &self.values.secondary_aggregators
2575 }
2576
2577 pub fn aggregator_config_for(&self, namespace: MetricNamespace) -> &AggregatorServiceConfig {
2579 for entry in &self.values.secondary_aggregators {
2580 if entry.condition.matches(Some(namespace)) {
2581 return &entry.config;
2582 }
2583 }
2584 &self.values.aggregator
2585 }
2586
2587 pub fn static_relays(&self) -> &HashMap<RelayId, RelayInfo> {
2589 &self.values.auth.static_relays
2590 }
2591
2592 pub fn signature_max_age(&self) -> Duration {
2594 Duration::from_secs(self.values.auth.signature_max_age)
2595 }
2596
2597 pub fn accept_unknown_items(&self) -> bool {
2599 let forward = self.values.routing.accept_unknown_items;
2600 forward.unwrap_or_else(|| !self.processing_enabled())
2601 }
2602}
2603
2604impl Default for Config {
2605 fn default() -> Self {
2606 Self {
2607 values: ConfigValues::default(),
2608 credentials: None,
2609 path: PathBuf::new(),
2610 }
2611 }
2612}
2613
2614#[cfg(test)]
2615mod tests {
2616
2617 use super::*;
2618
2619 #[test]
2621 fn test_event_buffer_size() {
2622 let yaml = r###"
2623cache:
2624 event_buffer_size: 1000000
2625 event_expiry: 1800
2626"###;
2627
2628 let values: ConfigValues = serde_yaml::from_str(yaml).unwrap();
2629 assert_eq!(values.cache.envelope_buffer_size, 1_000_000);
2630 assert_eq!(values.cache.envelope_expiry, 1800);
2631 }
2632
2633 #[test]
2634 fn test_emit_outcomes() {
2635 for (serialized, deserialized) in &[
2636 ("true", EmitOutcomes::AsOutcomes),
2637 ("false", EmitOutcomes::None),
2638 ("\"as_client_reports\"", EmitOutcomes::AsClientReports),
2639 ] {
2640 let value: EmitOutcomes = serde_json::from_str(serialized).unwrap();
2641 assert_eq!(value, *deserialized);
2642 assert_eq!(serde_json::to_string(&value).unwrap(), *serialized);
2643 }
2644 }
2645
2646 #[test]
2647 fn test_emit_outcomes_invalid() {
2648 assert!(serde_json::from_str::<EmitOutcomes>("asdf").is_err());
2649 }
2650}