1use std::collections::{BTreeMap, HashMap};
2use std::error::Error;
3use std::io::Write;
4use std::net::{IpAddr, SocketAddr};
5use std::num::NonZeroU8;
6use std::path::{Path, PathBuf};
7use std::str::FromStr;
8use std::time::Duration;
9use std::{env, fmt, fs, io};
10
11use anyhow::Context;
12use relay_auth::{PublicKey, RelayId, SecretKey, generate_key_pair, generate_relay_id};
13use relay_common::Dsn;
14use relay_kafka::{
15 ConfigError as KafkaConfigError, KafkaConfigParam, KafkaTopic, KafkaTopicConfig,
16 TopicAssignments,
17};
18use relay_metrics::MetricNamespace;
19use serde::de::{DeserializeOwned, Unexpected, Visitor};
20use serde::{Deserialize, Deserializer, Serialize, Serializer};
21use uuid::Uuid;
22
23use crate::aggregator::{AggregatorServiceConfig, ScopedAggregatorConfig};
24use crate::byte_size::ByteSize;
25use crate::upstream::UpstreamDescriptor;
26use crate::{RedisConfig, RedisConfigs, RedisConfigsRef, build_redis_configs};
27
28const DEFAULT_NETWORK_OUTAGE_GRACE_PERIOD: u64 = 10;
29
30static CONFIG_YAML_HEADER: &str = r###"# Please see the relevant documentation.
31# Performance tuning: https://docs.sentry.io/product/relay/operating-guidelines/
32# All config options: https://docs.sentry.io/product/relay/options/
33"###;
34
35#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
37#[non_exhaustive]
38pub enum ConfigErrorKind {
39 CouldNotOpenFile,
41 CouldNotWriteFile,
43 BadYaml,
45 BadJson,
47 InvalidValue,
49 ProcessingNotAvailable,
52}
53
54impl fmt::Display for ConfigErrorKind {
55 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
56 match self {
57 Self::CouldNotOpenFile => write!(f, "could not open config file"),
58 Self::CouldNotWriteFile => write!(f, "could not write config file"),
59 Self::BadYaml => write!(f, "could not parse yaml config file"),
60 Self::BadJson => write!(f, "could not parse json config file"),
61 Self::InvalidValue => write!(f, "invalid config value"),
62 Self::ProcessingNotAvailable => write!(
63 f,
64 "was not compiled with processing, cannot enable processing"
65 ),
66 }
67 }
68}
69
70#[derive(Debug, Default)]
72enum ConfigErrorSource {
73 #[default]
75 None,
76 File(PathBuf),
78 FieldOverride(String),
80}
81
82impl fmt::Display for ConfigErrorSource {
83 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84 match self {
85 ConfigErrorSource::None => Ok(()),
86 ConfigErrorSource::File(file_name) => {
87 write!(f, " (file {})", file_name.display())
88 }
89 ConfigErrorSource::FieldOverride(name) => write!(f, " (field {name})"),
90 }
91 }
92}
93
94#[derive(Debug)]
96pub struct ConfigError {
97 source: ConfigErrorSource,
98 kind: ConfigErrorKind,
99}
100
101impl ConfigError {
102 #[inline]
103 fn new(kind: ConfigErrorKind) -> Self {
104 Self {
105 source: ConfigErrorSource::None,
106 kind,
107 }
108 }
109
110 #[inline]
111 fn field(field: &'static str) -> Self {
112 Self {
113 source: ConfigErrorSource::FieldOverride(field.to_owned()),
114 kind: ConfigErrorKind::InvalidValue,
115 }
116 }
117
118 #[inline]
119 fn file(kind: ConfigErrorKind, p: impl AsRef<Path>) -> Self {
120 Self {
121 source: ConfigErrorSource::File(p.as_ref().to_path_buf()),
122 kind,
123 }
124 }
125
126 pub fn kind(&self) -> ConfigErrorKind {
128 self.kind
129 }
130}
131
132impl fmt::Display for ConfigError {
133 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
134 write!(f, "{}{}", self.kind(), self.source)
135 }
136}
137
138impl Error for ConfigError {}
139
140enum ConfigFormat {
141 Yaml,
142 Json,
143}
144
145impl ConfigFormat {
146 pub fn extension(&self) -> &'static str {
147 match self {
148 ConfigFormat::Yaml => "yml",
149 ConfigFormat::Json => "json",
150 }
151 }
152}
153
154trait ConfigObject: DeserializeOwned + Serialize {
155 fn format() -> ConfigFormat;
157
158 fn name() -> &'static str;
160
161 fn path(base: &Path) -> PathBuf {
163 base.join(format!("{}.{}", Self::name(), Self::format().extension()))
164 }
165
166 fn load(base: &Path) -> anyhow::Result<Self> {
168 let path = Self::path(base);
169
170 let f = fs::File::open(&path)
171 .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotOpenFile, &path))?;
172 let f = io::BufReader::new(f);
173
174 let mut source = {
175 let file = serde_vars::FileSource::default()
176 .with_variable_prefix("${file:")
177 .with_variable_suffix("}")
178 .with_base_path(base);
179 let env = serde_vars::EnvSource::default()
180 .with_variable_prefix("${")
181 .with_variable_suffix("}");
182 (file, env)
183 };
184 match Self::format() {
185 ConfigFormat::Yaml => {
186 serde_vars::deserialize(serde_yaml::Deserializer::from_reader(f), &mut source)
187 .with_context(|| ConfigError::file(ConfigErrorKind::BadYaml, &path))
188 }
189 ConfigFormat::Json => {
190 serde_vars::deserialize(&mut serde_json::Deserializer::from_reader(f), &mut source)
191 .with_context(|| ConfigError::file(ConfigErrorKind::BadJson, &path))
192 }
193 }
194 }
195
196 fn save(&self, base: &Path) -> anyhow::Result<()> {
198 let path = Self::path(base);
199 let mut options = fs::OpenOptions::new();
200 options.write(true).truncate(true).create(true);
201
202 #[cfg(unix)]
204 {
205 use std::os::unix::fs::OpenOptionsExt;
206 options.mode(0o600);
207 }
208
209 let mut f = options
210 .open(&path)
211 .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path))?;
212
213 match Self::format() {
214 ConfigFormat::Yaml => {
215 f.write_all(CONFIG_YAML_HEADER.as_bytes())?;
216 serde_yaml::to_writer(&mut f, self)
217 .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path))?
218 }
219 ConfigFormat::Json => serde_json::to_writer_pretty(&mut f, self)
220 .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path))?,
221 }
222
223 f.write_all(b"\n").ok();
224
225 Ok(())
226 }
227}
228
229#[derive(Debug, Default)]
232pub struct OverridableConfig {
233 pub mode: Option<String>,
235 pub instance: Option<String>,
237 pub log_level: Option<String>,
239 pub log_format: Option<String>,
241 pub upstream: Option<String>,
243 pub upstream_dsn: Option<String>,
245 pub host: Option<String>,
247 pub port: Option<String>,
249 pub processing: Option<String>,
251 pub kafka_url: Option<String>,
253 pub redis_url: Option<String>,
255 pub id: Option<String>,
257 pub secret_key: Option<String>,
259 pub public_key: Option<String>,
261 pub outcome_source: Option<String>,
263 pub shutdown_timeout: Option<String>,
265 pub server_name: Option<String>,
267}
268
269#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
271pub struct Credentials {
272 pub secret_key: SecretKey,
274 pub public_key: PublicKey,
276 pub id: RelayId,
278}
279
280impl Credentials {
281 pub fn generate() -> Self {
283 relay_log::info!("generating new relay credentials");
284 let (sk, pk) = generate_key_pair();
285 Self {
286 secret_key: sk,
287 public_key: pk,
288 id: generate_relay_id(),
289 }
290 }
291
292 pub fn to_json_string(&self) -> anyhow::Result<String> {
294 serde_json::to_string(self)
295 .with_context(|| ConfigError::new(ConfigErrorKind::CouldNotWriteFile))
296 }
297}
298
299impl ConfigObject for Credentials {
300 fn format() -> ConfigFormat {
301 ConfigFormat::Json
302 }
303 fn name() -> &'static str {
304 "credentials"
305 }
306}
307
308#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
310#[serde(rename_all = "camelCase")]
311pub struct RelayInfo {
312 pub public_key: PublicKey,
314
315 #[serde(default)]
317 pub internal: bool,
318}
319
320impl RelayInfo {
321 pub fn new(public_key: PublicKey) -> Self {
323 Self {
324 public_key,
325 internal: false,
326 }
327 }
328}
329
330#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
332#[serde(rename_all = "camelCase")]
333pub enum RelayMode {
334 Proxy,
340
341 Managed,
347}
348
349impl<'de> Deserialize<'de> for RelayMode {
350 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
351 where
352 D: Deserializer<'de>,
353 {
354 let s = String::deserialize(deserializer)?;
355 match s.as_str() {
356 "proxy" => Ok(RelayMode::Proxy),
357 "managed" => Ok(RelayMode::Managed),
358 "static" => Err(serde::de::Error::custom(
359 "Relay mode 'static' has been removed. Please use 'managed' or 'proxy' instead.",
360 )),
361 other => Err(serde::de::Error::unknown_variant(
362 other,
363 &["proxy", "managed"],
364 )),
365 }
366 }
367}
368
369impl fmt::Display for RelayMode {
370 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
371 match self {
372 RelayMode::Proxy => write!(f, "proxy"),
373 RelayMode::Managed => write!(f, "managed"),
374 }
375 }
376}
377
378#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize)]
380#[serde(rename_all = "camelCase")]
381pub enum RelayInstance {
382 Default,
384
385 Canary,
387}
388
389impl RelayInstance {
390 pub fn is_canary(&self) -> bool {
392 matches!(self, RelayInstance::Canary)
393 }
394}
395
396impl fmt::Display for RelayInstance {
397 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
398 match self {
399 RelayInstance::Default => write!(f, "default"),
400 RelayInstance::Canary => write!(f, "canary"),
401 }
402 }
403}
404
405impl FromStr for RelayInstance {
406 type Err = fmt::Error;
407
408 fn from_str(s: &str) -> Result<Self, Self::Err> {
409 match s {
410 "canary" => Ok(RelayInstance::Canary),
411 _ => Ok(RelayInstance::Default),
412 }
413 }
414}
415
416#[derive(Clone, Copy, Debug, Eq, PartialEq)]
418pub struct ParseRelayModeError;
419
420impl fmt::Display for ParseRelayModeError {
421 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
422 write!(f, "Relay mode must be one of: managed or proxy")
423 }
424}
425
426impl Error for ParseRelayModeError {}
427
428impl FromStr for RelayMode {
429 type Err = ParseRelayModeError;
430
431 fn from_str(s: &str) -> Result<Self, Self::Err> {
432 match s {
433 "proxy" => Ok(RelayMode::Proxy),
434 "managed" => Ok(RelayMode::Managed),
435 _ => Err(ParseRelayModeError),
436 }
437 }
438}
439
440fn is_default<T: Default + PartialEq>(t: &T) -> bool {
442 *t == T::default()
443}
444
445fn is_docker() -> bool {
447 if fs::metadata("/.dockerenv").is_ok() {
448 return true;
449 }
450
451 fs::read_to_string("/proc/self/cgroup").is_ok_and(|s| s.contains("/docker"))
452}
453
454fn default_host() -> IpAddr {
456 if is_docker() {
457 "0.0.0.0".parse().unwrap()
459 } else {
460 "127.0.0.1".parse().unwrap()
461 }
462}
463
464#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize)]
468#[serde(rename_all = "lowercase")]
469#[derive(Default)]
470pub enum ReadinessCondition {
471 #[default]
480 Authenticated,
481 Always,
483}
484
485#[derive(Serialize, Deserialize, Debug)]
487#[serde(default)]
488pub struct Relay {
489 pub mode: RelayMode,
491 pub instance: RelayInstance,
493 pub upstream: UpstreamDescriptor<'static>,
495 pub host: IpAddr,
497 pub port: u16,
499 pub internal_host: Option<IpAddr>,
513 pub internal_port: Option<u16>,
517 #[serde(skip_serializing)]
519 pub tls_port: Option<u16>,
520 #[serde(skip_serializing)]
522 pub tls_identity_path: Option<PathBuf>,
523 #[serde(skip_serializing)]
525 pub tls_identity_password: Option<String>,
526 #[serde(skip_serializing_if = "is_default")]
531 pub override_project_ids: bool,
532}
533
534impl Default for Relay {
535 fn default() -> Self {
536 Relay {
537 mode: RelayMode::Managed,
538 instance: RelayInstance::Default,
539 upstream: "https://sentry.io/".parse().unwrap(),
540 host: default_host(),
541 port: 3000,
542 internal_host: None,
543 internal_port: None,
544 tls_port: None,
545 tls_identity_path: None,
546 tls_identity_password: None,
547 override_project_ids: false,
548 }
549 }
550}
551
552#[derive(Serialize, Deserialize, Debug)]
554#[serde(default)]
555pub struct Metrics {
556 pub statsd: Option<String>,
560 pub statsd_buffer_size: Option<usize>,
564 pub prefix: String,
568 pub default_tags: BTreeMap<String, String>,
570 pub hostname_tag: Option<String>,
572 pub periodic_secs: u64,
577}
578
579impl Default for Metrics {
580 fn default() -> Self {
581 Metrics {
582 statsd: None,
583 statsd_buffer_size: None,
584 prefix: "sentry.relay".into(),
585 default_tags: BTreeMap::new(),
586 hostname_tag: None,
587 periodic_secs: 5,
588 }
589 }
590}
591
592#[derive(Serialize, Deserialize, Debug)]
594#[serde(default)]
595pub struct Limits {
596 pub max_concurrent_requests: usize,
599 pub max_concurrent_queries: usize,
604 pub max_event_size: ByteSize,
606 pub max_attachment_size: ByteSize,
608 pub max_upload_size: ByteSize,
610 pub max_attachments_size: ByteSize,
612 pub max_client_reports_size: ByteSize,
614 pub max_check_in_size: ByteSize,
616 pub max_envelope_size: ByteSize,
618 pub max_session_count: usize,
620 pub max_api_payload_size: ByteSize,
622 pub max_api_file_upload_size: ByteSize,
624 pub max_api_chunk_upload_size: ByteSize,
626 pub max_profile_size: ByteSize,
628 pub max_trace_metric_size: ByteSize,
630 pub max_log_size: ByteSize,
632 pub max_span_size: ByteSize,
634 pub max_container_size: ByteSize,
636 pub max_statsd_size: ByteSize,
638 pub max_metric_buckets_size: ByteSize,
640 pub max_replay_compressed_size: ByteSize,
642 #[serde(alias = "max_replay_size")]
644 max_replay_uncompressed_size: ByteSize,
645 pub max_replay_message_size: ByteSize,
647 pub max_removed_attribute_key_size: ByteSize,
657 pub max_thread_count: usize,
662 pub max_pool_concurrency: usize,
669 pub query_timeout: u64,
672 pub shutdown_timeout: u64,
675 pub keepalive_timeout: u64,
679 pub idle_timeout: Option<u64>,
686 pub max_connections: Option<usize>,
692 pub tcp_listen_backlog: u32,
700}
701
702impl Default for Limits {
703 fn default() -> Self {
704 Limits {
705 max_concurrent_requests: 100,
706 max_concurrent_queries: 5,
707 max_event_size: ByteSize::mebibytes(1),
708 max_attachment_size: ByteSize::mebibytes(200),
709 max_upload_size: ByteSize::mebibytes(1024),
710 max_attachments_size: ByteSize::mebibytes(200),
711 max_client_reports_size: ByteSize::kibibytes(4),
712 max_check_in_size: ByteSize::kibibytes(100),
713 max_envelope_size: ByteSize::mebibytes(200),
714 max_session_count: 100,
715 max_api_payload_size: ByteSize::mebibytes(20),
716 max_api_file_upload_size: ByteSize::mebibytes(40),
717 max_api_chunk_upload_size: ByteSize::mebibytes(100),
718 max_profile_size: ByteSize::mebibytes(50),
719 max_trace_metric_size: ByteSize::mebibytes(1),
720 max_log_size: ByteSize::mebibytes(1),
721 max_span_size: ByteSize::mebibytes(1),
722 max_container_size: ByteSize::mebibytes(12),
723 max_statsd_size: ByteSize::mebibytes(1),
724 max_metric_buckets_size: ByteSize::mebibytes(1),
725 max_replay_compressed_size: ByteSize::mebibytes(10),
726 max_replay_uncompressed_size: ByteSize::mebibytes(100),
727 max_replay_message_size: ByteSize::mebibytes(15),
728 max_thread_count: num_cpus::get(),
729 max_pool_concurrency: 1,
730 query_timeout: 30,
731 shutdown_timeout: 10,
732 keepalive_timeout: 5,
733 idle_timeout: None,
734 max_connections: None,
735 tcp_listen_backlog: 1024,
736 max_removed_attribute_key_size: ByteSize::kibibytes(10),
737 }
738 }
739}
740
741#[derive(Debug, Default, Deserialize, Serialize)]
743#[serde(default)]
744pub struct Routing {
745 pub accept_unknown_items: Option<bool>,
755}
756
757#[derive(Clone, Copy, Debug, Default, Deserialize, Serialize)]
759#[serde(rename_all = "lowercase")]
760pub enum HttpEncoding {
761 #[default]
766 Identity,
767 Deflate,
773 Gzip,
780 Br,
782 Zstd,
784}
785
786impl HttpEncoding {
787 pub fn parse(str: &str) -> Self {
789 let str = str.trim();
790 if str.eq_ignore_ascii_case("zstd") {
791 Self::Zstd
792 } else if str.eq_ignore_ascii_case("br") {
793 Self::Br
794 } else if str.eq_ignore_ascii_case("gzip") || str.eq_ignore_ascii_case("x-gzip") {
795 Self::Gzip
796 } else if str.eq_ignore_ascii_case("deflate") {
797 Self::Deflate
798 } else {
799 Self::Identity
800 }
801 }
802
803 pub fn name(&self) -> Option<&'static str> {
807 match self {
808 Self::Identity => None,
809 Self::Deflate => Some("deflate"),
810 Self::Gzip => Some("gzip"),
811 Self::Br => Some("br"),
812 Self::Zstd => Some("zstd"),
813 }
814 }
815}
816
817#[derive(Serialize, Deserialize, Debug)]
819#[serde(default)]
820pub struct Http {
821 pub timeout: u32,
827 pub connection_timeout: u32,
832 pub max_retry_interval: u32,
834 pub host_header: Option<String>,
836 pub auth_interval: Option<u64>,
844 pub outage_grace_period: u64,
850 pub retry_delay: u64,
854 pub project_failure_interval: u64,
859 pub encoding: HttpEncoding,
875 pub global_metrics: bool,
882 pub forward: bool,
886 pub dns_cache: bool,
890}
891
892impl Default for Http {
893 fn default() -> Self {
894 Http {
895 timeout: 5,
896 connection_timeout: 3,
897 max_retry_interval: 60, host_header: None,
899 auth_interval: Some(600), outage_grace_period: DEFAULT_NETWORK_OUTAGE_GRACE_PERIOD,
901 retry_delay: default_retry_delay(),
902 project_failure_interval: default_project_failure_interval(),
903 encoding: HttpEncoding::Zstd,
904 global_metrics: false,
905 forward: true,
906 dns_cache: true,
907 }
908 }
909}
910
911fn default_retry_delay() -> u64 {
913 1
914}
915
916fn default_project_failure_interval() -> u64 {
918 90
919}
920
921fn spool_envelopes_max_disk_size() -> ByteSize {
923 ByteSize::mebibytes(500)
924}
925
926fn spool_envelopes_batch_size_bytes() -> ByteSize {
928 ByteSize::kibibytes(10)
929}
930
931fn spool_envelopes_max_envelope_delay_secs() -> u64 {
932 24 * 60 * 60
933}
934
935fn spool_disk_usage_refresh_frequency_ms() -> u64 {
937 100
938}
939
940fn spool_max_backpressure_memory_percent() -> f32 {
942 0.8
943}
944
945fn spool_envelopes_partitions() -> NonZeroU8 {
947 NonZeroU8::new(1).unwrap()
948}
949
950#[derive(Debug, Serialize, Deserialize)]
952pub struct EnvelopeSpool {
953 pub path: Option<PathBuf>,
959 #[serde(default = "spool_envelopes_max_disk_size")]
965 pub max_disk_size: ByteSize,
966 #[serde(default = "spool_envelopes_batch_size_bytes")]
973 pub batch_size_bytes: ByteSize,
974 #[serde(default = "spool_envelopes_max_envelope_delay_secs")]
981 pub max_envelope_delay_secs: u64,
982 #[serde(default = "spool_disk_usage_refresh_frequency_ms")]
987 pub disk_usage_refresh_frequency_ms: u64,
988 #[serde(default = "spool_max_backpressure_memory_percent")]
1018 pub max_backpressure_memory_percent: f32,
1019 #[serde(default = "spool_envelopes_partitions")]
1026 pub partitions: NonZeroU8,
1027 #[serde(default)]
1034 pub ephemeral: bool,
1035}
1036
1037impl Default for EnvelopeSpool {
1038 fn default() -> Self {
1039 Self {
1040 path: None,
1041 max_disk_size: spool_envelopes_max_disk_size(),
1042 batch_size_bytes: spool_envelopes_batch_size_bytes(),
1043 max_envelope_delay_secs: spool_envelopes_max_envelope_delay_secs(),
1044 disk_usage_refresh_frequency_ms: spool_disk_usage_refresh_frequency_ms(),
1045 max_backpressure_memory_percent: spool_max_backpressure_memory_percent(),
1046 partitions: spool_envelopes_partitions(),
1047 ephemeral: false,
1048 }
1049 }
1050}
1051
1052#[derive(Debug, Serialize, Deserialize, Default)]
1054pub struct Spool {
1055 #[serde(default)]
1057 pub envelopes: EnvelopeSpool,
1058}
1059
1060#[derive(Serialize, Deserialize, Debug)]
1062#[serde(default)]
1063pub struct Cache {
1064 pub project_request_full_config: bool,
1066 pub project_expiry: u32,
1068 pub project_grace_period: u32,
1073 pub project_refresh_interval: Option<u32>,
1079 pub relay_expiry: u32,
1081 #[serde(alias = "event_expiry")]
1087 envelope_expiry: u32,
1088 #[serde(alias = "event_buffer_size")]
1090 envelope_buffer_size: u32,
1091 pub miss_expiry: u32,
1093 pub batch_interval: u32,
1095 pub downstream_relays_batch_interval: u32,
1097 pub batch_size: usize,
1101 pub file_interval: u32,
1103 pub global_config_fetch_interval: u32,
1105}
1106
1107impl Default for Cache {
1108 fn default() -> Self {
1109 Cache {
1110 project_request_full_config: false,
1111 project_expiry: 300, project_grace_period: 120, project_refresh_interval: None,
1114 relay_expiry: 3600, envelope_expiry: 600, envelope_buffer_size: 1000,
1117 miss_expiry: 60, batch_interval: 100, downstream_relays_batch_interval: 100, batch_size: 500,
1121 file_interval: 10, global_config_fetch_interval: 10, }
1124 }
1125}
1126
1127fn default_max_secs_in_future() -> u32 {
1128 60 }
1130
1131fn default_max_session_secs_in_past() -> u32 {
1132 5 * 24 * 3600 }
1134
1135fn default_chunk_size() -> ByteSize {
1136 ByteSize::mebibytes(1)
1137}
1138
1139fn default_projectconfig_cache_prefix() -> String {
1140 "relayconfig".to_owned()
1141}
1142
1143#[allow(clippy::unnecessary_wraps)]
1144fn default_max_rate_limit() -> Option<u32> {
1145 Some(300) }
1147
1148#[derive(Serialize, Deserialize, Debug)]
1150pub struct Processing {
1151 pub enabled: bool,
1153 #[serde(default)]
1155 pub geoip_path: Option<PathBuf>,
1156 #[serde(default = "default_max_secs_in_future")]
1158 pub max_secs_in_future: u32,
1159 #[serde(default = "default_max_session_secs_in_past")]
1161 pub max_session_secs_in_past: u32,
1162 pub kafka_config: Vec<KafkaConfigParam>,
1164 #[serde(default)]
1184 pub secondary_kafka_configs: BTreeMap<String, Vec<KafkaConfigParam>>,
1185 #[serde(default)]
1187 pub topics: TopicAssignments,
1188 #[serde(default)]
1190 pub kafka_validate_topics: bool,
1191 #[serde(default)]
1193 pub redis: Option<RedisConfigs>,
1194 #[serde(default = "default_chunk_size")]
1196 pub attachment_chunk_size: ByteSize,
1197 #[serde(default = "default_projectconfig_cache_prefix")]
1199 pub projectconfig_cache_prefix: String,
1200 #[serde(default = "default_max_rate_limit")]
1202 pub max_rate_limit: Option<u32>,
1203 pub quota_cache_ratio: Option<f32>,
1214 pub quota_cache_max: Option<f32>,
1221 #[serde(default, alias = "upload")]
1223 pub objectstore: ObjectstoreServiceConfig,
1224}
1225
1226impl Default for Processing {
1227 fn default() -> Self {
1229 Self {
1230 enabled: false,
1231 geoip_path: None,
1232 max_secs_in_future: default_max_secs_in_future(),
1233 max_session_secs_in_past: default_max_session_secs_in_past(),
1234 kafka_config: Vec::new(),
1235 secondary_kafka_configs: BTreeMap::new(),
1236 topics: TopicAssignments::default(),
1237 kafka_validate_topics: false,
1238 redis: None,
1239 attachment_chunk_size: default_chunk_size(),
1240 projectconfig_cache_prefix: default_projectconfig_cache_prefix(),
1241 max_rate_limit: default_max_rate_limit(),
1242 quota_cache_ratio: None,
1243 quota_cache_max: None,
1244 objectstore: ObjectstoreServiceConfig::default(),
1245 }
1246 }
1247}
1248
1249#[derive(Debug, Default, Serialize, Deserialize)]
1251#[serde(default)]
1252pub struct Normalization {
1253 #[serde(default)]
1255 pub level: NormalizationLevel,
1256}
1257
1258#[derive(Copy, Clone, Debug, Default, Serialize, Deserialize, Eq, PartialEq)]
1260#[serde(rename_all = "lowercase")]
1261pub enum NormalizationLevel {
1262 #[default]
1266 Default,
1267 Full,
1272}
1273
1274#[derive(Serialize, Deserialize, Debug)]
1276#[serde(default)]
1277pub struct OutcomeAggregatorConfig {
1278 pub bucket_interval: u64,
1280 pub flush_interval: u64,
1282}
1283
1284impl Default for OutcomeAggregatorConfig {
1285 fn default() -> Self {
1286 Self {
1287 bucket_interval: 60,
1288 flush_interval: 120,
1289 }
1290 }
1291}
1292
1293#[derive(Serialize, Deserialize)]
1295pub struct ObjectstoreAuthConfig {
1296 pub key_id: String,
1299
1300 pub signing_key: String,
1302}
1303
1304impl fmt::Debug for ObjectstoreAuthConfig {
1305 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1306 f.debug_struct("ObjectstoreAuthConfig")
1307 .field("key_id", &self.key_id)
1308 .field("signing_key", &"[redacted]")
1309 .finish()
1310 }
1311}
1312
1313#[derive(Serialize, Deserialize, Debug)]
1315#[serde(default)]
1316pub struct ObjectstoreServiceConfig {
1317 pub objectstore_url: Option<String>,
1322
1323 pub max_concurrent_requests: usize,
1325
1326 pub max_backlog: usize,
1330
1331 pub timeout: u64,
1336
1337 pub auth: Option<ObjectstoreAuthConfig>,
1339}
1340
1341impl Default for ObjectstoreServiceConfig {
1342 fn default() -> Self {
1343 Self {
1344 objectstore_url: None,
1345 max_concurrent_requests: 10,
1346 max_backlog: 20,
1347 timeout: 60,
1348 auth: None,
1349 }
1350 }
1351}
1352
1353#[derive(Copy, Clone, Debug, PartialEq, Eq)]
1356
1357pub enum EmitOutcomes {
1358 None,
1360 AsClientReports,
1362 AsOutcomes,
1364}
1365
1366impl EmitOutcomes {
1367 pub fn any(&self) -> bool {
1369 !matches!(self, EmitOutcomes::None)
1370 }
1371}
1372
1373impl Serialize for EmitOutcomes {
1374 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1375 where
1376 S: Serializer,
1377 {
1378 match self {
1380 Self::None => serializer.serialize_bool(false),
1381 Self::AsClientReports => serializer.serialize_str("as_client_reports"),
1382 Self::AsOutcomes => serializer.serialize_bool(true),
1383 }
1384 }
1385}
1386
1387struct EmitOutcomesVisitor;
1388
1389impl Visitor<'_> for EmitOutcomesVisitor {
1390 type Value = EmitOutcomes;
1391
1392 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
1393 formatter.write_str("true, false, or 'as_client_reports'")
1394 }
1395
1396 fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E>
1397 where
1398 E: serde::de::Error,
1399 {
1400 Ok(if v {
1401 EmitOutcomes::AsOutcomes
1402 } else {
1403 EmitOutcomes::None
1404 })
1405 }
1406
1407 fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
1408 where
1409 E: serde::de::Error,
1410 {
1411 if v == "as_client_reports" {
1412 Ok(EmitOutcomes::AsClientReports)
1413 } else {
1414 Err(E::invalid_value(Unexpected::Str(v), &"as_client_reports"))
1415 }
1416 }
1417}
1418
1419impl<'de> Deserialize<'de> for EmitOutcomes {
1420 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1421 where
1422 D: Deserializer<'de>,
1423 {
1424 deserializer.deserialize_any(EmitOutcomesVisitor)
1425 }
1426}
1427
1428#[derive(Serialize, Deserialize, Debug)]
1430#[serde(default)]
1431pub struct Outcomes {
1432 pub emit_outcomes: EmitOutcomes,
1436 pub batch_size: usize,
1439 pub batch_interval: u64,
1442 pub source: Option<String>,
1445 pub aggregator: OutcomeAggregatorConfig,
1447}
1448
1449impl Default for Outcomes {
1450 fn default() -> Self {
1451 Outcomes {
1452 emit_outcomes: EmitOutcomes::AsClientReports,
1453 batch_size: 1000,
1454 batch_interval: 500,
1455 source: None,
1456 aggregator: OutcomeAggregatorConfig::default(),
1457 }
1458 }
1459}
1460
1461#[derive(Serialize, Deserialize, Debug, Default)]
1463pub struct MinimalConfig {
1464 pub relay: Relay,
1466}
1467
1468impl MinimalConfig {
1469 pub fn save_in_folder<P: AsRef<Path>>(&self, p: P) -> anyhow::Result<()> {
1471 let path = p.as_ref();
1472 if fs::metadata(path).is_err() {
1473 fs::create_dir_all(path)
1474 .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotOpenFile, path))?;
1475 }
1476 self.save(path)
1477 }
1478}
1479
1480impl ConfigObject for MinimalConfig {
1481 fn format() -> ConfigFormat {
1482 ConfigFormat::Yaml
1483 }
1484
1485 fn name() -> &'static str {
1486 "config"
1487 }
1488}
1489
1490mod config_relay_info {
1492 use serde::ser::SerializeMap;
1493
1494 use super::*;
1495
1496 #[derive(Debug, Serialize, Deserialize, Clone)]
1498 struct RelayInfoConfig {
1499 public_key: PublicKey,
1500 #[serde(default)]
1501 internal: bool,
1502 }
1503
1504 impl From<RelayInfoConfig> for RelayInfo {
1505 fn from(v: RelayInfoConfig) -> Self {
1506 RelayInfo {
1507 public_key: v.public_key,
1508 internal: v.internal,
1509 }
1510 }
1511 }
1512
1513 impl From<RelayInfo> for RelayInfoConfig {
1514 fn from(v: RelayInfo) -> Self {
1515 RelayInfoConfig {
1516 public_key: v.public_key,
1517 internal: v.internal,
1518 }
1519 }
1520 }
1521
1522 pub(super) fn deserialize<'de, D>(des: D) -> Result<HashMap<RelayId, RelayInfo>, D::Error>
1523 where
1524 D: Deserializer<'de>,
1525 {
1526 let map = HashMap::<RelayId, RelayInfoConfig>::deserialize(des)?;
1527 Ok(map.into_iter().map(|(k, v)| (k, v.into())).collect())
1528 }
1529
1530 pub(super) fn serialize<S>(elm: &HashMap<RelayId, RelayInfo>, ser: S) -> Result<S::Ok, S::Error>
1531 where
1532 S: Serializer,
1533 {
1534 let mut map = ser.serialize_map(Some(elm.len()))?;
1535
1536 for (k, v) in elm {
1537 map.serialize_entry(k, &RelayInfoConfig::from(v.clone()))?;
1538 }
1539
1540 map.end()
1541 }
1542}
1543
1544#[derive(Serialize, Deserialize, Debug, Default)]
1546pub struct AuthConfig {
1547 #[serde(default, skip_serializing_if = "is_default")]
1549 pub ready: ReadinessCondition,
1550
1551 #[serde(default, with = "config_relay_info")]
1553 pub static_relays: HashMap<RelayId, RelayInfo>,
1554
1555 #[serde(default = "default_max_age")]
1559 pub signature_max_age: u64,
1560}
1561
1562fn default_max_age() -> u64 {
1563 300
1564}
1565
1566#[derive(Serialize, Deserialize, Debug, Default)]
1568pub struct GeoIpConfig {
1569 pub path: Option<PathBuf>,
1571}
1572
1573#[derive(Serialize, Deserialize, Debug)]
1575#[serde(default)]
1576pub struct CardinalityLimiter {
1577 pub cache_vacuum_interval: u64,
1583}
1584
1585impl Default for CardinalityLimiter {
1586 fn default() -> Self {
1587 Self {
1588 cache_vacuum_interval: 180,
1589 }
1590 }
1591}
1592
1593#[derive(Serialize, Deserialize, Debug)]
1598#[serde(default)]
1599pub struct Health {
1600 pub refresh_interval_ms: u64,
1607 pub max_memory_bytes: Option<ByteSize>,
1612 pub max_memory_percent: f32,
1616 pub probe_timeout_ms: u64,
1623 pub memory_stat_refresh_frequency_ms: u64,
1629}
1630
1631impl Default for Health {
1632 fn default() -> Self {
1633 Self {
1634 refresh_interval_ms: 3000,
1635 max_memory_bytes: None,
1636 max_memory_percent: 0.95,
1637 probe_timeout_ms: 900,
1638 memory_stat_refresh_frequency_ms: 100,
1639 }
1640 }
1641}
1642
1643#[derive(Serialize, Deserialize, Debug)]
1645#[serde(default)]
1646pub struct Cogs {
1647 pub max_queue_size: u64,
1653 pub relay_resource_id: String,
1659}
1660
1661impl Default for Cogs {
1662 fn default() -> Self {
1663 Self {
1664 max_queue_size: 10_000,
1665 relay_resource_id: "relay_service".to_owned(),
1666 }
1667 }
1668}
1669
1670#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
1672#[serde(default)]
1673pub struct Upload {
1674 pub max_concurrent_requests: usize,
1678 pub timeout: u64,
1680 pub max_age: i64,
1684}
1685
1686impl Default for Upload {
1687 fn default() -> Self {
1688 Self {
1689 max_concurrent_requests: 10,
1690 timeout: 5 * 60, max_age: 60 * 60, }
1693 }
1694}
1695
1696#[derive(Serialize, Deserialize, Debug, Default)]
1698#[allow(missing_docs)]
1699pub struct ConfigValues {
1700 #[serde(default)]
1701 pub relay: Relay,
1702 #[serde(default)]
1703 pub http: Http,
1704 #[serde(default)]
1705 pub cache: Cache,
1706 #[serde(default)]
1707 pub spool: Spool,
1708 #[serde(default)]
1709 pub limits: Limits,
1710 #[serde(default)]
1711 pub logging: relay_log::LogConfig,
1712 #[serde(default)]
1713 pub routing: Routing,
1714 #[serde(default)]
1715 pub metrics: Metrics,
1716 #[serde(default)]
1717 pub sentry: relay_log::SentryConfig,
1718 #[serde(default)]
1719 pub processing: Processing,
1720 #[serde(default)]
1721 pub outcomes: Outcomes,
1722 #[serde(default)]
1723 pub aggregator: AggregatorServiceConfig,
1724 #[serde(default)]
1725 pub secondary_aggregators: Vec<ScopedAggregatorConfig>,
1726 #[serde(default)]
1727 pub auth: AuthConfig,
1728 #[serde(default)]
1729 pub geoip: GeoIpConfig,
1730 #[serde(default)]
1731 pub normalization: Normalization,
1732 #[serde(default)]
1733 pub cardinality_limiter: CardinalityLimiter,
1734 #[serde(default)]
1735 pub health: Health,
1736 #[serde(default)]
1737 pub cogs: Cogs,
1738 #[serde(default)]
1739 pub upload: Upload,
1740}
1741
1742impl ConfigObject for ConfigValues {
1743 fn format() -> ConfigFormat {
1744 ConfigFormat::Yaml
1745 }
1746
1747 fn name() -> &'static str {
1748 "config"
1749 }
1750}
1751
1752pub struct Config {
1754 values: ConfigValues,
1755 credentials: Option<Credentials>,
1756 path: PathBuf,
1757}
1758
1759impl fmt::Debug for Config {
1760 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1761 f.debug_struct("Config")
1762 .field("path", &self.path)
1763 .field("values", &self.values)
1764 .finish()
1765 }
1766}
1767
1768impl Config {
1769 pub fn from_path<P: AsRef<Path>>(path: P) -> anyhow::Result<Config> {
1771 let path = env::current_dir()
1772 .map(|x| x.join(path.as_ref()))
1773 .unwrap_or_else(|_| path.as_ref().to_path_buf());
1774
1775 let config = Config {
1776 values: ConfigValues::load(&path)?,
1777 credentials: if Credentials::path(&path).exists() {
1778 Some(Credentials::load(&path)?)
1779 } else {
1780 None
1781 },
1782 path: path.clone(),
1783 };
1784
1785 if cfg!(not(feature = "processing")) && config.processing_enabled() {
1786 return Err(ConfigError::file(ConfigErrorKind::ProcessingNotAvailable, &path).into());
1787 }
1788
1789 Ok(config)
1790 }
1791
1792 pub fn from_json_value(value: serde_json::Value) -> anyhow::Result<Config> {
1796 Ok(Config {
1797 values: serde_json::from_value(value)
1798 .with_context(|| ConfigError::new(ConfigErrorKind::BadJson))?,
1799 credentials: None,
1800 path: PathBuf::new(),
1801 })
1802 }
1803
1804 pub fn apply_override(
1807 &mut self,
1808 mut overrides: OverridableConfig,
1809 ) -> anyhow::Result<&mut Self> {
1810 let relay = &mut self.values.relay;
1811
1812 if let Some(mode) = overrides.mode {
1813 relay.mode = mode
1814 .parse::<RelayMode>()
1815 .with_context(|| ConfigError::field("mode"))?;
1816 }
1817
1818 if let Some(deployment) = overrides.instance {
1819 relay.instance = deployment
1820 .parse::<RelayInstance>()
1821 .with_context(|| ConfigError::field("deployment"))?;
1822 }
1823
1824 if let Some(log_level) = overrides.log_level {
1825 self.values.logging.level = log_level.parse()?;
1826 }
1827
1828 if let Some(log_format) = overrides.log_format {
1829 self.values.logging.format = log_format.parse()?;
1830 }
1831
1832 if let Some(upstream) = overrides.upstream {
1833 relay.upstream = upstream
1834 .parse::<UpstreamDescriptor>()
1835 .with_context(|| ConfigError::field("upstream"))?;
1836 } else if let Some(upstream_dsn) = overrides.upstream_dsn {
1837 relay.upstream = upstream_dsn
1838 .parse::<Dsn>()
1839 .map(|dsn| UpstreamDescriptor::from_dsn(&dsn).into_owned())
1840 .with_context(|| ConfigError::field("upstream_dsn"))?;
1841 }
1842
1843 if let Some(host) = overrides.host {
1844 relay.host = host
1845 .parse::<IpAddr>()
1846 .with_context(|| ConfigError::field("host"))?;
1847 }
1848
1849 if let Some(port) = overrides.port {
1850 relay.port = port
1851 .as_str()
1852 .parse()
1853 .with_context(|| ConfigError::field("port"))?;
1854 }
1855
1856 let processing = &mut self.values.processing;
1857 if let Some(enabled) = overrides.processing {
1858 match enabled.to_lowercase().as_str() {
1859 "true" | "1" => processing.enabled = true,
1860 "false" | "0" | "" => processing.enabled = false,
1861 _ => return Err(ConfigError::field("processing").into()),
1862 }
1863 }
1864
1865 if let Some(redis) = overrides.redis_url {
1866 processing.redis = Some(RedisConfigs::Unified(RedisConfig::single(redis)))
1867 }
1868
1869 if let Some(kafka_url) = overrides.kafka_url {
1870 let existing = processing
1871 .kafka_config
1872 .iter_mut()
1873 .find(|e| e.name == "bootstrap.servers");
1874
1875 if let Some(config_param) = existing {
1876 config_param.value = kafka_url;
1877 } else {
1878 processing.kafka_config.push(KafkaConfigParam {
1879 name: "bootstrap.servers".to_owned(),
1880 value: kafka_url,
1881 })
1882 }
1883 }
1884 let id = if let Some(id) = overrides.id {
1886 let id = Uuid::parse_str(&id).with_context(|| ConfigError::field("id"))?;
1887 Some(id)
1888 } else {
1889 None
1890 };
1891 let public_key = if let Some(public_key) = overrides.public_key {
1892 let public_key = public_key
1893 .parse::<PublicKey>()
1894 .with_context(|| ConfigError::field("public_key"))?;
1895 Some(public_key)
1896 } else {
1897 None
1898 };
1899
1900 let secret_key = if let Some(secret_key) = overrides.secret_key {
1901 let secret_key = secret_key
1902 .parse::<SecretKey>()
1903 .with_context(|| ConfigError::field("secret_key"))?;
1904 Some(secret_key)
1905 } else {
1906 None
1907 };
1908 let outcomes = &mut self.values.outcomes;
1909 if overrides.outcome_source.is_some() {
1910 outcomes.source = overrides.outcome_source.take();
1911 }
1912
1913 if let Some(credentials) = &mut self.credentials {
1914 if let Some(id) = id {
1916 credentials.id = id;
1917 }
1918 if let Some(public_key) = public_key {
1919 credentials.public_key = public_key;
1920 }
1921 if let Some(secret_key) = secret_key {
1922 credentials.secret_key = secret_key
1923 }
1924 } else {
1925 match (id, public_key, secret_key) {
1927 (Some(id), Some(public_key), Some(secret_key)) => {
1928 self.credentials = Some(Credentials {
1929 secret_key,
1930 public_key,
1931 id,
1932 })
1933 }
1934 (None, None, None) => {
1935 }
1938 _ => {
1939 return Err(ConfigError::field("incomplete credentials").into());
1940 }
1941 }
1942 }
1943
1944 let limits = &mut self.values.limits;
1945 if let Some(shutdown_timeout) = overrides.shutdown_timeout
1946 && let Ok(shutdown_timeout) = shutdown_timeout.parse::<u64>()
1947 {
1948 limits.shutdown_timeout = shutdown_timeout;
1949 }
1950
1951 if let Some(server_name) = overrides.server_name {
1952 self.values.sentry.server_name = Some(server_name.into());
1953 }
1954
1955 Ok(self)
1956 }
1957
1958 pub fn config_exists<P: AsRef<Path>>(path: P) -> bool {
1960 fs::metadata(ConfigValues::path(path.as_ref())).is_ok()
1961 }
1962
1963 pub fn path(&self) -> &Path {
1965 &self.path
1966 }
1967
1968 pub fn to_yaml_string(&self) -> anyhow::Result<String> {
1970 serde_yaml::to_string(&self.values)
1971 .with_context(|| ConfigError::new(ConfigErrorKind::CouldNotWriteFile))
1972 }
1973
1974 pub fn regenerate_credentials(&mut self, save: bool) -> anyhow::Result<()> {
1978 let creds = Credentials::generate();
1979 if save {
1980 creds.save(&self.path)?;
1981 }
1982 self.credentials = Some(creds);
1983 Ok(())
1984 }
1985
1986 pub fn credentials(&self) -> Option<&Credentials> {
1988 self.credentials.as_ref()
1989 }
1990
1991 pub fn replace_credentials(
1995 &mut self,
1996 credentials: Option<Credentials>,
1997 ) -> anyhow::Result<bool> {
1998 if self.credentials == credentials {
1999 return Ok(false);
2000 }
2001
2002 match credentials {
2003 Some(ref creds) => {
2004 creds.save(&self.path)?;
2005 }
2006 None => {
2007 let path = Credentials::path(&self.path);
2008 if fs::metadata(&path).is_ok() {
2009 fs::remove_file(&path).with_context(|| {
2010 ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path)
2011 })?;
2012 }
2013 }
2014 }
2015
2016 self.credentials = credentials;
2017 Ok(true)
2018 }
2019
2020 pub fn has_credentials(&self) -> bool {
2022 self.credentials.is_some()
2023 }
2024
2025 pub fn secret_key(&self) -> Option<&SecretKey> {
2027 self.credentials.as_ref().map(|x| &x.secret_key)
2028 }
2029
2030 pub fn public_key(&self) -> Option<&PublicKey> {
2032 self.credentials.as_ref().map(|x| &x.public_key)
2033 }
2034
2035 pub fn relay_id(&self) -> Option<&RelayId> {
2037 self.credentials.as_ref().map(|x| &x.id)
2038 }
2039
2040 pub fn relay_mode(&self) -> RelayMode {
2042 self.values.relay.mode
2043 }
2044
2045 pub fn relay_instance(&self) -> RelayInstance {
2047 self.values.relay.instance
2048 }
2049
2050 pub fn upstream_descriptor(&self) -> &UpstreamDescriptor<'_> {
2052 &self.values.relay.upstream
2053 }
2054
2055 pub fn http_host_header(&self) -> Option<&str> {
2057 self.values.http.host_header.as_deref()
2058 }
2059
2060 pub fn listen_addr(&self) -> SocketAddr {
2062 (self.values.relay.host, self.values.relay.port).into()
2063 }
2064
2065 pub fn listen_addr_internal(&self) -> Option<SocketAddr> {
2073 match (
2074 self.values.relay.internal_host,
2075 self.values.relay.internal_port,
2076 ) {
2077 (Some(host), None) => Some((host, self.values.relay.port).into()),
2078 (None, Some(port)) => Some((self.values.relay.host, port).into()),
2079 (Some(host), Some(port)) => Some((host, port).into()),
2080 (None, None) => None,
2081 }
2082 }
2083
2084 pub fn tls_listen_addr(&self) -> Option<SocketAddr> {
2086 if self.values.relay.tls_identity_path.is_some() {
2087 let port = self.values.relay.tls_port.unwrap_or(3443);
2088 Some((self.values.relay.host, port).into())
2089 } else {
2090 None
2091 }
2092 }
2093
2094 pub fn tls_identity_path(&self) -> Option<&Path> {
2096 self.values.relay.tls_identity_path.as_deref()
2097 }
2098
2099 pub fn tls_identity_password(&self) -> Option<&str> {
2101 self.values.relay.tls_identity_password.as_deref()
2102 }
2103
2104 pub fn override_project_ids(&self) -> bool {
2108 self.values.relay.override_project_ids
2109 }
2110
2111 pub fn requires_auth(&self) -> bool {
2115 match self.values.auth.ready {
2116 ReadinessCondition::Authenticated => self.relay_mode() == RelayMode::Managed,
2117 ReadinessCondition::Always => false,
2118 }
2119 }
2120
2121 pub fn http_auth_interval(&self) -> Option<Duration> {
2125 if self.processing_enabled() {
2126 return None;
2127 }
2128
2129 match self.values.http.auth_interval {
2130 None | Some(0) => None,
2131 Some(secs) => Some(Duration::from_secs(secs)),
2132 }
2133 }
2134
2135 pub fn http_outage_grace_period(&self) -> Duration {
2138 Duration::from_secs(self.values.http.outage_grace_period)
2139 }
2140
2141 pub fn http_retry_delay(&self) -> Duration {
2146 Duration::from_secs(self.values.http.retry_delay)
2147 }
2148
2149 pub fn http_project_failure_interval(&self) -> Duration {
2151 Duration::from_secs(self.values.http.project_failure_interval)
2152 }
2153
2154 pub fn http_encoding(&self) -> HttpEncoding {
2156 self.values.http.encoding
2157 }
2158
2159 pub fn http_global_metrics(&self) -> bool {
2161 self.values.http.global_metrics
2162 }
2163
2164 pub fn http_forward(&self) -> bool {
2166 self.values.http.forward
2167 }
2168
2169 pub fn emit_outcomes(&self) -> EmitOutcomes {
2174 if self.processing_enabled() {
2175 return EmitOutcomes::AsOutcomes;
2176 }
2177 self.values.outcomes.emit_outcomes
2178 }
2179
2180 pub fn outcome_batch_size(&self) -> usize {
2182 self.values.outcomes.batch_size
2183 }
2184
2185 pub fn outcome_batch_interval(&self) -> Duration {
2187 Duration::from_millis(self.values.outcomes.batch_interval)
2188 }
2189
2190 pub fn outcome_source(&self) -> Option<&str> {
2192 self.values.outcomes.source.as_deref()
2193 }
2194
2195 pub fn outcome_aggregator(&self) -> &OutcomeAggregatorConfig {
2197 &self.values.outcomes.aggregator
2198 }
2199
2200 pub fn logging(&self) -> &relay_log::LogConfig {
2202 &self.values.logging
2203 }
2204
2205 pub fn sentry(&self) -> &relay_log::SentryConfig {
2207 &self.values.sentry
2208 }
2209
2210 pub fn statsd_addr(&self) -> Option<&str> {
2212 self.values.metrics.statsd.as_deref()
2213 }
2214
2215 pub fn statsd_buffer_size(&self) -> Option<usize> {
2217 self.values.metrics.statsd_buffer_size
2218 }
2219
2220 pub fn metrics_prefix(&self) -> &str {
2222 &self.values.metrics.prefix
2223 }
2224
2225 pub fn metrics_default_tags(&self) -> &BTreeMap<String, String> {
2227 &self.values.metrics.default_tags
2228 }
2229
2230 pub fn metrics_hostname_tag(&self) -> Option<&str> {
2232 self.values.metrics.hostname_tag.as_deref()
2233 }
2234
2235 pub fn metrics_periodic_interval(&self) -> Option<Duration> {
2239 match self.values.metrics.periodic_secs {
2240 0 => None,
2241 secs => Some(Duration::from_secs(secs)),
2242 }
2243 }
2244
2245 pub fn http_timeout(&self) -> Duration {
2247 Duration::from_secs(self.values.http.timeout.into())
2248 }
2249
2250 pub fn http_connection_timeout(&self) -> Duration {
2252 Duration::from_secs(self.values.http.connection_timeout.into())
2253 }
2254
2255 pub fn http_max_retry_interval(&self) -> Duration {
2257 Duration::from_secs(self.values.http.max_retry_interval.into())
2258 }
2259
2260 pub fn http_dns_cache(&self) -> bool {
2262 self.values.http.dns_cache
2263 }
2264
2265 pub fn project_cache_expiry(&self) -> Duration {
2267 Duration::from_secs(self.values.cache.project_expiry.into())
2268 }
2269
2270 pub fn request_full_project_config(&self) -> bool {
2272 self.values.cache.project_request_full_config
2273 }
2274
2275 pub fn relay_cache_expiry(&self) -> Duration {
2277 Duration::from_secs(self.values.cache.relay_expiry.into())
2278 }
2279
2280 pub fn envelope_buffer_size(&self) -> usize {
2282 self.values
2283 .cache
2284 .envelope_buffer_size
2285 .try_into()
2286 .unwrap_or(usize::MAX)
2287 }
2288
2289 pub fn cache_miss_expiry(&self) -> Duration {
2291 Duration::from_secs(self.values.cache.miss_expiry.into())
2292 }
2293
2294 pub fn project_grace_period(&self) -> Duration {
2296 Duration::from_secs(self.values.cache.project_grace_period.into())
2297 }
2298
2299 pub fn project_refresh_interval(&self) -> Option<Duration> {
2303 self.values
2304 .cache
2305 .project_refresh_interval
2306 .map(Into::into)
2307 .map(Duration::from_secs)
2308 }
2309
2310 pub fn query_batch_interval(&self) -> Duration {
2313 Duration::from_millis(self.values.cache.batch_interval.into())
2314 }
2315
2316 pub fn downstream_relays_batch_interval(&self) -> Duration {
2318 Duration::from_millis(self.values.cache.downstream_relays_batch_interval.into())
2319 }
2320
2321 pub fn local_cache_interval(&self) -> Duration {
2323 Duration::from_secs(self.values.cache.file_interval.into())
2324 }
2325
2326 pub fn global_config_fetch_interval(&self) -> Duration {
2329 Duration::from_secs(self.values.cache.global_config_fetch_interval.into())
2330 }
2331
2332 pub fn spool_envelopes_path(&self, partition_id: u8) -> Option<PathBuf> {
2337 let mut path = self
2338 .values
2339 .spool
2340 .envelopes
2341 .path
2342 .as_ref()
2343 .map(|path| path.to_owned())?;
2344
2345 if partition_id == 0 {
2346 return Some(path);
2347 }
2348
2349 let file_name = path.file_name().and_then(|f| f.to_str())?;
2350 let new_file_name = format!("{file_name}.{partition_id}");
2351 path.set_file_name(new_file_name);
2352
2353 Some(path)
2354 }
2355
2356 pub fn spool_envelopes_max_disk_size(&self) -> usize {
2358 self.values.spool.envelopes.max_disk_size.as_bytes()
2359 }
2360
2361 pub fn spool_envelopes_batch_size_bytes(&self) -> usize {
2364 self.values.spool.envelopes.batch_size_bytes.as_bytes()
2365 }
2366
2367 pub fn spool_envelopes_max_age(&self) -> Duration {
2369 Duration::from_secs(self.values.spool.envelopes.max_envelope_delay_secs)
2370 }
2371
2372 pub fn spool_disk_usage_refresh_frequency_ms(&self) -> Duration {
2374 Duration::from_millis(self.values.spool.envelopes.disk_usage_refresh_frequency_ms)
2375 }
2376
2377 pub fn spool_max_backpressure_memory_percent(&self) -> f32 {
2379 self.values.spool.envelopes.max_backpressure_memory_percent
2380 }
2381
2382 pub fn spool_partitions(&self) -> NonZeroU8 {
2384 self.values.spool.envelopes.partitions
2385 }
2386
2387 pub fn spool_ephemeral(&self) -> bool {
2389 self.values.spool.envelopes.ephemeral
2390 }
2391
2392 pub fn max_event_size(&self) -> usize {
2394 self.values.limits.max_event_size.as_bytes()
2395 }
2396
2397 pub fn max_attachment_size(&self) -> usize {
2399 self.values.limits.max_attachment_size.as_bytes()
2400 }
2401
2402 pub fn max_upload_size(&self) -> usize {
2404 self.values.limits.max_upload_size.as_bytes()
2405 }
2406
2407 pub fn max_attachments_size(&self) -> usize {
2410 self.values.limits.max_attachments_size.as_bytes()
2411 }
2412
2413 pub fn max_client_reports_size(&self) -> usize {
2415 self.values.limits.max_client_reports_size.as_bytes()
2416 }
2417
2418 pub fn max_check_in_size(&self) -> usize {
2420 self.values.limits.max_check_in_size.as_bytes()
2421 }
2422
2423 pub fn max_log_size(&self) -> usize {
2425 self.values.limits.max_log_size.as_bytes()
2426 }
2427
2428 pub fn max_span_size(&self) -> usize {
2430 self.values.limits.max_span_size.as_bytes()
2431 }
2432
2433 pub fn max_container_size(&self) -> usize {
2435 self.values.limits.max_container_size.as_bytes()
2436 }
2437
2438 pub fn max_logs_integration_size(&self) -> usize {
2440 self.max_container_size()
2442 }
2443
2444 pub fn max_spans_integration_size(&self) -> usize {
2446 self.max_container_size()
2448 }
2449
2450 pub fn max_envelope_size(&self) -> usize {
2454 self.values.limits.max_envelope_size.as_bytes()
2455 }
2456
2457 pub fn max_session_count(&self) -> usize {
2459 self.values.limits.max_session_count
2460 }
2461
2462 pub fn max_statsd_size(&self) -> usize {
2464 self.values.limits.max_statsd_size.as_bytes()
2465 }
2466
2467 pub fn max_metric_buckets_size(&self) -> usize {
2469 self.values.limits.max_metric_buckets_size.as_bytes()
2470 }
2471
2472 pub fn max_api_payload_size(&self) -> usize {
2474 self.values.limits.max_api_payload_size.as_bytes()
2475 }
2476
2477 pub fn max_api_file_upload_size(&self) -> usize {
2479 self.values.limits.max_api_file_upload_size.as_bytes()
2480 }
2481
2482 pub fn max_api_chunk_upload_size(&self) -> usize {
2484 self.values.limits.max_api_chunk_upload_size.as_bytes()
2485 }
2486
2487 pub fn max_profile_size(&self) -> usize {
2489 self.values.limits.max_profile_size.as_bytes()
2490 }
2491
2492 pub fn max_trace_metric_size(&self) -> usize {
2494 self.values.limits.max_trace_metric_size.as_bytes()
2495 }
2496
2497 pub fn max_replay_compressed_size(&self) -> usize {
2499 self.values.limits.max_replay_compressed_size.as_bytes()
2500 }
2501
2502 pub fn max_replay_uncompressed_size(&self) -> usize {
2504 self.values.limits.max_replay_uncompressed_size.as_bytes()
2505 }
2506
2507 pub fn max_replay_message_size(&self) -> usize {
2513 self.values.limits.max_replay_message_size.as_bytes()
2514 }
2515
2516 pub fn max_concurrent_requests(&self) -> usize {
2518 self.values.limits.max_concurrent_requests
2519 }
2520
2521 pub fn max_concurrent_queries(&self) -> usize {
2523 self.values.limits.max_concurrent_queries
2524 }
2525
2526 pub fn max_removed_attribute_key_size(&self) -> usize {
2528 self.values.limits.max_removed_attribute_key_size.as_bytes()
2529 }
2530
2531 pub fn query_timeout(&self) -> Duration {
2533 Duration::from_secs(self.values.limits.query_timeout)
2534 }
2535
2536 pub fn shutdown_timeout(&self) -> Duration {
2539 Duration::from_secs(self.values.limits.shutdown_timeout)
2540 }
2541
2542 pub fn keepalive_timeout(&self) -> Duration {
2546 Duration::from_secs(self.values.limits.keepalive_timeout)
2547 }
2548
2549 pub fn idle_timeout(&self) -> Option<Duration> {
2551 self.values.limits.idle_timeout.map(Duration::from_secs)
2552 }
2553
2554 pub fn max_connections(&self) -> Option<usize> {
2556 self.values.limits.max_connections
2557 }
2558
2559 pub fn tcp_listen_backlog(&self) -> u32 {
2561 self.values.limits.tcp_listen_backlog
2562 }
2563
2564 pub fn cpu_concurrency(&self) -> usize {
2566 self.values.limits.max_thread_count
2567 }
2568
2569 pub fn pool_concurrency(&self) -> usize {
2571 self.values.limits.max_pool_concurrency
2572 }
2573
2574 pub fn query_batch_size(&self) -> usize {
2576 self.values.cache.batch_size
2577 }
2578
2579 pub fn project_configs_path(&self) -> PathBuf {
2581 self.path.join("projects")
2582 }
2583
2584 pub fn processing_enabled(&self) -> bool {
2586 self.values.processing.enabled
2587 }
2588
2589 pub fn normalization_level(&self) -> NormalizationLevel {
2591 self.values.normalization.level
2592 }
2593
2594 pub fn geoip_path(&self) -> Option<&Path> {
2596 self.values
2597 .geoip
2598 .path
2599 .as_deref()
2600 .or(self.values.processing.geoip_path.as_deref())
2601 }
2602
2603 pub fn max_secs_in_future(&self) -> i64 {
2607 self.values.processing.max_secs_in_future.into()
2608 }
2609
2610 pub fn max_session_secs_in_past(&self) -> i64 {
2612 self.values.processing.max_session_secs_in_past.into()
2613 }
2614
2615 pub fn kafka_configs(
2617 &self,
2618 topic: KafkaTopic,
2619 ) -> Result<KafkaTopicConfig<'_>, KafkaConfigError> {
2620 self.values.processing.topics.get(topic).kafka_configs(
2621 &self.values.processing.kafka_config,
2622 &self.values.processing.secondary_kafka_configs,
2623 )
2624 }
2625
2626 pub fn kafka_validate_topics(&self) -> bool {
2628 self.values.processing.kafka_validate_topics
2629 }
2630
2631 pub fn unused_topic_assignments(&self) -> &relay_kafka::Unused {
2633 &self.values.processing.topics.unused
2634 }
2635
2636 pub fn objectstore(&self) -> &ObjectstoreServiceConfig {
2638 &self.values.processing.objectstore
2639 }
2640
2641 pub fn upload(&self) -> &Upload {
2643 &self.values.upload
2644 }
2645
2646 pub fn redis(&self) -> Option<RedisConfigsRef<'_>> {
2649 let redis_configs = self.values.processing.redis.as_ref()?;
2650
2651 Some(build_redis_configs(
2652 redis_configs,
2653 self.cpu_concurrency() as u32,
2654 self.pool_concurrency() as u32,
2655 ))
2656 }
2657
2658 pub fn attachment_chunk_size(&self) -> usize {
2660 self.values.processing.attachment_chunk_size.as_bytes()
2661 }
2662
2663 pub fn metrics_max_batch_size_bytes(&self) -> usize {
2665 self.values.aggregator.max_flush_bytes
2666 }
2667
2668 pub fn projectconfig_cache_prefix(&self) -> &str {
2671 &self.values.processing.projectconfig_cache_prefix
2672 }
2673
2674 pub fn max_rate_limit(&self) -> Option<u64> {
2676 self.values.processing.max_rate_limit.map(u32::into)
2677 }
2678
2679 pub fn quota_cache_ratio(&self) -> Option<f32> {
2681 self.values.processing.quota_cache_ratio
2682 }
2683
2684 pub fn quota_cache_max(&self) -> Option<f32> {
2686 self.values.processing.quota_cache_max
2687 }
2688
2689 pub fn cardinality_limiter_cache_vacuum_interval(&self) -> Duration {
2693 Duration::from_secs(self.values.cardinality_limiter.cache_vacuum_interval)
2694 }
2695
2696 pub fn health_refresh_interval(&self) -> Duration {
2698 Duration::from_millis(self.values.health.refresh_interval_ms)
2699 }
2700
2701 pub fn health_max_memory_watermark_bytes(&self) -> u64 {
2703 self.values
2704 .health
2705 .max_memory_bytes
2706 .as_ref()
2707 .map_or(u64::MAX, |b| b.as_bytes() as u64)
2708 }
2709
2710 pub fn health_max_memory_watermark_percent(&self) -> f32 {
2712 self.values.health.max_memory_percent
2713 }
2714
2715 pub fn health_probe_timeout(&self) -> Duration {
2717 Duration::from_millis(self.values.health.probe_timeout_ms)
2718 }
2719
2720 pub fn memory_stat_refresh_frequency_ms(&self) -> u64 {
2722 self.values.health.memory_stat_refresh_frequency_ms
2723 }
2724
2725 pub fn cogs_max_queue_size(&self) -> u64 {
2727 self.values.cogs.max_queue_size
2728 }
2729
2730 pub fn cogs_relay_resource_id(&self) -> &str {
2732 &self.values.cogs.relay_resource_id
2733 }
2734
2735 pub fn default_aggregator_config(&self) -> &AggregatorServiceConfig {
2737 &self.values.aggregator
2738 }
2739
2740 pub fn secondary_aggregator_configs(&self) -> &Vec<ScopedAggregatorConfig> {
2742 &self.values.secondary_aggregators
2743 }
2744
2745 pub fn aggregator_config_for(&self, namespace: MetricNamespace) -> &AggregatorServiceConfig {
2747 for entry in &self.values.secondary_aggregators {
2748 if entry.condition.matches(Some(namespace)) {
2749 return &entry.config;
2750 }
2751 }
2752 &self.values.aggregator
2753 }
2754
2755 pub fn static_relays(&self) -> &HashMap<RelayId, RelayInfo> {
2757 &self.values.auth.static_relays
2758 }
2759
2760 pub fn signature_max_age(&self) -> Duration {
2762 Duration::from_secs(self.values.auth.signature_max_age)
2763 }
2764
2765 pub fn accept_unknown_items(&self) -> bool {
2767 let forward = self.values.routing.accept_unknown_items;
2768 forward.unwrap_or_else(|| !self.processing_enabled())
2769 }
2770}
2771
2772impl Default for Config {
2773 fn default() -> Self {
2774 Self {
2775 values: ConfigValues::default(),
2776 credentials: None,
2777 path: PathBuf::new(),
2778 }
2779 }
2780}
2781
2782#[cfg(test)]
2783mod tests {
2784
2785 use super::*;
2786
2787 #[test]
2789 fn test_event_buffer_size() {
2790 let yaml = r###"
2791cache:
2792 event_buffer_size: 1000000
2793 event_expiry: 1800
2794"###;
2795
2796 let values: ConfigValues = serde_yaml::from_str(yaml).unwrap();
2797 assert_eq!(values.cache.envelope_buffer_size, 1_000_000);
2798 assert_eq!(values.cache.envelope_expiry, 1800);
2799 }
2800
2801 #[test]
2802 fn test_emit_outcomes() {
2803 for (serialized, deserialized) in &[
2804 ("true", EmitOutcomes::AsOutcomes),
2805 ("false", EmitOutcomes::None),
2806 ("\"as_client_reports\"", EmitOutcomes::AsClientReports),
2807 ] {
2808 let value: EmitOutcomes = serde_json::from_str(serialized).unwrap();
2809 assert_eq!(value, *deserialized);
2810 assert_eq!(serde_json::to_string(&value).unwrap(), *serialized);
2811 }
2812 }
2813
2814 #[test]
2815 fn test_emit_outcomes_invalid() {
2816 assert!(serde_json::from_str::<EmitOutcomes>("asdf").is_err());
2817 }
2818}