1use std::collections::{BTreeMap, HashMap};
2use std::error::Error;
3use std::io::Write;
4use std::net::{IpAddr, SocketAddr};
5use std::num::{NonZeroU8, NonZeroU16};
6use std::path::{Path, PathBuf};
7use std::str::FromStr;
8use std::time::Duration;
9use std::{env, fmt, fs, io};
10
11use anyhow::Context;
12use relay_auth::{PublicKey, RelayId, SecretKey, generate_key_pair, generate_relay_id};
13use relay_common::Dsn;
14use relay_kafka::{
15 ConfigError as KafkaConfigError, KafkaConfigParam, KafkaTopic, KafkaTopicConfig,
16 TopicAssignments,
17};
18use relay_metrics::MetricNamespace;
19use serde::de::{DeserializeOwned, Unexpected, Visitor};
20use serde::{Deserialize, Deserializer, Serialize, Serializer};
21use uuid::Uuid;
22
23use crate::aggregator::{AggregatorServiceConfig, ScopedAggregatorConfig};
24use crate::byte_size::ByteSize;
25use crate::upstream::UpstreamDescriptor;
26use crate::{RedisConfig, RedisConfigs, RedisConfigsRef, build_redis_configs};
27
28const DEFAULT_NETWORK_OUTAGE_GRACE_PERIOD: u64 = 10;
29
30static CONFIG_YAML_HEADER: &str = r###"# Please see the relevant documentation.
31# Performance tuning: https://docs.sentry.io/product/relay/operating-guidelines/
32# All config options: https://docs.sentry.io/product/relay/options/
33"###;
34
35#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
37#[non_exhaustive]
38pub enum ConfigErrorKind {
39 CouldNotOpenFile,
41 CouldNotWriteFile,
43 BadYaml,
45 BadJson,
47 InvalidValue,
49 ProcessingNotAvailable,
52}
53
54impl fmt::Display for ConfigErrorKind {
55 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
56 match self {
57 Self::CouldNotOpenFile => write!(f, "could not open config file"),
58 Self::CouldNotWriteFile => write!(f, "could not write config file"),
59 Self::BadYaml => write!(f, "could not parse yaml config file"),
60 Self::BadJson => write!(f, "could not parse json config file"),
61 Self::InvalidValue => write!(f, "invalid config value"),
62 Self::ProcessingNotAvailable => write!(
63 f,
64 "was not compiled with processing, cannot enable processing"
65 ),
66 }
67 }
68}
69
70#[derive(Debug, Default)]
72enum ConfigErrorSource {
73 #[default]
75 None,
76 File(PathBuf),
78 FieldOverride(String),
80}
81
82impl fmt::Display for ConfigErrorSource {
83 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84 match self {
85 ConfigErrorSource::None => Ok(()),
86 ConfigErrorSource::File(file_name) => {
87 write!(f, " (file {})", file_name.display())
88 }
89 ConfigErrorSource::FieldOverride(name) => write!(f, " (field {name})"),
90 }
91 }
92}
93
94#[derive(Debug)]
96pub struct ConfigError {
97 source: ConfigErrorSource,
98 kind: ConfigErrorKind,
99}
100
101impl ConfigError {
102 #[inline]
103 fn new(kind: ConfigErrorKind) -> Self {
104 Self {
105 source: ConfigErrorSource::None,
106 kind,
107 }
108 }
109
110 #[inline]
111 fn field(field: &'static str) -> Self {
112 Self {
113 source: ConfigErrorSource::FieldOverride(field.to_owned()),
114 kind: ConfigErrorKind::InvalidValue,
115 }
116 }
117
118 #[inline]
119 fn file(kind: ConfigErrorKind, p: impl AsRef<Path>) -> Self {
120 Self {
121 source: ConfigErrorSource::File(p.as_ref().to_path_buf()),
122 kind,
123 }
124 }
125
126 pub fn kind(&self) -> ConfigErrorKind {
128 self.kind
129 }
130}
131
132impl fmt::Display for ConfigError {
133 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
134 write!(f, "{}{}", self.kind(), self.source)
135 }
136}
137
138impl Error for ConfigError {}
139
140enum ConfigFormat {
141 Yaml,
142 Json,
143}
144
145impl ConfigFormat {
146 pub fn extension(&self) -> &'static str {
147 match self {
148 ConfigFormat::Yaml => "yml",
149 ConfigFormat::Json => "json",
150 }
151 }
152}
153
154trait ConfigObject: DeserializeOwned + Serialize {
155 fn format() -> ConfigFormat;
157
158 fn name() -> &'static str;
160
161 fn path(base: &Path) -> PathBuf {
163 base.join(format!("{}.{}", Self::name(), Self::format().extension()))
164 }
165
166 fn load(base: &Path) -> anyhow::Result<Self> {
168 let path = Self::path(base);
169
170 let f = fs::File::open(&path)
171 .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotOpenFile, &path))?;
172 let f = io::BufReader::new(f);
173
174 let mut source = {
175 let file = serde_vars::FileSource::default()
176 .with_variable_prefix("${file:")
177 .with_variable_suffix("}")
178 .with_base_path(base);
179 let env = serde_vars::EnvSource::default()
180 .with_variable_prefix("${")
181 .with_variable_suffix("}");
182 (file, env)
183 };
184 match Self::format() {
185 ConfigFormat::Yaml => {
186 serde_vars::deserialize(serde_yaml::Deserializer::from_reader(f), &mut source)
187 .with_context(|| ConfigError::file(ConfigErrorKind::BadYaml, &path))
188 }
189 ConfigFormat::Json => {
190 serde_vars::deserialize(&mut serde_json::Deserializer::from_reader(f), &mut source)
191 .with_context(|| ConfigError::file(ConfigErrorKind::BadJson, &path))
192 }
193 }
194 }
195
196 fn save(&self, base: &Path) -> anyhow::Result<()> {
198 let path = Self::path(base);
199 let mut options = fs::OpenOptions::new();
200 options.write(true).truncate(true).create(true);
201
202 #[cfg(unix)]
204 {
205 use std::os::unix::fs::OpenOptionsExt;
206 options.mode(0o600);
207 }
208
209 let mut f = options
210 .open(&path)
211 .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path))?;
212
213 match Self::format() {
214 ConfigFormat::Yaml => {
215 f.write_all(CONFIG_YAML_HEADER.as_bytes())?;
216 serde_yaml::to_writer(&mut f, self)
217 .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path))?
218 }
219 ConfigFormat::Json => serde_json::to_writer_pretty(&mut f, self)
220 .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path))?,
221 }
222
223 f.write_all(b"\n").ok();
224
225 Ok(())
226 }
227}
228
229#[derive(Debug, Default)]
232pub struct OverridableConfig {
233 pub mode: Option<String>,
235 pub instance: Option<String>,
237 pub log_level: Option<String>,
239 pub log_format: Option<String>,
241 pub upstream: Option<String>,
243 pub upstream_dsn: Option<String>,
245 pub host: Option<String>,
247 pub port: Option<String>,
249 pub processing: Option<String>,
251 pub kafka_url: Option<String>,
253 pub redis_url: Option<String>,
255 pub id: Option<String>,
257 pub secret_key: Option<String>,
259 pub public_key: Option<String>,
261 pub outcome_source: Option<String>,
263 pub shutdown_timeout: Option<String>,
265 pub server_name: Option<String>,
267}
268
269#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
271pub struct Credentials {
272 pub secret_key: SecretKey,
274 pub public_key: PublicKey,
276 pub id: RelayId,
278}
279
280impl Credentials {
281 pub fn generate() -> Self {
283 relay_log::info!("generating new relay credentials");
284 let (sk, pk) = generate_key_pair();
285 Self {
286 secret_key: sk,
287 public_key: pk,
288 id: generate_relay_id(),
289 }
290 }
291
292 pub fn to_json_string(&self) -> anyhow::Result<String> {
294 serde_json::to_string(self)
295 .with_context(|| ConfigError::new(ConfigErrorKind::CouldNotWriteFile))
296 }
297}
298
299impl ConfigObject for Credentials {
300 fn format() -> ConfigFormat {
301 ConfigFormat::Json
302 }
303 fn name() -> &'static str {
304 "credentials"
305 }
306}
307
308#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
310#[serde(rename_all = "camelCase")]
311pub struct RelayInfo {
312 pub public_key: PublicKey,
314
315 #[serde(default)]
317 pub internal: bool,
318}
319
320impl RelayInfo {
321 pub fn new(public_key: PublicKey) -> Self {
323 Self {
324 public_key,
325 internal: false,
326 }
327 }
328}
329
330#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
332#[serde(rename_all = "camelCase")]
333pub enum RelayMode {
334 Proxy,
340
341 Managed,
347}
348
349impl<'de> Deserialize<'de> for RelayMode {
350 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
351 where
352 D: Deserializer<'de>,
353 {
354 let s = String::deserialize(deserializer)?;
355 match s.as_str() {
356 "proxy" => Ok(RelayMode::Proxy),
357 "managed" => Ok(RelayMode::Managed),
358 "static" => Err(serde::de::Error::custom(
359 "Relay mode 'static' has been removed. Please use 'managed' or 'proxy' instead.",
360 )),
361 other => Err(serde::de::Error::unknown_variant(
362 other,
363 &["proxy", "managed"],
364 )),
365 }
366 }
367}
368
369impl fmt::Display for RelayMode {
370 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
371 match self {
372 RelayMode::Proxy => write!(f, "proxy"),
373 RelayMode::Managed => write!(f, "managed"),
374 }
375 }
376}
377
378#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize)]
380#[serde(rename_all = "camelCase")]
381pub enum RelayInstance {
382 Default,
384
385 Canary,
387}
388
389impl RelayInstance {
390 pub fn is_canary(&self) -> bool {
392 matches!(self, RelayInstance::Canary)
393 }
394}
395
396impl fmt::Display for RelayInstance {
397 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
398 match self {
399 RelayInstance::Default => write!(f, "default"),
400 RelayInstance::Canary => write!(f, "canary"),
401 }
402 }
403}
404
405impl FromStr for RelayInstance {
406 type Err = fmt::Error;
407
408 fn from_str(s: &str) -> Result<Self, Self::Err> {
409 match s {
410 "canary" => Ok(RelayInstance::Canary),
411 _ => Ok(RelayInstance::Default),
412 }
413 }
414}
415
416#[derive(Clone, Copy, Debug, Eq, PartialEq)]
418pub struct ParseRelayModeError;
419
420impl fmt::Display for ParseRelayModeError {
421 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
422 write!(f, "Relay mode must be one of: managed or proxy")
423 }
424}
425
426impl Error for ParseRelayModeError {}
427
428impl FromStr for RelayMode {
429 type Err = ParseRelayModeError;
430
431 fn from_str(s: &str) -> Result<Self, Self::Err> {
432 match s {
433 "proxy" => Ok(RelayMode::Proxy),
434 "managed" => Ok(RelayMode::Managed),
435 _ => Err(ParseRelayModeError),
436 }
437 }
438}
439
440fn is_default<T: Default + PartialEq>(t: &T) -> bool {
442 *t == T::default()
443}
444
445fn is_docker() -> bool {
447 if fs::metadata("/.dockerenv").is_ok() {
448 return true;
449 }
450
451 fs::read_to_string("/proc/self/cgroup").is_ok_and(|s| s.contains("/docker"))
452}
453
454fn default_host() -> IpAddr {
456 if is_docker() {
457 "0.0.0.0".parse().unwrap()
459 } else {
460 "127.0.0.1".parse().unwrap()
461 }
462}
463
464#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize)]
468#[serde(rename_all = "lowercase")]
469#[derive(Default)]
470pub enum ReadinessCondition {
471 #[default]
480 Authenticated,
481 Always,
483}
484
485#[derive(Serialize, Deserialize, Debug)]
487#[serde(default)]
488pub struct Relay {
489 pub mode: RelayMode,
491 pub instance: RelayInstance,
493 pub upstream: UpstreamDescriptor,
495 pub advertised_upstream: Option<UpstreamDescriptor>,
504 pub host: IpAddr,
506 pub port: u16,
508 pub internal_host: Option<IpAddr>,
522 pub internal_port: Option<u16>,
526 #[serde(skip_serializing)]
528 pub tls_port: Option<u16>,
529 #[serde(skip_serializing)]
531 pub tls_identity_path: Option<PathBuf>,
532 #[serde(skip_serializing)]
534 pub tls_identity_password: Option<String>,
535 #[serde(skip_serializing_if = "is_default")]
540 pub override_project_ids: bool,
541}
542
543impl Default for Relay {
544 fn default() -> Self {
545 Relay {
546 mode: RelayMode::Managed,
547 instance: RelayInstance::Default,
548 upstream: "https://sentry.io/".parse().unwrap(),
549 advertised_upstream: None,
550 host: default_host(),
551 port: 3000,
552 internal_host: None,
553 internal_port: None,
554 tls_port: None,
555 tls_identity_path: None,
556 tls_identity_password: None,
557 override_project_ids: false,
558 }
559 }
560}
561
562#[derive(Serialize, Deserialize, Debug)]
564#[serde(default)]
565pub struct Metrics {
566 pub statsd: Option<String>,
570 pub statsd_buffer_size: Option<usize>,
574 pub prefix: String,
578 pub default_tags: BTreeMap<String, String>,
580 pub hostname_tag: Option<String>,
582 pub periodic_secs: u64,
587}
588
589impl Default for Metrics {
590 fn default() -> Self {
591 Metrics {
592 statsd: None,
593 statsd_buffer_size: None,
594 prefix: "sentry.relay".into(),
595 default_tags: BTreeMap::new(),
596 hostname_tag: None,
597 periodic_secs: 5,
598 }
599 }
600}
601
602#[derive(Serialize, Deserialize, Debug)]
604#[serde(default)]
605pub struct Limits {
606 pub max_concurrent_requests: usize,
609 pub max_concurrent_queries: usize,
614 pub max_event_size: ByteSize,
616 pub max_attachment_size: ByteSize,
618 pub max_upload_size: ByteSize,
620 pub max_attachments_size: ByteSize,
622 pub max_client_reports_size: ByteSize,
624 pub max_check_in_size: ByteSize,
626 pub max_envelope_size: ByteSize,
628 pub max_session_count: usize,
630 pub max_api_payload_size: ByteSize,
632 pub max_api_file_upload_size: ByteSize,
634 pub max_api_chunk_upload_size: ByteSize,
636 pub max_profile_size: ByteSize,
638 pub max_trace_metric_size: ByteSize,
640 pub max_log_size: ByteSize,
642 pub max_span_size: ByteSize,
644 pub max_container_size: ByteSize,
646 pub max_statsd_size: ByteSize,
648 pub max_metric_buckets_size: ByteSize,
650 pub max_replay_compressed_size: ByteSize,
652 #[serde(alias = "max_replay_size")]
654 max_replay_uncompressed_size: ByteSize,
655 pub max_replay_message_size: ByteSize,
657 pub max_removed_attribute_key_size: ByteSize,
667 pub max_thread_count: usize,
672 pub max_pool_concurrency: usize,
679 pub query_timeout: u64,
682 pub shutdown_timeout: u64,
685 pub keepalive_timeout: u64,
689 pub idle_timeout: Option<u64>,
696 pub max_connections: Option<usize>,
702 pub tcp_listen_backlog: u32,
710}
711
712impl Default for Limits {
713 fn default() -> Self {
714 Limits {
715 max_concurrent_requests: 100,
716 max_concurrent_queries: 5,
717 max_event_size: ByteSize::mebibytes(1),
718 max_attachment_size: ByteSize::mebibytes(200),
719 max_upload_size: ByteSize::mebibytes(1024),
720 max_attachments_size: ByteSize::mebibytes(200),
721 max_client_reports_size: ByteSize::kibibytes(4),
722 max_check_in_size: ByteSize::kibibytes(100),
723 max_envelope_size: ByteSize::mebibytes(200),
724 max_session_count: 100,
725 max_api_payload_size: ByteSize::mebibytes(20),
726 max_api_file_upload_size: ByteSize::mebibytes(40),
727 max_api_chunk_upload_size: ByteSize::mebibytes(100),
728 max_profile_size: ByteSize::mebibytes(50),
729 max_trace_metric_size: ByteSize::mebibytes(1),
730 max_log_size: ByteSize::mebibytes(1),
731 max_span_size: ByteSize::mebibytes(10),
732 max_container_size: ByteSize::mebibytes(12),
733 max_statsd_size: ByteSize::mebibytes(1),
734 max_metric_buckets_size: ByteSize::mebibytes(1),
735 max_replay_compressed_size: ByteSize::mebibytes(10),
736 max_replay_uncompressed_size: ByteSize::mebibytes(100),
737 max_replay_message_size: ByteSize::mebibytes(15),
738 max_thread_count: num_cpus::get(),
739 max_pool_concurrency: 1,
740 query_timeout: 30,
741 shutdown_timeout: 10,
742 keepalive_timeout: 5,
743 idle_timeout: None,
744 max_connections: None,
745 tcp_listen_backlog: 1024,
746 max_removed_attribute_key_size: ByteSize::kibibytes(10),
747 }
748 }
749}
750
751#[derive(Debug, Default, Deserialize, Serialize)]
753#[serde(default)]
754pub struct Routing {
755 pub accept_unknown_items: Option<bool>,
765}
766
767#[derive(Clone, Copy, Debug, Default, Deserialize, Serialize)]
769#[serde(rename_all = "lowercase")]
770pub enum HttpEncoding {
771 #[default]
776 Identity,
777 Deflate,
783 Gzip,
790 Br,
792 Zstd,
794}
795
796impl HttpEncoding {
797 pub fn parse(str: &str) -> Self {
799 let str = str.trim();
800 if str.eq_ignore_ascii_case("zstd") {
801 Self::Zstd
802 } else if str.eq_ignore_ascii_case("br") {
803 Self::Br
804 } else if str.eq_ignore_ascii_case("gzip") || str.eq_ignore_ascii_case("x-gzip") {
805 Self::Gzip
806 } else if str.eq_ignore_ascii_case("deflate") {
807 Self::Deflate
808 } else {
809 Self::Identity
810 }
811 }
812
813 pub fn name(&self) -> Option<&'static str> {
817 match self {
818 Self::Identity => None,
819 Self::Deflate => Some("deflate"),
820 Self::Gzip => Some("gzip"),
821 Self::Br => Some("br"),
822 Self::Zstd => Some("zstd"),
823 }
824 }
825}
826
827#[derive(Serialize, Deserialize, Debug)]
829#[serde(default)]
830pub struct Http {
831 pub timeout: u32,
837 pub connection_timeout: u32,
842 pub max_retry_interval: u32,
844 pub host_header: Option<String>,
846 pub auth_interval: Option<u64>,
854 pub outage_grace_period: u64,
860 pub retry_delay: u64,
864 pub project_failure_interval: u64,
869 pub encoding: HttpEncoding,
885 pub global_metrics: bool,
892 pub forward: bool,
899 pub dns_cache: bool,
903}
904
905impl Default for Http {
906 fn default() -> Self {
907 Http {
908 timeout: 5,
909 connection_timeout: 3,
910 max_retry_interval: 60, host_header: None,
912 auth_interval: Some(600), outage_grace_period: DEFAULT_NETWORK_OUTAGE_GRACE_PERIOD,
914 retry_delay: default_retry_delay(),
915 project_failure_interval: default_project_failure_interval(),
916 encoding: HttpEncoding::Zstd,
917 global_metrics: false,
918 forward: true,
919 dns_cache: true,
920 }
921 }
922}
923
924fn default_retry_delay() -> u64 {
926 1
927}
928
929fn default_project_failure_interval() -> u64 {
931 90
932}
933
934fn spool_envelopes_max_disk_size() -> ByteSize {
936 ByteSize::mebibytes(500)
937}
938
939fn spool_envelopes_batch_size_bytes() -> ByteSize {
941 ByteSize::kibibytes(10)
942}
943
944fn spool_envelopes_max_envelope_delay_secs() -> u64 {
945 24 * 60 * 60
946}
947
948fn spool_disk_usage_refresh_frequency_ms() -> u64 {
950 100
951}
952
953fn spool_max_backpressure_memory_percent() -> f32 {
955 0.8
956}
957
958fn spool_envelopes_partitions() -> NonZeroU8 {
960 NonZeroU8::new(1).unwrap()
961}
962
963#[derive(Clone, Copy, Debug, Eq, PartialEq, Default, Deserialize, Serialize)]
965#[serde(rename_all = "snake_case")]
966pub enum EnvelopeSpoolPartitioning {
967 ProjectKeyPair,
971 #[default]
978 RoundRobin,
979}
980
981#[derive(Debug, Serialize, Deserialize)]
983pub struct EnvelopeSpool {
984 pub path: Option<PathBuf>,
990 #[serde(default = "spool_envelopes_max_disk_size")]
996 pub max_disk_size: ByteSize,
997 #[serde(default = "spool_envelopes_batch_size_bytes")]
1004 pub batch_size_bytes: ByteSize,
1005 #[serde(default = "spool_envelopes_max_envelope_delay_secs")]
1012 pub max_envelope_delay_secs: u64,
1013 #[serde(default = "spool_disk_usage_refresh_frequency_ms")]
1018 pub disk_usage_refresh_frequency_ms: u64,
1019 #[serde(default = "spool_max_backpressure_memory_percent")]
1049 pub max_backpressure_memory_percent: f32,
1050 #[serde(default = "spool_envelopes_partitions")]
1057 pub partitions: NonZeroU8,
1058 #[serde(default)]
1064 pub partitioning: EnvelopeSpoolPartitioning,
1065 #[serde(default)]
1072 pub ephemeral: bool,
1073}
1074
1075impl Default for EnvelopeSpool {
1076 fn default() -> Self {
1077 Self {
1078 path: None,
1079 max_disk_size: spool_envelopes_max_disk_size(),
1080 batch_size_bytes: spool_envelopes_batch_size_bytes(),
1081 max_envelope_delay_secs: spool_envelopes_max_envelope_delay_secs(),
1082 disk_usage_refresh_frequency_ms: spool_disk_usage_refresh_frequency_ms(),
1083 max_backpressure_memory_percent: spool_max_backpressure_memory_percent(),
1084 partitions: spool_envelopes_partitions(),
1085 partitioning: EnvelopeSpoolPartitioning::default(),
1086 ephemeral: false,
1087 }
1088 }
1089}
1090
1091#[derive(Debug, Serialize, Deserialize, Default)]
1093pub struct Spool {
1094 #[serde(default)]
1096 pub envelopes: EnvelopeSpool,
1097}
1098
1099#[derive(Serialize, Deserialize, Debug)]
1101#[serde(default)]
1102pub struct Cache {
1103 pub project_request_full_config: bool,
1105 pub project_expiry: u32,
1107 pub project_grace_period: u32,
1112 pub project_refresh_interval: Option<u32>,
1118 pub relay_expiry: u32,
1120 #[serde(alias = "event_expiry")]
1126 envelope_expiry: u32,
1127 #[serde(alias = "event_buffer_size")]
1129 envelope_buffer_size: u32,
1130 pub miss_expiry: u32,
1132 pub batch_interval: u32,
1134 pub downstream_relays_batch_interval: u32,
1136 pub batch_size: usize,
1140 pub file_interval: u32,
1142 pub global_config_fetch_interval: u32,
1144}
1145
1146impl Default for Cache {
1147 fn default() -> Self {
1148 Cache {
1149 project_request_full_config: false,
1150 project_expiry: 300, project_grace_period: 120, project_refresh_interval: None,
1153 relay_expiry: 3600, envelope_expiry: 600, envelope_buffer_size: 1000,
1156 miss_expiry: 60, batch_interval: 100, downstream_relays_batch_interval: 100, batch_size: 500,
1160 file_interval: 10, global_config_fetch_interval: 10, }
1163 }
1164}
1165
1166fn default_max_secs_in_future() -> u32 {
1167 60 }
1169
1170fn default_max_session_secs_in_past() -> u32 {
1171 5 * 24 * 3600 }
1173
1174fn default_chunk_size() -> ByteSize {
1175 ByteSize::mebibytes(1)
1176}
1177
1178fn default_projectconfig_cache_prefix() -> String {
1179 "relayconfig".to_owned()
1180}
1181
1182#[allow(clippy::unnecessary_wraps)]
1183fn default_max_rate_limit() -> Option<u32> {
1184 Some(300) }
1186
1187#[derive(Serialize, Deserialize, Debug)]
1189pub struct Processing {
1190 pub enabled: bool,
1192 #[serde(default)]
1194 pub geoip_path: Option<PathBuf>,
1195 #[serde(default = "default_max_secs_in_future")]
1197 pub max_secs_in_future: u32,
1198 #[serde(default = "default_max_session_secs_in_past")]
1200 pub max_session_secs_in_past: u32,
1201 pub kafka_config: Vec<KafkaConfigParam>,
1203 #[serde(default)]
1223 pub secondary_kafka_configs: BTreeMap<String, Vec<KafkaConfigParam>>,
1224 #[serde(default)]
1226 pub topics: TopicAssignments,
1227 #[serde(default)]
1229 pub kafka_validate_topics: bool,
1230 #[serde(default)]
1232 pub redis: Option<RedisConfigs>,
1233 #[serde(default = "default_chunk_size")]
1235 pub attachment_chunk_size: ByteSize,
1236 #[serde(default = "default_projectconfig_cache_prefix")]
1238 pub projectconfig_cache_prefix: String,
1239 #[serde(default = "default_max_rate_limit")]
1241 pub max_rate_limit: Option<u32>,
1242 pub quota_cache_ratio: Option<f32>,
1253 pub quota_cache_max: Option<f32>,
1260 #[serde(default, alias = "upload")]
1262 pub objectstore: ObjectstoreServiceConfig,
1263}
1264
1265impl Default for Processing {
1266 fn default() -> Self {
1268 Self {
1269 enabled: false,
1270 geoip_path: None,
1271 max_secs_in_future: default_max_secs_in_future(),
1272 max_session_secs_in_past: default_max_session_secs_in_past(),
1273 kafka_config: Vec::new(),
1274 secondary_kafka_configs: BTreeMap::new(),
1275 topics: TopicAssignments::default(),
1276 kafka_validate_topics: false,
1277 redis: None,
1278 attachment_chunk_size: default_chunk_size(),
1279 projectconfig_cache_prefix: default_projectconfig_cache_prefix(),
1280 max_rate_limit: default_max_rate_limit(),
1281 quota_cache_ratio: None,
1282 quota_cache_max: None,
1283 objectstore: ObjectstoreServiceConfig::default(),
1284 }
1285 }
1286}
1287
1288#[derive(Debug, Default, Serialize, Deserialize)]
1290#[serde(default)]
1291pub struct Normalization {
1292 #[serde(default)]
1294 pub level: NormalizationLevel,
1295}
1296
1297#[derive(Copy, Clone, Debug, Default, Serialize, Deserialize, Eq, PartialEq)]
1299#[serde(rename_all = "lowercase")]
1300pub enum NormalizationLevel {
1301 #[default]
1305 Default,
1306 Full,
1311}
1312
1313#[derive(Serialize, Deserialize, Debug)]
1315#[serde(default)]
1316pub struct OutcomeAggregatorConfig {
1317 pub bucket_interval: u64,
1319 pub flush_interval: u64,
1321}
1322
1323impl Default for OutcomeAggregatorConfig {
1324 fn default() -> Self {
1325 Self {
1326 bucket_interval: 60,
1327 flush_interval: 120,
1328 }
1329 }
1330}
1331
1332#[derive(Serialize, Deserialize)]
1334pub struct ObjectstoreAuthConfig {
1335 pub key_id: String,
1338
1339 pub signing_key: String,
1341}
1342
1343impl fmt::Debug for ObjectstoreAuthConfig {
1344 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1345 f.debug_struct("ObjectstoreAuthConfig")
1346 .field("key_id", &self.key_id)
1347 .field("signing_key", &"[redacted]")
1348 .finish()
1349 }
1350}
1351
1352#[derive(Serialize, Deserialize, Debug)]
1354#[serde(default)]
1355pub struct ObjectstoreServiceConfig {
1356 pub objectstore_url: Option<String>,
1361
1362 pub max_concurrent_requests: usize,
1364
1365 pub max_backlog: usize,
1369
1370 pub timeout: u64,
1375
1376 pub stream_timeout: u64,
1381
1382 pub retry_delay: f64,
1384
1385 pub max_attempts: NonZeroU16,
1387
1388 pub auth: Option<ObjectstoreAuthConfig>,
1390}
1391
1392impl Default for ObjectstoreServiceConfig {
1393 fn default() -> Self {
1394 Self {
1395 objectstore_url: None,
1396 max_concurrent_requests: 10,
1397 max_backlog: 20,
1398 timeout: 60,
1399 stream_timeout: 5 * 60, retry_delay: 1.0,
1401 max_attempts: NonZeroU16::new(5).unwrap(),
1402 auth: None,
1403 }
1404 }
1405}
1406
1407#[derive(Copy, Clone, Debug, PartialEq, Eq)]
1410
1411pub enum EmitOutcomes {
1412 None,
1414 AsClientReports,
1416 AsOutcomes,
1418}
1419
1420impl EmitOutcomes {
1421 pub fn any(&self) -> bool {
1423 !matches!(self, EmitOutcomes::None)
1424 }
1425}
1426
1427impl Serialize for EmitOutcomes {
1428 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1429 where
1430 S: Serializer,
1431 {
1432 match self {
1434 Self::None => serializer.serialize_bool(false),
1435 Self::AsClientReports => serializer.serialize_str("as_client_reports"),
1436 Self::AsOutcomes => serializer.serialize_bool(true),
1437 }
1438 }
1439}
1440
1441struct EmitOutcomesVisitor;
1442
1443impl Visitor<'_> for EmitOutcomesVisitor {
1444 type Value = EmitOutcomes;
1445
1446 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
1447 formatter.write_str("true, false, or 'as_client_reports'")
1448 }
1449
1450 fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E>
1451 where
1452 E: serde::de::Error,
1453 {
1454 Ok(if v {
1455 EmitOutcomes::AsOutcomes
1456 } else {
1457 EmitOutcomes::None
1458 })
1459 }
1460
1461 fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
1462 where
1463 E: serde::de::Error,
1464 {
1465 if v == "as_client_reports" {
1466 Ok(EmitOutcomes::AsClientReports)
1467 } else {
1468 Err(E::invalid_value(Unexpected::Str(v), &"as_client_reports"))
1469 }
1470 }
1471}
1472
1473impl<'de> Deserialize<'de> for EmitOutcomes {
1474 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1475 where
1476 D: Deserializer<'de>,
1477 {
1478 deserializer.deserialize_any(EmitOutcomesVisitor)
1479 }
1480}
1481
1482#[derive(Serialize, Deserialize, Debug)]
1484#[serde(default)]
1485pub struct Outcomes {
1486 pub emit_outcomes: EmitOutcomes,
1490 pub batch_size: usize,
1493 pub batch_interval: u64,
1496 pub source: Option<String>,
1499 pub aggregator: OutcomeAggregatorConfig,
1501}
1502
1503impl Default for Outcomes {
1504 fn default() -> Self {
1505 Outcomes {
1506 emit_outcomes: EmitOutcomes::AsClientReports,
1507 batch_size: 1000,
1508 batch_interval: 500,
1509 source: None,
1510 aggregator: OutcomeAggregatorConfig::default(),
1511 }
1512 }
1513}
1514
1515#[derive(Serialize, Deserialize, Debug, Default)]
1517pub struct MinimalConfig {
1518 pub relay: Relay,
1520}
1521
1522impl MinimalConfig {
1523 pub fn save_in_folder<P: AsRef<Path>>(&self, p: P) -> anyhow::Result<()> {
1525 let path = p.as_ref();
1526 if fs::metadata(path).is_err() {
1527 fs::create_dir_all(path)
1528 .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotOpenFile, path))?;
1529 }
1530 self.save(path)
1531 }
1532}
1533
1534impl ConfigObject for MinimalConfig {
1535 fn format() -> ConfigFormat {
1536 ConfigFormat::Yaml
1537 }
1538
1539 fn name() -> &'static str {
1540 "config"
1541 }
1542}
1543
1544mod config_relay_info {
1546 use serde::ser::SerializeMap;
1547
1548 use super::*;
1549
1550 #[derive(Debug, Serialize, Deserialize, Clone)]
1552 struct RelayInfoConfig {
1553 public_key: PublicKey,
1554 #[serde(default)]
1555 internal: bool,
1556 }
1557
1558 impl From<RelayInfoConfig> for RelayInfo {
1559 fn from(v: RelayInfoConfig) -> Self {
1560 RelayInfo {
1561 public_key: v.public_key,
1562 internal: v.internal,
1563 }
1564 }
1565 }
1566
1567 impl From<RelayInfo> for RelayInfoConfig {
1568 fn from(v: RelayInfo) -> Self {
1569 RelayInfoConfig {
1570 public_key: v.public_key,
1571 internal: v.internal,
1572 }
1573 }
1574 }
1575
1576 pub(super) fn deserialize<'de, D>(des: D) -> Result<HashMap<RelayId, RelayInfo>, D::Error>
1577 where
1578 D: Deserializer<'de>,
1579 {
1580 let map = HashMap::<RelayId, RelayInfoConfig>::deserialize(des)?;
1581 Ok(map.into_iter().map(|(k, v)| (k, v.into())).collect())
1582 }
1583
1584 pub(super) fn serialize<S>(elm: &HashMap<RelayId, RelayInfo>, ser: S) -> Result<S::Ok, S::Error>
1585 where
1586 S: Serializer,
1587 {
1588 let mut map = ser.serialize_map(Some(elm.len()))?;
1589
1590 for (k, v) in elm {
1591 map.serialize_entry(k, &RelayInfoConfig::from(v.clone()))?;
1592 }
1593
1594 map.end()
1595 }
1596}
1597
1598#[derive(Serialize, Deserialize, Debug, Default)]
1600pub struct AuthConfig {
1601 #[serde(default, skip_serializing_if = "is_default")]
1603 pub ready: ReadinessCondition,
1604
1605 #[serde(default, with = "config_relay_info")]
1607 pub static_relays: HashMap<RelayId, RelayInfo>,
1608
1609 #[serde(default = "default_max_age")]
1613 pub signature_max_age: u64,
1614}
1615
1616fn default_max_age() -> u64 {
1617 300
1618}
1619
1620#[derive(Serialize, Deserialize, Debug, Default)]
1622pub struct GeoIpConfig {
1623 pub path: Option<PathBuf>,
1625}
1626
1627#[derive(Serialize, Deserialize, Debug)]
1629#[serde(default)]
1630pub struct CardinalityLimiter {
1631 pub cache_vacuum_interval: u64,
1637}
1638
1639impl Default for CardinalityLimiter {
1640 fn default() -> Self {
1641 Self {
1642 cache_vacuum_interval: 180,
1643 }
1644 }
1645}
1646
1647#[derive(Serialize, Deserialize, Debug)]
1652#[serde(default)]
1653pub struct Health {
1654 pub refresh_interval_ms: u64,
1661 pub max_memory_bytes: Option<ByteSize>,
1666 pub max_memory_percent: f32,
1670 pub probe_timeout_ms: u64,
1677 pub memory_stat_refresh_frequency_ms: u64,
1683}
1684
1685impl Default for Health {
1686 fn default() -> Self {
1687 Self {
1688 refresh_interval_ms: 3000,
1689 max_memory_bytes: None,
1690 max_memory_percent: 0.95,
1691 probe_timeout_ms: 900,
1692 memory_stat_refresh_frequency_ms: 100,
1693 }
1694 }
1695}
1696
1697#[derive(Serialize, Deserialize, Debug)]
1699#[serde(default)]
1700pub struct Cogs {
1701 pub max_queue_size: u64,
1707 pub relay_resource_id: String,
1713}
1714
1715impl Default for Cogs {
1716 fn default() -> Self {
1717 Self {
1718 max_queue_size: 10_000,
1719 relay_resource_id: "relay_service".to_owned(),
1720 }
1721 }
1722}
1723
1724#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
1726#[serde(default)]
1727pub struct Upload {
1728 pub max_concurrent_requests: usize,
1732 pub timeout: u64,
1734 pub max_age: i64,
1738}
1739
1740impl Default for Upload {
1741 fn default() -> Self {
1742 Self {
1743 max_concurrent_requests: 100,
1744 timeout: 5 * 60, max_age: 60 * 60, }
1747 }
1748}
1749
1750#[derive(Serialize, Deserialize, Debug, Default)]
1752#[allow(missing_docs)]
1753pub struct ConfigValues {
1754 #[serde(default)]
1755 pub relay: Relay,
1756 #[serde(default)]
1757 pub http: Http,
1758 #[serde(default)]
1759 pub cache: Cache,
1760 #[serde(default)]
1761 pub spool: Spool,
1762 #[serde(default)]
1763 pub limits: Limits,
1764 #[serde(default)]
1765 pub logging: relay_log::LogConfig,
1766 #[serde(default)]
1767 pub routing: Routing,
1768 #[serde(default)]
1769 pub metrics: Metrics,
1770 #[serde(default)]
1771 pub sentry: relay_log::SentryConfig,
1772 #[serde(default)]
1773 pub processing: Processing,
1774 #[serde(default)]
1775 pub outcomes: Outcomes,
1776 #[serde(default)]
1777 pub aggregator: AggregatorServiceConfig,
1778 #[serde(default)]
1779 pub secondary_aggregators: Vec<ScopedAggregatorConfig>,
1780 #[serde(default)]
1781 pub auth: AuthConfig,
1782 #[serde(default)]
1783 pub geoip: GeoIpConfig,
1784 #[serde(default)]
1785 pub normalization: Normalization,
1786 #[serde(default)]
1787 pub cardinality_limiter: CardinalityLimiter,
1788 #[serde(default)]
1789 pub health: Health,
1790 #[serde(default)]
1791 pub cogs: Cogs,
1792 #[serde(default)]
1793 pub upload: Upload,
1794}
1795
1796impl ConfigObject for ConfigValues {
1797 fn format() -> ConfigFormat {
1798 ConfigFormat::Yaml
1799 }
1800
1801 fn name() -> &'static str {
1802 "config"
1803 }
1804}
1805
1806pub struct Config {
1808 values: ConfigValues,
1809 credentials: Option<Credentials>,
1810 path: PathBuf,
1811}
1812
1813impl fmt::Debug for Config {
1814 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1815 f.debug_struct("Config")
1816 .field("path", &self.path)
1817 .field("values", &self.values)
1818 .finish()
1819 }
1820}
1821
1822impl Config {
1823 pub fn from_path<P: AsRef<Path>>(path: P) -> anyhow::Result<Config> {
1825 let path = env::current_dir()
1826 .map(|x| x.join(path.as_ref()))
1827 .unwrap_or_else(|_| path.as_ref().to_path_buf());
1828
1829 let config = Config {
1830 values: ConfigValues::load(&path)?,
1831 credentials: if Credentials::path(&path).exists() {
1832 Some(Credentials::load(&path)?)
1833 } else {
1834 None
1835 },
1836 path: path.clone(),
1837 };
1838
1839 if cfg!(not(feature = "processing")) && config.processing_enabled() {
1840 return Err(ConfigError::file(ConfigErrorKind::ProcessingNotAvailable, &path).into());
1841 }
1842
1843 Ok(config)
1844 }
1845
1846 pub fn from_json_value(value: serde_json::Value) -> anyhow::Result<Config> {
1850 Ok(Config {
1851 values: serde_json::from_value(value)
1852 .with_context(|| ConfigError::new(ConfigErrorKind::BadJson))?,
1853 credentials: None,
1854 path: PathBuf::new(),
1855 })
1856 }
1857
1858 pub fn apply_override(
1861 &mut self,
1862 mut overrides: OverridableConfig,
1863 ) -> anyhow::Result<&mut Self> {
1864 let relay = &mut self.values.relay;
1865
1866 if let Some(mode) = overrides.mode {
1867 relay.mode = mode
1868 .parse::<RelayMode>()
1869 .with_context(|| ConfigError::field("mode"))?;
1870 }
1871
1872 if let Some(deployment) = overrides.instance {
1873 relay.instance = deployment
1874 .parse::<RelayInstance>()
1875 .with_context(|| ConfigError::field("deployment"))?;
1876 }
1877
1878 if let Some(log_level) = overrides.log_level {
1879 self.values.logging.level = log_level.parse()?;
1880 }
1881
1882 if let Some(log_format) = overrides.log_format {
1883 self.values.logging.format = log_format.parse()?;
1884 }
1885
1886 if let Some(upstream) = overrides.upstream {
1887 relay.upstream = upstream
1888 .parse::<UpstreamDescriptor>()
1889 .with_context(|| ConfigError::field("upstream"))?;
1890 } else if let Some(upstream_dsn) = overrides.upstream_dsn {
1891 relay.upstream = upstream_dsn
1892 .parse::<Dsn>()
1893 .map(|dsn| UpstreamDescriptor::from_dsn(&dsn))
1894 .with_context(|| ConfigError::field("upstream_dsn"))?;
1895 }
1896
1897 if let Some(host) = overrides.host {
1898 relay.host = host
1899 .parse::<IpAddr>()
1900 .with_context(|| ConfigError::field("host"))?;
1901 }
1902
1903 if let Some(port) = overrides.port {
1904 relay.port = port
1905 .as_str()
1906 .parse()
1907 .with_context(|| ConfigError::field("port"))?;
1908 }
1909
1910 let processing = &mut self.values.processing;
1911 if let Some(enabled) = overrides.processing {
1912 match enabled.to_lowercase().as_str() {
1913 "true" | "1" => processing.enabled = true,
1914 "false" | "0" | "" => processing.enabled = false,
1915 _ => return Err(ConfigError::field("processing").into()),
1916 }
1917 }
1918
1919 if let Some(redis) = overrides.redis_url {
1920 processing.redis = Some(RedisConfigs::Unified(RedisConfig::single(redis)))
1921 }
1922
1923 if let Some(kafka_url) = overrides.kafka_url {
1924 let existing = processing
1925 .kafka_config
1926 .iter_mut()
1927 .find(|e| e.name == "bootstrap.servers");
1928
1929 if let Some(config_param) = existing {
1930 config_param.value = kafka_url;
1931 } else {
1932 processing.kafka_config.push(KafkaConfigParam {
1933 name: "bootstrap.servers".to_owned(),
1934 value: kafka_url,
1935 })
1936 }
1937 }
1938 let id = if let Some(id) = overrides.id {
1940 let id = Uuid::parse_str(&id).with_context(|| ConfigError::field("id"))?;
1941 Some(id)
1942 } else {
1943 None
1944 };
1945 let public_key = if let Some(public_key) = overrides.public_key {
1946 let public_key = public_key
1947 .parse::<PublicKey>()
1948 .with_context(|| ConfigError::field("public_key"))?;
1949 Some(public_key)
1950 } else {
1951 None
1952 };
1953
1954 let secret_key = if let Some(secret_key) = overrides.secret_key {
1955 let secret_key = secret_key
1956 .parse::<SecretKey>()
1957 .with_context(|| ConfigError::field("secret_key"))?;
1958 Some(secret_key)
1959 } else {
1960 None
1961 };
1962 let outcomes = &mut self.values.outcomes;
1963 if overrides.outcome_source.is_some() {
1964 outcomes.source = overrides.outcome_source.take();
1965 }
1966
1967 if let Some(credentials) = &mut self.credentials {
1968 if let Some(id) = id {
1970 credentials.id = id;
1971 }
1972 if let Some(public_key) = public_key {
1973 credentials.public_key = public_key;
1974 }
1975 if let Some(secret_key) = secret_key {
1976 credentials.secret_key = secret_key
1977 }
1978 } else {
1979 match (id, public_key, secret_key) {
1981 (Some(id), Some(public_key), Some(secret_key)) => {
1982 self.credentials = Some(Credentials {
1983 secret_key,
1984 public_key,
1985 id,
1986 })
1987 }
1988 (None, None, None) => {
1989 }
1992 _ => {
1993 return Err(ConfigError::field("incomplete credentials").into());
1994 }
1995 }
1996 }
1997
1998 let limits = &mut self.values.limits;
1999 if let Some(shutdown_timeout) = overrides.shutdown_timeout
2000 && let Ok(shutdown_timeout) = shutdown_timeout.parse::<u64>()
2001 {
2002 limits.shutdown_timeout = shutdown_timeout;
2003 }
2004
2005 if let Some(server_name) = overrides.server_name {
2006 self.values.sentry.server_name = Some(server_name.into());
2007 }
2008
2009 Ok(self)
2010 }
2011
2012 pub fn config_exists<P: AsRef<Path>>(path: P) -> bool {
2014 fs::metadata(ConfigValues::path(path.as_ref())).is_ok()
2015 }
2016
2017 pub fn path(&self) -> &Path {
2019 &self.path
2020 }
2021
2022 pub fn to_yaml_string(&self) -> anyhow::Result<String> {
2024 serde_yaml::to_string(&self.values)
2025 .with_context(|| ConfigError::new(ConfigErrorKind::CouldNotWriteFile))
2026 }
2027
2028 pub fn regenerate_credentials(&mut self, save: bool) -> anyhow::Result<()> {
2032 let creds = Credentials::generate();
2033 if save {
2034 creds.save(&self.path)?;
2035 }
2036 self.credentials = Some(creds);
2037 Ok(())
2038 }
2039
2040 pub fn credentials(&self) -> Option<&Credentials> {
2042 self.credentials.as_ref()
2043 }
2044
2045 pub fn replace_credentials(
2049 &mut self,
2050 credentials: Option<Credentials>,
2051 ) -> anyhow::Result<bool> {
2052 if self.credentials == credentials {
2053 return Ok(false);
2054 }
2055
2056 match credentials {
2057 Some(ref creds) => {
2058 creds.save(&self.path)?;
2059 }
2060 None => {
2061 let path = Credentials::path(&self.path);
2062 if fs::metadata(&path).is_ok() {
2063 fs::remove_file(&path).with_context(|| {
2064 ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path)
2065 })?;
2066 }
2067 }
2068 }
2069
2070 self.credentials = credentials;
2071 Ok(true)
2072 }
2073
2074 pub fn has_credentials(&self) -> bool {
2076 self.credentials.is_some()
2077 }
2078
2079 pub fn secret_key(&self) -> Option<&SecretKey> {
2081 self.credentials.as_ref().map(|x| &x.secret_key)
2082 }
2083
2084 pub fn public_key(&self) -> Option<&PublicKey> {
2086 self.credentials.as_ref().map(|x| &x.public_key)
2087 }
2088
2089 pub fn relay_id(&self) -> Option<&RelayId> {
2091 self.credentials.as_ref().map(|x| &x.id)
2092 }
2093
2094 pub fn relay_mode(&self) -> RelayMode {
2096 self.values.relay.mode
2097 }
2098
2099 pub fn relay_instance(&self) -> RelayInstance {
2101 self.values.relay.instance
2102 }
2103
2104 pub fn upstream(&self) -> &UpstreamDescriptor {
2106 &self.values.relay.upstream
2107 }
2108
2109 pub fn advertised_upstream(&self) -> Option<&UpstreamDescriptor> {
2111 self.values.relay.advertised_upstream.as_ref()
2112 }
2113
2114 pub fn http_host_header(&self) -> Option<&str> {
2116 self.values.http.host_header.as_deref()
2117 }
2118
2119 pub fn listen_addr(&self) -> SocketAddr {
2121 (self.values.relay.host, self.values.relay.port).into()
2122 }
2123
2124 pub fn listen_addr_internal(&self) -> Option<SocketAddr> {
2132 match (
2133 self.values.relay.internal_host,
2134 self.values.relay.internal_port,
2135 ) {
2136 (Some(host), None) => Some((host, self.values.relay.port).into()),
2137 (None, Some(port)) => Some((self.values.relay.host, port).into()),
2138 (Some(host), Some(port)) => Some((host, port).into()),
2139 (None, None) => None,
2140 }
2141 }
2142
2143 pub fn tls_listen_addr(&self) -> Option<SocketAddr> {
2145 if self.values.relay.tls_identity_path.is_some() {
2146 let port = self.values.relay.tls_port.unwrap_or(3443);
2147 Some((self.values.relay.host, port).into())
2148 } else {
2149 None
2150 }
2151 }
2152
2153 pub fn tls_identity_path(&self) -> Option<&Path> {
2155 self.values.relay.tls_identity_path.as_deref()
2156 }
2157
2158 pub fn tls_identity_password(&self) -> Option<&str> {
2160 self.values.relay.tls_identity_password.as_deref()
2161 }
2162
2163 pub fn override_project_ids(&self) -> bool {
2167 self.values.relay.override_project_ids
2168 }
2169
2170 pub fn requires_auth(&self) -> bool {
2174 match self.values.auth.ready {
2175 ReadinessCondition::Authenticated => self.relay_mode() == RelayMode::Managed,
2176 ReadinessCondition::Always => false,
2177 }
2178 }
2179
2180 pub fn http_auth_interval(&self) -> Option<Duration> {
2184 if self.processing_enabled() {
2185 return None;
2186 }
2187
2188 match self.values.http.auth_interval {
2189 None | Some(0) => None,
2190 Some(secs) => Some(Duration::from_secs(secs)),
2191 }
2192 }
2193
2194 pub fn http_outage_grace_period(&self) -> Duration {
2197 Duration::from_secs(self.values.http.outage_grace_period)
2198 }
2199
2200 pub fn http_retry_delay(&self) -> Duration {
2205 Duration::from_secs(self.values.http.retry_delay)
2206 }
2207
2208 pub fn http_project_failure_interval(&self) -> Duration {
2210 Duration::from_secs(self.values.http.project_failure_interval)
2211 }
2212
2213 pub fn http_encoding(&self) -> HttpEncoding {
2215 self.values.http.encoding
2216 }
2217
2218 pub fn http_global_metrics(&self) -> bool {
2220 self.values.http.global_metrics
2221 }
2222
2223 pub fn http_forward(&self) -> bool {
2228 self.values.http.forward && !self.processing_enabled()
2229 }
2230
2231 pub fn emit_outcomes(&self) -> EmitOutcomes {
2236 if self.processing_enabled() {
2237 return EmitOutcomes::AsOutcomes;
2238 }
2239 self.values.outcomes.emit_outcomes
2240 }
2241
2242 pub fn outcome_batch_size(&self) -> usize {
2244 self.values.outcomes.batch_size
2245 }
2246
2247 pub fn outcome_batch_interval(&self) -> Duration {
2249 Duration::from_millis(self.values.outcomes.batch_interval)
2250 }
2251
2252 pub fn outcome_source(&self) -> Option<&str> {
2254 self.values.outcomes.source.as_deref()
2255 }
2256
2257 pub fn outcome_aggregator(&self) -> &OutcomeAggregatorConfig {
2259 &self.values.outcomes.aggregator
2260 }
2261
2262 pub fn logging(&self) -> &relay_log::LogConfig {
2264 &self.values.logging
2265 }
2266
2267 pub fn sentry(&self) -> &relay_log::SentryConfig {
2269 &self.values.sentry
2270 }
2271
2272 pub fn statsd_addr(&self) -> Option<&str> {
2274 self.values.metrics.statsd.as_deref()
2275 }
2276
2277 pub fn statsd_buffer_size(&self) -> Option<usize> {
2279 self.values.metrics.statsd_buffer_size
2280 }
2281
2282 pub fn metrics_prefix(&self) -> &str {
2284 &self.values.metrics.prefix
2285 }
2286
2287 pub fn metrics_default_tags(&self) -> &BTreeMap<String, String> {
2289 &self.values.metrics.default_tags
2290 }
2291
2292 pub fn metrics_hostname_tag(&self) -> Option<&str> {
2294 self.values.metrics.hostname_tag.as_deref()
2295 }
2296
2297 pub fn metrics_periodic_interval(&self) -> Option<Duration> {
2301 match self.values.metrics.periodic_secs {
2302 0 => None,
2303 secs => Some(Duration::from_secs(secs)),
2304 }
2305 }
2306
2307 pub fn http_timeout(&self) -> Duration {
2309 Duration::from_secs(self.values.http.timeout.into())
2310 }
2311
2312 pub fn http_connection_timeout(&self) -> Duration {
2314 Duration::from_secs(self.values.http.connection_timeout.into())
2315 }
2316
2317 pub fn http_max_retry_interval(&self) -> Duration {
2319 Duration::from_secs(self.values.http.max_retry_interval.into())
2320 }
2321
2322 pub fn http_dns_cache(&self) -> bool {
2324 self.values.http.dns_cache
2325 }
2326
2327 pub fn project_cache_expiry(&self) -> Duration {
2329 Duration::from_secs(self.values.cache.project_expiry.into())
2330 }
2331
2332 pub fn request_full_project_config(&self) -> bool {
2334 self.values.cache.project_request_full_config
2335 }
2336
2337 pub fn relay_cache_expiry(&self) -> Duration {
2339 Duration::from_secs(self.values.cache.relay_expiry.into())
2340 }
2341
2342 pub fn envelope_buffer_size(&self) -> usize {
2344 self.values
2345 .cache
2346 .envelope_buffer_size
2347 .try_into()
2348 .unwrap_or(usize::MAX)
2349 }
2350
2351 pub fn cache_miss_expiry(&self) -> Duration {
2353 Duration::from_secs(self.values.cache.miss_expiry.into())
2354 }
2355
2356 pub fn project_grace_period(&self) -> Duration {
2358 Duration::from_secs(self.values.cache.project_grace_period.into())
2359 }
2360
2361 pub fn project_refresh_interval(&self) -> Option<Duration> {
2365 self.values
2366 .cache
2367 .project_refresh_interval
2368 .map(Into::into)
2369 .map(Duration::from_secs)
2370 }
2371
2372 pub fn query_batch_interval(&self) -> Duration {
2375 Duration::from_millis(self.values.cache.batch_interval.into())
2376 }
2377
2378 pub fn downstream_relays_batch_interval(&self) -> Duration {
2380 Duration::from_millis(self.values.cache.downstream_relays_batch_interval.into())
2381 }
2382
2383 pub fn local_cache_interval(&self) -> Duration {
2385 Duration::from_secs(self.values.cache.file_interval.into())
2386 }
2387
2388 pub fn global_config_fetch_interval(&self) -> Duration {
2391 Duration::from_secs(self.values.cache.global_config_fetch_interval.into())
2392 }
2393
2394 pub fn spool_envelopes_path(&self, partition_id: u8) -> Option<PathBuf> {
2399 let mut path = self
2400 .values
2401 .spool
2402 .envelopes
2403 .path
2404 .as_ref()
2405 .map(|path| path.to_owned())?;
2406
2407 if partition_id == 0 {
2408 return Some(path);
2409 }
2410
2411 let file_name = path.file_name().and_then(|f| f.to_str())?;
2412 let new_file_name = format!("{file_name}.{partition_id}");
2413 path.set_file_name(new_file_name);
2414
2415 Some(path)
2416 }
2417
2418 pub fn spool_envelopes_max_disk_size(&self) -> usize {
2420 self.values.spool.envelopes.max_disk_size.as_bytes()
2421 }
2422
2423 pub fn spool_envelopes_batch_size_bytes(&self) -> usize {
2426 self.values.spool.envelopes.batch_size_bytes.as_bytes()
2427 }
2428
2429 pub fn spool_envelopes_max_age(&self) -> Duration {
2431 Duration::from_secs(self.values.spool.envelopes.max_envelope_delay_secs)
2432 }
2433
2434 pub fn spool_disk_usage_refresh_frequency_ms(&self) -> Duration {
2436 Duration::from_millis(self.values.spool.envelopes.disk_usage_refresh_frequency_ms)
2437 }
2438
2439 pub fn spool_max_backpressure_memory_percent(&self) -> f32 {
2441 self.values.spool.envelopes.max_backpressure_memory_percent
2442 }
2443
2444 pub fn spool_partitions(&self) -> NonZeroU8 {
2446 self.values.spool.envelopes.partitions
2447 }
2448
2449 pub fn spool_partitioning(&self) -> EnvelopeSpoolPartitioning {
2451 self.values.spool.envelopes.partitioning
2452 }
2453
2454 pub fn spool_ephemeral(&self) -> bool {
2456 self.values.spool.envelopes.ephemeral
2457 }
2458
2459 pub fn max_event_size(&self) -> usize {
2461 self.values.limits.max_event_size.as_bytes()
2462 }
2463
2464 pub fn max_attachment_size(&self) -> usize {
2466 self.values.limits.max_attachment_size.as_bytes()
2467 }
2468
2469 pub fn max_upload_size(&self) -> usize {
2471 self.values.limits.max_upload_size.as_bytes()
2472 }
2473
2474 pub fn max_attachments_size(&self) -> usize {
2477 self.values.limits.max_attachments_size.as_bytes()
2478 }
2479
2480 pub fn max_client_reports_size(&self) -> usize {
2482 self.values.limits.max_client_reports_size.as_bytes()
2483 }
2484
2485 pub fn max_check_in_size(&self) -> usize {
2487 self.values.limits.max_check_in_size.as_bytes()
2488 }
2489
2490 pub fn max_log_size(&self) -> usize {
2492 self.values.limits.max_log_size.as_bytes()
2493 }
2494
2495 pub fn max_span_size(&self) -> usize {
2497 self.values.limits.max_span_size.as_bytes()
2498 }
2499
2500 pub fn max_container_size(&self) -> usize {
2502 self.values.limits.max_container_size.as_bytes()
2503 }
2504
2505 pub fn max_logs_integration_size(&self) -> usize {
2507 self.max_container_size()
2509 }
2510
2511 pub fn max_spans_integration_size(&self) -> usize {
2513 self.max_container_size()
2515 }
2516
2517 pub fn max_envelope_size(&self) -> usize {
2521 self.values.limits.max_envelope_size.as_bytes()
2522 }
2523
2524 pub fn max_session_count(&self) -> usize {
2526 self.values.limits.max_session_count
2527 }
2528
2529 pub fn max_statsd_size(&self) -> usize {
2531 self.values.limits.max_statsd_size.as_bytes()
2532 }
2533
2534 pub fn max_metric_buckets_size(&self) -> usize {
2536 self.values.limits.max_metric_buckets_size.as_bytes()
2537 }
2538
2539 pub fn max_api_payload_size(&self) -> usize {
2541 self.values.limits.max_api_payload_size.as_bytes()
2542 }
2543
2544 pub fn max_api_file_upload_size(&self) -> usize {
2546 self.values.limits.max_api_file_upload_size.as_bytes()
2547 }
2548
2549 pub fn max_api_chunk_upload_size(&self) -> usize {
2551 self.values.limits.max_api_chunk_upload_size.as_bytes()
2552 }
2553
2554 pub fn max_profile_size(&self) -> usize {
2556 self.values.limits.max_profile_size.as_bytes()
2557 }
2558
2559 pub fn max_trace_metric_size(&self) -> usize {
2561 self.values.limits.max_trace_metric_size.as_bytes()
2562 }
2563
2564 pub fn max_replay_compressed_size(&self) -> usize {
2566 self.values.limits.max_replay_compressed_size.as_bytes()
2567 }
2568
2569 pub fn max_replay_uncompressed_size(&self) -> usize {
2571 self.values.limits.max_replay_uncompressed_size.as_bytes()
2572 }
2573
2574 pub fn max_replay_message_size(&self) -> usize {
2580 self.values.limits.max_replay_message_size.as_bytes()
2581 }
2582
2583 pub fn max_concurrent_requests(&self) -> usize {
2585 self.values.limits.max_concurrent_requests
2586 }
2587
2588 pub fn max_concurrent_queries(&self) -> usize {
2590 self.values.limits.max_concurrent_queries
2591 }
2592
2593 pub fn max_removed_attribute_key_size(&self) -> usize {
2595 self.values.limits.max_removed_attribute_key_size.as_bytes()
2596 }
2597
2598 pub fn query_timeout(&self) -> Duration {
2600 Duration::from_secs(self.values.limits.query_timeout)
2601 }
2602
2603 pub fn shutdown_timeout(&self) -> Duration {
2606 Duration::from_secs(self.values.limits.shutdown_timeout)
2607 }
2608
2609 pub fn keepalive_timeout(&self) -> Duration {
2613 Duration::from_secs(self.values.limits.keepalive_timeout)
2614 }
2615
2616 pub fn idle_timeout(&self) -> Option<Duration> {
2618 self.values.limits.idle_timeout.map(Duration::from_secs)
2619 }
2620
2621 pub fn max_connections(&self) -> Option<usize> {
2623 self.values.limits.max_connections
2624 }
2625
2626 pub fn tcp_listen_backlog(&self) -> u32 {
2628 self.values.limits.tcp_listen_backlog
2629 }
2630
2631 pub fn cpu_concurrency(&self) -> usize {
2633 self.values.limits.max_thread_count
2634 }
2635
2636 pub fn pool_concurrency(&self) -> usize {
2638 self.values.limits.max_pool_concurrency
2639 }
2640
2641 pub fn query_batch_size(&self) -> usize {
2643 self.values.cache.batch_size
2644 }
2645
2646 pub fn project_configs_path(&self) -> PathBuf {
2648 self.path.join("projects")
2649 }
2650
2651 pub fn processing_enabled(&self) -> bool {
2653 self.values.processing.enabled
2654 }
2655
2656 pub fn normalization_level(&self) -> NormalizationLevel {
2658 self.values.normalization.level
2659 }
2660
2661 pub fn geoip_path(&self) -> Option<&Path> {
2663 self.values
2664 .geoip
2665 .path
2666 .as_deref()
2667 .or(self.values.processing.geoip_path.as_deref())
2668 }
2669
2670 pub fn max_secs_in_future(&self) -> i64 {
2674 self.values.processing.max_secs_in_future.into()
2675 }
2676
2677 pub fn max_session_secs_in_past(&self) -> i64 {
2679 self.values.processing.max_session_secs_in_past.into()
2680 }
2681
2682 pub fn kafka_configs(
2684 &self,
2685 topic: KafkaTopic,
2686 ) -> Result<KafkaTopicConfig<'_>, KafkaConfigError> {
2687 self.values.processing.topics.get(topic).kafka_configs(
2688 &self.values.processing.kafka_config,
2689 &self.values.processing.secondary_kafka_configs,
2690 )
2691 }
2692
2693 pub fn kafka_validate_topics(&self) -> bool {
2695 self.values.processing.kafka_validate_topics
2696 }
2697
2698 pub fn unused_topic_assignments(&self) -> &relay_kafka::Unused {
2700 &self.values.processing.topics.unused
2701 }
2702
2703 pub fn objectstore(&self) -> &ObjectstoreServiceConfig {
2705 &self.values.processing.objectstore
2706 }
2707
2708 pub fn upload(&self) -> &Upload {
2710 &self.values.upload
2711 }
2712
2713 pub fn redis(&self) -> Option<RedisConfigsRef<'_>> {
2716 let redis_configs = self.values.processing.redis.as_ref()?;
2717
2718 Some(build_redis_configs(
2719 redis_configs,
2720 self.cpu_concurrency() as u32,
2721 self.pool_concurrency() as u32,
2722 ))
2723 }
2724
2725 pub fn attachment_chunk_size(&self) -> usize {
2727 self.values.processing.attachment_chunk_size.as_bytes()
2728 }
2729
2730 pub fn metrics_max_batch_size_bytes(&self) -> usize {
2732 self.values.aggregator.max_flush_bytes
2733 }
2734
2735 pub fn projectconfig_cache_prefix(&self) -> &str {
2738 &self.values.processing.projectconfig_cache_prefix
2739 }
2740
2741 pub fn max_rate_limit(&self) -> Option<u64> {
2743 self.values.processing.max_rate_limit.map(u32::into)
2744 }
2745
2746 pub fn quota_cache_ratio(&self) -> Option<f32> {
2748 self.values.processing.quota_cache_ratio
2749 }
2750
2751 pub fn quota_cache_max(&self) -> Option<f32> {
2753 self.values.processing.quota_cache_max
2754 }
2755
2756 pub fn cardinality_limiter_cache_vacuum_interval(&self) -> Duration {
2760 Duration::from_secs(self.values.cardinality_limiter.cache_vacuum_interval)
2761 }
2762
2763 pub fn health_refresh_interval(&self) -> Duration {
2765 Duration::from_millis(self.values.health.refresh_interval_ms)
2766 }
2767
2768 pub fn health_max_memory_watermark_bytes(&self) -> u64 {
2770 self.values
2771 .health
2772 .max_memory_bytes
2773 .as_ref()
2774 .map_or(u64::MAX, |b| b.as_bytes() as u64)
2775 }
2776
2777 pub fn health_max_memory_watermark_percent(&self) -> f32 {
2779 self.values.health.max_memory_percent
2780 }
2781
2782 pub fn health_probe_timeout(&self) -> Duration {
2784 Duration::from_millis(self.values.health.probe_timeout_ms)
2785 }
2786
2787 pub fn memory_stat_refresh_frequency_ms(&self) -> u64 {
2789 self.values.health.memory_stat_refresh_frequency_ms
2790 }
2791
2792 pub fn cogs_max_queue_size(&self) -> u64 {
2794 self.values.cogs.max_queue_size
2795 }
2796
2797 pub fn cogs_relay_resource_id(&self) -> &str {
2799 &self.values.cogs.relay_resource_id
2800 }
2801
2802 pub fn default_aggregator_config(&self) -> &AggregatorServiceConfig {
2804 &self.values.aggregator
2805 }
2806
2807 pub fn secondary_aggregator_configs(&self) -> &Vec<ScopedAggregatorConfig> {
2809 &self.values.secondary_aggregators
2810 }
2811
2812 pub fn aggregator_config_for(&self, namespace: MetricNamespace) -> &AggregatorServiceConfig {
2814 for entry in &self.values.secondary_aggregators {
2815 if entry.condition.matches(Some(namespace)) {
2816 return &entry.config;
2817 }
2818 }
2819 &self.values.aggregator
2820 }
2821
2822 pub fn static_relays(&self) -> &HashMap<RelayId, RelayInfo> {
2824 &self.values.auth.static_relays
2825 }
2826
2827 pub fn signature_max_age(&self) -> Duration {
2829 Duration::from_secs(self.values.auth.signature_max_age)
2830 }
2831
2832 pub fn accept_unknown_items(&self) -> bool {
2834 let forward = self.values.routing.accept_unknown_items;
2835 forward.unwrap_or_else(|| !self.processing_enabled())
2836 }
2837}
2838
2839impl Default for Config {
2840 fn default() -> Self {
2841 Self {
2842 values: ConfigValues::default(),
2843 credentials: None,
2844 path: PathBuf::new(),
2845 }
2846 }
2847}
2848
2849#[cfg(test)]
2850mod tests {
2851
2852 use super::*;
2853
2854 #[test]
2856 fn test_event_buffer_size() {
2857 let yaml = r###"
2858cache:
2859 event_buffer_size: 1000000
2860 event_expiry: 1800
2861"###;
2862
2863 let values: ConfigValues = serde_yaml::from_str(yaml).unwrap();
2864 assert_eq!(values.cache.envelope_buffer_size, 1_000_000);
2865 assert_eq!(values.cache.envelope_expiry, 1800);
2866 }
2867
2868 #[test]
2869 fn test_emit_outcomes() {
2870 for (serialized, deserialized) in &[
2871 ("true", EmitOutcomes::AsOutcomes),
2872 ("false", EmitOutcomes::None),
2873 ("\"as_client_reports\"", EmitOutcomes::AsClientReports),
2874 ] {
2875 let value: EmitOutcomes = serde_json::from_str(serialized).unwrap();
2876 assert_eq!(value, *deserialized);
2877 assert_eq!(serde_json::to_string(&value).unwrap(), *serialized);
2878 }
2879 }
2880
2881 #[test]
2882 fn test_emit_outcomes_invalid() {
2883 assert!(serde_json::from_str::<EmitOutcomes>("asdf").is_err());
2884 }
2885}