1use std::collections::{BTreeMap, HashMap};
2use std::error::Error;
3use std::io::Write;
4use std::net::{IpAddr, SocketAddr, ToSocketAddrs};
5use std::num::NonZeroU8;
6use std::path::{Path, PathBuf};
7use std::str::FromStr;
8use std::time::Duration;
9use std::{env, fmt, fs, io};
10
11use anyhow::Context;
12use relay_auth::{generate_key_pair, generate_relay_id, PublicKey, RelayId, SecretKey};
13use relay_common::Dsn;
14use relay_kafka::{
15 ConfigError as KafkaConfigError, KafkaConfigParam, KafkaParams, KafkaTopic, TopicAssignment,
16 TopicAssignments,
17};
18use relay_metrics::MetricNamespace;
19use serde::de::{DeserializeOwned, Unexpected, Visitor};
20use serde::{Deserialize, Deserializer, Serialize, Serializer};
21use uuid::Uuid;
22
23use crate::aggregator::{AggregatorServiceConfig, ScopedAggregatorConfig};
24use crate::byte_size::ByteSize;
25use crate::upstream::UpstreamDescriptor;
26use crate::{build_redis_configs, RedisConfig, RedisConfigs, RedisConfigsRef};
27
28const DEFAULT_NETWORK_OUTAGE_GRACE_PERIOD: u64 = 10;
29
30static CONFIG_YAML_HEADER: &str = r###"# Please see the relevant documentation.
31# Performance tuning: https://docs.sentry.io/product/relay/operating-guidelines/
32# All config options: https://docs.sentry.io/product/relay/options/
33"###;
34
35#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
37#[non_exhaustive]
38pub enum ConfigErrorKind {
39 CouldNotOpenFile,
41 CouldNotWriteFile,
43 BadYaml,
45 BadJson,
47 InvalidValue,
49 ProcessingNotAvailable,
52}
53
54impl fmt::Display for ConfigErrorKind {
55 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
56 match self {
57 Self::CouldNotOpenFile => write!(f, "could not open config file"),
58 Self::CouldNotWriteFile => write!(f, "could not write config file"),
59 Self::BadYaml => write!(f, "could not parse yaml config file"),
60 Self::BadJson => write!(f, "could not parse json config file"),
61 Self::InvalidValue => write!(f, "invalid config value"),
62 Self::ProcessingNotAvailable => write!(
63 f,
64 "was not compiled with processing, cannot enable processing"
65 ),
66 }
67 }
68}
69
70#[derive(Debug)]
72enum ConfigErrorSource {
73 None,
75 File(PathBuf),
77 FieldOverride(String),
79}
80
81impl Default for ConfigErrorSource {
82 fn default() -> Self {
83 Self::None
84 }
85}
86
87impl fmt::Display for ConfigErrorSource {
88 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
89 match self {
90 ConfigErrorSource::None => Ok(()),
91 ConfigErrorSource::File(file_name) => {
92 write!(f, " (file {})", file_name.display())
93 }
94 ConfigErrorSource::FieldOverride(name) => write!(f, " (field {name})"),
95 }
96 }
97}
98
99#[derive(Debug)]
101pub struct ConfigError {
102 source: ConfigErrorSource,
103 kind: ConfigErrorKind,
104}
105
106impl ConfigError {
107 #[inline]
108 fn new(kind: ConfigErrorKind) -> Self {
109 Self {
110 source: ConfigErrorSource::None,
111 kind,
112 }
113 }
114
115 #[inline]
116 fn field(field: &'static str) -> Self {
117 Self {
118 source: ConfigErrorSource::FieldOverride(field.to_owned()),
119 kind: ConfigErrorKind::InvalidValue,
120 }
121 }
122
123 #[inline]
124 fn file(kind: ConfigErrorKind, p: impl AsRef<Path>) -> Self {
125 Self {
126 source: ConfigErrorSource::File(p.as_ref().to_path_buf()),
127 kind,
128 }
129 }
130
131 pub fn kind(&self) -> ConfigErrorKind {
133 self.kind
134 }
135}
136
137impl fmt::Display for ConfigError {
138 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
139 write!(f, "{}{}", self.kind(), self.source)
140 }
141}
142
143impl Error for ConfigError {}
144
145enum ConfigFormat {
146 Yaml,
147 Json,
148}
149
150impl ConfigFormat {
151 pub fn extension(&self) -> &'static str {
152 match self {
153 ConfigFormat::Yaml => "yml",
154 ConfigFormat::Json => "json",
155 }
156 }
157}
158
159trait ConfigObject: DeserializeOwned + Serialize {
160 fn format() -> ConfigFormat;
162
163 fn name() -> &'static str;
165
166 fn path(base: &Path) -> PathBuf {
168 base.join(format!("{}.{}", Self::name(), Self::format().extension()))
169 }
170
171 fn load(base: &Path) -> anyhow::Result<Self> {
173 let path = Self::path(base);
174
175 let f = fs::File::open(&path)
176 .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotOpenFile, &path))?;
177
178 match Self::format() {
179 ConfigFormat::Yaml => serde_yaml::from_reader(io::BufReader::new(f))
180 .with_context(|| ConfigError::file(ConfigErrorKind::BadYaml, &path)),
181 ConfigFormat::Json => serde_json::from_reader(io::BufReader::new(f))
182 .with_context(|| ConfigError::file(ConfigErrorKind::BadJson, &path)),
183 }
184 }
185
186 fn save(&self, base: &Path) -> anyhow::Result<()> {
188 let path = Self::path(base);
189 let mut options = fs::OpenOptions::new();
190 options.write(true).truncate(true).create(true);
191
192 #[cfg(unix)]
194 {
195 use std::os::unix::fs::OpenOptionsExt;
196 options.mode(0o600);
197 }
198
199 let mut f = options
200 .open(&path)
201 .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path))?;
202
203 match Self::format() {
204 ConfigFormat::Yaml => {
205 f.write_all(CONFIG_YAML_HEADER.as_bytes())?;
206 serde_yaml::to_writer(&mut f, self)
207 .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path))?
208 }
209 ConfigFormat::Json => serde_json::to_writer_pretty(&mut f, self)
210 .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path))?,
211 }
212
213 f.write_all(b"\n").ok();
214
215 Ok(())
216 }
217}
218
219#[derive(Debug, Default)]
222pub struct OverridableConfig {
223 pub mode: Option<String>,
225 pub instance: Option<String>,
227 pub log_level: Option<String>,
229 pub log_format: Option<String>,
231 pub upstream: Option<String>,
233 pub upstream_dsn: Option<String>,
235 pub host: Option<String>,
237 pub port: Option<String>,
239 pub processing: Option<String>,
241 pub kafka_url: Option<String>,
243 pub redis_url: Option<String>,
245 pub id: Option<String>,
247 pub secret_key: Option<String>,
249 pub public_key: Option<String>,
251 pub outcome_source: Option<String>,
253 pub shutdown_timeout: Option<String>,
255 pub server_name: Option<String>,
257}
258
259#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
261pub struct Credentials {
262 pub secret_key: SecretKey,
264 pub public_key: PublicKey,
266 pub id: RelayId,
268}
269
270impl Credentials {
271 pub fn generate() -> Self {
273 relay_log::info!("generating new relay credentials");
274 let (sk, pk) = generate_key_pair();
275 Self {
276 secret_key: sk,
277 public_key: pk,
278 id: generate_relay_id(),
279 }
280 }
281
282 pub fn to_json_string(&self) -> anyhow::Result<String> {
284 serde_json::to_string(self)
285 .with_context(|| ConfigError::new(ConfigErrorKind::CouldNotWriteFile))
286 }
287}
288
289impl ConfigObject for Credentials {
290 fn format() -> ConfigFormat {
291 ConfigFormat::Json
292 }
293 fn name() -> &'static str {
294 "credentials"
295 }
296}
297
298#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
300#[serde(rename_all = "camelCase")]
301pub struct RelayInfo {
302 pub public_key: PublicKey,
304
305 #[serde(default)]
307 pub internal: bool,
308}
309
310impl RelayInfo {
311 pub fn new(public_key: PublicKey) -> Self {
313 Self {
314 public_key,
315 internal: false,
316 }
317 }
318}
319
320#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize)]
322#[serde(rename_all = "camelCase")]
323pub enum RelayMode {
324 Proxy,
330
331 Static,
336
337 Managed,
343
344 Capture,
348}
349
350impl fmt::Display for RelayMode {
351 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
352 match self {
353 RelayMode::Proxy => write!(f, "proxy"),
354 RelayMode::Static => write!(f, "static"),
355 RelayMode::Managed => write!(f, "managed"),
356 RelayMode::Capture => write!(f, "capture"),
357 }
358 }
359}
360
361#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize)]
363#[serde(rename_all = "camelCase")]
364pub enum RelayInstance {
365 Default,
367
368 Canary,
370}
371
372impl RelayInstance {
373 pub fn is_canary(&self) -> bool {
375 matches!(self, RelayInstance::Canary)
376 }
377}
378
379impl fmt::Display for RelayInstance {
380 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
381 match self {
382 RelayInstance::Default => write!(f, "default"),
383 RelayInstance::Canary => write!(f, "canary"),
384 }
385 }
386}
387
388impl FromStr for RelayInstance {
389 type Err = fmt::Error;
390
391 fn from_str(s: &str) -> Result<Self, Self::Err> {
392 match s {
393 "canary" => Ok(RelayInstance::Canary),
394 _ => Ok(RelayInstance::Default),
395 }
396 }
397}
398
399#[derive(Clone, Copy, Debug, Eq, PartialEq)]
401pub struct ParseRelayModeError;
402
403impl fmt::Display for ParseRelayModeError {
404 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
405 write!(
406 f,
407 "Relay mode must be one of: managed, static, proxy, capture"
408 )
409 }
410}
411
412impl Error for ParseRelayModeError {}
413
414impl FromStr for RelayMode {
415 type Err = ParseRelayModeError;
416
417 fn from_str(s: &str) -> Result<Self, Self::Err> {
418 match s {
419 "proxy" => Ok(RelayMode::Proxy),
420 "static" => Ok(RelayMode::Static),
421 "managed" => Ok(RelayMode::Managed),
422 "capture" => Ok(RelayMode::Capture),
423 _ => Err(ParseRelayModeError),
424 }
425 }
426}
427
428fn is_default<T: Default + PartialEq>(t: &T) -> bool {
430 *t == T::default()
431}
432
433fn is_docker() -> bool {
435 if fs::metadata("/.dockerenv").is_ok() {
436 return true;
437 }
438
439 fs::read_to_string("/proc/self/cgroup").is_ok_and(|s| s.contains("/docker"))
440}
441
442fn default_host() -> IpAddr {
444 if is_docker() {
445 "0.0.0.0".parse().unwrap()
447 } else {
448 "127.0.0.1".parse().unwrap()
449 }
450}
451
452#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize)]
456#[serde(rename_all = "lowercase")]
457pub enum ReadinessCondition {
458 Authenticated,
467 Always,
469}
470
471impl Default for ReadinessCondition {
472 fn default() -> Self {
473 Self::Authenticated
474 }
475}
476
477#[derive(Serialize, Deserialize, Debug)]
479#[serde(default)]
480pub struct Relay {
481 pub mode: RelayMode,
483 pub instance: RelayInstance,
485 pub upstream: UpstreamDescriptor<'static>,
487 pub host: IpAddr,
489 pub port: u16,
491 #[serde(skip_serializing)]
493 pub tls_port: Option<u16>,
494 #[serde(skip_serializing)]
496 pub tls_identity_path: Option<PathBuf>,
497 #[serde(skip_serializing)]
499 pub tls_identity_password: Option<String>,
500 #[serde(skip_serializing_if = "is_default")]
505 pub override_project_ids: bool,
506}
507
508impl Default for Relay {
509 fn default() -> Self {
510 Relay {
511 mode: RelayMode::Managed,
512 instance: RelayInstance::Default,
513 upstream: "https://sentry.io/".parse().unwrap(),
514 host: default_host(),
515 port: 3000,
516 tls_port: None,
517 tls_identity_path: None,
518 tls_identity_password: None,
519 override_project_ids: false,
520 }
521 }
522}
523
524#[derive(Serialize, Deserialize, Debug)]
526#[serde(default)]
527pub struct Metrics {
528 pub statsd: Option<String>,
532 pub prefix: String,
536 pub default_tags: BTreeMap<String, String>,
538 pub hostname_tag: Option<String>,
540 pub sample_rate: f32,
545 pub periodic_secs: u64,
550 pub aggregate: bool,
554}
555
556impl Default for Metrics {
557 fn default() -> Self {
558 Metrics {
559 statsd: None,
560 prefix: "sentry.relay".into(),
561 default_tags: BTreeMap::new(),
562 hostname_tag: None,
563 sample_rate: 1.0,
564 periodic_secs: 5,
565 aggregate: true,
566 }
567 }
568}
569
570#[derive(Serialize, Deserialize, Debug, Default)]
572#[serde(default)]
573pub struct SentryMetrics {
574 pub metric_stats_enabled: bool,
585}
586
587#[derive(Serialize, Deserialize, Debug)]
589#[serde(default)]
590pub struct Limits {
591 pub max_concurrent_requests: usize,
594 pub max_concurrent_queries: usize,
599 pub max_event_size: ByteSize,
601 pub max_attachment_size: ByteSize,
603 pub max_attachments_size: ByteSize,
605 pub max_client_reports_size: ByteSize,
607 pub max_check_in_size: ByteSize,
609 pub max_envelope_size: ByteSize,
611 pub max_session_count: usize,
613 pub max_api_payload_size: ByteSize,
615 pub max_api_file_upload_size: ByteSize,
617 pub max_api_chunk_upload_size: ByteSize,
619 pub max_profile_size: ByteSize,
621 pub max_log_size: ByteSize,
623 pub max_span_size: ByteSize,
625 pub max_statsd_size: ByteSize,
627 pub max_metric_buckets_size: ByteSize,
629 pub max_replay_compressed_size: ByteSize,
631 #[serde(alias = "max_replay_size")]
633 max_replay_uncompressed_size: ByteSize,
634 pub max_replay_message_size: ByteSize,
636 pub max_thread_count: usize,
641 pub max_pool_concurrency: usize,
648 pub query_timeout: u64,
651 pub shutdown_timeout: u64,
654 pub keepalive_timeout: u64,
658 pub idle_timeout: Option<u64>,
665 pub max_connections: Option<usize>,
671 pub tcp_listen_backlog: u32,
679}
680
681impl Default for Limits {
682 fn default() -> Self {
683 Limits {
684 max_concurrent_requests: 100,
685 max_concurrent_queries: 5,
686 max_event_size: ByteSize::mebibytes(1),
687 max_attachment_size: ByteSize::mebibytes(100),
688 max_attachments_size: ByteSize::mebibytes(100),
689 max_client_reports_size: ByteSize::kibibytes(4),
690 max_check_in_size: ByteSize::kibibytes(100),
691 max_envelope_size: ByteSize::mebibytes(100),
692 max_session_count: 100,
693 max_api_payload_size: ByteSize::mebibytes(20),
694 max_api_file_upload_size: ByteSize::mebibytes(40),
695 max_api_chunk_upload_size: ByteSize::mebibytes(100),
696 max_profile_size: ByteSize::mebibytes(50),
697 max_log_size: ByteSize::mebibytes(1),
698 max_span_size: ByteSize::mebibytes(1),
699 max_statsd_size: ByteSize::mebibytes(1),
700 max_metric_buckets_size: ByteSize::mebibytes(1),
701 max_replay_compressed_size: ByteSize::mebibytes(10),
702 max_replay_uncompressed_size: ByteSize::mebibytes(100),
703 max_replay_message_size: ByteSize::mebibytes(15),
704 max_thread_count: num_cpus::get(),
705 max_pool_concurrency: 1,
706 query_timeout: 30,
707 shutdown_timeout: 10,
708 keepalive_timeout: 5,
709 idle_timeout: None,
710 max_connections: None,
711 tcp_listen_backlog: 1024,
712 }
713 }
714}
715
716#[derive(Debug, Default, Deserialize, Serialize)]
718#[serde(default)]
719pub struct Routing {
720 pub accept_unknown_items: Option<bool>,
730}
731
732#[derive(Clone, Copy, Debug, Default, Deserialize, Serialize)]
734#[serde(rename_all = "lowercase")]
735pub enum HttpEncoding {
736 #[default]
741 Identity,
742 Deflate,
748 Gzip,
755 Br,
757 Zstd,
759}
760
761impl HttpEncoding {
762 pub fn parse(str: &str) -> Self {
764 let str = str.trim();
765 if str.eq_ignore_ascii_case("zstd") {
766 Self::Zstd
767 } else if str.eq_ignore_ascii_case("br") {
768 Self::Br
769 } else if str.eq_ignore_ascii_case("gzip") || str.eq_ignore_ascii_case("x-gzip") {
770 Self::Gzip
771 } else if str.eq_ignore_ascii_case("deflate") {
772 Self::Deflate
773 } else {
774 Self::Identity
775 }
776 }
777
778 pub fn name(&self) -> Option<&'static str> {
782 match self {
783 Self::Identity => None,
784 Self::Deflate => Some("deflate"),
785 Self::Gzip => Some("gzip"),
786 Self::Br => Some("br"),
787 Self::Zstd => Some("zstd"),
788 }
789 }
790}
791
792#[derive(Serialize, Deserialize, Debug)]
794#[serde(default)]
795pub struct Http {
796 pub timeout: u32,
802 pub connection_timeout: u32,
807 pub max_retry_interval: u32,
809 pub host_header: Option<String>,
811 pub auth_interval: Option<u64>,
819 pub outage_grace_period: u64,
825 pub retry_delay: u64,
829 pub project_failure_interval: u64,
834 pub encoding: HttpEncoding,
850 pub global_metrics: bool,
857}
858
859impl Default for Http {
860 fn default() -> Self {
861 Http {
862 timeout: 5,
863 connection_timeout: 3,
864 max_retry_interval: 60, host_header: None,
866 auth_interval: Some(600), outage_grace_period: DEFAULT_NETWORK_OUTAGE_GRACE_PERIOD,
868 retry_delay: default_retry_delay(),
869 project_failure_interval: default_project_failure_interval(),
870 encoding: HttpEncoding::Zstd,
871 global_metrics: false,
872 }
873 }
874}
875
876fn default_retry_delay() -> u64 {
878 1
879}
880
881fn default_project_failure_interval() -> u64 {
883 90
884}
885
886fn spool_envelopes_max_disk_size() -> ByteSize {
888 ByteSize::mebibytes(500)
889}
890
891fn spool_envelopes_batch_size_bytes() -> ByteSize {
893 ByteSize::kibibytes(10)
894}
895
896fn spool_envelopes_max_envelope_delay_secs() -> u64 {
897 24 * 60 * 60
898}
899
900fn spool_disk_usage_refresh_frequency_ms() -> u64 {
902 100
903}
904
905fn spool_max_backpressure_envelopes() -> usize {
907 500
908}
909
910fn spool_max_backpressure_memory_percent() -> f32 {
912 0.9
913}
914
915fn spool_envelopes_partitions() -> NonZeroU8 {
917 NonZeroU8::new(1).unwrap()
918}
919
920#[derive(Debug, Serialize, Deserialize)]
922pub struct EnvelopeSpool {
923 pub path: Option<PathBuf>,
929 #[serde(default = "spool_envelopes_max_disk_size")]
935 pub max_disk_size: ByteSize,
936 #[serde(default = "spool_envelopes_batch_size_bytes")]
943 pub batch_size_bytes: ByteSize,
944 #[serde(default = "spool_envelopes_max_envelope_delay_secs")]
951 pub max_envelope_delay_secs: u64,
952 #[serde(default = "spool_disk_usage_refresh_frequency_ms")]
957 pub disk_usage_refresh_frequency_ms: u64,
958 #[serde(default = "spool_max_backpressure_envelopes")]
962 pub max_backpressure_envelopes: usize,
963 #[serde(default = "spool_max_backpressure_memory_percent")]
993 pub max_backpressure_memory_percent: f32,
994 #[serde(default = "spool_envelopes_partitions")]
1001 pub partitions: NonZeroU8,
1002}
1003
1004impl Default for EnvelopeSpool {
1005 fn default() -> Self {
1006 Self {
1007 path: None,
1008 max_disk_size: spool_envelopes_max_disk_size(),
1009 batch_size_bytes: spool_envelopes_batch_size_bytes(),
1010 max_envelope_delay_secs: spool_envelopes_max_envelope_delay_secs(),
1011 disk_usage_refresh_frequency_ms: spool_disk_usage_refresh_frequency_ms(),
1012 max_backpressure_envelopes: spool_max_backpressure_envelopes(),
1013 max_backpressure_memory_percent: spool_max_backpressure_memory_percent(),
1014 partitions: spool_envelopes_partitions(),
1015 }
1016 }
1017}
1018
1019#[derive(Debug, Serialize, Deserialize, Default)]
1021pub struct Spool {
1022 #[serde(default)]
1024 pub envelopes: EnvelopeSpool,
1025}
1026
1027#[derive(Serialize, Deserialize, Debug)]
1029#[serde(default)]
1030pub struct Cache {
1031 pub project_request_full_config: bool,
1033 pub project_expiry: u32,
1035 pub project_grace_period: u32,
1040 pub relay_expiry: u32,
1042 #[serde(alias = "event_expiry")]
1048 envelope_expiry: u32,
1049 #[serde(alias = "event_buffer_size")]
1051 envelope_buffer_size: u32,
1052 pub miss_expiry: u32,
1054 pub batch_interval: u32,
1056 pub downstream_relays_batch_interval: u32,
1058 pub batch_size: usize,
1062 pub file_interval: u32,
1064 pub global_config_fetch_interval: u32,
1066}
1067
1068impl Default for Cache {
1069 fn default() -> Self {
1070 Cache {
1071 project_request_full_config: false,
1072 project_expiry: 300, project_grace_period: 120, relay_expiry: 3600, envelope_expiry: 600, envelope_buffer_size: 1000,
1077 miss_expiry: 60, batch_interval: 100, downstream_relays_batch_interval: 100, batch_size: 500,
1081 file_interval: 10, global_config_fetch_interval: 10, }
1084 }
1085}
1086
1087fn default_max_secs_in_future() -> u32 {
1088 60 }
1090
1091fn default_max_session_secs_in_past() -> u32 {
1092 5 * 24 * 3600 }
1094
1095fn default_chunk_size() -> ByteSize {
1096 ByteSize::mebibytes(1)
1097}
1098
1099fn default_projectconfig_cache_prefix() -> String {
1100 "relayconfig".to_owned()
1101}
1102
1103#[allow(clippy::unnecessary_wraps)]
1104fn default_max_rate_limit() -> Option<u32> {
1105 Some(300) }
1107
1108#[derive(Serialize, Deserialize, Debug)]
1110pub struct Processing {
1111 pub enabled: bool,
1113 #[serde(default)]
1115 pub geoip_path: Option<PathBuf>,
1116 #[serde(default = "default_max_secs_in_future")]
1118 pub max_secs_in_future: u32,
1119 #[serde(default = "default_max_session_secs_in_past")]
1121 pub max_session_secs_in_past: u32,
1122 pub kafka_config: Vec<KafkaConfigParam>,
1124 #[serde(default)]
1144 pub secondary_kafka_configs: BTreeMap<String, Vec<KafkaConfigParam>>,
1145 #[serde(default)]
1147 pub topics: TopicAssignments,
1148 #[serde(default)]
1150 pub kafka_validate_topics: bool,
1151 #[serde(default)]
1153 pub redis: Option<RedisConfigs>,
1154 #[serde(default = "default_chunk_size")]
1156 pub attachment_chunk_size: ByteSize,
1157 #[serde(default = "default_projectconfig_cache_prefix")]
1159 pub projectconfig_cache_prefix: String,
1160 #[serde(default = "default_max_rate_limit")]
1162 pub max_rate_limit: Option<u32>,
1163}
1164
1165impl Default for Processing {
1166 fn default() -> Self {
1168 Self {
1169 enabled: false,
1170 geoip_path: None,
1171 max_secs_in_future: default_max_secs_in_future(),
1172 max_session_secs_in_past: default_max_session_secs_in_past(),
1173 kafka_config: Vec::new(),
1174 secondary_kafka_configs: BTreeMap::new(),
1175 topics: TopicAssignments::default(),
1176 kafka_validate_topics: false,
1177 redis: None,
1178 attachment_chunk_size: default_chunk_size(),
1179 projectconfig_cache_prefix: default_projectconfig_cache_prefix(),
1180 max_rate_limit: default_max_rate_limit(),
1181 }
1182 }
1183}
1184
1185#[derive(Debug, Default, Serialize, Deserialize)]
1187#[serde(default)]
1188pub struct Normalization {
1189 #[serde(default)]
1191 pub level: NormalizationLevel,
1192}
1193
1194#[derive(Copy, Clone, Debug, Default, Serialize, Deserialize, Eq, PartialEq)]
1196#[serde(rename_all = "lowercase")]
1197pub enum NormalizationLevel {
1198 #[default]
1202 Default,
1203 Full,
1208}
1209
1210#[derive(Serialize, Deserialize, Debug)]
1212#[serde(default)]
1213pub struct OutcomeAggregatorConfig {
1214 pub bucket_interval: u64,
1216 pub flush_interval: u64,
1218}
1219
1220impl Default for OutcomeAggregatorConfig {
1221 fn default() -> Self {
1222 Self {
1223 bucket_interval: 60,
1224 flush_interval: 120,
1225 }
1226 }
1227}
1228
1229#[derive(Copy, Clone, Debug, PartialEq, Eq)]
1232
1233pub enum EmitOutcomes {
1234 None,
1236 AsClientReports,
1238 AsOutcomes,
1240}
1241
1242impl EmitOutcomes {
1243 pub fn any(&self) -> bool {
1245 !matches!(self, EmitOutcomes::None)
1246 }
1247}
1248
1249impl Serialize for EmitOutcomes {
1250 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1251 where
1252 S: Serializer,
1253 {
1254 match self {
1256 Self::None => serializer.serialize_bool(false),
1257 Self::AsClientReports => serializer.serialize_str("as_client_reports"),
1258 Self::AsOutcomes => serializer.serialize_bool(true),
1259 }
1260 }
1261}
1262
1263struct EmitOutcomesVisitor;
1264
1265impl Visitor<'_> for EmitOutcomesVisitor {
1266 type Value = EmitOutcomes;
1267
1268 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
1269 formatter.write_str("true, false, or 'as_client_reports'")
1270 }
1271
1272 fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E>
1273 where
1274 E: serde::de::Error,
1275 {
1276 Ok(if v {
1277 EmitOutcomes::AsOutcomes
1278 } else {
1279 EmitOutcomes::None
1280 })
1281 }
1282
1283 fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
1284 where
1285 E: serde::de::Error,
1286 {
1287 if v == "as_client_reports" {
1288 Ok(EmitOutcomes::AsClientReports)
1289 } else {
1290 Err(E::invalid_value(Unexpected::Str(v), &"as_client_reports"))
1291 }
1292 }
1293}
1294
1295impl<'de> Deserialize<'de> for EmitOutcomes {
1296 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1297 where
1298 D: Deserializer<'de>,
1299 {
1300 deserializer.deserialize_any(EmitOutcomesVisitor)
1301 }
1302}
1303
1304#[derive(Serialize, Deserialize, Debug)]
1306#[serde(default)]
1307pub struct Outcomes {
1308 pub emit_outcomes: EmitOutcomes,
1312 pub emit_client_outcomes: bool,
1314 pub batch_size: usize,
1317 pub batch_interval: u64,
1320 pub source: Option<String>,
1323 pub aggregator: OutcomeAggregatorConfig,
1325}
1326
1327impl Default for Outcomes {
1328 fn default() -> Self {
1329 Outcomes {
1330 emit_outcomes: EmitOutcomes::AsClientReports,
1331 emit_client_outcomes: true,
1332 batch_size: 1000,
1333 batch_interval: 500,
1334 source: None,
1335 aggregator: OutcomeAggregatorConfig::default(),
1336 }
1337 }
1338}
1339
1340#[derive(Serialize, Deserialize, Debug, Default)]
1342pub struct MinimalConfig {
1343 pub relay: Relay,
1345}
1346
1347impl MinimalConfig {
1348 pub fn save_in_folder<P: AsRef<Path>>(&self, p: P) -> anyhow::Result<()> {
1350 let path = p.as_ref();
1351 if fs::metadata(path).is_err() {
1352 fs::create_dir_all(path)
1353 .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotOpenFile, path))?;
1354 }
1355 self.save(path)
1356 }
1357}
1358
1359impl ConfigObject for MinimalConfig {
1360 fn format() -> ConfigFormat {
1361 ConfigFormat::Yaml
1362 }
1363
1364 fn name() -> &'static str {
1365 "config"
1366 }
1367}
1368
1369mod config_relay_info {
1371 use serde::ser::SerializeMap;
1372
1373 use super::*;
1374
1375 #[derive(Debug, Serialize, Deserialize, Clone)]
1377 struct RelayInfoConfig {
1378 public_key: PublicKey,
1379 #[serde(default)]
1380 internal: bool,
1381 }
1382
1383 impl From<RelayInfoConfig> for RelayInfo {
1384 fn from(v: RelayInfoConfig) -> Self {
1385 RelayInfo {
1386 public_key: v.public_key,
1387 internal: v.internal,
1388 }
1389 }
1390 }
1391
1392 impl From<RelayInfo> for RelayInfoConfig {
1393 fn from(v: RelayInfo) -> Self {
1394 RelayInfoConfig {
1395 public_key: v.public_key,
1396 internal: v.internal,
1397 }
1398 }
1399 }
1400
1401 pub(super) fn deserialize<'de, D>(des: D) -> Result<HashMap<RelayId, RelayInfo>, D::Error>
1402 where
1403 D: Deserializer<'de>,
1404 {
1405 let map = HashMap::<RelayId, RelayInfoConfig>::deserialize(des)?;
1406 Ok(map.into_iter().map(|(k, v)| (k, v.into())).collect())
1407 }
1408
1409 pub(super) fn serialize<S>(elm: &HashMap<RelayId, RelayInfo>, ser: S) -> Result<S::Ok, S::Error>
1410 where
1411 S: Serializer,
1412 {
1413 let mut map = ser.serialize_map(Some(elm.len()))?;
1414
1415 for (k, v) in elm {
1416 map.serialize_entry(k, &RelayInfoConfig::from(v.clone()))?;
1417 }
1418
1419 map.end()
1420 }
1421}
1422
1423#[derive(Serialize, Deserialize, Debug, Default)]
1425pub struct AuthConfig {
1426 #[serde(default, skip_serializing_if = "is_default")]
1428 pub ready: ReadinessCondition,
1429
1430 #[serde(default, with = "config_relay_info")]
1432 pub static_relays: HashMap<RelayId, RelayInfo>,
1433}
1434
1435#[derive(Serialize, Deserialize, Debug, Default)]
1437pub struct GeoIpConfig {
1438 pub path: Option<PathBuf>,
1440}
1441
1442#[derive(Serialize, Deserialize, Debug)]
1444#[serde(default)]
1445pub struct CardinalityLimiter {
1446 pub cache_vacuum_interval: u64,
1452}
1453
1454impl Default for CardinalityLimiter {
1455 fn default() -> Self {
1456 Self {
1457 cache_vacuum_interval: 180,
1458 }
1459 }
1460}
1461
1462#[derive(Serialize, Deserialize, Debug)]
1467#[serde(default)]
1468pub struct Health {
1469 pub refresh_interval_ms: u64,
1476 pub max_memory_bytes: Option<ByteSize>,
1481 pub max_memory_percent: f32,
1485 pub probe_timeout_ms: u64,
1492 pub memory_stat_refresh_frequency_ms: u64,
1498}
1499
1500impl Default for Health {
1501 fn default() -> Self {
1502 Self {
1503 refresh_interval_ms: 3000,
1504 max_memory_bytes: None,
1505 max_memory_percent: 0.95,
1506 probe_timeout_ms: 900,
1507 memory_stat_refresh_frequency_ms: 100,
1508 }
1509 }
1510}
1511
1512#[derive(Serialize, Deserialize, Debug)]
1514#[serde(default)]
1515pub struct Cogs {
1516 pub max_queue_size: u64,
1522 pub relay_resource_id: String,
1528}
1529
1530impl Default for Cogs {
1531 fn default() -> Self {
1532 Self {
1533 max_queue_size: 10_000,
1534 relay_resource_id: "relay_service".to_owned(),
1535 }
1536 }
1537}
1538
1539#[derive(Serialize, Deserialize, Debug, Default)]
1540struct ConfigValues {
1541 #[serde(default)]
1542 relay: Relay,
1543 #[serde(default)]
1544 http: Http,
1545 #[serde(default)]
1546 cache: Cache,
1547 #[serde(default)]
1548 spool: Spool,
1549 #[serde(default)]
1550 limits: Limits,
1551 #[serde(default)]
1552 logging: relay_log::LogConfig,
1553 #[serde(default)]
1554 routing: Routing,
1555 #[serde(default)]
1556 metrics: Metrics,
1557 #[serde(default)]
1558 sentry_metrics: SentryMetrics,
1559 #[serde(default)]
1560 sentry: relay_log::SentryConfig,
1561 #[serde(default)]
1562 processing: Processing,
1563 #[serde(default)]
1564 outcomes: Outcomes,
1565 #[serde(default)]
1566 aggregator: AggregatorServiceConfig,
1567 #[serde(default)]
1568 secondary_aggregators: Vec<ScopedAggregatorConfig>,
1569 #[serde(default)]
1570 auth: AuthConfig,
1571 #[serde(default)]
1572 geoip: GeoIpConfig,
1573 #[serde(default)]
1574 normalization: Normalization,
1575 #[serde(default)]
1576 cardinality_limiter: CardinalityLimiter,
1577 #[serde(default)]
1578 health: Health,
1579 #[serde(default)]
1580 cogs: Cogs,
1581}
1582
1583impl ConfigObject for ConfigValues {
1584 fn format() -> ConfigFormat {
1585 ConfigFormat::Yaml
1586 }
1587
1588 fn name() -> &'static str {
1589 "config"
1590 }
1591}
1592
1593pub struct Config {
1595 values: ConfigValues,
1596 credentials: Option<Credentials>,
1597 path: PathBuf,
1598}
1599
1600impl fmt::Debug for Config {
1601 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1602 f.debug_struct("Config")
1603 .field("path", &self.path)
1604 .field("values", &self.values)
1605 .finish()
1606 }
1607}
1608
1609impl Config {
1610 pub fn from_path<P: AsRef<Path>>(path: P) -> anyhow::Result<Config> {
1612 let path = env::current_dir()
1613 .map(|x| x.join(path.as_ref()))
1614 .unwrap_or_else(|_| path.as_ref().to_path_buf());
1615
1616 let config = Config {
1617 values: ConfigValues::load(&path)?,
1618 credentials: if Credentials::path(&path).exists() {
1619 Some(Credentials::load(&path)?)
1620 } else {
1621 None
1622 },
1623 path: path.clone(),
1624 };
1625
1626 if cfg!(not(feature = "processing")) && config.processing_enabled() {
1627 return Err(ConfigError::file(ConfigErrorKind::ProcessingNotAvailable, &path).into());
1628 }
1629
1630 Ok(config)
1631 }
1632
1633 pub fn from_json_value(value: serde_json::Value) -> anyhow::Result<Config> {
1637 Ok(Config {
1638 values: serde_json::from_value(value)
1639 .with_context(|| ConfigError::new(ConfigErrorKind::BadJson))?,
1640 credentials: None,
1641 path: PathBuf::new(),
1642 })
1643 }
1644
1645 pub fn apply_override(
1648 &mut self,
1649 mut overrides: OverridableConfig,
1650 ) -> anyhow::Result<&mut Self> {
1651 let relay = &mut self.values.relay;
1652
1653 if let Some(mode) = overrides.mode {
1654 relay.mode = mode
1655 .parse::<RelayMode>()
1656 .with_context(|| ConfigError::field("mode"))?;
1657 }
1658
1659 if let Some(deployment) = overrides.instance {
1660 relay.instance = deployment
1661 .parse::<RelayInstance>()
1662 .with_context(|| ConfigError::field("deployment"))?;
1663 }
1664
1665 if let Some(log_level) = overrides.log_level {
1666 self.values.logging.level = log_level.parse()?;
1667 }
1668
1669 if let Some(log_format) = overrides.log_format {
1670 self.values.logging.format = log_format.parse()?;
1671 }
1672
1673 if let Some(upstream) = overrides.upstream {
1674 relay.upstream = upstream
1675 .parse::<UpstreamDescriptor>()
1676 .with_context(|| ConfigError::field("upstream"))?;
1677 } else if let Some(upstream_dsn) = overrides.upstream_dsn {
1678 relay.upstream = upstream_dsn
1679 .parse::<Dsn>()
1680 .map(|dsn| UpstreamDescriptor::from_dsn(&dsn).into_owned())
1681 .with_context(|| ConfigError::field("upstream_dsn"))?;
1682 }
1683
1684 if let Some(host) = overrides.host {
1685 relay.host = host
1686 .parse::<IpAddr>()
1687 .with_context(|| ConfigError::field("host"))?;
1688 }
1689
1690 if let Some(port) = overrides.port {
1691 relay.port = port
1692 .as_str()
1693 .parse()
1694 .with_context(|| ConfigError::field("port"))?;
1695 }
1696
1697 let processing = &mut self.values.processing;
1698 if let Some(enabled) = overrides.processing {
1699 match enabled.to_lowercase().as_str() {
1700 "true" | "1" => processing.enabled = true,
1701 "false" | "0" | "" => processing.enabled = false,
1702 _ => return Err(ConfigError::field("processing").into()),
1703 }
1704 }
1705
1706 if let Some(redis) = overrides.redis_url {
1707 processing.redis = Some(RedisConfigs::Unified(RedisConfig::single(redis)))
1708 }
1709
1710 if let Some(kafka_url) = overrides.kafka_url {
1711 let existing = processing
1712 .kafka_config
1713 .iter_mut()
1714 .find(|e| e.name == "bootstrap.servers");
1715
1716 if let Some(config_param) = existing {
1717 config_param.value = kafka_url;
1718 } else {
1719 processing.kafka_config.push(KafkaConfigParam {
1720 name: "bootstrap.servers".to_owned(),
1721 value: kafka_url,
1722 })
1723 }
1724 }
1725 let id = if let Some(id) = overrides.id {
1727 let id = Uuid::parse_str(&id).with_context(|| ConfigError::field("id"))?;
1728 Some(id)
1729 } else {
1730 None
1731 };
1732 let public_key = if let Some(public_key) = overrides.public_key {
1733 let public_key = public_key
1734 .parse::<PublicKey>()
1735 .with_context(|| ConfigError::field("public_key"))?;
1736 Some(public_key)
1737 } else {
1738 None
1739 };
1740
1741 let secret_key = if let Some(secret_key) = overrides.secret_key {
1742 let secret_key = secret_key
1743 .parse::<SecretKey>()
1744 .with_context(|| ConfigError::field("secret_key"))?;
1745 Some(secret_key)
1746 } else {
1747 None
1748 };
1749 let outcomes = &mut self.values.outcomes;
1750 if overrides.outcome_source.is_some() {
1751 outcomes.source = overrides.outcome_source.take();
1752 }
1753
1754 if let Some(credentials) = &mut self.credentials {
1755 if let Some(id) = id {
1757 credentials.id = id;
1758 }
1759 if let Some(public_key) = public_key {
1760 credentials.public_key = public_key;
1761 }
1762 if let Some(secret_key) = secret_key {
1763 credentials.secret_key = secret_key
1764 }
1765 } else {
1766 match (id, public_key, secret_key) {
1768 (Some(id), Some(public_key), Some(secret_key)) => {
1769 self.credentials = Some(Credentials {
1770 secret_key,
1771 public_key,
1772 id,
1773 })
1774 }
1775 (None, None, None) => {
1776 }
1779 _ => {
1780 return Err(ConfigError::field("incomplete credentials").into());
1781 }
1782 }
1783 }
1784
1785 let limits = &mut self.values.limits;
1786 if let Some(shutdown_timeout) = overrides.shutdown_timeout {
1787 if let Ok(shutdown_timeout) = shutdown_timeout.parse::<u64>() {
1788 limits.shutdown_timeout = shutdown_timeout;
1789 }
1790 }
1791
1792 if let Some(server_name) = overrides.server_name {
1793 self.values.sentry.server_name = Some(server_name.into());
1794 }
1795
1796 Ok(self)
1797 }
1798
1799 pub fn config_exists<P: AsRef<Path>>(path: P) -> bool {
1801 fs::metadata(ConfigValues::path(path.as_ref())).is_ok()
1802 }
1803
1804 pub fn path(&self) -> &Path {
1806 &self.path
1807 }
1808
1809 pub fn to_yaml_string(&self) -> anyhow::Result<String> {
1811 serde_yaml::to_string(&self.values)
1812 .with_context(|| ConfigError::new(ConfigErrorKind::CouldNotWriteFile))
1813 }
1814
1815 pub fn regenerate_credentials(&mut self, save: bool) -> anyhow::Result<()> {
1819 let creds = Credentials::generate();
1820 if save {
1821 creds.save(&self.path)?;
1822 }
1823 self.credentials = Some(creds);
1824 Ok(())
1825 }
1826
1827 pub fn credentials(&self) -> Option<&Credentials> {
1829 self.credentials.as_ref()
1830 }
1831
1832 pub fn replace_credentials(
1836 &mut self,
1837 credentials: Option<Credentials>,
1838 ) -> anyhow::Result<bool> {
1839 if self.credentials == credentials {
1840 return Ok(false);
1841 }
1842
1843 match credentials {
1844 Some(ref creds) => {
1845 creds.save(&self.path)?;
1846 }
1847 None => {
1848 let path = Credentials::path(&self.path);
1849 if fs::metadata(&path).is_ok() {
1850 fs::remove_file(&path).with_context(|| {
1851 ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path)
1852 })?;
1853 }
1854 }
1855 }
1856
1857 self.credentials = credentials;
1858 Ok(true)
1859 }
1860
1861 pub fn has_credentials(&self) -> bool {
1863 self.credentials.is_some()
1864 }
1865
1866 pub fn secret_key(&self) -> Option<&SecretKey> {
1868 self.credentials.as_ref().map(|x| &x.secret_key)
1869 }
1870
1871 pub fn public_key(&self) -> Option<&PublicKey> {
1873 self.credentials.as_ref().map(|x| &x.public_key)
1874 }
1875
1876 pub fn relay_id(&self) -> Option<&RelayId> {
1878 self.credentials.as_ref().map(|x| &x.id)
1879 }
1880
1881 pub fn relay_mode(&self) -> RelayMode {
1883 self.values.relay.mode
1884 }
1885
1886 pub fn relay_instance(&self) -> RelayInstance {
1888 self.values.relay.instance
1889 }
1890
1891 pub fn upstream_descriptor(&self) -> &UpstreamDescriptor<'_> {
1893 &self.values.relay.upstream
1894 }
1895
1896 pub fn http_host_header(&self) -> Option<&str> {
1898 self.values.http.host_header.as_deref()
1899 }
1900
1901 pub fn listen_addr(&self) -> SocketAddr {
1903 (self.values.relay.host, self.values.relay.port).into()
1904 }
1905
1906 pub fn tls_listen_addr(&self) -> Option<SocketAddr> {
1908 if self.values.relay.tls_identity_path.is_some() {
1909 let port = self.values.relay.tls_port.unwrap_or(3443);
1910 Some((self.values.relay.host, port).into())
1911 } else {
1912 None
1913 }
1914 }
1915
1916 pub fn tls_identity_path(&self) -> Option<&Path> {
1918 self.values.relay.tls_identity_path.as_deref()
1919 }
1920
1921 pub fn tls_identity_password(&self) -> Option<&str> {
1923 self.values.relay.tls_identity_password.as_deref()
1924 }
1925
1926 pub fn override_project_ids(&self) -> bool {
1930 self.values.relay.override_project_ids
1931 }
1932
1933 pub fn requires_auth(&self) -> bool {
1937 match self.values.auth.ready {
1938 ReadinessCondition::Authenticated => self.relay_mode() == RelayMode::Managed,
1939 ReadinessCondition::Always => false,
1940 }
1941 }
1942
1943 pub fn http_auth_interval(&self) -> Option<Duration> {
1947 if self.processing_enabled() {
1948 return None;
1949 }
1950
1951 match self.values.http.auth_interval {
1952 None | Some(0) => None,
1953 Some(secs) => Some(Duration::from_secs(secs)),
1954 }
1955 }
1956
1957 pub fn http_outage_grace_period(&self) -> Duration {
1960 Duration::from_secs(self.values.http.outage_grace_period)
1961 }
1962
1963 pub fn http_retry_delay(&self) -> Duration {
1968 Duration::from_secs(self.values.http.retry_delay)
1969 }
1970
1971 pub fn http_project_failure_interval(&self) -> Duration {
1973 Duration::from_secs(self.values.http.project_failure_interval)
1974 }
1975
1976 pub fn http_encoding(&self) -> HttpEncoding {
1978 self.values.http.encoding
1979 }
1980
1981 pub fn http_global_metrics(&self) -> bool {
1983 self.values.http.global_metrics
1984 }
1985
1986 pub fn emit_outcomes(&self) -> EmitOutcomes {
1991 if self.processing_enabled() {
1992 return EmitOutcomes::AsOutcomes;
1993 }
1994 self.values.outcomes.emit_outcomes
1995 }
1996
1997 pub fn emit_client_outcomes(&self) -> bool {
2007 self.values.outcomes.emit_client_outcomes
2008 }
2009
2010 pub fn outcome_batch_size(&self) -> usize {
2012 self.values.outcomes.batch_size
2013 }
2014
2015 pub fn outcome_batch_interval(&self) -> Duration {
2017 Duration::from_millis(self.values.outcomes.batch_interval)
2018 }
2019
2020 pub fn outcome_source(&self) -> Option<&str> {
2022 self.values.outcomes.source.as_deref()
2023 }
2024
2025 pub fn outcome_aggregator(&self) -> &OutcomeAggregatorConfig {
2027 &self.values.outcomes.aggregator
2028 }
2029
2030 pub fn logging(&self) -> &relay_log::LogConfig {
2032 &self.values.logging
2033 }
2034
2035 pub fn sentry(&self) -> &relay_log::SentryConfig {
2037 &self.values.sentry
2038 }
2039
2040 pub fn statsd_addrs(&self) -> anyhow::Result<Vec<SocketAddr>> {
2044 if let Some(ref addr) = self.values.metrics.statsd {
2045 let addrs = addr
2046 .as_str()
2047 .to_socket_addrs()
2048 .with_context(|| ConfigError::file(ConfigErrorKind::InvalidValue, &self.path))?
2049 .collect();
2050 Ok(addrs)
2051 } else {
2052 Ok(vec![])
2053 }
2054 }
2055
2056 pub fn metrics_prefix(&self) -> &str {
2058 &self.values.metrics.prefix
2059 }
2060
2061 pub fn metrics_default_tags(&self) -> &BTreeMap<String, String> {
2063 &self.values.metrics.default_tags
2064 }
2065
2066 pub fn metrics_hostname_tag(&self) -> Option<&str> {
2068 self.values.metrics.hostname_tag.as_deref()
2069 }
2070
2071 pub fn metrics_sample_rate(&self) -> f32 {
2073 self.values.metrics.sample_rate
2074 }
2075
2076 pub fn metrics_aggregate(&self) -> bool {
2078 self.values.metrics.aggregate
2079 }
2080
2081 pub fn metrics_periodic_interval(&self) -> Option<Duration> {
2085 match self.values.metrics.periodic_secs {
2086 0 => None,
2087 secs => Some(Duration::from_secs(secs)),
2088 }
2089 }
2090
2091 pub fn http_timeout(&self) -> Duration {
2093 Duration::from_secs(self.values.http.timeout.into())
2094 }
2095
2096 pub fn http_connection_timeout(&self) -> Duration {
2098 Duration::from_secs(self.values.http.connection_timeout.into())
2099 }
2100
2101 pub fn http_max_retry_interval(&self) -> Duration {
2103 Duration::from_secs(self.values.http.max_retry_interval.into())
2104 }
2105
2106 pub fn project_cache_expiry(&self) -> Duration {
2108 Duration::from_secs(self.values.cache.project_expiry.into())
2109 }
2110
2111 pub fn request_full_project_config(&self) -> bool {
2113 self.values.cache.project_request_full_config
2114 }
2115
2116 pub fn relay_cache_expiry(&self) -> Duration {
2118 Duration::from_secs(self.values.cache.relay_expiry.into())
2119 }
2120
2121 pub fn envelope_buffer_size(&self) -> usize {
2123 self.values
2124 .cache
2125 .envelope_buffer_size
2126 .try_into()
2127 .unwrap_or(usize::MAX)
2128 }
2129
2130 pub fn cache_miss_expiry(&self) -> Duration {
2132 Duration::from_secs(self.values.cache.miss_expiry.into())
2133 }
2134
2135 pub fn project_grace_period(&self) -> Duration {
2137 Duration::from_secs(self.values.cache.project_grace_period.into())
2138 }
2139
2140 pub fn query_batch_interval(&self) -> Duration {
2143 Duration::from_millis(self.values.cache.batch_interval.into())
2144 }
2145
2146 pub fn downstream_relays_batch_interval(&self) -> Duration {
2148 Duration::from_millis(self.values.cache.downstream_relays_batch_interval.into())
2149 }
2150
2151 pub fn local_cache_interval(&self) -> Duration {
2153 Duration::from_secs(self.values.cache.file_interval.into())
2154 }
2155
2156 pub fn global_config_fetch_interval(&self) -> Duration {
2159 Duration::from_secs(self.values.cache.global_config_fetch_interval.into())
2160 }
2161
2162 pub fn spool_envelopes_path(&self, partition_id: u8) -> Option<PathBuf> {
2167 let mut path = self
2168 .values
2169 .spool
2170 .envelopes
2171 .path
2172 .as_ref()
2173 .map(|path| path.to_owned())?;
2174
2175 if partition_id == 0 {
2176 return Some(path);
2177 }
2178
2179 let file_name = path.file_name().and_then(|f| f.to_str())?;
2180 let new_file_name = format!("{}.{}", file_name, partition_id);
2181 path.set_file_name(new_file_name);
2182
2183 Some(path)
2184 }
2185
2186 pub fn spool_envelopes_max_disk_size(&self) -> usize {
2188 self.values.spool.envelopes.max_disk_size.as_bytes()
2189 }
2190
2191 pub fn spool_envelopes_batch_size_bytes(&self) -> usize {
2194 self.values.spool.envelopes.batch_size_bytes.as_bytes()
2195 }
2196
2197 pub fn spool_envelopes_max_age(&self) -> Duration {
2199 Duration::from_secs(self.values.spool.envelopes.max_envelope_delay_secs)
2200 }
2201
2202 pub fn spool_disk_usage_refresh_frequency_ms(&self) -> Duration {
2204 Duration::from_millis(self.values.spool.envelopes.disk_usage_refresh_frequency_ms)
2205 }
2206
2207 pub fn spool_max_backpressure_envelopes(&self) -> usize {
2209 self.values.spool.envelopes.max_backpressure_envelopes
2210 }
2211
2212 pub fn spool_max_backpressure_memory_percent(&self) -> f32 {
2214 self.values.spool.envelopes.max_backpressure_memory_percent
2215 }
2216
2217 pub fn spool_partitions(&self) -> NonZeroU8 {
2219 self.values.spool.envelopes.partitions
2220 }
2221
2222 pub fn max_event_size(&self) -> usize {
2224 self.values.limits.max_event_size.as_bytes()
2225 }
2226
2227 pub fn max_attachment_size(&self) -> usize {
2229 self.values.limits.max_attachment_size.as_bytes()
2230 }
2231
2232 pub fn max_attachments_size(&self) -> usize {
2235 self.values.limits.max_attachments_size.as_bytes()
2236 }
2237
2238 pub fn max_client_reports_size(&self) -> usize {
2240 self.values.limits.max_client_reports_size.as_bytes()
2241 }
2242
2243 pub fn max_check_in_size(&self) -> usize {
2245 self.values.limits.max_check_in_size.as_bytes()
2246 }
2247
2248 pub fn max_log_size(&self) -> usize {
2250 self.values.limits.max_log_size.as_bytes()
2251 }
2252
2253 pub fn max_span_size(&self) -> usize {
2255 self.values.limits.max_span_size.as_bytes()
2256 }
2257
2258 pub fn max_envelope_size(&self) -> usize {
2262 self.values.limits.max_envelope_size.as_bytes()
2263 }
2264
2265 pub fn max_session_count(&self) -> usize {
2267 self.values.limits.max_session_count
2268 }
2269
2270 pub fn max_statsd_size(&self) -> usize {
2272 self.values.limits.max_statsd_size.as_bytes()
2273 }
2274
2275 pub fn max_metric_buckets_size(&self) -> usize {
2277 self.values.limits.max_metric_buckets_size.as_bytes()
2278 }
2279
2280 pub fn metric_stats_enabled(&self) -> bool {
2285 self.values.sentry_metrics.metric_stats_enabled || self.values.processing.enabled
2286 }
2287
2288 pub fn max_api_payload_size(&self) -> usize {
2290 self.values.limits.max_api_payload_size.as_bytes()
2291 }
2292
2293 pub fn max_api_file_upload_size(&self) -> usize {
2295 self.values.limits.max_api_file_upload_size.as_bytes()
2296 }
2297
2298 pub fn max_api_chunk_upload_size(&self) -> usize {
2300 self.values.limits.max_api_chunk_upload_size.as_bytes()
2301 }
2302
2303 pub fn max_profile_size(&self) -> usize {
2305 self.values.limits.max_profile_size.as_bytes()
2306 }
2307
2308 pub fn max_replay_compressed_size(&self) -> usize {
2310 self.values.limits.max_replay_compressed_size.as_bytes()
2311 }
2312
2313 pub fn max_replay_uncompressed_size(&self) -> usize {
2315 self.values.limits.max_replay_uncompressed_size.as_bytes()
2316 }
2317
2318 pub fn max_replay_message_size(&self) -> usize {
2324 self.values.limits.max_replay_message_size.as_bytes()
2325 }
2326
2327 pub fn max_concurrent_requests(&self) -> usize {
2329 self.values.limits.max_concurrent_requests
2330 }
2331
2332 pub fn max_concurrent_queries(&self) -> usize {
2334 self.values.limits.max_concurrent_queries
2335 }
2336
2337 pub fn query_timeout(&self) -> Duration {
2339 Duration::from_secs(self.values.limits.query_timeout)
2340 }
2341
2342 pub fn shutdown_timeout(&self) -> Duration {
2345 Duration::from_secs(self.values.limits.shutdown_timeout)
2346 }
2347
2348 pub fn keepalive_timeout(&self) -> Duration {
2352 Duration::from_secs(self.values.limits.keepalive_timeout)
2353 }
2354
2355 pub fn idle_timeout(&self) -> Option<Duration> {
2357 self.values.limits.idle_timeout.map(Duration::from_secs)
2358 }
2359
2360 pub fn max_connections(&self) -> Option<usize> {
2362 self.values.limits.max_connections
2363 }
2364
2365 pub fn tcp_listen_backlog(&self) -> u32 {
2367 self.values.limits.tcp_listen_backlog
2368 }
2369
2370 pub fn cpu_concurrency(&self) -> usize {
2372 self.values.limits.max_thread_count
2373 }
2374
2375 pub fn pool_concurrency(&self) -> usize {
2377 self.values.limits.max_pool_concurrency
2378 }
2379
2380 pub fn query_batch_size(&self) -> usize {
2382 self.values.cache.batch_size
2383 }
2384
2385 pub fn project_configs_path(&self) -> PathBuf {
2387 self.path.join("projects")
2388 }
2389
2390 pub fn processing_enabled(&self) -> bool {
2392 self.values.processing.enabled
2393 }
2394
2395 pub fn normalization_level(&self) -> NormalizationLevel {
2397 self.values.normalization.level
2398 }
2399
2400 pub fn geoip_path(&self) -> Option<&Path> {
2402 self.values
2403 .geoip
2404 .path
2405 .as_deref()
2406 .or(self.values.processing.geoip_path.as_deref())
2407 }
2408
2409 pub fn max_secs_in_future(&self) -> i64 {
2413 self.values.processing.max_secs_in_future.into()
2414 }
2415
2416 pub fn max_session_secs_in_past(&self) -> i64 {
2418 self.values.processing.max_session_secs_in_past.into()
2419 }
2420
2421 pub fn kafka_config(&self, topic: KafkaTopic) -> Result<KafkaParams, KafkaConfigError> {
2423 self.values.processing.topics.get(topic).kafka_config(
2424 &self.values.processing.kafka_config,
2425 &self.values.processing.secondary_kafka_configs,
2426 )
2427 }
2428
2429 pub fn kafka_validate_topics(&self) -> bool {
2431 self.values.processing.kafka_validate_topics
2432 }
2433
2434 pub fn unused_topic_assignments(&self) -> &BTreeMap<String, TopicAssignment> {
2436 &self.values.processing.topics.unused
2437 }
2438
2439 pub fn redis(&self) -> Option<RedisConfigsRef> {
2442 let redis_configs = self.values.processing.redis.as_ref()?;
2443
2444 Some(build_redis_configs(
2445 redis_configs,
2446 self.cpu_concurrency() as u32,
2447 ))
2448 }
2449
2450 pub fn attachment_chunk_size(&self) -> usize {
2452 self.values.processing.attachment_chunk_size.as_bytes()
2453 }
2454
2455 pub fn metrics_max_batch_size_bytes(&self) -> usize {
2457 self.values.aggregator.max_flush_bytes
2458 }
2459
2460 pub fn projectconfig_cache_prefix(&self) -> &str {
2463 &self.values.processing.projectconfig_cache_prefix
2464 }
2465
2466 pub fn max_rate_limit(&self) -> Option<u64> {
2468 self.values.processing.max_rate_limit.map(u32::into)
2469 }
2470
2471 pub fn cardinality_limiter_cache_vacuum_interval(&self) -> Duration {
2475 Duration::from_secs(self.values.cardinality_limiter.cache_vacuum_interval)
2476 }
2477
2478 pub fn health_refresh_interval(&self) -> Duration {
2480 Duration::from_millis(self.values.health.refresh_interval_ms)
2481 }
2482
2483 pub fn health_max_memory_watermark_bytes(&self) -> u64 {
2485 self.values
2486 .health
2487 .max_memory_bytes
2488 .as_ref()
2489 .map_or(u64::MAX, |b| b.as_bytes() as u64)
2490 }
2491
2492 pub fn health_max_memory_watermark_percent(&self) -> f32 {
2494 self.values.health.max_memory_percent
2495 }
2496
2497 pub fn health_probe_timeout(&self) -> Duration {
2499 Duration::from_millis(self.values.health.probe_timeout_ms)
2500 }
2501
2502 pub fn memory_stat_refresh_frequency_ms(&self) -> u64 {
2504 self.values.health.memory_stat_refresh_frequency_ms
2505 }
2506
2507 pub fn cogs_max_queue_size(&self) -> u64 {
2509 self.values.cogs.max_queue_size
2510 }
2511
2512 pub fn cogs_relay_resource_id(&self) -> &str {
2514 &self.values.cogs.relay_resource_id
2515 }
2516
2517 pub fn default_aggregator_config(&self) -> &AggregatorServiceConfig {
2519 &self.values.aggregator
2520 }
2521
2522 pub fn secondary_aggregator_configs(&self) -> &Vec<ScopedAggregatorConfig> {
2524 &self.values.secondary_aggregators
2525 }
2526
2527 pub fn aggregator_config_for(&self, namespace: MetricNamespace) -> &AggregatorServiceConfig {
2529 for entry in &self.values.secondary_aggregators {
2530 if entry.condition.matches(Some(namespace)) {
2531 return &entry.config;
2532 }
2533 }
2534 &self.values.aggregator
2535 }
2536
2537 pub fn static_relays(&self) -> &HashMap<RelayId, RelayInfo> {
2539 &self.values.auth.static_relays
2540 }
2541
2542 pub fn accept_unknown_items(&self) -> bool {
2544 let forward = self.values.routing.accept_unknown_items;
2545 forward.unwrap_or_else(|| !self.processing_enabled())
2546 }
2547}
2548
2549impl Default for Config {
2550 fn default() -> Self {
2551 Self {
2552 values: ConfigValues::default(),
2553 credentials: None,
2554 path: PathBuf::new(),
2555 }
2556 }
2557}
2558
2559#[cfg(test)]
2560mod tests {
2561
2562 use super::*;
2563
2564 #[test]
2566 fn test_event_buffer_size() {
2567 let yaml = r###"
2568cache:
2569 event_buffer_size: 1000000
2570 event_expiry: 1800
2571"###;
2572
2573 let values: ConfigValues = serde_yaml::from_str(yaml).unwrap();
2574 assert_eq!(values.cache.envelope_buffer_size, 1_000_000);
2575 assert_eq!(values.cache.envelope_expiry, 1800);
2576 }
2577
2578 #[test]
2579 fn test_emit_outcomes() {
2580 for (serialized, deserialized) in &[
2581 ("true", EmitOutcomes::AsOutcomes),
2582 ("false", EmitOutcomes::None),
2583 ("\"as_client_reports\"", EmitOutcomes::AsClientReports),
2584 ] {
2585 let value: EmitOutcomes = serde_json::from_str(serialized).unwrap();
2586 assert_eq!(value, *deserialized);
2587 assert_eq!(serde_json::to_string(&value).unwrap(), *serialized);
2588 }
2589 }
2590
2591 #[test]
2592 fn test_emit_outcomes_invalid() {
2593 assert!(serde_json::from_str::<EmitOutcomes>("asdf").is_err());
2594 }
2595}