1use std::collections::{BTreeMap, HashMap};
2use std::error::Error;
3use std::io::Write;
4use std::net::{IpAddr, SocketAddr, ToSocketAddrs};
5use std::num::NonZeroU8;
6use std::path::{Path, PathBuf};
7use std::str::FromStr;
8use std::time::Duration;
9use std::{env, fmt, fs, io};
10
11use anyhow::Context;
12use relay_auth::{PublicKey, RelayId, SecretKey, generate_key_pair, generate_relay_id};
13use relay_common::Dsn;
14use relay_kafka::{
15 ConfigError as KafkaConfigError, KafkaConfigParam, KafkaTopic, KafkaTopicConfig,
16 TopicAssignments,
17};
18use relay_metrics::MetricNamespace;
19use serde::de::{DeserializeOwned, Unexpected, Visitor};
20use serde::{Deserialize, Deserializer, Serialize, Serializer};
21use uuid::Uuid;
22
23use crate::aggregator::{AggregatorServiceConfig, ScopedAggregatorConfig};
24use crate::byte_size::ByteSize;
25use crate::upstream::UpstreamDescriptor;
26use crate::{RedisConfig, RedisConfigs, RedisConfigsRef, build_redis_configs};
27
28const DEFAULT_NETWORK_OUTAGE_GRACE_PERIOD: u64 = 10;
29
30static CONFIG_YAML_HEADER: &str = r###"# Please see the relevant documentation.
31# Performance tuning: https://docs.sentry.io/product/relay/operating-guidelines/
32# All config options: https://docs.sentry.io/product/relay/options/
33"###;
34
35#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
37#[non_exhaustive]
38pub enum ConfigErrorKind {
39 CouldNotOpenFile,
41 CouldNotWriteFile,
43 BadYaml,
45 BadJson,
47 InvalidValue,
49 ProcessingNotAvailable,
52}
53
54impl fmt::Display for ConfigErrorKind {
55 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
56 match self {
57 Self::CouldNotOpenFile => write!(f, "could not open config file"),
58 Self::CouldNotWriteFile => write!(f, "could not write config file"),
59 Self::BadYaml => write!(f, "could not parse yaml config file"),
60 Self::BadJson => write!(f, "could not parse json config file"),
61 Self::InvalidValue => write!(f, "invalid config value"),
62 Self::ProcessingNotAvailable => write!(
63 f,
64 "was not compiled with processing, cannot enable processing"
65 ),
66 }
67 }
68}
69
70#[derive(Debug, Default)]
72enum ConfigErrorSource {
73 #[default]
75 None,
76 File(PathBuf),
78 FieldOverride(String),
80}
81
82impl fmt::Display for ConfigErrorSource {
83 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84 match self {
85 ConfigErrorSource::None => Ok(()),
86 ConfigErrorSource::File(file_name) => {
87 write!(f, " (file {})", file_name.display())
88 }
89 ConfigErrorSource::FieldOverride(name) => write!(f, " (field {name})"),
90 }
91 }
92}
93
94#[derive(Debug)]
96pub struct ConfigError {
97 source: ConfigErrorSource,
98 kind: ConfigErrorKind,
99}
100
101impl ConfigError {
102 #[inline]
103 fn new(kind: ConfigErrorKind) -> Self {
104 Self {
105 source: ConfigErrorSource::None,
106 kind,
107 }
108 }
109
110 #[inline]
111 fn field(field: &'static str) -> Self {
112 Self {
113 source: ConfigErrorSource::FieldOverride(field.to_owned()),
114 kind: ConfigErrorKind::InvalidValue,
115 }
116 }
117
118 #[inline]
119 fn file(kind: ConfigErrorKind, p: impl AsRef<Path>) -> Self {
120 Self {
121 source: ConfigErrorSource::File(p.as_ref().to_path_buf()),
122 kind,
123 }
124 }
125
126 pub fn kind(&self) -> ConfigErrorKind {
128 self.kind
129 }
130}
131
132impl fmt::Display for ConfigError {
133 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
134 write!(f, "{}{}", self.kind(), self.source)
135 }
136}
137
138impl Error for ConfigError {}
139
140enum ConfigFormat {
141 Yaml,
142 Json,
143}
144
145impl ConfigFormat {
146 pub fn extension(&self) -> &'static str {
147 match self {
148 ConfigFormat::Yaml => "yml",
149 ConfigFormat::Json => "json",
150 }
151 }
152}
153
154trait ConfigObject: DeserializeOwned + Serialize {
155 fn format() -> ConfigFormat;
157
158 fn name() -> &'static str;
160
161 fn path(base: &Path) -> PathBuf {
163 base.join(format!("{}.{}", Self::name(), Self::format().extension()))
164 }
165
166 fn load(base: &Path) -> anyhow::Result<Self> {
168 let path = Self::path(base);
169
170 let f = fs::File::open(&path)
171 .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotOpenFile, &path))?;
172 let f = io::BufReader::new(f);
173
174 let mut source = serde_vars::EnvSource::default();
175 match Self::format() {
176 ConfigFormat::Yaml => {
177 serde_vars::deserialize(serde_yaml::Deserializer::from_reader(f), &mut source)
178 .with_context(|| ConfigError::file(ConfigErrorKind::BadYaml, &path))
179 }
180 ConfigFormat::Json => {
181 serde_vars::deserialize(&mut serde_json::Deserializer::from_reader(f), &mut source)
182 .with_context(|| ConfigError::file(ConfigErrorKind::BadJson, &path))
183 }
184 }
185 }
186
187 fn save(&self, base: &Path) -> anyhow::Result<()> {
189 let path = Self::path(base);
190 let mut options = fs::OpenOptions::new();
191 options.write(true).truncate(true).create(true);
192
193 #[cfg(unix)]
195 {
196 use std::os::unix::fs::OpenOptionsExt;
197 options.mode(0o600);
198 }
199
200 let mut f = options
201 .open(&path)
202 .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path))?;
203
204 match Self::format() {
205 ConfigFormat::Yaml => {
206 f.write_all(CONFIG_YAML_HEADER.as_bytes())?;
207 serde_yaml::to_writer(&mut f, self)
208 .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path))?
209 }
210 ConfigFormat::Json => serde_json::to_writer_pretty(&mut f, self)
211 .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path))?,
212 }
213
214 f.write_all(b"\n").ok();
215
216 Ok(())
217 }
218}
219
220#[derive(Debug, Default)]
223pub struct OverridableConfig {
224 pub mode: Option<String>,
226 pub instance: Option<String>,
228 pub log_level: Option<String>,
230 pub log_format: Option<String>,
232 pub upstream: Option<String>,
234 pub upstream_dsn: Option<String>,
236 pub host: Option<String>,
238 pub port: Option<String>,
240 pub processing: Option<String>,
242 pub kafka_url: Option<String>,
244 pub redis_url: Option<String>,
246 pub id: Option<String>,
248 pub secret_key: Option<String>,
250 pub public_key: Option<String>,
252 pub outcome_source: Option<String>,
254 pub shutdown_timeout: Option<String>,
256 pub server_name: Option<String>,
258}
259
260#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
262pub struct Credentials {
263 pub secret_key: SecretKey,
265 pub public_key: PublicKey,
267 pub id: RelayId,
269}
270
271impl Credentials {
272 pub fn generate() -> Self {
274 relay_log::info!("generating new relay credentials");
275 let (sk, pk) = generate_key_pair();
276 Self {
277 secret_key: sk,
278 public_key: pk,
279 id: generate_relay_id(),
280 }
281 }
282
283 pub fn to_json_string(&self) -> anyhow::Result<String> {
285 serde_json::to_string(self)
286 .with_context(|| ConfigError::new(ConfigErrorKind::CouldNotWriteFile))
287 }
288}
289
290impl ConfigObject for Credentials {
291 fn format() -> ConfigFormat {
292 ConfigFormat::Json
293 }
294 fn name() -> &'static str {
295 "credentials"
296 }
297}
298
299#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
301#[serde(rename_all = "camelCase")]
302pub struct RelayInfo {
303 pub public_key: PublicKey,
305
306 #[serde(default)]
308 pub internal: bool,
309}
310
311impl RelayInfo {
312 pub fn new(public_key: PublicKey) -> Self {
314 Self {
315 public_key,
316 internal: false,
317 }
318 }
319}
320
321#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
323#[serde(rename_all = "camelCase")]
324pub enum RelayMode {
325 Proxy,
331
332 Managed,
338}
339
340impl<'de> Deserialize<'de> for RelayMode {
341 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
342 where
343 D: Deserializer<'de>,
344 {
345 let s = String::deserialize(deserializer)?;
346 match s.as_str() {
347 "proxy" => Ok(RelayMode::Proxy),
348 "managed" => Ok(RelayMode::Managed),
349 "static" => Err(serde::de::Error::custom(
350 "Relay mode 'static' has been removed. Please use 'managed' or 'proxy' instead.",
351 )),
352 other => Err(serde::de::Error::unknown_variant(
353 other,
354 &["proxy", "managed"],
355 )),
356 }
357 }
358}
359
360impl fmt::Display for RelayMode {
361 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
362 match self {
363 RelayMode::Proxy => write!(f, "proxy"),
364 RelayMode::Managed => write!(f, "managed"),
365 }
366 }
367}
368
369#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize)]
371#[serde(rename_all = "camelCase")]
372pub enum RelayInstance {
373 Default,
375
376 Canary,
378}
379
380impl RelayInstance {
381 pub fn is_canary(&self) -> bool {
383 matches!(self, RelayInstance::Canary)
384 }
385}
386
387impl fmt::Display for RelayInstance {
388 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
389 match self {
390 RelayInstance::Default => write!(f, "default"),
391 RelayInstance::Canary => write!(f, "canary"),
392 }
393 }
394}
395
396impl FromStr for RelayInstance {
397 type Err = fmt::Error;
398
399 fn from_str(s: &str) -> Result<Self, Self::Err> {
400 match s {
401 "canary" => Ok(RelayInstance::Canary),
402 _ => Ok(RelayInstance::Default),
403 }
404 }
405}
406
407#[derive(Clone, Copy, Debug, Eq, PartialEq)]
409pub struct ParseRelayModeError;
410
411impl fmt::Display for ParseRelayModeError {
412 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
413 write!(f, "Relay mode must be one of: managed or proxy")
414 }
415}
416
417impl Error for ParseRelayModeError {}
418
419impl FromStr for RelayMode {
420 type Err = ParseRelayModeError;
421
422 fn from_str(s: &str) -> Result<Self, Self::Err> {
423 match s {
424 "proxy" => Ok(RelayMode::Proxy),
425 "managed" => Ok(RelayMode::Managed),
426 _ => Err(ParseRelayModeError),
427 }
428 }
429}
430
431fn is_default<T: Default + PartialEq>(t: &T) -> bool {
433 *t == T::default()
434}
435
436fn is_docker() -> bool {
438 if fs::metadata("/.dockerenv").is_ok() {
439 return true;
440 }
441
442 fs::read_to_string("/proc/self/cgroup").is_ok_and(|s| s.contains("/docker"))
443}
444
445fn default_host() -> IpAddr {
447 if is_docker() {
448 "0.0.0.0".parse().unwrap()
450 } else {
451 "127.0.0.1".parse().unwrap()
452 }
453}
454
455#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize)]
459#[serde(rename_all = "lowercase")]
460#[derive(Default)]
461pub enum ReadinessCondition {
462 #[default]
471 Authenticated,
472 Always,
474}
475
476#[derive(Serialize, Deserialize, Debug)]
478#[serde(default)]
479pub struct Relay {
480 pub mode: RelayMode,
482 pub instance: RelayInstance,
484 pub upstream: UpstreamDescriptor<'static>,
486 pub host: IpAddr,
488 pub port: u16,
490 pub internal_host: Option<IpAddr>,
504 pub internal_port: Option<u16>,
508 #[serde(skip_serializing)]
510 pub tls_port: Option<u16>,
511 #[serde(skip_serializing)]
513 pub tls_identity_path: Option<PathBuf>,
514 #[serde(skip_serializing)]
516 pub tls_identity_password: Option<String>,
517 #[serde(skip_serializing_if = "is_default")]
522 pub override_project_ids: bool,
523}
524
525impl Default for Relay {
526 fn default() -> Self {
527 Relay {
528 mode: RelayMode::Managed,
529 instance: RelayInstance::Default,
530 upstream: "https://sentry.io/".parse().unwrap(),
531 host: default_host(),
532 port: 3000,
533 internal_host: None,
534 internal_port: None,
535 tls_port: None,
536 tls_identity_path: None,
537 tls_identity_password: None,
538 override_project_ids: false,
539 }
540 }
541}
542
543#[derive(Serialize, Deserialize, Debug)]
545#[serde(default)]
546pub struct Metrics {
547 pub statsd: Option<String>,
551 pub prefix: String,
555 pub default_tags: BTreeMap<String, String>,
557 pub hostname_tag: Option<String>,
559 pub sample_rate: f32,
564 pub periodic_secs: u64,
569 pub aggregate: bool,
573 pub allow_high_cardinality_tags: bool,
581}
582
583impl Default for Metrics {
584 fn default() -> Self {
585 Metrics {
586 statsd: None,
587 prefix: "sentry.relay".into(),
588 default_tags: BTreeMap::new(),
589 hostname_tag: None,
590 sample_rate: 1.0,
591 periodic_secs: 5,
592 aggregate: true,
593 allow_high_cardinality_tags: false,
594 }
595 }
596}
597
598#[derive(Serialize, Deserialize, Debug)]
600#[serde(default)]
601pub struct Limits {
602 pub max_concurrent_requests: usize,
605 pub max_concurrent_queries: usize,
610 pub max_event_size: ByteSize,
612 pub max_attachment_size: ByteSize,
614 pub max_attachments_size: ByteSize,
616 pub max_client_reports_size: ByteSize,
618 pub max_check_in_size: ByteSize,
620 pub max_envelope_size: ByteSize,
622 pub max_session_count: usize,
624 pub max_api_payload_size: ByteSize,
626 pub max_api_file_upload_size: ByteSize,
628 pub max_api_chunk_upload_size: ByteSize,
630 pub max_profile_size: ByteSize,
632 pub max_trace_metric_size: ByteSize,
634 pub max_log_size: ByteSize,
636 pub max_span_size: ByteSize,
638 pub max_container_size: ByteSize,
640 pub max_statsd_size: ByteSize,
642 pub max_metric_buckets_size: ByteSize,
644 pub max_replay_compressed_size: ByteSize,
646 #[serde(alias = "max_replay_size")]
648 max_replay_uncompressed_size: ByteSize,
649 pub max_replay_message_size: ByteSize,
651 pub max_thread_count: usize,
656 pub max_pool_concurrency: usize,
663 pub query_timeout: u64,
666 pub shutdown_timeout: u64,
669 pub keepalive_timeout: u64,
673 pub idle_timeout: Option<u64>,
680 pub max_connections: Option<usize>,
686 pub tcp_listen_backlog: u32,
694}
695
696impl Default for Limits {
697 fn default() -> Self {
698 Limits {
699 max_concurrent_requests: 100,
700 max_concurrent_queries: 5,
701 max_event_size: ByteSize::mebibytes(1),
702 max_attachment_size: ByteSize::mebibytes(200),
703 max_attachments_size: ByteSize::mebibytes(200),
704 max_client_reports_size: ByteSize::kibibytes(4),
705 max_check_in_size: ByteSize::kibibytes(100),
706 max_envelope_size: ByteSize::mebibytes(200),
707 max_session_count: 100,
708 max_api_payload_size: ByteSize::mebibytes(20),
709 max_api_file_upload_size: ByteSize::mebibytes(40),
710 max_api_chunk_upload_size: ByteSize::mebibytes(100),
711 max_profile_size: ByteSize::mebibytes(50),
712 max_trace_metric_size: ByteSize::kibibytes(2),
713 max_log_size: ByteSize::mebibytes(1),
714 max_span_size: ByteSize::mebibytes(1),
715 max_container_size: ByteSize::mebibytes(12),
716 max_statsd_size: ByteSize::mebibytes(1),
717 max_metric_buckets_size: ByteSize::mebibytes(1),
718 max_replay_compressed_size: ByteSize::mebibytes(10),
719 max_replay_uncompressed_size: ByteSize::mebibytes(100),
720 max_replay_message_size: ByteSize::mebibytes(15),
721 max_thread_count: num_cpus::get(),
722 max_pool_concurrency: 1,
723 query_timeout: 30,
724 shutdown_timeout: 10,
725 keepalive_timeout: 5,
726 idle_timeout: None,
727 max_connections: None,
728 tcp_listen_backlog: 1024,
729 }
730 }
731}
732
733#[derive(Debug, Default, Deserialize, Serialize)]
735#[serde(default)]
736pub struct Routing {
737 pub accept_unknown_items: Option<bool>,
747}
748
749#[derive(Clone, Copy, Debug, Default, Deserialize, Serialize)]
751#[serde(rename_all = "lowercase")]
752pub enum HttpEncoding {
753 #[default]
758 Identity,
759 Deflate,
765 Gzip,
772 Br,
774 Zstd,
776}
777
778impl HttpEncoding {
779 pub fn parse(str: &str) -> Self {
781 let str = str.trim();
782 if str.eq_ignore_ascii_case("zstd") {
783 Self::Zstd
784 } else if str.eq_ignore_ascii_case("br") {
785 Self::Br
786 } else if str.eq_ignore_ascii_case("gzip") || str.eq_ignore_ascii_case("x-gzip") {
787 Self::Gzip
788 } else if str.eq_ignore_ascii_case("deflate") {
789 Self::Deflate
790 } else {
791 Self::Identity
792 }
793 }
794
795 pub fn name(&self) -> Option<&'static str> {
799 match self {
800 Self::Identity => None,
801 Self::Deflate => Some("deflate"),
802 Self::Gzip => Some("gzip"),
803 Self::Br => Some("br"),
804 Self::Zstd => Some("zstd"),
805 }
806 }
807}
808
809#[derive(Serialize, Deserialize, Debug)]
811#[serde(default)]
812pub struct Http {
813 pub timeout: u32,
819 pub connection_timeout: u32,
824 pub max_retry_interval: u32,
826 pub host_header: Option<String>,
828 pub auth_interval: Option<u64>,
836 pub outage_grace_period: u64,
842 pub retry_delay: u64,
846 pub project_failure_interval: u64,
851 pub encoding: HttpEncoding,
867 pub global_metrics: bool,
874}
875
876impl Default for Http {
877 fn default() -> Self {
878 Http {
879 timeout: 5,
880 connection_timeout: 3,
881 max_retry_interval: 60, host_header: None,
883 auth_interval: Some(600), outage_grace_period: DEFAULT_NETWORK_OUTAGE_GRACE_PERIOD,
885 retry_delay: default_retry_delay(),
886 project_failure_interval: default_project_failure_interval(),
887 encoding: HttpEncoding::Zstd,
888 global_metrics: false,
889 }
890 }
891}
892
893fn default_retry_delay() -> u64 {
895 1
896}
897
898fn default_project_failure_interval() -> u64 {
900 90
901}
902
903fn spool_envelopes_max_disk_size() -> ByteSize {
905 ByteSize::mebibytes(500)
906}
907
908fn spool_envelopes_batch_size_bytes() -> ByteSize {
910 ByteSize::kibibytes(10)
911}
912
913fn spool_envelopes_max_envelope_delay_secs() -> u64 {
914 24 * 60 * 60
915}
916
917fn spool_disk_usage_refresh_frequency_ms() -> u64 {
919 100
920}
921
922fn spool_max_backpressure_envelopes() -> usize {
924 500
925}
926
927fn spool_max_backpressure_memory_percent() -> f32 {
929 0.9
930}
931
932fn spool_envelopes_partitions() -> NonZeroU8 {
934 NonZeroU8::new(1).unwrap()
935}
936
937#[derive(Debug, Serialize, Deserialize)]
939pub struct EnvelopeSpool {
940 pub path: Option<PathBuf>,
946 #[serde(default = "spool_envelopes_max_disk_size")]
952 pub max_disk_size: ByteSize,
953 #[serde(default = "spool_envelopes_batch_size_bytes")]
960 pub batch_size_bytes: ByteSize,
961 #[serde(default = "spool_envelopes_max_envelope_delay_secs")]
968 pub max_envelope_delay_secs: u64,
969 #[serde(default = "spool_disk_usage_refresh_frequency_ms")]
974 pub disk_usage_refresh_frequency_ms: u64,
975 #[serde(default = "spool_max_backpressure_envelopes")]
979 pub max_backpressure_envelopes: usize,
980 #[serde(default = "spool_max_backpressure_memory_percent")]
1010 pub max_backpressure_memory_percent: f32,
1011 #[serde(default = "spool_envelopes_partitions")]
1018 pub partitions: NonZeroU8,
1019}
1020
1021impl Default for EnvelopeSpool {
1022 fn default() -> Self {
1023 Self {
1024 path: None,
1025 max_disk_size: spool_envelopes_max_disk_size(),
1026 batch_size_bytes: spool_envelopes_batch_size_bytes(),
1027 max_envelope_delay_secs: spool_envelopes_max_envelope_delay_secs(),
1028 disk_usage_refresh_frequency_ms: spool_disk_usage_refresh_frequency_ms(),
1029 max_backpressure_envelopes: spool_max_backpressure_envelopes(),
1030 max_backpressure_memory_percent: spool_max_backpressure_memory_percent(),
1031 partitions: spool_envelopes_partitions(),
1032 }
1033 }
1034}
1035
1036#[derive(Debug, Serialize, Deserialize, Default)]
1038pub struct Spool {
1039 #[serde(default)]
1041 pub envelopes: EnvelopeSpool,
1042}
1043
1044#[derive(Serialize, Deserialize, Debug)]
1046#[serde(default)]
1047pub struct Cache {
1048 pub project_request_full_config: bool,
1050 pub project_expiry: u32,
1052 pub project_grace_period: u32,
1057 pub project_refresh_interval: Option<u32>,
1063 pub relay_expiry: u32,
1065 #[serde(alias = "event_expiry")]
1071 envelope_expiry: u32,
1072 #[serde(alias = "event_buffer_size")]
1074 envelope_buffer_size: u32,
1075 pub miss_expiry: u32,
1077 pub batch_interval: u32,
1079 pub downstream_relays_batch_interval: u32,
1081 pub batch_size: usize,
1085 pub file_interval: u32,
1087 pub global_config_fetch_interval: u32,
1089}
1090
1091impl Default for Cache {
1092 fn default() -> Self {
1093 Cache {
1094 project_request_full_config: false,
1095 project_expiry: 300, project_grace_period: 120, project_refresh_interval: None,
1098 relay_expiry: 3600, envelope_expiry: 600, envelope_buffer_size: 1000,
1101 miss_expiry: 60, batch_interval: 100, downstream_relays_batch_interval: 100, batch_size: 500,
1105 file_interval: 10, global_config_fetch_interval: 10, }
1108 }
1109}
1110
1111fn default_max_secs_in_future() -> u32 {
1112 60 }
1114
1115fn default_max_session_secs_in_past() -> u32 {
1116 5 * 24 * 3600 }
1118
1119fn default_chunk_size() -> ByteSize {
1120 ByteSize::mebibytes(1)
1121}
1122
1123fn default_projectconfig_cache_prefix() -> String {
1124 "relayconfig".to_owned()
1125}
1126
1127#[allow(clippy::unnecessary_wraps)]
1128fn default_max_rate_limit() -> Option<u32> {
1129 Some(300) }
1131
1132#[derive(Serialize, Deserialize, Debug)]
1134pub struct Processing {
1135 pub enabled: bool,
1137 #[serde(default)]
1139 pub geoip_path: Option<PathBuf>,
1140 #[serde(default = "default_max_secs_in_future")]
1142 pub max_secs_in_future: u32,
1143 #[serde(default = "default_max_session_secs_in_past")]
1145 pub max_session_secs_in_past: u32,
1146 pub kafka_config: Vec<KafkaConfigParam>,
1148 #[serde(default)]
1168 pub secondary_kafka_configs: BTreeMap<String, Vec<KafkaConfigParam>>,
1169 #[serde(default)]
1171 pub topics: TopicAssignments,
1172 #[serde(default)]
1174 pub kafka_validate_topics: bool,
1175 #[serde(default)]
1177 pub redis: Option<RedisConfigs>,
1178 #[serde(default = "default_chunk_size")]
1180 pub attachment_chunk_size: ByteSize,
1181 #[serde(default = "default_projectconfig_cache_prefix")]
1183 pub projectconfig_cache_prefix: String,
1184 #[serde(default = "default_max_rate_limit")]
1186 pub max_rate_limit: Option<u32>,
1187 pub quota_cache_ratio: Option<f32>,
1198 pub quota_cache_max: Option<f32>,
1205 #[serde(default)]
1207 pub upload: UploadServiceConfig,
1208}
1209
1210impl Default for Processing {
1211 fn default() -> Self {
1213 Self {
1214 enabled: false,
1215 geoip_path: None,
1216 max_secs_in_future: default_max_secs_in_future(),
1217 max_session_secs_in_past: default_max_session_secs_in_past(),
1218 kafka_config: Vec::new(),
1219 secondary_kafka_configs: BTreeMap::new(),
1220 topics: TopicAssignments::default(),
1221 kafka_validate_topics: false,
1222 redis: None,
1223 attachment_chunk_size: default_chunk_size(),
1224 projectconfig_cache_prefix: default_projectconfig_cache_prefix(),
1225 max_rate_limit: default_max_rate_limit(),
1226 quota_cache_ratio: None,
1227 quota_cache_max: None,
1228 upload: UploadServiceConfig::default(),
1229 }
1230 }
1231}
1232
1233#[derive(Debug, Default, Serialize, Deserialize)]
1235#[serde(default)]
1236pub struct Normalization {
1237 #[serde(default)]
1239 pub level: NormalizationLevel,
1240}
1241
1242#[derive(Copy, Clone, Debug, Default, Serialize, Deserialize, Eq, PartialEq)]
1244#[serde(rename_all = "lowercase")]
1245pub enum NormalizationLevel {
1246 #[default]
1250 Default,
1251 Full,
1256}
1257
1258#[derive(Serialize, Deserialize, Debug)]
1260#[serde(default)]
1261pub struct OutcomeAggregatorConfig {
1262 pub bucket_interval: u64,
1264 pub flush_interval: u64,
1266}
1267
1268impl Default for OutcomeAggregatorConfig {
1269 fn default() -> Self {
1270 Self {
1271 bucket_interval: 60,
1272 flush_interval: 120,
1273 }
1274 }
1275}
1276
1277#[derive(Serialize, Deserialize, Debug)]
1279#[serde(default)]
1280pub struct UploadServiceConfig {
1281 pub objectstore_url: Option<String>,
1286
1287 pub max_concurrent_requests: usize,
1289
1290 pub timeout: u64,
1292}
1293
1294impl Default for UploadServiceConfig {
1295 fn default() -> Self {
1296 Self {
1297 objectstore_url: None,
1298 max_concurrent_requests: 10,
1299 timeout: 60,
1300 }
1301 }
1302}
1303
1304#[derive(Copy, Clone, Debug, PartialEq, Eq)]
1307
1308pub enum EmitOutcomes {
1309 None,
1311 AsClientReports,
1313 AsOutcomes,
1315}
1316
1317impl EmitOutcomes {
1318 pub fn any(&self) -> bool {
1320 !matches!(self, EmitOutcomes::None)
1321 }
1322}
1323
1324impl Serialize for EmitOutcomes {
1325 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1326 where
1327 S: Serializer,
1328 {
1329 match self {
1331 Self::None => serializer.serialize_bool(false),
1332 Self::AsClientReports => serializer.serialize_str("as_client_reports"),
1333 Self::AsOutcomes => serializer.serialize_bool(true),
1334 }
1335 }
1336}
1337
1338struct EmitOutcomesVisitor;
1339
1340impl Visitor<'_> for EmitOutcomesVisitor {
1341 type Value = EmitOutcomes;
1342
1343 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
1344 formatter.write_str("true, false, or 'as_client_reports'")
1345 }
1346
1347 fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E>
1348 where
1349 E: serde::de::Error,
1350 {
1351 Ok(if v {
1352 EmitOutcomes::AsOutcomes
1353 } else {
1354 EmitOutcomes::None
1355 })
1356 }
1357
1358 fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
1359 where
1360 E: serde::de::Error,
1361 {
1362 if v == "as_client_reports" {
1363 Ok(EmitOutcomes::AsClientReports)
1364 } else {
1365 Err(E::invalid_value(Unexpected::Str(v), &"as_client_reports"))
1366 }
1367 }
1368}
1369
1370impl<'de> Deserialize<'de> for EmitOutcomes {
1371 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1372 where
1373 D: Deserializer<'de>,
1374 {
1375 deserializer.deserialize_any(EmitOutcomesVisitor)
1376 }
1377}
1378
1379#[derive(Serialize, Deserialize, Debug)]
1381#[serde(default)]
1382pub struct Outcomes {
1383 pub emit_outcomes: EmitOutcomes,
1387 pub emit_client_outcomes: bool,
1389 pub batch_size: usize,
1392 pub batch_interval: u64,
1395 pub source: Option<String>,
1398 pub aggregator: OutcomeAggregatorConfig,
1400}
1401
1402impl Default for Outcomes {
1403 fn default() -> Self {
1404 Outcomes {
1405 emit_outcomes: EmitOutcomes::AsClientReports,
1406 emit_client_outcomes: true,
1407 batch_size: 1000,
1408 batch_interval: 500,
1409 source: None,
1410 aggregator: OutcomeAggregatorConfig::default(),
1411 }
1412 }
1413}
1414
1415#[derive(Serialize, Deserialize, Debug, Default)]
1417pub struct MinimalConfig {
1418 pub relay: Relay,
1420}
1421
1422impl MinimalConfig {
1423 pub fn save_in_folder<P: AsRef<Path>>(&self, p: P) -> anyhow::Result<()> {
1425 let path = p.as_ref();
1426 if fs::metadata(path).is_err() {
1427 fs::create_dir_all(path)
1428 .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotOpenFile, path))?;
1429 }
1430 self.save(path)
1431 }
1432}
1433
1434impl ConfigObject for MinimalConfig {
1435 fn format() -> ConfigFormat {
1436 ConfigFormat::Yaml
1437 }
1438
1439 fn name() -> &'static str {
1440 "config"
1441 }
1442}
1443
1444mod config_relay_info {
1446 use serde::ser::SerializeMap;
1447
1448 use super::*;
1449
1450 #[derive(Debug, Serialize, Deserialize, Clone)]
1452 struct RelayInfoConfig {
1453 public_key: PublicKey,
1454 #[serde(default)]
1455 internal: bool,
1456 }
1457
1458 impl From<RelayInfoConfig> for RelayInfo {
1459 fn from(v: RelayInfoConfig) -> Self {
1460 RelayInfo {
1461 public_key: v.public_key,
1462 internal: v.internal,
1463 }
1464 }
1465 }
1466
1467 impl From<RelayInfo> for RelayInfoConfig {
1468 fn from(v: RelayInfo) -> Self {
1469 RelayInfoConfig {
1470 public_key: v.public_key,
1471 internal: v.internal,
1472 }
1473 }
1474 }
1475
1476 pub(super) fn deserialize<'de, D>(des: D) -> Result<HashMap<RelayId, RelayInfo>, D::Error>
1477 where
1478 D: Deserializer<'de>,
1479 {
1480 let map = HashMap::<RelayId, RelayInfoConfig>::deserialize(des)?;
1481 Ok(map.into_iter().map(|(k, v)| (k, v.into())).collect())
1482 }
1483
1484 pub(super) fn serialize<S>(elm: &HashMap<RelayId, RelayInfo>, ser: S) -> Result<S::Ok, S::Error>
1485 where
1486 S: Serializer,
1487 {
1488 let mut map = ser.serialize_map(Some(elm.len()))?;
1489
1490 for (k, v) in elm {
1491 map.serialize_entry(k, &RelayInfoConfig::from(v.clone()))?;
1492 }
1493
1494 map.end()
1495 }
1496}
1497
1498#[derive(Serialize, Deserialize, Debug, Default)]
1500pub struct AuthConfig {
1501 #[serde(default, skip_serializing_if = "is_default")]
1503 pub ready: ReadinessCondition,
1504
1505 #[serde(default, with = "config_relay_info")]
1507 pub static_relays: HashMap<RelayId, RelayInfo>,
1508
1509 #[serde(default = "default_max_age")]
1513 pub signature_max_age: u64,
1514}
1515
1516fn default_max_age() -> u64 {
1517 300
1518}
1519
1520#[derive(Serialize, Deserialize, Debug, Default)]
1522pub struct GeoIpConfig {
1523 pub path: Option<PathBuf>,
1525}
1526
1527#[derive(Serialize, Deserialize, Debug)]
1529#[serde(default)]
1530pub struct CardinalityLimiter {
1531 pub cache_vacuum_interval: u64,
1537}
1538
1539impl Default for CardinalityLimiter {
1540 fn default() -> Self {
1541 Self {
1542 cache_vacuum_interval: 180,
1543 }
1544 }
1545}
1546
1547#[derive(Serialize, Deserialize, Debug)]
1552#[serde(default)]
1553pub struct Health {
1554 pub refresh_interval_ms: u64,
1561 pub max_memory_bytes: Option<ByteSize>,
1566 pub max_memory_percent: f32,
1570 pub probe_timeout_ms: u64,
1577 pub memory_stat_refresh_frequency_ms: u64,
1583}
1584
1585impl Default for Health {
1586 fn default() -> Self {
1587 Self {
1588 refresh_interval_ms: 3000,
1589 max_memory_bytes: None,
1590 max_memory_percent: 0.95,
1591 probe_timeout_ms: 900,
1592 memory_stat_refresh_frequency_ms: 100,
1593 }
1594 }
1595}
1596
1597#[derive(Serialize, Deserialize, Debug)]
1599#[serde(default)]
1600pub struct Cogs {
1601 pub max_queue_size: u64,
1607 pub relay_resource_id: String,
1613}
1614
1615impl Default for Cogs {
1616 fn default() -> Self {
1617 Self {
1618 max_queue_size: 10_000,
1619 relay_resource_id: "relay_service".to_owned(),
1620 }
1621 }
1622}
1623
1624#[derive(Serialize, Deserialize, Debug, Default)]
1625struct ConfigValues {
1626 #[serde(default)]
1627 relay: Relay,
1628 #[serde(default)]
1629 http: Http,
1630 #[serde(default)]
1631 cache: Cache,
1632 #[serde(default)]
1633 spool: Spool,
1634 #[serde(default)]
1635 limits: Limits,
1636 #[serde(default)]
1637 logging: relay_log::LogConfig,
1638 #[serde(default)]
1639 routing: Routing,
1640 #[serde(default)]
1641 metrics: Metrics,
1642 #[serde(default)]
1643 sentry: relay_log::SentryConfig,
1644 #[serde(default)]
1645 processing: Processing,
1646 #[serde(default)]
1647 outcomes: Outcomes,
1648 #[serde(default)]
1649 aggregator: AggregatorServiceConfig,
1650 #[serde(default)]
1651 secondary_aggregators: Vec<ScopedAggregatorConfig>,
1652 #[serde(default)]
1653 auth: AuthConfig,
1654 #[serde(default)]
1655 geoip: GeoIpConfig,
1656 #[serde(default)]
1657 normalization: Normalization,
1658 #[serde(default)]
1659 cardinality_limiter: CardinalityLimiter,
1660 #[serde(default)]
1661 health: Health,
1662 #[serde(default)]
1663 cogs: Cogs,
1664}
1665
1666impl ConfigObject for ConfigValues {
1667 fn format() -> ConfigFormat {
1668 ConfigFormat::Yaml
1669 }
1670
1671 fn name() -> &'static str {
1672 "config"
1673 }
1674}
1675
1676pub struct Config {
1678 values: ConfigValues,
1679 credentials: Option<Credentials>,
1680 path: PathBuf,
1681}
1682
1683impl fmt::Debug for Config {
1684 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1685 f.debug_struct("Config")
1686 .field("path", &self.path)
1687 .field("values", &self.values)
1688 .finish()
1689 }
1690}
1691
1692impl Config {
1693 pub fn from_path<P: AsRef<Path>>(path: P) -> anyhow::Result<Config> {
1695 let path = env::current_dir()
1696 .map(|x| x.join(path.as_ref()))
1697 .unwrap_or_else(|_| path.as_ref().to_path_buf());
1698
1699 let config = Config {
1700 values: ConfigValues::load(&path)?,
1701 credentials: if Credentials::path(&path).exists() {
1702 Some(Credentials::load(&path)?)
1703 } else {
1704 None
1705 },
1706 path: path.clone(),
1707 };
1708
1709 if cfg!(not(feature = "processing")) && config.processing_enabled() {
1710 return Err(ConfigError::file(ConfigErrorKind::ProcessingNotAvailable, &path).into());
1711 }
1712
1713 Ok(config)
1714 }
1715
1716 pub fn from_json_value(value: serde_json::Value) -> anyhow::Result<Config> {
1720 Ok(Config {
1721 values: serde_json::from_value(value)
1722 .with_context(|| ConfigError::new(ConfigErrorKind::BadJson))?,
1723 credentials: None,
1724 path: PathBuf::new(),
1725 })
1726 }
1727
1728 pub fn apply_override(
1731 &mut self,
1732 mut overrides: OverridableConfig,
1733 ) -> anyhow::Result<&mut Self> {
1734 let relay = &mut self.values.relay;
1735
1736 if let Some(mode) = overrides.mode {
1737 relay.mode = mode
1738 .parse::<RelayMode>()
1739 .with_context(|| ConfigError::field("mode"))?;
1740 }
1741
1742 if let Some(deployment) = overrides.instance {
1743 relay.instance = deployment
1744 .parse::<RelayInstance>()
1745 .with_context(|| ConfigError::field("deployment"))?;
1746 }
1747
1748 if let Some(log_level) = overrides.log_level {
1749 self.values.logging.level = log_level.parse()?;
1750 }
1751
1752 if let Some(log_format) = overrides.log_format {
1753 self.values.logging.format = log_format.parse()?;
1754 }
1755
1756 if let Some(upstream) = overrides.upstream {
1757 relay.upstream = upstream
1758 .parse::<UpstreamDescriptor>()
1759 .with_context(|| ConfigError::field("upstream"))?;
1760 } else if let Some(upstream_dsn) = overrides.upstream_dsn {
1761 relay.upstream = upstream_dsn
1762 .parse::<Dsn>()
1763 .map(|dsn| UpstreamDescriptor::from_dsn(&dsn).into_owned())
1764 .with_context(|| ConfigError::field("upstream_dsn"))?;
1765 }
1766
1767 if let Some(host) = overrides.host {
1768 relay.host = host
1769 .parse::<IpAddr>()
1770 .with_context(|| ConfigError::field("host"))?;
1771 }
1772
1773 if let Some(port) = overrides.port {
1774 relay.port = port
1775 .as_str()
1776 .parse()
1777 .with_context(|| ConfigError::field("port"))?;
1778 }
1779
1780 let processing = &mut self.values.processing;
1781 if let Some(enabled) = overrides.processing {
1782 match enabled.to_lowercase().as_str() {
1783 "true" | "1" => processing.enabled = true,
1784 "false" | "0" | "" => processing.enabled = false,
1785 _ => return Err(ConfigError::field("processing").into()),
1786 }
1787 }
1788
1789 if let Some(redis) = overrides.redis_url {
1790 processing.redis = Some(RedisConfigs::Unified(RedisConfig::single(redis)))
1791 }
1792
1793 if let Some(kafka_url) = overrides.kafka_url {
1794 let existing = processing
1795 .kafka_config
1796 .iter_mut()
1797 .find(|e| e.name == "bootstrap.servers");
1798
1799 if let Some(config_param) = existing {
1800 config_param.value = kafka_url;
1801 } else {
1802 processing.kafka_config.push(KafkaConfigParam {
1803 name: "bootstrap.servers".to_owned(),
1804 value: kafka_url,
1805 })
1806 }
1807 }
1808 let id = if let Some(id) = overrides.id {
1810 let id = Uuid::parse_str(&id).with_context(|| ConfigError::field("id"))?;
1811 Some(id)
1812 } else {
1813 None
1814 };
1815 let public_key = if let Some(public_key) = overrides.public_key {
1816 let public_key = public_key
1817 .parse::<PublicKey>()
1818 .with_context(|| ConfigError::field("public_key"))?;
1819 Some(public_key)
1820 } else {
1821 None
1822 };
1823
1824 let secret_key = if let Some(secret_key) = overrides.secret_key {
1825 let secret_key = secret_key
1826 .parse::<SecretKey>()
1827 .with_context(|| ConfigError::field("secret_key"))?;
1828 Some(secret_key)
1829 } else {
1830 None
1831 };
1832 let outcomes = &mut self.values.outcomes;
1833 if overrides.outcome_source.is_some() {
1834 outcomes.source = overrides.outcome_source.take();
1835 }
1836
1837 if let Some(credentials) = &mut self.credentials {
1838 if let Some(id) = id {
1840 credentials.id = id;
1841 }
1842 if let Some(public_key) = public_key {
1843 credentials.public_key = public_key;
1844 }
1845 if let Some(secret_key) = secret_key {
1846 credentials.secret_key = secret_key
1847 }
1848 } else {
1849 match (id, public_key, secret_key) {
1851 (Some(id), Some(public_key), Some(secret_key)) => {
1852 self.credentials = Some(Credentials {
1853 secret_key,
1854 public_key,
1855 id,
1856 })
1857 }
1858 (None, None, None) => {
1859 }
1862 _ => {
1863 return Err(ConfigError::field("incomplete credentials").into());
1864 }
1865 }
1866 }
1867
1868 let limits = &mut self.values.limits;
1869 if let Some(shutdown_timeout) = overrides.shutdown_timeout
1870 && let Ok(shutdown_timeout) = shutdown_timeout.parse::<u64>()
1871 {
1872 limits.shutdown_timeout = shutdown_timeout;
1873 }
1874
1875 if let Some(server_name) = overrides.server_name {
1876 self.values.sentry.server_name = Some(server_name.into());
1877 }
1878
1879 Ok(self)
1880 }
1881
1882 pub fn config_exists<P: AsRef<Path>>(path: P) -> bool {
1884 fs::metadata(ConfigValues::path(path.as_ref())).is_ok()
1885 }
1886
1887 pub fn path(&self) -> &Path {
1889 &self.path
1890 }
1891
1892 pub fn to_yaml_string(&self) -> anyhow::Result<String> {
1894 serde_yaml::to_string(&self.values)
1895 .with_context(|| ConfigError::new(ConfigErrorKind::CouldNotWriteFile))
1896 }
1897
1898 pub fn regenerate_credentials(&mut self, save: bool) -> anyhow::Result<()> {
1902 let creds = Credentials::generate();
1903 if save {
1904 creds.save(&self.path)?;
1905 }
1906 self.credentials = Some(creds);
1907 Ok(())
1908 }
1909
1910 pub fn credentials(&self) -> Option<&Credentials> {
1912 self.credentials.as_ref()
1913 }
1914
1915 pub fn replace_credentials(
1919 &mut self,
1920 credentials: Option<Credentials>,
1921 ) -> anyhow::Result<bool> {
1922 if self.credentials == credentials {
1923 return Ok(false);
1924 }
1925
1926 match credentials {
1927 Some(ref creds) => {
1928 creds.save(&self.path)?;
1929 }
1930 None => {
1931 let path = Credentials::path(&self.path);
1932 if fs::metadata(&path).is_ok() {
1933 fs::remove_file(&path).with_context(|| {
1934 ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path)
1935 })?;
1936 }
1937 }
1938 }
1939
1940 self.credentials = credentials;
1941 Ok(true)
1942 }
1943
1944 pub fn has_credentials(&self) -> bool {
1946 self.credentials.is_some()
1947 }
1948
1949 pub fn secret_key(&self) -> Option<&SecretKey> {
1951 self.credentials.as_ref().map(|x| &x.secret_key)
1952 }
1953
1954 pub fn public_key(&self) -> Option<&PublicKey> {
1956 self.credentials.as_ref().map(|x| &x.public_key)
1957 }
1958
1959 pub fn relay_id(&self) -> Option<&RelayId> {
1961 self.credentials.as_ref().map(|x| &x.id)
1962 }
1963
1964 pub fn relay_mode(&self) -> RelayMode {
1966 self.values.relay.mode
1967 }
1968
1969 pub fn relay_instance(&self) -> RelayInstance {
1971 self.values.relay.instance
1972 }
1973
1974 pub fn upstream_descriptor(&self) -> &UpstreamDescriptor<'_> {
1976 &self.values.relay.upstream
1977 }
1978
1979 pub fn http_host_header(&self) -> Option<&str> {
1981 self.values.http.host_header.as_deref()
1982 }
1983
1984 pub fn listen_addr(&self) -> SocketAddr {
1986 (self.values.relay.host, self.values.relay.port).into()
1987 }
1988
1989 pub fn listen_addr_internal(&self) -> Option<SocketAddr> {
1997 match (
1998 self.values.relay.internal_host,
1999 self.values.relay.internal_port,
2000 ) {
2001 (Some(host), None) => Some((host, self.values.relay.port).into()),
2002 (None, Some(port)) => Some((self.values.relay.host, port).into()),
2003 (Some(host), Some(port)) => Some((host, port).into()),
2004 (None, None) => None,
2005 }
2006 }
2007
2008 pub fn tls_listen_addr(&self) -> Option<SocketAddr> {
2010 if self.values.relay.tls_identity_path.is_some() {
2011 let port = self.values.relay.tls_port.unwrap_or(3443);
2012 Some((self.values.relay.host, port).into())
2013 } else {
2014 None
2015 }
2016 }
2017
2018 pub fn tls_identity_path(&self) -> Option<&Path> {
2020 self.values.relay.tls_identity_path.as_deref()
2021 }
2022
2023 pub fn tls_identity_password(&self) -> Option<&str> {
2025 self.values.relay.tls_identity_password.as_deref()
2026 }
2027
2028 pub fn override_project_ids(&self) -> bool {
2032 self.values.relay.override_project_ids
2033 }
2034
2035 pub fn requires_auth(&self) -> bool {
2039 match self.values.auth.ready {
2040 ReadinessCondition::Authenticated => self.relay_mode() == RelayMode::Managed,
2041 ReadinessCondition::Always => false,
2042 }
2043 }
2044
2045 pub fn http_auth_interval(&self) -> Option<Duration> {
2049 if self.processing_enabled() {
2050 return None;
2051 }
2052
2053 match self.values.http.auth_interval {
2054 None | Some(0) => None,
2055 Some(secs) => Some(Duration::from_secs(secs)),
2056 }
2057 }
2058
2059 pub fn http_outage_grace_period(&self) -> Duration {
2062 Duration::from_secs(self.values.http.outage_grace_period)
2063 }
2064
2065 pub fn http_retry_delay(&self) -> Duration {
2070 Duration::from_secs(self.values.http.retry_delay)
2071 }
2072
2073 pub fn http_project_failure_interval(&self) -> Duration {
2075 Duration::from_secs(self.values.http.project_failure_interval)
2076 }
2077
2078 pub fn http_encoding(&self) -> HttpEncoding {
2080 self.values.http.encoding
2081 }
2082
2083 pub fn http_global_metrics(&self) -> bool {
2085 self.values.http.global_metrics
2086 }
2087
2088 pub fn emit_outcomes(&self) -> EmitOutcomes {
2093 if self.processing_enabled() {
2094 return EmitOutcomes::AsOutcomes;
2095 }
2096 self.values.outcomes.emit_outcomes
2097 }
2098
2099 pub fn emit_client_outcomes(&self) -> bool {
2109 self.values.outcomes.emit_client_outcomes
2110 }
2111
2112 pub fn outcome_batch_size(&self) -> usize {
2114 self.values.outcomes.batch_size
2115 }
2116
2117 pub fn outcome_batch_interval(&self) -> Duration {
2119 Duration::from_millis(self.values.outcomes.batch_interval)
2120 }
2121
2122 pub fn outcome_source(&self) -> Option<&str> {
2124 self.values.outcomes.source.as_deref()
2125 }
2126
2127 pub fn outcome_aggregator(&self) -> &OutcomeAggregatorConfig {
2129 &self.values.outcomes.aggregator
2130 }
2131
2132 pub fn logging(&self) -> &relay_log::LogConfig {
2134 &self.values.logging
2135 }
2136
2137 pub fn sentry(&self) -> &relay_log::SentryConfig {
2139 &self.values.sentry
2140 }
2141
2142 pub fn statsd_addrs(&self) -> anyhow::Result<Vec<SocketAddr>> {
2146 if let Some(ref addr) = self.values.metrics.statsd {
2147 let addrs = addr
2148 .as_str()
2149 .to_socket_addrs()
2150 .with_context(|| ConfigError::file(ConfigErrorKind::InvalidValue, &self.path))?
2151 .collect();
2152 Ok(addrs)
2153 } else {
2154 Ok(vec![])
2155 }
2156 }
2157
2158 pub fn metrics_prefix(&self) -> &str {
2160 &self.values.metrics.prefix
2161 }
2162
2163 pub fn metrics_default_tags(&self) -> &BTreeMap<String, String> {
2165 &self.values.metrics.default_tags
2166 }
2167
2168 pub fn metrics_hostname_tag(&self) -> Option<&str> {
2170 self.values.metrics.hostname_tag.as_deref()
2171 }
2172
2173 pub fn metrics_sample_rate(&self) -> f32 {
2175 self.values.metrics.sample_rate
2176 }
2177
2178 pub fn metrics_aggregate(&self) -> bool {
2180 self.values.metrics.aggregate
2181 }
2182
2183 pub fn metrics_allow_high_cardinality_tags(&self) -> bool {
2185 self.values.metrics.allow_high_cardinality_tags
2186 }
2187
2188 pub fn metrics_periodic_interval(&self) -> Option<Duration> {
2192 match self.values.metrics.periodic_secs {
2193 0 => None,
2194 secs => Some(Duration::from_secs(secs)),
2195 }
2196 }
2197
2198 pub fn http_timeout(&self) -> Duration {
2200 Duration::from_secs(self.values.http.timeout.into())
2201 }
2202
2203 pub fn http_connection_timeout(&self) -> Duration {
2205 Duration::from_secs(self.values.http.connection_timeout.into())
2206 }
2207
2208 pub fn http_max_retry_interval(&self) -> Duration {
2210 Duration::from_secs(self.values.http.max_retry_interval.into())
2211 }
2212
2213 pub fn project_cache_expiry(&self) -> Duration {
2215 Duration::from_secs(self.values.cache.project_expiry.into())
2216 }
2217
2218 pub fn request_full_project_config(&self) -> bool {
2220 self.values.cache.project_request_full_config
2221 }
2222
2223 pub fn relay_cache_expiry(&self) -> Duration {
2225 Duration::from_secs(self.values.cache.relay_expiry.into())
2226 }
2227
2228 pub fn envelope_buffer_size(&self) -> usize {
2230 self.values
2231 .cache
2232 .envelope_buffer_size
2233 .try_into()
2234 .unwrap_or(usize::MAX)
2235 }
2236
2237 pub fn cache_miss_expiry(&self) -> Duration {
2239 Duration::from_secs(self.values.cache.miss_expiry.into())
2240 }
2241
2242 pub fn project_grace_period(&self) -> Duration {
2244 Duration::from_secs(self.values.cache.project_grace_period.into())
2245 }
2246
2247 pub fn project_refresh_interval(&self) -> Option<Duration> {
2251 self.values
2252 .cache
2253 .project_refresh_interval
2254 .map(Into::into)
2255 .map(Duration::from_secs)
2256 }
2257
2258 pub fn query_batch_interval(&self) -> Duration {
2261 Duration::from_millis(self.values.cache.batch_interval.into())
2262 }
2263
2264 pub fn downstream_relays_batch_interval(&self) -> Duration {
2266 Duration::from_millis(self.values.cache.downstream_relays_batch_interval.into())
2267 }
2268
2269 pub fn local_cache_interval(&self) -> Duration {
2271 Duration::from_secs(self.values.cache.file_interval.into())
2272 }
2273
2274 pub fn global_config_fetch_interval(&self) -> Duration {
2277 Duration::from_secs(self.values.cache.global_config_fetch_interval.into())
2278 }
2279
2280 pub fn spool_envelopes_path(&self, partition_id: u8) -> Option<PathBuf> {
2285 let mut path = self
2286 .values
2287 .spool
2288 .envelopes
2289 .path
2290 .as_ref()
2291 .map(|path| path.to_owned())?;
2292
2293 if partition_id == 0 {
2294 return Some(path);
2295 }
2296
2297 let file_name = path.file_name().and_then(|f| f.to_str())?;
2298 let new_file_name = format!("{file_name}.{partition_id}");
2299 path.set_file_name(new_file_name);
2300
2301 Some(path)
2302 }
2303
2304 pub fn spool_envelopes_max_disk_size(&self) -> usize {
2306 self.values.spool.envelopes.max_disk_size.as_bytes()
2307 }
2308
2309 pub fn spool_envelopes_batch_size_bytes(&self) -> usize {
2312 self.values.spool.envelopes.batch_size_bytes.as_bytes()
2313 }
2314
2315 pub fn spool_envelopes_max_age(&self) -> Duration {
2317 Duration::from_secs(self.values.spool.envelopes.max_envelope_delay_secs)
2318 }
2319
2320 pub fn spool_disk_usage_refresh_frequency_ms(&self) -> Duration {
2322 Duration::from_millis(self.values.spool.envelopes.disk_usage_refresh_frequency_ms)
2323 }
2324
2325 pub fn spool_max_backpressure_envelopes(&self) -> usize {
2327 self.values.spool.envelopes.max_backpressure_envelopes
2328 }
2329
2330 pub fn spool_max_backpressure_memory_percent(&self) -> f32 {
2332 self.values.spool.envelopes.max_backpressure_memory_percent
2333 }
2334
2335 pub fn spool_partitions(&self) -> NonZeroU8 {
2337 self.values.spool.envelopes.partitions
2338 }
2339
2340 pub fn max_event_size(&self) -> usize {
2342 self.values.limits.max_event_size.as_bytes()
2343 }
2344
2345 pub fn max_attachment_size(&self) -> usize {
2347 self.values.limits.max_attachment_size.as_bytes()
2348 }
2349
2350 pub fn max_attachments_size(&self) -> usize {
2353 self.values.limits.max_attachments_size.as_bytes()
2354 }
2355
2356 pub fn max_client_reports_size(&self) -> usize {
2358 self.values.limits.max_client_reports_size.as_bytes()
2359 }
2360
2361 pub fn max_check_in_size(&self) -> usize {
2363 self.values.limits.max_check_in_size.as_bytes()
2364 }
2365
2366 pub fn max_log_size(&self) -> usize {
2368 self.values.limits.max_log_size.as_bytes()
2369 }
2370
2371 pub fn max_span_size(&self) -> usize {
2373 self.values.limits.max_span_size.as_bytes()
2374 }
2375
2376 pub fn max_container_size(&self) -> usize {
2378 self.values.limits.max_container_size.as_bytes()
2379 }
2380
2381 pub fn max_logs_integration_size(&self) -> usize {
2383 self.max_container_size()
2385 }
2386
2387 pub fn max_spans_integration_size(&self) -> usize {
2389 self.max_container_size()
2391 }
2392
2393 pub fn max_envelope_size(&self) -> usize {
2397 self.values.limits.max_envelope_size.as_bytes()
2398 }
2399
2400 pub fn max_session_count(&self) -> usize {
2402 self.values.limits.max_session_count
2403 }
2404
2405 pub fn max_statsd_size(&self) -> usize {
2407 self.values.limits.max_statsd_size.as_bytes()
2408 }
2409
2410 pub fn max_metric_buckets_size(&self) -> usize {
2412 self.values.limits.max_metric_buckets_size.as_bytes()
2413 }
2414
2415 pub fn max_api_payload_size(&self) -> usize {
2417 self.values.limits.max_api_payload_size.as_bytes()
2418 }
2419
2420 pub fn max_api_file_upload_size(&self) -> usize {
2422 self.values.limits.max_api_file_upload_size.as_bytes()
2423 }
2424
2425 pub fn max_api_chunk_upload_size(&self) -> usize {
2427 self.values.limits.max_api_chunk_upload_size.as_bytes()
2428 }
2429
2430 pub fn max_profile_size(&self) -> usize {
2432 self.values.limits.max_profile_size.as_bytes()
2433 }
2434
2435 pub fn max_trace_metric_size(&self) -> usize {
2437 self.values.limits.max_trace_metric_size.as_bytes()
2438 }
2439
2440 pub fn max_replay_compressed_size(&self) -> usize {
2442 self.values.limits.max_replay_compressed_size.as_bytes()
2443 }
2444
2445 pub fn max_replay_uncompressed_size(&self) -> usize {
2447 self.values.limits.max_replay_uncompressed_size.as_bytes()
2448 }
2449
2450 pub fn max_replay_message_size(&self) -> usize {
2456 self.values.limits.max_replay_message_size.as_bytes()
2457 }
2458
2459 pub fn max_concurrent_requests(&self) -> usize {
2461 self.values.limits.max_concurrent_requests
2462 }
2463
2464 pub fn max_concurrent_queries(&self) -> usize {
2466 self.values.limits.max_concurrent_queries
2467 }
2468
2469 pub fn query_timeout(&self) -> Duration {
2471 Duration::from_secs(self.values.limits.query_timeout)
2472 }
2473
2474 pub fn shutdown_timeout(&self) -> Duration {
2477 Duration::from_secs(self.values.limits.shutdown_timeout)
2478 }
2479
2480 pub fn keepalive_timeout(&self) -> Duration {
2484 Duration::from_secs(self.values.limits.keepalive_timeout)
2485 }
2486
2487 pub fn idle_timeout(&self) -> Option<Duration> {
2489 self.values.limits.idle_timeout.map(Duration::from_secs)
2490 }
2491
2492 pub fn max_connections(&self) -> Option<usize> {
2494 self.values.limits.max_connections
2495 }
2496
2497 pub fn tcp_listen_backlog(&self) -> u32 {
2499 self.values.limits.tcp_listen_backlog
2500 }
2501
2502 pub fn cpu_concurrency(&self) -> usize {
2504 self.values.limits.max_thread_count
2505 }
2506
2507 pub fn pool_concurrency(&self) -> usize {
2509 self.values.limits.max_pool_concurrency
2510 }
2511
2512 pub fn query_batch_size(&self) -> usize {
2514 self.values.cache.batch_size
2515 }
2516
2517 pub fn project_configs_path(&self) -> PathBuf {
2519 self.path.join("projects")
2520 }
2521
2522 pub fn processing_enabled(&self) -> bool {
2524 self.values.processing.enabled
2525 }
2526
2527 pub fn normalization_level(&self) -> NormalizationLevel {
2529 self.values.normalization.level
2530 }
2531
2532 pub fn geoip_path(&self) -> Option<&Path> {
2534 self.values
2535 .geoip
2536 .path
2537 .as_deref()
2538 .or(self.values.processing.geoip_path.as_deref())
2539 }
2540
2541 pub fn max_secs_in_future(&self) -> i64 {
2545 self.values.processing.max_secs_in_future.into()
2546 }
2547
2548 pub fn max_session_secs_in_past(&self) -> i64 {
2550 self.values.processing.max_session_secs_in_past.into()
2551 }
2552
2553 pub fn kafka_configs(
2555 &self,
2556 topic: KafkaTopic,
2557 ) -> Result<KafkaTopicConfig<'_>, KafkaConfigError> {
2558 self.values.processing.topics.get(topic).kafka_configs(
2559 &self.values.processing.kafka_config,
2560 &self.values.processing.secondary_kafka_configs,
2561 )
2562 }
2563
2564 pub fn kafka_validate_topics(&self) -> bool {
2566 self.values.processing.kafka_validate_topics
2567 }
2568
2569 pub fn unused_topic_assignments(&self) -> &relay_kafka::Unused {
2571 &self.values.processing.topics.unused
2572 }
2573
2574 pub fn upload(&self) -> &UploadServiceConfig {
2576 &self.values.processing.upload
2577 }
2578
2579 pub fn redis(&self) -> Option<RedisConfigsRef<'_>> {
2582 let redis_configs = self.values.processing.redis.as_ref()?;
2583
2584 Some(build_redis_configs(
2585 redis_configs,
2586 self.cpu_concurrency() as u32,
2587 self.pool_concurrency() as u32,
2588 ))
2589 }
2590
2591 pub fn attachment_chunk_size(&self) -> usize {
2593 self.values.processing.attachment_chunk_size.as_bytes()
2594 }
2595
2596 pub fn metrics_max_batch_size_bytes(&self) -> usize {
2598 self.values.aggregator.max_flush_bytes
2599 }
2600
2601 pub fn projectconfig_cache_prefix(&self) -> &str {
2604 &self.values.processing.projectconfig_cache_prefix
2605 }
2606
2607 pub fn max_rate_limit(&self) -> Option<u64> {
2609 self.values.processing.max_rate_limit.map(u32::into)
2610 }
2611
2612 pub fn quota_cache_ratio(&self) -> Option<f32> {
2614 self.values.processing.quota_cache_ratio
2615 }
2616
2617 pub fn quota_cache_max(&self) -> Option<f32> {
2619 self.values.processing.quota_cache_max
2620 }
2621
2622 pub fn cardinality_limiter_cache_vacuum_interval(&self) -> Duration {
2626 Duration::from_secs(self.values.cardinality_limiter.cache_vacuum_interval)
2627 }
2628
2629 pub fn health_refresh_interval(&self) -> Duration {
2631 Duration::from_millis(self.values.health.refresh_interval_ms)
2632 }
2633
2634 pub fn health_max_memory_watermark_bytes(&self) -> u64 {
2636 self.values
2637 .health
2638 .max_memory_bytes
2639 .as_ref()
2640 .map_or(u64::MAX, |b| b.as_bytes() as u64)
2641 }
2642
2643 pub fn health_max_memory_watermark_percent(&self) -> f32 {
2645 self.values.health.max_memory_percent
2646 }
2647
2648 pub fn health_probe_timeout(&self) -> Duration {
2650 Duration::from_millis(self.values.health.probe_timeout_ms)
2651 }
2652
2653 pub fn memory_stat_refresh_frequency_ms(&self) -> u64 {
2655 self.values.health.memory_stat_refresh_frequency_ms
2656 }
2657
2658 pub fn cogs_max_queue_size(&self) -> u64 {
2660 self.values.cogs.max_queue_size
2661 }
2662
2663 pub fn cogs_relay_resource_id(&self) -> &str {
2665 &self.values.cogs.relay_resource_id
2666 }
2667
2668 pub fn default_aggregator_config(&self) -> &AggregatorServiceConfig {
2670 &self.values.aggregator
2671 }
2672
2673 pub fn secondary_aggregator_configs(&self) -> &Vec<ScopedAggregatorConfig> {
2675 &self.values.secondary_aggregators
2676 }
2677
2678 pub fn aggregator_config_for(&self, namespace: MetricNamespace) -> &AggregatorServiceConfig {
2680 for entry in &self.values.secondary_aggregators {
2681 if entry.condition.matches(Some(namespace)) {
2682 return &entry.config;
2683 }
2684 }
2685 &self.values.aggregator
2686 }
2687
2688 pub fn static_relays(&self) -> &HashMap<RelayId, RelayInfo> {
2690 &self.values.auth.static_relays
2691 }
2692
2693 pub fn signature_max_age(&self) -> Duration {
2695 Duration::from_secs(self.values.auth.signature_max_age)
2696 }
2697
2698 pub fn accept_unknown_items(&self) -> bool {
2700 let forward = self.values.routing.accept_unknown_items;
2701 forward.unwrap_or_else(|| !self.processing_enabled())
2702 }
2703}
2704
2705impl Default for Config {
2706 fn default() -> Self {
2707 Self {
2708 values: ConfigValues::default(),
2709 credentials: None,
2710 path: PathBuf::new(),
2711 }
2712 }
2713}
2714
2715#[cfg(test)]
2716mod tests {
2717
2718 use super::*;
2719
2720 #[test]
2722 fn test_event_buffer_size() {
2723 let yaml = r###"
2724cache:
2725 event_buffer_size: 1000000
2726 event_expiry: 1800
2727"###;
2728
2729 let values: ConfigValues = serde_yaml::from_str(yaml).unwrap();
2730 assert_eq!(values.cache.envelope_buffer_size, 1_000_000);
2731 assert_eq!(values.cache.envelope_expiry, 1800);
2732 }
2733
2734 #[test]
2735 fn test_emit_outcomes() {
2736 for (serialized, deserialized) in &[
2737 ("true", EmitOutcomes::AsOutcomes),
2738 ("false", EmitOutcomes::None),
2739 ("\"as_client_reports\"", EmitOutcomes::AsClientReports),
2740 ] {
2741 let value: EmitOutcomes = serde_json::from_str(serialized).unwrap();
2742 assert_eq!(value, *deserialized);
2743 assert_eq!(serde_json::to_string(&value).unwrap(), *serialized);
2744 }
2745 }
2746
2747 #[test]
2748 fn test_emit_outcomes_invalid() {
2749 assert!(serde_json::from_str::<EmitOutcomes>("asdf").is_err());
2750 }
2751}