1use std::collections::{BTreeMap, HashMap};
2use std::error::Error;
3use std::io::Write;
4use std::net::{IpAddr, SocketAddr, ToSocketAddrs};
5use std::num::NonZeroU8;
6use std::path::{Path, PathBuf};
7use std::str::FromStr;
8use std::time::Duration;
9use std::{env, fmt, fs, io};
10
11use anyhow::Context;
12use relay_auth::{PublicKey, RelayId, SecretKey, generate_key_pair, generate_relay_id};
13use relay_common::Dsn;
14use relay_kafka::{
15 ConfigError as KafkaConfigError, KafkaConfigParam, KafkaTopic, KafkaTopicConfig,
16 TopicAssignments,
17};
18use relay_metrics::MetricNamespace;
19use serde::de::{DeserializeOwned, Unexpected, Visitor};
20use serde::{Deserialize, Deserializer, Serialize, Serializer};
21use uuid::Uuid;
22
23use crate::aggregator::{AggregatorServiceConfig, ScopedAggregatorConfig};
24use crate::byte_size::ByteSize;
25use crate::upstream::UpstreamDescriptor;
26use crate::{RedisConfig, RedisConfigs, RedisConfigsRef, build_redis_configs};
27
28const DEFAULT_NETWORK_OUTAGE_GRACE_PERIOD: u64 = 10;
29
30static CONFIG_YAML_HEADER: &str = r###"# Please see the relevant documentation.
31# Performance tuning: https://docs.sentry.io/product/relay/operating-guidelines/
32# All config options: https://docs.sentry.io/product/relay/options/
33"###;
34
35#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
37#[non_exhaustive]
38pub enum ConfigErrorKind {
39 CouldNotOpenFile,
41 CouldNotWriteFile,
43 BadYaml,
45 BadJson,
47 InvalidValue,
49 ProcessingNotAvailable,
52}
53
54impl fmt::Display for ConfigErrorKind {
55 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
56 match self {
57 Self::CouldNotOpenFile => write!(f, "could not open config file"),
58 Self::CouldNotWriteFile => write!(f, "could not write config file"),
59 Self::BadYaml => write!(f, "could not parse yaml config file"),
60 Self::BadJson => write!(f, "could not parse json config file"),
61 Self::InvalidValue => write!(f, "invalid config value"),
62 Self::ProcessingNotAvailable => write!(
63 f,
64 "was not compiled with processing, cannot enable processing"
65 ),
66 }
67 }
68}
69
70#[derive(Debug)]
72enum ConfigErrorSource {
73 None,
75 File(PathBuf),
77 FieldOverride(String),
79}
80
81impl Default for ConfigErrorSource {
82 fn default() -> Self {
83 Self::None
84 }
85}
86
87impl fmt::Display for ConfigErrorSource {
88 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
89 match self {
90 ConfigErrorSource::None => Ok(()),
91 ConfigErrorSource::File(file_name) => {
92 write!(f, " (file {})", file_name.display())
93 }
94 ConfigErrorSource::FieldOverride(name) => write!(f, " (field {name})"),
95 }
96 }
97}
98
99#[derive(Debug)]
101pub struct ConfigError {
102 source: ConfigErrorSource,
103 kind: ConfigErrorKind,
104}
105
106impl ConfigError {
107 #[inline]
108 fn new(kind: ConfigErrorKind) -> Self {
109 Self {
110 source: ConfigErrorSource::None,
111 kind,
112 }
113 }
114
115 #[inline]
116 fn field(field: &'static str) -> Self {
117 Self {
118 source: ConfigErrorSource::FieldOverride(field.to_owned()),
119 kind: ConfigErrorKind::InvalidValue,
120 }
121 }
122
123 #[inline]
124 fn file(kind: ConfigErrorKind, p: impl AsRef<Path>) -> Self {
125 Self {
126 source: ConfigErrorSource::File(p.as_ref().to_path_buf()),
127 kind,
128 }
129 }
130
131 pub fn kind(&self) -> ConfigErrorKind {
133 self.kind
134 }
135}
136
137impl fmt::Display for ConfigError {
138 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
139 write!(f, "{}{}", self.kind(), self.source)
140 }
141}
142
143impl Error for ConfigError {}
144
145enum ConfigFormat {
146 Yaml,
147 Json,
148}
149
150impl ConfigFormat {
151 pub fn extension(&self) -> &'static str {
152 match self {
153 ConfigFormat::Yaml => "yml",
154 ConfigFormat::Json => "json",
155 }
156 }
157}
158
159trait ConfigObject: DeserializeOwned + Serialize {
160 fn format() -> ConfigFormat;
162
163 fn name() -> &'static str;
165
166 fn path(base: &Path) -> PathBuf {
168 base.join(format!("{}.{}", Self::name(), Self::format().extension()))
169 }
170
171 fn load(base: &Path) -> anyhow::Result<Self> {
173 let path = Self::path(base);
174
175 let f = fs::File::open(&path)
176 .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotOpenFile, &path))?;
177 let f = io::BufReader::new(f);
178
179 let mut source = serde_vars::EnvSource::default();
180 match Self::format() {
181 ConfigFormat::Yaml => {
182 serde_vars::deserialize(serde_yaml::Deserializer::from_reader(f), &mut source)
183 .with_context(|| ConfigError::file(ConfigErrorKind::BadYaml, &path))
184 }
185 ConfigFormat::Json => {
186 serde_vars::deserialize(&mut serde_json::Deserializer::from_reader(f), &mut source)
187 .with_context(|| ConfigError::file(ConfigErrorKind::BadJson, &path))
188 }
189 }
190 }
191
192 fn save(&self, base: &Path) -> anyhow::Result<()> {
194 let path = Self::path(base);
195 let mut options = fs::OpenOptions::new();
196 options.write(true).truncate(true).create(true);
197
198 #[cfg(unix)]
200 {
201 use std::os::unix::fs::OpenOptionsExt;
202 options.mode(0o600);
203 }
204
205 let mut f = options
206 .open(&path)
207 .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path))?;
208
209 match Self::format() {
210 ConfigFormat::Yaml => {
211 f.write_all(CONFIG_YAML_HEADER.as_bytes())?;
212 serde_yaml::to_writer(&mut f, self)
213 .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path))?
214 }
215 ConfigFormat::Json => serde_json::to_writer_pretty(&mut f, self)
216 .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path))?,
217 }
218
219 f.write_all(b"\n").ok();
220
221 Ok(())
222 }
223}
224
225#[derive(Debug, Default)]
228pub struct OverridableConfig {
229 pub mode: Option<String>,
231 pub instance: Option<String>,
233 pub log_level: Option<String>,
235 pub log_format: Option<String>,
237 pub upstream: Option<String>,
239 pub upstream_dsn: Option<String>,
241 pub host: Option<String>,
243 pub port: Option<String>,
245 pub processing: Option<String>,
247 pub kafka_url: Option<String>,
249 pub redis_url: Option<String>,
251 pub id: Option<String>,
253 pub secret_key: Option<String>,
255 pub public_key: Option<String>,
257 pub outcome_source: Option<String>,
259 pub shutdown_timeout: Option<String>,
261 pub server_name: Option<String>,
263}
264
265#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
267pub struct Credentials {
268 pub secret_key: SecretKey,
270 pub public_key: PublicKey,
272 pub id: RelayId,
274}
275
276impl Credentials {
277 pub fn generate() -> Self {
279 relay_log::info!("generating new relay credentials");
280 let (sk, pk) = generate_key_pair();
281 Self {
282 secret_key: sk,
283 public_key: pk,
284 id: generate_relay_id(),
285 }
286 }
287
288 pub fn to_json_string(&self) -> anyhow::Result<String> {
290 serde_json::to_string(self)
291 .with_context(|| ConfigError::new(ConfigErrorKind::CouldNotWriteFile))
292 }
293}
294
295impl ConfigObject for Credentials {
296 fn format() -> ConfigFormat {
297 ConfigFormat::Json
298 }
299 fn name() -> &'static str {
300 "credentials"
301 }
302}
303
304#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
306#[serde(rename_all = "camelCase")]
307pub struct RelayInfo {
308 pub public_key: PublicKey,
310
311 #[serde(default)]
313 pub internal: bool,
314}
315
316impl RelayInfo {
317 pub fn new(public_key: PublicKey) -> Self {
319 Self {
320 public_key,
321 internal: false,
322 }
323 }
324}
325
326#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize)]
328#[serde(rename_all = "camelCase")]
329pub enum RelayMode {
330 Proxy,
336
337 Static,
342
343 Managed,
349}
350
351impl fmt::Display for RelayMode {
352 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
353 match self {
354 RelayMode::Proxy => write!(f, "proxy"),
355 RelayMode::Static => write!(f, "static"),
356 RelayMode::Managed => write!(f, "managed"),
357 }
358 }
359}
360
361#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize)]
363#[serde(rename_all = "camelCase")]
364pub enum RelayInstance {
365 Default,
367
368 Canary,
370}
371
372impl RelayInstance {
373 pub fn is_canary(&self) -> bool {
375 matches!(self, RelayInstance::Canary)
376 }
377}
378
379impl fmt::Display for RelayInstance {
380 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
381 match self {
382 RelayInstance::Default => write!(f, "default"),
383 RelayInstance::Canary => write!(f, "canary"),
384 }
385 }
386}
387
388impl FromStr for RelayInstance {
389 type Err = fmt::Error;
390
391 fn from_str(s: &str) -> Result<Self, Self::Err> {
392 match s {
393 "canary" => Ok(RelayInstance::Canary),
394 _ => Ok(RelayInstance::Default),
395 }
396 }
397}
398
399#[derive(Clone, Copy, Debug, Eq, PartialEq)]
401pub struct ParseRelayModeError;
402
403impl fmt::Display for ParseRelayModeError {
404 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
405 write!(
406 f,
407 "Relay mode must be one of: managed, static, proxy, capture"
408 )
409 }
410}
411
412impl Error for ParseRelayModeError {}
413
414impl FromStr for RelayMode {
415 type Err = ParseRelayModeError;
416
417 fn from_str(s: &str) -> Result<Self, Self::Err> {
418 match s {
419 "proxy" => Ok(RelayMode::Proxy),
420 "static" => Ok(RelayMode::Static),
421 "managed" => Ok(RelayMode::Managed),
422 _ => Err(ParseRelayModeError),
423 }
424 }
425}
426
427fn is_default<T: Default + PartialEq>(t: &T) -> bool {
429 *t == T::default()
430}
431
432fn is_docker() -> bool {
434 if fs::metadata("/.dockerenv").is_ok() {
435 return true;
436 }
437
438 fs::read_to_string("/proc/self/cgroup").is_ok_and(|s| s.contains("/docker"))
439}
440
441fn default_host() -> IpAddr {
443 if is_docker() {
444 "0.0.0.0".parse().unwrap()
446 } else {
447 "127.0.0.1".parse().unwrap()
448 }
449}
450
451#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize)]
455#[serde(rename_all = "lowercase")]
456pub enum ReadinessCondition {
457 Authenticated,
466 Always,
468}
469
470impl Default for ReadinessCondition {
471 fn default() -> Self {
472 Self::Authenticated
473 }
474}
475
476#[derive(Serialize, Deserialize, Debug)]
478#[serde(default)]
479pub struct Relay {
480 pub mode: RelayMode,
482 pub instance: RelayInstance,
484 pub upstream: UpstreamDescriptor<'static>,
486 pub host: IpAddr,
488 pub port: u16,
490 #[serde(skip_serializing)]
492 pub tls_port: Option<u16>,
493 #[serde(skip_serializing)]
495 pub tls_identity_path: Option<PathBuf>,
496 #[serde(skip_serializing)]
498 pub tls_identity_password: Option<String>,
499 #[serde(skip_serializing_if = "is_default")]
504 pub override_project_ids: bool,
505}
506
507impl Default for Relay {
508 fn default() -> Self {
509 Relay {
510 mode: RelayMode::Managed,
511 instance: RelayInstance::Default,
512 upstream: "https://sentry.io/".parse().unwrap(),
513 host: default_host(),
514 port: 3000,
515 tls_port: None,
516 tls_identity_path: None,
517 tls_identity_password: None,
518 override_project_ids: false,
519 }
520 }
521}
522
523#[derive(Serialize, Deserialize, Debug)]
525#[serde(default)]
526pub struct Metrics {
527 pub statsd: Option<String>,
531 pub prefix: String,
535 pub default_tags: BTreeMap<String, String>,
537 pub hostname_tag: Option<String>,
539 pub sample_rate: f32,
544 pub periodic_secs: u64,
549 pub aggregate: bool,
553 pub allow_high_cardinality_tags: bool,
561}
562
563impl Default for Metrics {
564 fn default() -> Self {
565 Metrics {
566 statsd: None,
567 prefix: "sentry.relay".into(),
568 default_tags: BTreeMap::new(),
569 hostname_tag: None,
570 sample_rate: 1.0,
571 periodic_secs: 5,
572 aggregate: true,
573 allow_high_cardinality_tags: false,
574 }
575 }
576}
577
578#[derive(Serialize, Deserialize, Debug, Default)]
580#[serde(default)]
581pub struct SentryMetrics {
582 pub metric_stats_enabled: bool,
593}
594
595#[derive(Serialize, Deserialize, Debug)]
597#[serde(default)]
598pub struct Limits {
599 pub max_concurrent_requests: usize,
602 pub max_concurrent_queries: usize,
607 pub max_event_size: ByteSize,
609 pub max_attachment_size: ByteSize,
611 pub max_attachments_size: ByteSize,
613 pub max_client_reports_size: ByteSize,
615 pub max_check_in_size: ByteSize,
617 pub max_envelope_size: ByteSize,
619 pub max_session_count: usize,
621 pub max_span_count: usize,
623 pub max_log_count: usize,
625 pub max_api_payload_size: ByteSize,
627 pub max_api_file_upload_size: ByteSize,
629 pub max_api_chunk_upload_size: ByteSize,
631 pub max_profile_size: ByteSize,
633 pub max_log_size: ByteSize,
635 pub max_span_size: ByteSize,
637 pub max_container_size: ByteSize,
639 pub max_statsd_size: ByteSize,
641 pub max_metric_buckets_size: ByteSize,
643 pub max_replay_compressed_size: ByteSize,
645 #[serde(alias = "max_replay_size")]
647 max_replay_uncompressed_size: ByteSize,
648 pub max_replay_message_size: ByteSize,
650 pub max_thread_count: usize,
655 pub max_pool_concurrency: usize,
662 pub query_timeout: u64,
665 pub shutdown_timeout: u64,
668 pub keepalive_timeout: u64,
672 pub idle_timeout: Option<u64>,
679 pub max_connections: Option<usize>,
685 pub tcp_listen_backlog: u32,
693}
694
695impl Default for Limits {
696 fn default() -> Self {
697 Limits {
698 max_concurrent_requests: 100,
699 max_concurrent_queries: 5,
700 max_event_size: ByteSize::mebibytes(1),
701 max_attachment_size: ByteSize::mebibytes(100),
702 max_attachments_size: ByteSize::mebibytes(100),
703 max_client_reports_size: ByteSize::kibibytes(4),
704 max_check_in_size: ByteSize::kibibytes(100),
705 max_envelope_size: ByteSize::mebibytes(100),
706 max_session_count: 100,
707 max_span_count: 1000,
708 max_log_count: 1000,
709 max_api_payload_size: ByteSize::mebibytes(20),
710 max_api_file_upload_size: ByteSize::mebibytes(40),
711 max_api_chunk_upload_size: ByteSize::mebibytes(100),
712 max_profile_size: ByteSize::mebibytes(50),
713 max_log_size: ByteSize::mebibytes(1),
714 max_span_size: ByteSize::mebibytes(1),
715 max_container_size: ByteSize::mebibytes(3),
716 max_statsd_size: ByteSize::mebibytes(1),
717 max_metric_buckets_size: ByteSize::mebibytes(1),
718 max_replay_compressed_size: ByteSize::mebibytes(10),
719 max_replay_uncompressed_size: ByteSize::mebibytes(100),
720 max_replay_message_size: ByteSize::mebibytes(15),
721 max_thread_count: num_cpus::get(),
722 max_pool_concurrency: 1,
723 query_timeout: 30,
724 shutdown_timeout: 10,
725 keepalive_timeout: 5,
726 idle_timeout: None,
727 max_connections: None,
728 tcp_listen_backlog: 1024,
729 }
730 }
731}
732
733#[derive(Debug, Default, Deserialize, Serialize)]
735#[serde(default)]
736pub struct Routing {
737 pub accept_unknown_items: Option<bool>,
747}
748
749#[derive(Clone, Copy, Debug, Default, Deserialize, Serialize)]
751#[serde(rename_all = "lowercase")]
752pub enum HttpEncoding {
753 #[default]
758 Identity,
759 Deflate,
765 Gzip,
772 Br,
774 Zstd,
776}
777
778impl HttpEncoding {
779 pub fn parse(str: &str) -> Self {
781 let str = str.trim();
782 if str.eq_ignore_ascii_case("zstd") {
783 Self::Zstd
784 } else if str.eq_ignore_ascii_case("br") {
785 Self::Br
786 } else if str.eq_ignore_ascii_case("gzip") || str.eq_ignore_ascii_case("x-gzip") {
787 Self::Gzip
788 } else if str.eq_ignore_ascii_case("deflate") {
789 Self::Deflate
790 } else {
791 Self::Identity
792 }
793 }
794
795 pub fn name(&self) -> Option<&'static str> {
799 match self {
800 Self::Identity => None,
801 Self::Deflate => Some("deflate"),
802 Self::Gzip => Some("gzip"),
803 Self::Br => Some("br"),
804 Self::Zstd => Some("zstd"),
805 }
806 }
807}
808
809#[derive(Serialize, Deserialize, Debug)]
811#[serde(default)]
812pub struct Http {
813 pub timeout: u32,
819 pub connection_timeout: u32,
824 pub max_retry_interval: u32,
826 pub host_header: Option<String>,
828 pub auth_interval: Option<u64>,
836 pub outage_grace_period: u64,
842 pub retry_delay: u64,
846 pub project_failure_interval: u64,
851 pub encoding: HttpEncoding,
867 pub global_metrics: bool,
874}
875
876impl Default for Http {
877 fn default() -> Self {
878 Http {
879 timeout: 5,
880 connection_timeout: 3,
881 max_retry_interval: 60, host_header: None,
883 auth_interval: Some(600), outage_grace_period: DEFAULT_NETWORK_OUTAGE_GRACE_PERIOD,
885 retry_delay: default_retry_delay(),
886 project_failure_interval: default_project_failure_interval(),
887 encoding: HttpEncoding::Zstd,
888 global_metrics: false,
889 }
890 }
891}
892
893fn default_retry_delay() -> u64 {
895 1
896}
897
898fn default_project_failure_interval() -> u64 {
900 90
901}
902
903fn spool_envelopes_max_disk_size() -> ByteSize {
905 ByteSize::mebibytes(500)
906}
907
908fn spool_envelopes_batch_size_bytes() -> ByteSize {
910 ByteSize::kibibytes(10)
911}
912
913fn spool_envelopes_max_envelope_delay_secs() -> u64 {
914 24 * 60 * 60
915}
916
917fn spool_disk_usage_refresh_frequency_ms() -> u64 {
919 100
920}
921
922fn spool_max_backpressure_envelopes() -> usize {
924 500
925}
926
927fn spool_max_backpressure_memory_percent() -> f32 {
929 0.9
930}
931
932fn spool_envelopes_partitions() -> NonZeroU8 {
934 NonZeroU8::new(1).unwrap()
935}
936
937#[derive(Debug, Serialize, Deserialize)]
939pub struct EnvelopeSpool {
940 pub path: Option<PathBuf>,
946 #[serde(default = "spool_envelopes_max_disk_size")]
952 pub max_disk_size: ByteSize,
953 #[serde(default = "spool_envelopes_batch_size_bytes")]
960 pub batch_size_bytes: ByteSize,
961 #[serde(default = "spool_envelopes_max_envelope_delay_secs")]
968 pub max_envelope_delay_secs: u64,
969 #[serde(default = "spool_disk_usage_refresh_frequency_ms")]
974 pub disk_usage_refresh_frequency_ms: u64,
975 #[serde(default = "spool_max_backpressure_envelopes")]
979 pub max_backpressure_envelopes: usize,
980 #[serde(default = "spool_max_backpressure_memory_percent")]
1010 pub max_backpressure_memory_percent: f32,
1011 #[serde(default = "spool_envelopes_partitions")]
1018 pub partitions: NonZeroU8,
1019}
1020
1021impl Default for EnvelopeSpool {
1022 fn default() -> Self {
1023 Self {
1024 path: None,
1025 max_disk_size: spool_envelopes_max_disk_size(),
1026 batch_size_bytes: spool_envelopes_batch_size_bytes(),
1027 max_envelope_delay_secs: spool_envelopes_max_envelope_delay_secs(),
1028 disk_usage_refresh_frequency_ms: spool_disk_usage_refresh_frequency_ms(),
1029 max_backpressure_envelopes: spool_max_backpressure_envelopes(),
1030 max_backpressure_memory_percent: spool_max_backpressure_memory_percent(),
1031 partitions: spool_envelopes_partitions(),
1032 }
1033 }
1034}
1035
1036#[derive(Debug, Serialize, Deserialize, Default)]
1038pub struct Spool {
1039 #[serde(default)]
1041 pub envelopes: EnvelopeSpool,
1042}
1043
1044#[derive(Serialize, Deserialize, Debug)]
1046#[serde(default)]
1047pub struct Cache {
1048 pub project_request_full_config: bool,
1050 pub project_expiry: u32,
1052 pub project_grace_period: u32,
1057 pub project_refresh_interval: Option<u32>,
1063 pub relay_expiry: u32,
1065 #[serde(alias = "event_expiry")]
1071 envelope_expiry: u32,
1072 #[serde(alias = "event_buffer_size")]
1074 envelope_buffer_size: u32,
1075 pub miss_expiry: u32,
1077 pub batch_interval: u32,
1079 pub downstream_relays_batch_interval: u32,
1081 pub batch_size: usize,
1085 pub file_interval: u32,
1087 pub global_config_fetch_interval: u32,
1089}
1090
1091impl Default for Cache {
1092 fn default() -> Self {
1093 Cache {
1094 project_request_full_config: false,
1095 project_expiry: 300, project_grace_period: 120, project_refresh_interval: None,
1098 relay_expiry: 3600, envelope_expiry: 600, envelope_buffer_size: 1000,
1101 miss_expiry: 60, batch_interval: 100, downstream_relays_batch_interval: 100, batch_size: 500,
1105 file_interval: 10, global_config_fetch_interval: 10, }
1108 }
1109}
1110
1111fn default_max_secs_in_future() -> u32 {
1112 60 }
1114
1115fn default_max_session_secs_in_past() -> u32 {
1116 5 * 24 * 3600 }
1118
1119fn default_chunk_size() -> ByteSize {
1120 ByteSize::mebibytes(1)
1121}
1122
1123fn default_projectconfig_cache_prefix() -> String {
1124 "relayconfig".to_owned()
1125}
1126
1127#[allow(clippy::unnecessary_wraps)]
1128fn default_max_rate_limit() -> Option<u32> {
1129 Some(300) }
1131
1132#[derive(Serialize, Deserialize, Debug)]
1134pub struct Processing {
1135 pub enabled: bool,
1137 #[serde(default)]
1139 pub geoip_path: Option<PathBuf>,
1140 #[serde(default = "default_max_secs_in_future")]
1142 pub max_secs_in_future: u32,
1143 #[serde(default = "default_max_session_secs_in_past")]
1145 pub max_session_secs_in_past: u32,
1146 pub kafka_config: Vec<KafkaConfigParam>,
1148 #[serde(default)]
1150 pub span_producers: SpanProducers,
1151 #[serde(default)]
1171 pub secondary_kafka_configs: BTreeMap<String, Vec<KafkaConfigParam>>,
1172 #[serde(default)]
1174 pub topics: TopicAssignments,
1175 #[serde(default)]
1177 pub kafka_validate_topics: bool,
1178 #[serde(default)]
1180 pub redis: Option<RedisConfigs>,
1181 #[serde(default = "default_chunk_size")]
1183 pub attachment_chunk_size: ByteSize,
1184 #[serde(default = "default_projectconfig_cache_prefix")]
1186 pub projectconfig_cache_prefix: String,
1187 #[serde(default = "default_max_rate_limit")]
1189 pub max_rate_limit: Option<u32>,
1190}
1191
1192impl Default for Processing {
1193 fn default() -> Self {
1195 Self {
1196 enabled: false,
1197 geoip_path: None,
1198 max_secs_in_future: default_max_secs_in_future(),
1199 max_session_secs_in_past: default_max_session_secs_in_past(),
1200 kafka_config: Vec::new(),
1201 secondary_kafka_configs: BTreeMap::new(),
1202 topics: TopicAssignments::default(),
1203 kafka_validate_topics: false,
1204 redis: None,
1205 attachment_chunk_size: default_chunk_size(),
1206 projectconfig_cache_prefix: default_projectconfig_cache_prefix(),
1207 max_rate_limit: default_max_rate_limit(),
1208 span_producers: Default::default(),
1209 }
1210 }
1211}
1212
1213#[derive(Debug, Serialize, Deserialize)]
1215#[serde(default)]
1216pub struct SpanProducers {
1217 pub produce_json: bool,
1219 pub produce_protobuf: bool,
1221}
1222
1223impl Default for SpanProducers {
1224 fn default() -> Self {
1225 Self {
1226 produce_json: false,
1227 produce_protobuf: true,
1228 }
1229 }
1230}
1231
1232#[derive(Debug, Default, Serialize, Deserialize)]
1234#[serde(default)]
1235pub struct Normalization {
1236 #[serde(default)]
1238 pub level: NormalizationLevel,
1239}
1240
1241#[derive(Copy, Clone, Debug, Default, Serialize, Deserialize, Eq, PartialEq)]
1243#[serde(rename_all = "lowercase")]
1244pub enum NormalizationLevel {
1245 #[default]
1249 Default,
1250 Full,
1255}
1256
1257#[derive(Serialize, Deserialize, Debug)]
1259#[serde(default)]
1260pub struct OutcomeAggregatorConfig {
1261 pub bucket_interval: u64,
1263 pub flush_interval: u64,
1265}
1266
1267impl Default for OutcomeAggregatorConfig {
1268 fn default() -> Self {
1269 Self {
1270 bucket_interval: 60,
1271 flush_interval: 120,
1272 }
1273 }
1274}
1275
1276#[derive(Copy, Clone, Debug, PartialEq, Eq)]
1279
1280pub enum EmitOutcomes {
1281 None,
1283 AsClientReports,
1285 AsOutcomes,
1287}
1288
1289impl EmitOutcomes {
1290 pub fn any(&self) -> bool {
1292 !matches!(self, EmitOutcomes::None)
1293 }
1294}
1295
1296impl Serialize for EmitOutcomes {
1297 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1298 where
1299 S: Serializer,
1300 {
1301 match self {
1303 Self::None => serializer.serialize_bool(false),
1304 Self::AsClientReports => serializer.serialize_str("as_client_reports"),
1305 Self::AsOutcomes => serializer.serialize_bool(true),
1306 }
1307 }
1308}
1309
1310struct EmitOutcomesVisitor;
1311
1312impl Visitor<'_> for EmitOutcomesVisitor {
1313 type Value = EmitOutcomes;
1314
1315 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
1316 formatter.write_str("true, false, or 'as_client_reports'")
1317 }
1318
1319 fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E>
1320 where
1321 E: serde::de::Error,
1322 {
1323 Ok(if v {
1324 EmitOutcomes::AsOutcomes
1325 } else {
1326 EmitOutcomes::None
1327 })
1328 }
1329
1330 fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
1331 where
1332 E: serde::de::Error,
1333 {
1334 if v == "as_client_reports" {
1335 Ok(EmitOutcomes::AsClientReports)
1336 } else {
1337 Err(E::invalid_value(Unexpected::Str(v), &"as_client_reports"))
1338 }
1339 }
1340}
1341
1342impl<'de> Deserialize<'de> for EmitOutcomes {
1343 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1344 where
1345 D: Deserializer<'de>,
1346 {
1347 deserializer.deserialize_any(EmitOutcomesVisitor)
1348 }
1349}
1350
1351#[derive(Serialize, Deserialize, Debug)]
1353#[serde(default)]
1354pub struct Outcomes {
1355 pub emit_outcomes: EmitOutcomes,
1359 pub emit_client_outcomes: bool,
1361 pub batch_size: usize,
1364 pub batch_interval: u64,
1367 pub source: Option<String>,
1370 pub aggregator: OutcomeAggregatorConfig,
1372}
1373
1374impl Default for Outcomes {
1375 fn default() -> Self {
1376 Outcomes {
1377 emit_outcomes: EmitOutcomes::AsClientReports,
1378 emit_client_outcomes: true,
1379 batch_size: 1000,
1380 batch_interval: 500,
1381 source: None,
1382 aggregator: OutcomeAggregatorConfig::default(),
1383 }
1384 }
1385}
1386
1387#[derive(Serialize, Deserialize, Debug, Default)]
1389pub struct MinimalConfig {
1390 pub relay: Relay,
1392}
1393
1394impl MinimalConfig {
1395 pub fn save_in_folder<P: AsRef<Path>>(&self, p: P) -> anyhow::Result<()> {
1397 let path = p.as_ref();
1398 if fs::metadata(path).is_err() {
1399 fs::create_dir_all(path)
1400 .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotOpenFile, path))?;
1401 }
1402 self.save(path)
1403 }
1404}
1405
1406impl ConfigObject for MinimalConfig {
1407 fn format() -> ConfigFormat {
1408 ConfigFormat::Yaml
1409 }
1410
1411 fn name() -> &'static str {
1412 "config"
1413 }
1414}
1415
1416mod config_relay_info {
1418 use serde::ser::SerializeMap;
1419
1420 use super::*;
1421
1422 #[derive(Debug, Serialize, Deserialize, Clone)]
1424 struct RelayInfoConfig {
1425 public_key: PublicKey,
1426 #[serde(default)]
1427 internal: bool,
1428 }
1429
1430 impl From<RelayInfoConfig> for RelayInfo {
1431 fn from(v: RelayInfoConfig) -> Self {
1432 RelayInfo {
1433 public_key: v.public_key,
1434 internal: v.internal,
1435 }
1436 }
1437 }
1438
1439 impl From<RelayInfo> for RelayInfoConfig {
1440 fn from(v: RelayInfo) -> Self {
1441 RelayInfoConfig {
1442 public_key: v.public_key,
1443 internal: v.internal,
1444 }
1445 }
1446 }
1447
1448 pub(super) fn deserialize<'de, D>(des: D) -> Result<HashMap<RelayId, RelayInfo>, D::Error>
1449 where
1450 D: Deserializer<'de>,
1451 {
1452 let map = HashMap::<RelayId, RelayInfoConfig>::deserialize(des)?;
1453 Ok(map.into_iter().map(|(k, v)| (k, v.into())).collect())
1454 }
1455
1456 pub(super) fn serialize<S>(elm: &HashMap<RelayId, RelayInfo>, ser: S) -> Result<S::Ok, S::Error>
1457 where
1458 S: Serializer,
1459 {
1460 let mut map = ser.serialize_map(Some(elm.len()))?;
1461
1462 for (k, v) in elm {
1463 map.serialize_entry(k, &RelayInfoConfig::from(v.clone()))?;
1464 }
1465
1466 map.end()
1467 }
1468}
1469
1470#[derive(Serialize, Deserialize, Debug, Default)]
1472pub struct AuthConfig {
1473 #[serde(default, skip_serializing_if = "is_default")]
1475 pub ready: ReadinessCondition,
1476
1477 #[serde(default, with = "config_relay_info")]
1479 pub static_relays: HashMap<RelayId, RelayInfo>,
1480
1481 #[serde(default = "default_max_age")]
1485 pub signature_max_age: u64,
1486}
1487
1488fn default_max_age() -> u64 {
1489 300
1490}
1491
1492#[derive(Serialize, Deserialize, Debug, Default)]
1494pub struct GeoIpConfig {
1495 pub path: Option<PathBuf>,
1497}
1498
1499#[derive(Serialize, Deserialize, Debug)]
1501#[serde(default)]
1502pub struct CardinalityLimiter {
1503 pub cache_vacuum_interval: u64,
1509}
1510
1511impl Default for CardinalityLimiter {
1512 fn default() -> Self {
1513 Self {
1514 cache_vacuum_interval: 180,
1515 }
1516 }
1517}
1518
1519#[derive(Serialize, Deserialize, Debug)]
1524#[serde(default)]
1525pub struct Health {
1526 pub refresh_interval_ms: u64,
1533 pub max_memory_bytes: Option<ByteSize>,
1538 pub max_memory_percent: f32,
1542 pub probe_timeout_ms: u64,
1549 pub memory_stat_refresh_frequency_ms: u64,
1555}
1556
1557impl Default for Health {
1558 fn default() -> Self {
1559 Self {
1560 refresh_interval_ms: 3000,
1561 max_memory_bytes: None,
1562 max_memory_percent: 0.95,
1563 probe_timeout_ms: 900,
1564 memory_stat_refresh_frequency_ms: 100,
1565 }
1566 }
1567}
1568
1569#[derive(Serialize, Deserialize, Debug)]
1571#[serde(default)]
1572pub struct Cogs {
1573 pub max_queue_size: u64,
1579 pub relay_resource_id: String,
1585}
1586
1587impl Default for Cogs {
1588 fn default() -> Self {
1589 Self {
1590 max_queue_size: 10_000,
1591 relay_resource_id: "relay_service".to_owned(),
1592 }
1593 }
1594}
1595
1596#[derive(Serialize, Deserialize, Debug, Default)]
1597struct ConfigValues {
1598 #[serde(default)]
1599 relay: Relay,
1600 #[serde(default)]
1601 http: Http,
1602 #[serde(default)]
1603 cache: Cache,
1604 #[serde(default)]
1605 spool: Spool,
1606 #[serde(default)]
1607 limits: Limits,
1608 #[serde(default)]
1609 logging: relay_log::LogConfig,
1610 #[serde(default)]
1611 routing: Routing,
1612 #[serde(default)]
1613 metrics: Metrics,
1614 #[serde(default)]
1615 sentry_metrics: SentryMetrics,
1616 #[serde(default)]
1617 sentry: relay_log::SentryConfig,
1618 #[serde(default)]
1619 processing: Processing,
1620 #[serde(default)]
1621 outcomes: Outcomes,
1622 #[serde(default)]
1623 aggregator: AggregatorServiceConfig,
1624 #[serde(default)]
1625 secondary_aggregators: Vec<ScopedAggregatorConfig>,
1626 #[serde(default)]
1627 auth: AuthConfig,
1628 #[serde(default)]
1629 geoip: GeoIpConfig,
1630 #[serde(default)]
1631 normalization: Normalization,
1632 #[serde(default)]
1633 cardinality_limiter: CardinalityLimiter,
1634 #[serde(default)]
1635 health: Health,
1636 #[serde(default)]
1637 cogs: Cogs,
1638}
1639
1640impl ConfigObject for ConfigValues {
1641 fn format() -> ConfigFormat {
1642 ConfigFormat::Yaml
1643 }
1644
1645 fn name() -> &'static str {
1646 "config"
1647 }
1648}
1649
1650pub struct Config {
1652 values: ConfigValues,
1653 credentials: Option<Credentials>,
1654 path: PathBuf,
1655}
1656
1657impl fmt::Debug for Config {
1658 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1659 f.debug_struct("Config")
1660 .field("path", &self.path)
1661 .field("values", &self.values)
1662 .finish()
1663 }
1664}
1665
1666impl Config {
1667 pub fn from_path<P: AsRef<Path>>(path: P) -> anyhow::Result<Config> {
1669 let path = env::current_dir()
1670 .map(|x| x.join(path.as_ref()))
1671 .unwrap_or_else(|_| path.as_ref().to_path_buf());
1672
1673 let config = Config {
1674 values: ConfigValues::load(&path)?,
1675 credentials: if Credentials::path(&path).exists() {
1676 Some(Credentials::load(&path)?)
1677 } else {
1678 None
1679 },
1680 path: path.clone(),
1681 };
1682
1683 if cfg!(not(feature = "processing")) && config.processing_enabled() {
1684 return Err(ConfigError::file(ConfigErrorKind::ProcessingNotAvailable, &path).into());
1685 }
1686
1687 Ok(config)
1688 }
1689
1690 pub fn from_json_value(value: serde_json::Value) -> anyhow::Result<Config> {
1694 Ok(Config {
1695 values: serde_json::from_value(value)
1696 .with_context(|| ConfigError::new(ConfigErrorKind::BadJson))?,
1697 credentials: None,
1698 path: PathBuf::new(),
1699 })
1700 }
1701
1702 pub fn apply_override(
1705 &mut self,
1706 mut overrides: OverridableConfig,
1707 ) -> anyhow::Result<&mut Self> {
1708 let relay = &mut self.values.relay;
1709
1710 if let Some(mode) = overrides.mode {
1711 relay.mode = mode
1712 .parse::<RelayMode>()
1713 .with_context(|| ConfigError::field("mode"))?;
1714 }
1715
1716 if let Some(deployment) = overrides.instance {
1717 relay.instance = deployment
1718 .parse::<RelayInstance>()
1719 .with_context(|| ConfigError::field("deployment"))?;
1720 }
1721
1722 if let Some(log_level) = overrides.log_level {
1723 self.values.logging.level = log_level.parse()?;
1724 }
1725
1726 if let Some(log_format) = overrides.log_format {
1727 self.values.logging.format = log_format.parse()?;
1728 }
1729
1730 if let Some(upstream) = overrides.upstream {
1731 relay.upstream = upstream
1732 .parse::<UpstreamDescriptor>()
1733 .with_context(|| ConfigError::field("upstream"))?;
1734 } else if let Some(upstream_dsn) = overrides.upstream_dsn {
1735 relay.upstream = upstream_dsn
1736 .parse::<Dsn>()
1737 .map(|dsn| UpstreamDescriptor::from_dsn(&dsn).into_owned())
1738 .with_context(|| ConfigError::field("upstream_dsn"))?;
1739 }
1740
1741 if let Some(host) = overrides.host {
1742 relay.host = host
1743 .parse::<IpAddr>()
1744 .with_context(|| ConfigError::field("host"))?;
1745 }
1746
1747 if let Some(port) = overrides.port {
1748 relay.port = port
1749 .as_str()
1750 .parse()
1751 .with_context(|| ConfigError::field("port"))?;
1752 }
1753
1754 let processing = &mut self.values.processing;
1755 if let Some(enabled) = overrides.processing {
1756 match enabled.to_lowercase().as_str() {
1757 "true" | "1" => processing.enabled = true,
1758 "false" | "0" | "" => processing.enabled = false,
1759 _ => return Err(ConfigError::field("processing").into()),
1760 }
1761 }
1762
1763 if let Some(redis) = overrides.redis_url {
1764 processing.redis = Some(RedisConfigs::Unified(RedisConfig::single(redis)))
1765 }
1766
1767 if let Some(kafka_url) = overrides.kafka_url {
1768 let existing = processing
1769 .kafka_config
1770 .iter_mut()
1771 .find(|e| e.name == "bootstrap.servers");
1772
1773 if let Some(config_param) = existing {
1774 config_param.value = kafka_url;
1775 } else {
1776 processing.kafka_config.push(KafkaConfigParam {
1777 name: "bootstrap.servers".to_owned(),
1778 value: kafka_url,
1779 })
1780 }
1781 }
1782 let id = if let Some(id) = overrides.id {
1784 let id = Uuid::parse_str(&id).with_context(|| ConfigError::field("id"))?;
1785 Some(id)
1786 } else {
1787 None
1788 };
1789 let public_key = if let Some(public_key) = overrides.public_key {
1790 let public_key = public_key
1791 .parse::<PublicKey>()
1792 .with_context(|| ConfigError::field("public_key"))?;
1793 Some(public_key)
1794 } else {
1795 None
1796 };
1797
1798 let secret_key = if let Some(secret_key) = overrides.secret_key {
1799 let secret_key = secret_key
1800 .parse::<SecretKey>()
1801 .with_context(|| ConfigError::field("secret_key"))?;
1802 Some(secret_key)
1803 } else {
1804 None
1805 };
1806 let outcomes = &mut self.values.outcomes;
1807 if overrides.outcome_source.is_some() {
1808 outcomes.source = overrides.outcome_source.take();
1809 }
1810
1811 if let Some(credentials) = &mut self.credentials {
1812 if let Some(id) = id {
1814 credentials.id = id;
1815 }
1816 if let Some(public_key) = public_key {
1817 credentials.public_key = public_key;
1818 }
1819 if let Some(secret_key) = secret_key {
1820 credentials.secret_key = secret_key
1821 }
1822 } else {
1823 match (id, public_key, secret_key) {
1825 (Some(id), Some(public_key), Some(secret_key)) => {
1826 self.credentials = Some(Credentials {
1827 secret_key,
1828 public_key,
1829 id,
1830 })
1831 }
1832 (None, None, None) => {
1833 }
1836 _ => {
1837 return Err(ConfigError::field("incomplete credentials").into());
1838 }
1839 }
1840 }
1841
1842 let limits = &mut self.values.limits;
1843 if let Some(shutdown_timeout) = overrides.shutdown_timeout
1844 && let Ok(shutdown_timeout) = shutdown_timeout.parse::<u64>()
1845 {
1846 limits.shutdown_timeout = shutdown_timeout;
1847 }
1848
1849 if let Some(server_name) = overrides.server_name {
1850 self.values.sentry.server_name = Some(server_name.into());
1851 }
1852
1853 Ok(self)
1854 }
1855
1856 pub fn config_exists<P: AsRef<Path>>(path: P) -> bool {
1858 fs::metadata(ConfigValues::path(path.as_ref())).is_ok()
1859 }
1860
1861 pub fn path(&self) -> &Path {
1863 &self.path
1864 }
1865
1866 pub fn to_yaml_string(&self) -> anyhow::Result<String> {
1868 serde_yaml::to_string(&self.values)
1869 .with_context(|| ConfigError::new(ConfigErrorKind::CouldNotWriteFile))
1870 }
1871
1872 pub fn regenerate_credentials(&mut self, save: bool) -> anyhow::Result<()> {
1876 let creds = Credentials::generate();
1877 if save {
1878 creds.save(&self.path)?;
1879 }
1880 self.credentials = Some(creds);
1881 Ok(())
1882 }
1883
1884 pub fn credentials(&self) -> Option<&Credentials> {
1886 self.credentials.as_ref()
1887 }
1888
1889 pub fn replace_credentials(
1893 &mut self,
1894 credentials: Option<Credentials>,
1895 ) -> anyhow::Result<bool> {
1896 if self.credentials == credentials {
1897 return Ok(false);
1898 }
1899
1900 match credentials {
1901 Some(ref creds) => {
1902 creds.save(&self.path)?;
1903 }
1904 None => {
1905 let path = Credentials::path(&self.path);
1906 if fs::metadata(&path).is_ok() {
1907 fs::remove_file(&path).with_context(|| {
1908 ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path)
1909 })?;
1910 }
1911 }
1912 }
1913
1914 self.credentials = credentials;
1915 Ok(true)
1916 }
1917
1918 pub fn has_credentials(&self) -> bool {
1920 self.credentials.is_some()
1921 }
1922
1923 pub fn secret_key(&self) -> Option<&SecretKey> {
1925 self.credentials.as_ref().map(|x| &x.secret_key)
1926 }
1927
1928 pub fn public_key(&self) -> Option<&PublicKey> {
1930 self.credentials.as_ref().map(|x| &x.public_key)
1931 }
1932
1933 pub fn relay_id(&self) -> Option<&RelayId> {
1935 self.credentials.as_ref().map(|x| &x.id)
1936 }
1937
1938 pub fn relay_mode(&self) -> RelayMode {
1940 self.values.relay.mode
1941 }
1942
1943 pub fn relay_instance(&self) -> RelayInstance {
1945 self.values.relay.instance
1946 }
1947
1948 pub fn upstream_descriptor(&self) -> &UpstreamDescriptor<'_> {
1950 &self.values.relay.upstream
1951 }
1952
1953 pub fn http_host_header(&self) -> Option<&str> {
1955 self.values.http.host_header.as_deref()
1956 }
1957
1958 pub fn listen_addr(&self) -> SocketAddr {
1960 (self.values.relay.host, self.values.relay.port).into()
1961 }
1962
1963 pub fn tls_listen_addr(&self) -> Option<SocketAddr> {
1965 if self.values.relay.tls_identity_path.is_some() {
1966 let port = self.values.relay.tls_port.unwrap_or(3443);
1967 Some((self.values.relay.host, port).into())
1968 } else {
1969 None
1970 }
1971 }
1972
1973 pub fn tls_identity_path(&self) -> Option<&Path> {
1975 self.values.relay.tls_identity_path.as_deref()
1976 }
1977
1978 pub fn tls_identity_password(&self) -> Option<&str> {
1980 self.values.relay.tls_identity_password.as_deref()
1981 }
1982
1983 pub fn override_project_ids(&self) -> bool {
1987 self.values.relay.override_project_ids
1988 }
1989
1990 pub fn requires_auth(&self) -> bool {
1994 match self.values.auth.ready {
1995 ReadinessCondition::Authenticated => self.relay_mode() == RelayMode::Managed,
1996 ReadinessCondition::Always => false,
1997 }
1998 }
1999
2000 pub fn http_auth_interval(&self) -> Option<Duration> {
2004 if self.processing_enabled() {
2005 return None;
2006 }
2007
2008 match self.values.http.auth_interval {
2009 None | Some(0) => None,
2010 Some(secs) => Some(Duration::from_secs(secs)),
2011 }
2012 }
2013
2014 pub fn http_outage_grace_period(&self) -> Duration {
2017 Duration::from_secs(self.values.http.outage_grace_period)
2018 }
2019
2020 pub fn http_retry_delay(&self) -> Duration {
2025 Duration::from_secs(self.values.http.retry_delay)
2026 }
2027
2028 pub fn http_project_failure_interval(&self) -> Duration {
2030 Duration::from_secs(self.values.http.project_failure_interval)
2031 }
2032
2033 pub fn http_encoding(&self) -> HttpEncoding {
2035 self.values.http.encoding
2036 }
2037
2038 pub fn http_global_metrics(&self) -> bool {
2040 self.values.http.global_metrics
2041 }
2042
2043 pub fn emit_outcomes(&self) -> EmitOutcomes {
2048 if self.processing_enabled() {
2049 return EmitOutcomes::AsOutcomes;
2050 }
2051 self.values.outcomes.emit_outcomes
2052 }
2053
2054 pub fn emit_client_outcomes(&self) -> bool {
2064 self.values.outcomes.emit_client_outcomes
2065 }
2066
2067 pub fn outcome_batch_size(&self) -> usize {
2069 self.values.outcomes.batch_size
2070 }
2071
2072 pub fn outcome_batch_interval(&self) -> Duration {
2074 Duration::from_millis(self.values.outcomes.batch_interval)
2075 }
2076
2077 pub fn outcome_source(&self) -> Option<&str> {
2079 self.values.outcomes.source.as_deref()
2080 }
2081
2082 pub fn outcome_aggregator(&self) -> &OutcomeAggregatorConfig {
2084 &self.values.outcomes.aggregator
2085 }
2086
2087 pub fn logging(&self) -> &relay_log::LogConfig {
2089 &self.values.logging
2090 }
2091
2092 pub fn sentry(&self) -> &relay_log::SentryConfig {
2094 &self.values.sentry
2095 }
2096
2097 pub fn statsd_addrs(&self) -> anyhow::Result<Vec<SocketAddr>> {
2101 if let Some(ref addr) = self.values.metrics.statsd {
2102 let addrs = addr
2103 .as_str()
2104 .to_socket_addrs()
2105 .with_context(|| ConfigError::file(ConfigErrorKind::InvalidValue, &self.path))?
2106 .collect();
2107 Ok(addrs)
2108 } else {
2109 Ok(vec![])
2110 }
2111 }
2112
2113 pub fn metrics_prefix(&self) -> &str {
2115 &self.values.metrics.prefix
2116 }
2117
2118 pub fn metrics_default_tags(&self) -> &BTreeMap<String, String> {
2120 &self.values.metrics.default_tags
2121 }
2122
2123 pub fn metrics_hostname_tag(&self) -> Option<&str> {
2125 self.values.metrics.hostname_tag.as_deref()
2126 }
2127
2128 pub fn metrics_sample_rate(&self) -> f32 {
2130 self.values.metrics.sample_rate
2131 }
2132
2133 pub fn metrics_aggregate(&self) -> bool {
2135 self.values.metrics.aggregate
2136 }
2137
2138 pub fn metrics_allow_high_cardinality_tags(&self) -> bool {
2140 self.values.metrics.allow_high_cardinality_tags
2141 }
2142
2143 pub fn metrics_periodic_interval(&self) -> Option<Duration> {
2147 match self.values.metrics.periodic_secs {
2148 0 => None,
2149 secs => Some(Duration::from_secs(secs)),
2150 }
2151 }
2152
2153 pub fn http_timeout(&self) -> Duration {
2155 Duration::from_secs(self.values.http.timeout.into())
2156 }
2157
2158 pub fn http_connection_timeout(&self) -> Duration {
2160 Duration::from_secs(self.values.http.connection_timeout.into())
2161 }
2162
2163 pub fn http_max_retry_interval(&self) -> Duration {
2165 Duration::from_secs(self.values.http.max_retry_interval.into())
2166 }
2167
2168 pub fn project_cache_expiry(&self) -> Duration {
2170 Duration::from_secs(self.values.cache.project_expiry.into())
2171 }
2172
2173 pub fn request_full_project_config(&self) -> bool {
2175 self.values.cache.project_request_full_config
2176 }
2177
2178 pub fn relay_cache_expiry(&self) -> Duration {
2180 Duration::from_secs(self.values.cache.relay_expiry.into())
2181 }
2182
2183 pub fn envelope_buffer_size(&self) -> usize {
2185 self.values
2186 .cache
2187 .envelope_buffer_size
2188 .try_into()
2189 .unwrap_or(usize::MAX)
2190 }
2191
2192 pub fn cache_miss_expiry(&self) -> Duration {
2194 Duration::from_secs(self.values.cache.miss_expiry.into())
2195 }
2196
2197 pub fn project_grace_period(&self) -> Duration {
2199 Duration::from_secs(self.values.cache.project_grace_period.into())
2200 }
2201
2202 pub fn project_refresh_interval(&self) -> Option<Duration> {
2206 self.values
2207 .cache
2208 .project_refresh_interval
2209 .map(Into::into)
2210 .map(Duration::from_secs)
2211 }
2212
2213 pub fn query_batch_interval(&self) -> Duration {
2216 Duration::from_millis(self.values.cache.batch_interval.into())
2217 }
2218
2219 pub fn downstream_relays_batch_interval(&self) -> Duration {
2221 Duration::from_millis(self.values.cache.downstream_relays_batch_interval.into())
2222 }
2223
2224 pub fn local_cache_interval(&self) -> Duration {
2226 Duration::from_secs(self.values.cache.file_interval.into())
2227 }
2228
2229 pub fn global_config_fetch_interval(&self) -> Duration {
2232 Duration::from_secs(self.values.cache.global_config_fetch_interval.into())
2233 }
2234
2235 pub fn spool_envelopes_path(&self, partition_id: u8) -> Option<PathBuf> {
2240 let mut path = self
2241 .values
2242 .spool
2243 .envelopes
2244 .path
2245 .as_ref()
2246 .map(|path| path.to_owned())?;
2247
2248 if partition_id == 0 {
2249 return Some(path);
2250 }
2251
2252 let file_name = path.file_name().and_then(|f| f.to_str())?;
2253 let new_file_name = format!("{file_name}.{partition_id}");
2254 path.set_file_name(new_file_name);
2255
2256 Some(path)
2257 }
2258
2259 pub fn spool_envelopes_max_disk_size(&self) -> usize {
2261 self.values.spool.envelopes.max_disk_size.as_bytes()
2262 }
2263
2264 pub fn spool_envelopes_batch_size_bytes(&self) -> usize {
2267 self.values.spool.envelopes.batch_size_bytes.as_bytes()
2268 }
2269
2270 pub fn spool_envelopes_max_age(&self) -> Duration {
2272 Duration::from_secs(self.values.spool.envelopes.max_envelope_delay_secs)
2273 }
2274
2275 pub fn spool_disk_usage_refresh_frequency_ms(&self) -> Duration {
2277 Duration::from_millis(self.values.spool.envelopes.disk_usage_refresh_frequency_ms)
2278 }
2279
2280 pub fn spool_max_backpressure_envelopes(&self) -> usize {
2282 self.values.spool.envelopes.max_backpressure_envelopes
2283 }
2284
2285 pub fn spool_max_backpressure_memory_percent(&self) -> f32 {
2287 self.values.spool.envelopes.max_backpressure_memory_percent
2288 }
2289
2290 pub fn spool_partitions(&self) -> NonZeroU8 {
2292 self.values.spool.envelopes.partitions
2293 }
2294
2295 pub fn max_event_size(&self) -> usize {
2297 self.values.limits.max_event_size.as_bytes()
2298 }
2299
2300 pub fn max_attachment_size(&self) -> usize {
2302 self.values.limits.max_attachment_size.as_bytes()
2303 }
2304
2305 pub fn max_attachments_size(&self) -> usize {
2308 self.values.limits.max_attachments_size.as_bytes()
2309 }
2310
2311 pub fn max_client_reports_size(&self) -> usize {
2313 self.values.limits.max_client_reports_size.as_bytes()
2314 }
2315
2316 pub fn max_check_in_size(&self) -> usize {
2318 self.values.limits.max_check_in_size.as_bytes()
2319 }
2320
2321 pub fn max_log_size(&self) -> usize {
2323 self.values.limits.max_log_size.as_bytes()
2324 }
2325
2326 pub fn max_span_size(&self) -> usize {
2328 self.values.limits.max_span_size.as_bytes()
2329 }
2330
2331 pub fn max_container_size(&self) -> usize {
2333 self.values.limits.max_container_size.as_bytes()
2334 }
2335
2336 pub fn max_envelope_size(&self) -> usize {
2340 self.values.limits.max_envelope_size.as_bytes()
2341 }
2342
2343 pub fn max_session_count(&self) -> usize {
2345 self.values.limits.max_session_count
2346 }
2347
2348 pub fn max_span_count(&self) -> usize {
2350 self.values.limits.max_span_count
2351 }
2352
2353 pub fn max_log_count(&self) -> usize {
2355 self.values.limits.max_log_count
2356 }
2357
2358 pub fn max_statsd_size(&self) -> usize {
2360 self.values.limits.max_statsd_size.as_bytes()
2361 }
2362
2363 pub fn max_metric_buckets_size(&self) -> usize {
2365 self.values.limits.max_metric_buckets_size.as_bytes()
2366 }
2367
2368 pub fn metric_stats_enabled(&self) -> bool {
2373 self.values.sentry_metrics.metric_stats_enabled || self.values.processing.enabled
2374 }
2375
2376 pub fn max_api_payload_size(&self) -> usize {
2378 self.values.limits.max_api_payload_size.as_bytes()
2379 }
2380
2381 pub fn max_api_file_upload_size(&self) -> usize {
2383 self.values.limits.max_api_file_upload_size.as_bytes()
2384 }
2385
2386 pub fn max_api_chunk_upload_size(&self) -> usize {
2388 self.values.limits.max_api_chunk_upload_size.as_bytes()
2389 }
2390
2391 pub fn max_profile_size(&self) -> usize {
2393 self.values.limits.max_profile_size.as_bytes()
2394 }
2395
2396 pub fn max_replay_compressed_size(&self) -> usize {
2398 self.values.limits.max_replay_compressed_size.as_bytes()
2399 }
2400
2401 pub fn max_replay_uncompressed_size(&self) -> usize {
2403 self.values.limits.max_replay_uncompressed_size.as_bytes()
2404 }
2405
2406 pub fn max_replay_message_size(&self) -> usize {
2412 self.values.limits.max_replay_message_size.as_bytes()
2413 }
2414
2415 pub fn max_concurrent_requests(&self) -> usize {
2417 self.values.limits.max_concurrent_requests
2418 }
2419
2420 pub fn max_concurrent_queries(&self) -> usize {
2422 self.values.limits.max_concurrent_queries
2423 }
2424
2425 pub fn query_timeout(&self) -> Duration {
2427 Duration::from_secs(self.values.limits.query_timeout)
2428 }
2429
2430 pub fn shutdown_timeout(&self) -> Duration {
2433 Duration::from_secs(self.values.limits.shutdown_timeout)
2434 }
2435
2436 pub fn keepalive_timeout(&self) -> Duration {
2440 Duration::from_secs(self.values.limits.keepalive_timeout)
2441 }
2442
2443 pub fn idle_timeout(&self) -> Option<Duration> {
2445 self.values.limits.idle_timeout.map(Duration::from_secs)
2446 }
2447
2448 pub fn max_connections(&self) -> Option<usize> {
2450 self.values.limits.max_connections
2451 }
2452
2453 pub fn tcp_listen_backlog(&self) -> u32 {
2455 self.values.limits.tcp_listen_backlog
2456 }
2457
2458 pub fn cpu_concurrency(&self) -> usize {
2460 self.values.limits.max_thread_count
2461 }
2462
2463 pub fn pool_concurrency(&self) -> usize {
2465 self.values.limits.max_pool_concurrency
2466 }
2467
2468 pub fn query_batch_size(&self) -> usize {
2470 self.values.cache.batch_size
2471 }
2472
2473 pub fn project_configs_path(&self) -> PathBuf {
2475 self.path.join("projects")
2476 }
2477
2478 pub fn processing_enabled(&self) -> bool {
2480 self.values.processing.enabled
2481 }
2482
2483 pub fn normalization_level(&self) -> NormalizationLevel {
2485 self.values.normalization.level
2486 }
2487
2488 pub fn geoip_path(&self) -> Option<&Path> {
2490 self.values
2491 .geoip
2492 .path
2493 .as_deref()
2494 .or(self.values.processing.geoip_path.as_deref())
2495 }
2496
2497 pub fn max_secs_in_future(&self) -> i64 {
2501 self.values.processing.max_secs_in_future.into()
2502 }
2503
2504 pub fn max_session_secs_in_past(&self) -> i64 {
2506 self.values.processing.max_session_secs_in_past.into()
2507 }
2508
2509 pub fn kafka_configs(
2511 &self,
2512 topic: KafkaTopic,
2513 ) -> Result<KafkaTopicConfig<'_>, KafkaConfigError> {
2514 self.values.processing.topics.get(topic).kafka_configs(
2515 &self.values.processing.kafka_config,
2516 &self.values.processing.secondary_kafka_configs,
2517 )
2518 }
2519
2520 pub fn kafka_validate_topics(&self) -> bool {
2522 self.values.processing.kafka_validate_topics
2523 }
2524
2525 pub fn unused_topic_assignments(&self) -> &relay_kafka::Unused {
2527 &self.values.processing.topics.unused
2528 }
2529
2530 pub fn redis(&self) -> Option<RedisConfigsRef<'_>> {
2533 let redis_configs = self.values.processing.redis.as_ref()?;
2534
2535 Some(build_redis_configs(
2536 redis_configs,
2537 self.cpu_concurrency() as u32,
2538 ))
2539 }
2540
2541 pub fn attachment_chunk_size(&self) -> usize {
2543 self.values.processing.attachment_chunk_size.as_bytes()
2544 }
2545
2546 pub fn metrics_max_batch_size_bytes(&self) -> usize {
2548 self.values.aggregator.max_flush_bytes
2549 }
2550
2551 pub fn projectconfig_cache_prefix(&self) -> &str {
2554 &self.values.processing.projectconfig_cache_prefix
2555 }
2556
2557 pub fn max_rate_limit(&self) -> Option<u64> {
2559 self.values.processing.max_rate_limit.map(u32::into)
2560 }
2561
2562 pub fn cardinality_limiter_cache_vacuum_interval(&self) -> Duration {
2566 Duration::from_secs(self.values.cardinality_limiter.cache_vacuum_interval)
2567 }
2568
2569 pub fn health_refresh_interval(&self) -> Duration {
2571 Duration::from_millis(self.values.health.refresh_interval_ms)
2572 }
2573
2574 pub fn health_max_memory_watermark_bytes(&self) -> u64 {
2576 self.values
2577 .health
2578 .max_memory_bytes
2579 .as_ref()
2580 .map_or(u64::MAX, |b| b.as_bytes() as u64)
2581 }
2582
2583 pub fn health_max_memory_watermark_percent(&self) -> f32 {
2585 self.values.health.max_memory_percent
2586 }
2587
2588 pub fn health_probe_timeout(&self) -> Duration {
2590 Duration::from_millis(self.values.health.probe_timeout_ms)
2591 }
2592
2593 pub fn memory_stat_refresh_frequency_ms(&self) -> u64 {
2595 self.values.health.memory_stat_refresh_frequency_ms
2596 }
2597
2598 pub fn cogs_max_queue_size(&self) -> u64 {
2600 self.values.cogs.max_queue_size
2601 }
2602
2603 pub fn cogs_relay_resource_id(&self) -> &str {
2605 &self.values.cogs.relay_resource_id
2606 }
2607
2608 pub fn default_aggregator_config(&self) -> &AggregatorServiceConfig {
2610 &self.values.aggregator
2611 }
2612
2613 pub fn secondary_aggregator_configs(&self) -> &Vec<ScopedAggregatorConfig> {
2615 &self.values.secondary_aggregators
2616 }
2617
2618 pub fn aggregator_config_for(&self, namespace: MetricNamespace) -> &AggregatorServiceConfig {
2620 for entry in &self.values.secondary_aggregators {
2621 if entry.condition.matches(Some(namespace)) {
2622 return &entry.config;
2623 }
2624 }
2625 &self.values.aggregator
2626 }
2627
2628 pub fn static_relays(&self) -> &HashMap<RelayId, RelayInfo> {
2630 &self.values.auth.static_relays
2631 }
2632
2633 pub fn signature_max_age(&self) -> Duration {
2635 Duration::from_secs(self.values.auth.signature_max_age)
2636 }
2637
2638 pub fn accept_unknown_items(&self) -> bool {
2640 let forward = self.values.routing.accept_unknown_items;
2641 forward.unwrap_or_else(|| !self.processing_enabled())
2642 }
2643
2644 pub fn produce_protobuf_spans(&self) -> bool {
2646 self.values.processing.span_producers.produce_protobuf
2647 }
2648
2649 pub fn produce_json_spans(&self) -> bool {
2651 self.values.processing.span_producers.produce_json
2652 }
2653}
2654
2655impl Default for Config {
2656 fn default() -> Self {
2657 Self {
2658 values: ConfigValues::default(),
2659 credentials: None,
2660 path: PathBuf::new(),
2661 }
2662 }
2663}
2664
2665#[cfg(test)]
2666mod tests {
2667
2668 use super::*;
2669
2670 #[test]
2672 fn test_event_buffer_size() {
2673 let yaml = r###"
2674cache:
2675 event_buffer_size: 1000000
2676 event_expiry: 1800
2677"###;
2678
2679 let values: ConfigValues = serde_yaml::from_str(yaml).unwrap();
2680 assert_eq!(values.cache.envelope_buffer_size, 1_000_000);
2681 assert_eq!(values.cache.envelope_expiry, 1800);
2682 }
2683
2684 #[test]
2685 fn test_emit_outcomes() {
2686 for (serialized, deserialized) in &[
2687 ("true", EmitOutcomes::AsOutcomes),
2688 ("false", EmitOutcomes::None),
2689 ("\"as_client_reports\"", EmitOutcomes::AsClientReports),
2690 ] {
2691 let value: EmitOutcomes = serde_json::from_str(serialized).unwrap();
2692 assert_eq!(value, *deserialized);
2693 assert_eq!(serde_json::to_string(&value).unwrap(), *serialized);
2694 }
2695 }
2696
2697 #[test]
2698 fn test_emit_outcomes_invalid() {
2699 assert!(serde_json::from_str::<EmitOutcomes>("asdf").is_err());
2700 }
2701}