1use std::collections::{BTreeMap, HashMap};
2use std::error::Error;
3use std::io::Write;
4use std::net::{IpAddr, SocketAddr, ToSocketAddrs};
5use std::num::NonZeroU8;
6use std::path::{Path, PathBuf};
7use std::str::FromStr;
8use std::time::Duration;
9use std::{env, fmt, fs, io};
10
11use anyhow::Context;
12use relay_auth::{PublicKey, RelayId, SecretKey, generate_key_pair, generate_relay_id};
13use relay_common::Dsn;
14use relay_kafka::{
15 ConfigError as KafkaConfigError, KafkaConfigParam, KafkaTopic, KafkaTopicConfig,
16 TopicAssignments,
17};
18use relay_metrics::MetricNamespace;
19use serde::de::{DeserializeOwned, Unexpected, Visitor};
20use serde::{Deserialize, Deserializer, Serialize, Serializer};
21use uuid::Uuid;
22
23use crate::aggregator::{AggregatorServiceConfig, ScopedAggregatorConfig};
24use crate::byte_size::ByteSize;
25use crate::upstream::UpstreamDescriptor;
26use crate::{RedisConfig, RedisConfigs, RedisConfigsRef, build_redis_configs};
27
28const DEFAULT_NETWORK_OUTAGE_GRACE_PERIOD: u64 = 10;
29
30static CONFIG_YAML_HEADER: &str = r###"# Please see the relevant documentation.
31# Performance tuning: https://docs.sentry.io/product/relay/operating-guidelines/
32# All config options: https://docs.sentry.io/product/relay/options/
33"###;
34
35#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
37#[non_exhaustive]
38pub enum ConfigErrorKind {
39 CouldNotOpenFile,
41 CouldNotWriteFile,
43 BadYaml,
45 BadJson,
47 InvalidValue,
49 ProcessingNotAvailable,
52}
53
54impl fmt::Display for ConfigErrorKind {
55 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
56 match self {
57 Self::CouldNotOpenFile => write!(f, "could not open config file"),
58 Self::CouldNotWriteFile => write!(f, "could not write config file"),
59 Self::BadYaml => write!(f, "could not parse yaml config file"),
60 Self::BadJson => write!(f, "could not parse json config file"),
61 Self::InvalidValue => write!(f, "invalid config value"),
62 Self::ProcessingNotAvailable => write!(
63 f,
64 "was not compiled with processing, cannot enable processing"
65 ),
66 }
67 }
68}
69
70#[derive(Debug)]
72enum ConfigErrorSource {
73 None,
75 File(PathBuf),
77 FieldOverride(String),
79}
80
81impl Default for ConfigErrorSource {
82 fn default() -> Self {
83 Self::None
84 }
85}
86
87impl fmt::Display for ConfigErrorSource {
88 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
89 match self {
90 ConfigErrorSource::None => Ok(()),
91 ConfigErrorSource::File(file_name) => {
92 write!(f, " (file {})", file_name.display())
93 }
94 ConfigErrorSource::FieldOverride(name) => write!(f, " (field {name})"),
95 }
96 }
97}
98
99#[derive(Debug)]
101pub struct ConfigError {
102 source: ConfigErrorSource,
103 kind: ConfigErrorKind,
104}
105
106impl ConfigError {
107 #[inline]
108 fn new(kind: ConfigErrorKind) -> Self {
109 Self {
110 source: ConfigErrorSource::None,
111 kind,
112 }
113 }
114
115 #[inline]
116 fn field(field: &'static str) -> Self {
117 Self {
118 source: ConfigErrorSource::FieldOverride(field.to_owned()),
119 kind: ConfigErrorKind::InvalidValue,
120 }
121 }
122
123 #[inline]
124 fn file(kind: ConfigErrorKind, p: impl AsRef<Path>) -> Self {
125 Self {
126 source: ConfigErrorSource::File(p.as_ref().to_path_buf()),
127 kind,
128 }
129 }
130
131 pub fn kind(&self) -> ConfigErrorKind {
133 self.kind
134 }
135}
136
137impl fmt::Display for ConfigError {
138 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
139 write!(f, "{}{}", self.kind(), self.source)
140 }
141}
142
143impl Error for ConfigError {}
144
145enum ConfigFormat {
146 Yaml,
147 Json,
148}
149
150impl ConfigFormat {
151 pub fn extension(&self) -> &'static str {
152 match self {
153 ConfigFormat::Yaml => "yml",
154 ConfigFormat::Json => "json",
155 }
156 }
157}
158
159trait ConfigObject: DeserializeOwned + Serialize {
160 fn format() -> ConfigFormat;
162
163 fn name() -> &'static str;
165
166 fn path(base: &Path) -> PathBuf {
168 base.join(format!("{}.{}", Self::name(), Self::format().extension()))
169 }
170
171 fn load(base: &Path) -> anyhow::Result<Self> {
173 let path = Self::path(base);
174
175 let f = fs::File::open(&path)
176 .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotOpenFile, &path))?;
177 let f = io::BufReader::new(f);
178
179 let mut source = serde_vars::EnvSource::default();
180 match Self::format() {
181 ConfigFormat::Yaml => {
182 serde_vars::deserialize(serde_yaml::Deserializer::from_reader(f), &mut source)
183 .with_context(|| ConfigError::file(ConfigErrorKind::BadYaml, &path))
184 }
185 ConfigFormat::Json => {
186 serde_vars::deserialize(&mut serde_json::Deserializer::from_reader(f), &mut source)
187 .with_context(|| ConfigError::file(ConfigErrorKind::BadJson, &path))
188 }
189 }
190 }
191
192 fn save(&self, base: &Path) -> anyhow::Result<()> {
194 let path = Self::path(base);
195 let mut options = fs::OpenOptions::new();
196 options.write(true).truncate(true).create(true);
197
198 #[cfg(unix)]
200 {
201 use std::os::unix::fs::OpenOptionsExt;
202 options.mode(0o600);
203 }
204
205 let mut f = options
206 .open(&path)
207 .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path))?;
208
209 match Self::format() {
210 ConfigFormat::Yaml => {
211 f.write_all(CONFIG_YAML_HEADER.as_bytes())?;
212 serde_yaml::to_writer(&mut f, self)
213 .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path))?
214 }
215 ConfigFormat::Json => serde_json::to_writer_pretty(&mut f, self)
216 .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path))?,
217 }
218
219 f.write_all(b"\n").ok();
220
221 Ok(())
222 }
223}
224
225#[derive(Debug, Default)]
228pub struct OverridableConfig {
229 pub mode: Option<String>,
231 pub instance: Option<String>,
233 pub log_level: Option<String>,
235 pub log_format: Option<String>,
237 pub upstream: Option<String>,
239 pub upstream_dsn: Option<String>,
241 pub host: Option<String>,
243 pub port: Option<String>,
245 pub processing: Option<String>,
247 pub kafka_url: Option<String>,
249 pub redis_url: Option<String>,
251 pub id: Option<String>,
253 pub secret_key: Option<String>,
255 pub public_key: Option<String>,
257 pub outcome_source: Option<String>,
259 pub shutdown_timeout: Option<String>,
261 pub server_name: Option<String>,
263}
264
265#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
267pub struct Credentials {
268 pub secret_key: SecretKey,
270 pub public_key: PublicKey,
272 pub id: RelayId,
274}
275
276impl Credentials {
277 pub fn generate() -> Self {
279 relay_log::info!("generating new relay credentials");
280 let (sk, pk) = generate_key_pair();
281 Self {
282 secret_key: sk,
283 public_key: pk,
284 id: generate_relay_id(),
285 }
286 }
287
288 pub fn to_json_string(&self) -> anyhow::Result<String> {
290 serde_json::to_string(self)
291 .with_context(|| ConfigError::new(ConfigErrorKind::CouldNotWriteFile))
292 }
293}
294
295impl ConfigObject for Credentials {
296 fn format() -> ConfigFormat {
297 ConfigFormat::Json
298 }
299 fn name() -> &'static str {
300 "credentials"
301 }
302}
303
304#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
306#[serde(rename_all = "camelCase")]
307pub struct RelayInfo {
308 pub public_key: PublicKey,
310
311 #[serde(default)]
313 pub internal: bool,
314}
315
316impl RelayInfo {
317 pub fn new(public_key: PublicKey) -> Self {
319 Self {
320 public_key,
321 internal: false,
322 }
323 }
324}
325
326#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize)]
328#[serde(rename_all = "camelCase")]
329pub enum RelayMode {
330 Proxy,
336
337 Static,
342
343 Managed,
349
350 Capture,
354}
355
356impl fmt::Display for RelayMode {
357 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
358 match self {
359 RelayMode::Proxy => write!(f, "proxy"),
360 RelayMode::Static => write!(f, "static"),
361 RelayMode::Managed => write!(f, "managed"),
362 RelayMode::Capture => write!(f, "capture"),
363 }
364 }
365}
366
367#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize)]
369#[serde(rename_all = "camelCase")]
370pub enum RelayInstance {
371 Default,
373
374 Canary,
376}
377
378impl RelayInstance {
379 pub fn is_canary(&self) -> bool {
381 matches!(self, RelayInstance::Canary)
382 }
383}
384
385impl fmt::Display for RelayInstance {
386 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
387 match self {
388 RelayInstance::Default => write!(f, "default"),
389 RelayInstance::Canary => write!(f, "canary"),
390 }
391 }
392}
393
394impl FromStr for RelayInstance {
395 type Err = fmt::Error;
396
397 fn from_str(s: &str) -> Result<Self, Self::Err> {
398 match s {
399 "canary" => Ok(RelayInstance::Canary),
400 _ => Ok(RelayInstance::Default),
401 }
402 }
403}
404
405#[derive(Clone, Copy, Debug, Eq, PartialEq)]
407pub struct ParseRelayModeError;
408
409impl fmt::Display for ParseRelayModeError {
410 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
411 write!(
412 f,
413 "Relay mode must be one of: managed, static, proxy, capture"
414 )
415 }
416}
417
418impl Error for ParseRelayModeError {}
419
420impl FromStr for RelayMode {
421 type Err = ParseRelayModeError;
422
423 fn from_str(s: &str) -> Result<Self, Self::Err> {
424 match s {
425 "proxy" => Ok(RelayMode::Proxy),
426 "static" => Ok(RelayMode::Static),
427 "managed" => Ok(RelayMode::Managed),
428 "capture" => Ok(RelayMode::Capture),
429 _ => Err(ParseRelayModeError),
430 }
431 }
432}
433
434fn is_default<T: Default + PartialEq>(t: &T) -> bool {
436 *t == T::default()
437}
438
439fn is_docker() -> bool {
441 if fs::metadata("/.dockerenv").is_ok() {
442 return true;
443 }
444
445 fs::read_to_string("/proc/self/cgroup").is_ok_and(|s| s.contains("/docker"))
446}
447
448fn default_host() -> IpAddr {
450 if is_docker() {
451 "0.0.0.0".parse().unwrap()
453 } else {
454 "127.0.0.1".parse().unwrap()
455 }
456}
457
458#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize)]
462#[serde(rename_all = "lowercase")]
463pub enum ReadinessCondition {
464 Authenticated,
473 Always,
475}
476
477impl Default for ReadinessCondition {
478 fn default() -> Self {
479 Self::Authenticated
480 }
481}
482
483#[derive(Serialize, Deserialize, Debug)]
485#[serde(default)]
486pub struct Relay {
487 pub mode: RelayMode,
489 pub instance: RelayInstance,
491 pub upstream: UpstreamDescriptor<'static>,
493 pub host: IpAddr,
495 pub port: u16,
497 #[serde(skip_serializing)]
499 pub tls_port: Option<u16>,
500 #[serde(skip_serializing)]
502 pub tls_identity_path: Option<PathBuf>,
503 #[serde(skip_serializing)]
505 pub tls_identity_password: Option<String>,
506 #[serde(skip_serializing_if = "is_default")]
511 pub override_project_ids: bool,
512}
513
514impl Default for Relay {
515 fn default() -> Self {
516 Relay {
517 mode: RelayMode::Managed,
518 instance: RelayInstance::Default,
519 upstream: "https://sentry.io/".parse().unwrap(),
520 host: default_host(),
521 port: 3000,
522 tls_port: None,
523 tls_identity_path: None,
524 tls_identity_password: None,
525 override_project_ids: false,
526 }
527 }
528}
529
530#[derive(Serialize, Deserialize, Debug)]
532#[serde(default)]
533pub struct Metrics {
534 pub statsd: Option<String>,
538 pub prefix: String,
542 pub default_tags: BTreeMap<String, String>,
544 pub hostname_tag: Option<String>,
546 pub sample_rate: f32,
551 pub periodic_secs: u64,
556 pub aggregate: bool,
560 pub allow_high_cardinality_tags: bool,
568}
569
570impl Default for Metrics {
571 fn default() -> Self {
572 Metrics {
573 statsd: None,
574 prefix: "sentry.relay".into(),
575 default_tags: BTreeMap::new(),
576 hostname_tag: None,
577 sample_rate: 1.0,
578 periodic_secs: 5,
579 aggregate: true,
580 allow_high_cardinality_tags: false,
581 }
582 }
583}
584
585#[derive(Serialize, Deserialize, Debug, Default)]
587#[serde(default)]
588pub struct SentryMetrics {
589 pub metric_stats_enabled: bool,
600}
601
602#[derive(Serialize, Deserialize, Debug)]
604#[serde(default)]
605pub struct Limits {
606 pub max_concurrent_requests: usize,
609 pub max_concurrent_queries: usize,
614 pub max_event_size: ByteSize,
616 pub max_attachment_size: ByteSize,
618 pub max_attachments_size: ByteSize,
620 pub max_client_reports_size: ByteSize,
622 pub max_check_in_size: ByteSize,
624 pub max_envelope_size: ByteSize,
626 pub max_session_count: usize,
628 pub max_span_count: usize,
630 pub max_log_count: usize,
632 pub max_api_payload_size: ByteSize,
634 pub max_api_file_upload_size: ByteSize,
636 pub max_api_chunk_upload_size: ByteSize,
638 pub max_profile_size: ByteSize,
640 pub max_log_size: ByteSize,
642 pub max_span_size: ByteSize,
644 pub max_container_size: ByteSize,
646 pub max_statsd_size: ByteSize,
648 pub max_metric_buckets_size: ByteSize,
650 pub max_replay_compressed_size: ByteSize,
652 #[serde(alias = "max_replay_size")]
654 max_replay_uncompressed_size: ByteSize,
655 pub max_replay_message_size: ByteSize,
657 pub max_thread_count: usize,
662 pub max_pool_concurrency: usize,
669 pub query_timeout: u64,
672 pub shutdown_timeout: u64,
675 pub keepalive_timeout: u64,
679 pub idle_timeout: Option<u64>,
686 pub max_connections: Option<usize>,
692 pub tcp_listen_backlog: u32,
700}
701
702impl Default for Limits {
703 fn default() -> Self {
704 Limits {
705 max_concurrent_requests: 100,
706 max_concurrent_queries: 5,
707 max_event_size: ByteSize::mebibytes(1),
708 max_attachment_size: ByteSize::mebibytes(100),
709 max_attachments_size: ByteSize::mebibytes(100),
710 max_client_reports_size: ByteSize::kibibytes(4),
711 max_check_in_size: ByteSize::kibibytes(100),
712 max_envelope_size: ByteSize::mebibytes(100),
713 max_session_count: 100,
714 max_span_count: 1000,
715 max_log_count: 1000,
716 max_api_payload_size: ByteSize::mebibytes(20),
717 max_api_file_upload_size: ByteSize::mebibytes(40),
718 max_api_chunk_upload_size: ByteSize::mebibytes(100),
719 max_profile_size: ByteSize::mebibytes(50),
720 max_log_size: ByteSize::mebibytes(1),
721 max_span_size: ByteSize::mebibytes(1),
722 max_container_size: ByteSize::mebibytes(3),
723 max_statsd_size: ByteSize::mebibytes(1),
724 max_metric_buckets_size: ByteSize::mebibytes(1),
725 max_replay_compressed_size: ByteSize::mebibytes(10),
726 max_replay_uncompressed_size: ByteSize::mebibytes(100),
727 max_replay_message_size: ByteSize::mebibytes(15),
728 max_thread_count: num_cpus::get(),
729 max_pool_concurrency: 1,
730 query_timeout: 30,
731 shutdown_timeout: 10,
732 keepalive_timeout: 5,
733 idle_timeout: None,
734 max_connections: None,
735 tcp_listen_backlog: 1024,
736 }
737 }
738}
739
740#[derive(Debug, Default, Deserialize, Serialize)]
742#[serde(default)]
743pub struct Routing {
744 pub accept_unknown_items: Option<bool>,
754}
755
756#[derive(Clone, Copy, Debug, Default, Deserialize, Serialize)]
758#[serde(rename_all = "lowercase")]
759pub enum HttpEncoding {
760 #[default]
765 Identity,
766 Deflate,
772 Gzip,
779 Br,
781 Zstd,
783}
784
785impl HttpEncoding {
786 pub fn parse(str: &str) -> Self {
788 let str = str.trim();
789 if str.eq_ignore_ascii_case("zstd") {
790 Self::Zstd
791 } else if str.eq_ignore_ascii_case("br") {
792 Self::Br
793 } else if str.eq_ignore_ascii_case("gzip") || str.eq_ignore_ascii_case("x-gzip") {
794 Self::Gzip
795 } else if str.eq_ignore_ascii_case("deflate") {
796 Self::Deflate
797 } else {
798 Self::Identity
799 }
800 }
801
802 pub fn name(&self) -> Option<&'static str> {
806 match self {
807 Self::Identity => None,
808 Self::Deflate => Some("deflate"),
809 Self::Gzip => Some("gzip"),
810 Self::Br => Some("br"),
811 Self::Zstd => Some("zstd"),
812 }
813 }
814}
815
816#[derive(Serialize, Deserialize, Debug)]
818#[serde(default)]
819pub struct Http {
820 pub timeout: u32,
826 pub connection_timeout: u32,
831 pub max_retry_interval: u32,
833 pub host_header: Option<String>,
835 pub auth_interval: Option<u64>,
843 pub outage_grace_period: u64,
849 pub retry_delay: u64,
853 pub project_failure_interval: u64,
858 pub encoding: HttpEncoding,
874 pub global_metrics: bool,
881}
882
883impl Default for Http {
884 fn default() -> Self {
885 Http {
886 timeout: 5,
887 connection_timeout: 3,
888 max_retry_interval: 60, host_header: None,
890 auth_interval: Some(600), outage_grace_period: DEFAULT_NETWORK_OUTAGE_GRACE_PERIOD,
892 retry_delay: default_retry_delay(),
893 project_failure_interval: default_project_failure_interval(),
894 encoding: HttpEncoding::Zstd,
895 global_metrics: false,
896 }
897 }
898}
899
900fn default_retry_delay() -> u64 {
902 1
903}
904
905fn default_project_failure_interval() -> u64 {
907 90
908}
909
910fn spool_envelopes_max_disk_size() -> ByteSize {
912 ByteSize::mebibytes(500)
913}
914
915fn spool_envelopes_batch_size_bytes() -> ByteSize {
917 ByteSize::kibibytes(10)
918}
919
920fn spool_envelopes_max_envelope_delay_secs() -> u64 {
921 24 * 60 * 60
922}
923
924fn spool_disk_usage_refresh_frequency_ms() -> u64 {
926 100
927}
928
929fn spool_max_backpressure_envelopes() -> usize {
931 500
932}
933
934fn spool_max_backpressure_memory_percent() -> f32 {
936 0.9
937}
938
939fn spool_envelopes_partitions() -> NonZeroU8 {
941 NonZeroU8::new(1).unwrap()
942}
943
944#[derive(Debug, Serialize, Deserialize)]
946pub struct EnvelopeSpool {
947 pub path: Option<PathBuf>,
953 #[serde(default = "spool_envelopes_max_disk_size")]
959 pub max_disk_size: ByteSize,
960 #[serde(default = "spool_envelopes_batch_size_bytes")]
967 pub batch_size_bytes: ByteSize,
968 #[serde(default = "spool_envelopes_max_envelope_delay_secs")]
975 pub max_envelope_delay_secs: u64,
976 #[serde(default = "spool_disk_usage_refresh_frequency_ms")]
981 pub disk_usage_refresh_frequency_ms: u64,
982 #[serde(default = "spool_max_backpressure_envelopes")]
986 pub max_backpressure_envelopes: usize,
987 #[serde(default = "spool_max_backpressure_memory_percent")]
1017 pub max_backpressure_memory_percent: f32,
1018 #[serde(default = "spool_envelopes_partitions")]
1025 pub partitions: NonZeroU8,
1026}
1027
1028impl Default for EnvelopeSpool {
1029 fn default() -> Self {
1030 Self {
1031 path: None,
1032 max_disk_size: spool_envelopes_max_disk_size(),
1033 batch_size_bytes: spool_envelopes_batch_size_bytes(),
1034 max_envelope_delay_secs: spool_envelopes_max_envelope_delay_secs(),
1035 disk_usage_refresh_frequency_ms: spool_disk_usage_refresh_frequency_ms(),
1036 max_backpressure_envelopes: spool_max_backpressure_envelopes(),
1037 max_backpressure_memory_percent: spool_max_backpressure_memory_percent(),
1038 partitions: spool_envelopes_partitions(),
1039 }
1040 }
1041}
1042
1043#[derive(Debug, Serialize, Deserialize, Default)]
1045pub struct Spool {
1046 #[serde(default)]
1048 pub envelopes: EnvelopeSpool,
1049}
1050
1051#[derive(Serialize, Deserialize, Debug)]
1053#[serde(default)]
1054pub struct Cache {
1055 pub project_request_full_config: bool,
1057 pub project_expiry: u32,
1059 pub project_grace_period: u32,
1064 pub project_refresh_interval: Option<u32>,
1070 pub relay_expiry: u32,
1072 #[serde(alias = "event_expiry")]
1078 envelope_expiry: u32,
1079 #[serde(alias = "event_buffer_size")]
1081 envelope_buffer_size: u32,
1082 pub miss_expiry: u32,
1084 pub batch_interval: u32,
1086 pub downstream_relays_batch_interval: u32,
1088 pub batch_size: usize,
1092 pub file_interval: u32,
1094 pub global_config_fetch_interval: u32,
1096}
1097
1098impl Default for Cache {
1099 fn default() -> Self {
1100 Cache {
1101 project_request_full_config: false,
1102 project_expiry: 300, project_grace_period: 120, project_refresh_interval: None,
1105 relay_expiry: 3600, envelope_expiry: 600, envelope_buffer_size: 1000,
1108 miss_expiry: 60, batch_interval: 100, downstream_relays_batch_interval: 100, batch_size: 500,
1112 file_interval: 10, global_config_fetch_interval: 10, }
1115 }
1116}
1117
1118fn default_max_secs_in_future() -> u32 {
1119 60 }
1121
1122fn default_max_session_secs_in_past() -> u32 {
1123 5 * 24 * 3600 }
1125
1126fn default_chunk_size() -> ByteSize {
1127 ByteSize::mebibytes(1)
1128}
1129
1130fn default_projectconfig_cache_prefix() -> String {
1131 "relayconfig".to_owned()
1132}
1133
1134#[allow(clippy::unnecessary_wraps)]
1135fn default_max_rate_limit() -> Option<u32> {
1136 Some(300) }
1138
1139#[derive(Serialize, Deserialize, Debug)]
1141pub struct Processing {
1142 pub enabled: bool,
1144 #[serde(default)]
1146 pub geoip_path: Option<PathBuf>,
1147 #[serde(default = "default_max_secs_in_future")]
1149 pub max_secs_in_future: u32,
1150 #[serde(default = "default_max_session_secs_in_past")]
1152 pub max_session_secs_in_past: u32,
1153 pub kafka_config: Vec<KafkaConfigParam>,
1155 #[serde(default)]
1157 pub span_producers: SpanProducers,
1158 #[serde(default)]
1178 pub secondary_kafka_configs: BTreeMap<String, Vec<KafkaConfigParam>>,
1179 #[serde(default)]
1181 pub topics: TopicAssignments,
1182 #[serde(default)]
1184 pub kafka_validate_topics: bool,
1185 #[serde(default)]
1187 pub redis: Option<RedisConfigs>,
1188 #[serde(default = "default_chunk_size")]
1190 pub attachment_chunk_size: ByteSize,
1191 #[serde(default = "default_projectconfig_cache_prefix")]
1193 pub projectconfig_cache_prefix: String,
1194 #[serde(default = "default_max_rate_limit")]
1196 pub max_rate_limit: Option<u32>,
1197}
1198
1199impl Default for Processing {
1200 fn default() -> Self {
1202 Self {
1203 enabled: false,
1204 geoip_path: None,
1205 max_secs_in_future: default_max_secs_in_future(),
1206 max_session_secs_in_past: default_max_session_secs_in_past(),
1207 kafka_config: Vec::new(),
1208 secondary_kafka_configs: BTreeMap::new(),
1209 topics: TopicAssignments::default(),
1210 kafka_validate_topics: false,
1211 redis: None,
1212 attachment_chunk_size: default_chunk_size(),
1213 projectconfig_cache_prefix: default_projectconfig_cache_prefix(),
1214 max_rate_limit: default_max_rate_limit(),
1215 span_producers: Default::default(),
1216 }
1217 }
1218}
1219
1220#[derive(Debug, Serialize, Deserialize)]
1222#[serde(default)]
1223pub struct SpanProducers {
1224 pub produce_json: bool,
1226 pub produce_protobuf: bool,
1228}
1229
1230impl Default for SpanProducers {
1231 fn default() -> Self {
1232 Self {
1233 produce_json: true,
1234 produce_protobuf: false,
1235 }
1236 }
1237}
1238
1239#[derive(Debug, Default, Serialize, Deserialize)]
1241#[serde(default)]
1242pub struct Normalization {
1243 #[serde(default)]
1245 pub level: NormalizationLevel,
1246}
1247
1248#[derive(Copy, Clone, Debug, Default, Serialize, Deserialize, Eq, PartialEq)]
1250#[serde(rename_all = "lowercase")]
1251pub enum NormalizationLevel {
1252 #[default]
1256 Default,
1257 Full,
1262}
1263
1264#[derive(Serialize, Deserialize, Debug)]
1266#[serde(default)]
1267pub struct OutcomeAggregatorConfig {
1268 pub bucket_interval: u64,
1270 pub flush_interval: u64,
1272}
1273
1274impl Default for OutcomeAggregatorConfig {
1275 fn default() -> Self {
1276 Self {
1277 bucket_interval: 60,
1278 flush_interval: 120,
1279 }
1280 }
1281}
1282
1283#[derive(Copy, Clone, Debug, PartialEq, Eq)]
1286
1287pub enum EmitOutcomes {
1288 None,
1290 AsClientReports,
1292 AsOutcomes,
1294}
1295
1296impl EmitOutcomes {
1297 pub fn any(&self) -> bool {
1299 !matches!(self, EmitOutcomes::None)
1300 }
1301}
1302
1303impl Serialize for EmitOutcomes {
1304 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1305 where
1306 S: Serializer,
1307 {
1308 match self {
1310 Self::None => serializer.serialize_bool(false),
1311 Self::AsClientReports => serializer.serialize_str("as_client_reports"),
1312 Self::AsOutcomes => serializer.serialize_bool(true),
1313 }
1314 }
1315}
1316
1317struct EmitOutcomesVisitor;
1318
1319impl Visitor<'_> for EmitOutcomesVisitor {
1320 type Value = EmitOutcomes;
1321
1322 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
1323 formatter.write_str("true, false, or 'as_client_reports'")
1324 }
1325
1326 fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E>
1327 where
1328 E: serde::de::Error,
1329 {
1330 Ok(if v {
1331 EmitOutcomes::AsOutcomes
1332 } else {
1333 EmitOutcomes::None
1334 })
1335 }
1336
1337 fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
1338 where
1339 E: serde::de::Error,
1340 {
1341 if v == "as_client_reports" {
1342 Ok(EmitOutcomes::AsClientReports)
1343 } else {
1344 Err(E::invalid_value(Unexpected::Str(v), &"as_client_reports"))
1345 }
1346 }
1347}
1348
1349impl<'de> Deserialize<'de> for EmitOutcomes {
1350 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1351 where
1352 D: Deserializer<'de>,
1353 {
1354 deserializer.deserialize_any(EmitOutcomesVisitor)
1355 }
1356}
1357
1358#[derive(Serialize, Deserialize, Debug)]
1360#[serde(default)]
1361pub struct Outcomes {
1362 pub emit_outcomes: EmitOutcomes,
1366 pub emit_client_outcomes: bool,
1368 pub batch_size: usize,
1371 pub batch_interval: u64,
1374 pub source: Option<String>,
1377 pub aggregator: OutcomeAggregatorConfig,
1379}
1380
1381impl Default for Outcomes {
1382 fn default() -> Self {
1383 Outcomes {
1384 emit_outcomes: EmitOutcomes::AsClientReports,
1385 emit_client_outcomes: true,
1386 batch_size: 1000,
1387 batch_interval: 500,
1388 source: None,
1389 aggregator: OutcomeAggregatorConfig::default(),
1390 }
1391 }
1392}
1393
1394#[derive(Serialize, Deserialize, Debug, Default)]
1396pub struct MinimalConfig {
1397 pub relay: Relay,
1399}
1400
1401impl MinimalConfig {
1402 pub fn save_in_folder<P: AsRef<Path>>(&self, p: P) -> anyhow::Result<()> {
1404 let path = p.as_ref();
1405 if fs::metadata(path).is_err() {
1406 fs::create_dir_all(path)
1407 .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotOpenFile, path))?;
1408 }
1409 self.save(path)
1410 }
1411}
1412
1413impl ConfigObject for MinimalConfig {
1414 fn format() -> ConfigFormat {
1415 ConfigFormat::Yaml
1416 }
1417
1418 fn name() -> &'static str {
1419 "config"
1420 }
1421}
1422
1423mod config_relay_info {
1425 use serde::ser::SerializeMap;
1426
1427 use super::*;
1428
1429 #[derive(Debug, Serialize, Deserialize, Clone)]
1431 struct RelayInfoConfig {
1432 public_key: PublicKey,
1433 #[serde(default)]
1434 internal: bool,
1435 }
1436
1437 impl From<RelayInfoConfig> for RelayInfo {
1438 fn from(v: RelayInfoConfig) -> Self {
1439 RelayInfo {
1440 public_key: v.public_key,
1441 internal: v.internal,
1442 }
1443 }
1444 }
1445
1446 impl From<RelayInfo> for RelayInfoConfig {
1447 fn from(v: RelayInfo) -> Self {
1448 RelayInfoConfig {
1449 public_key: v.public_key,
1450 internal: v.internal,
1451 }
1452 }
1453 }
1454
1455 pub(super) fn deserialize<'de, D>(des: D) -> Result<HashMap<RelayId, RelayInfo>, D::Error>
1456 where
1457 D: Deserializer<'de>,
1458 {
1459 let map = HashMap::<RelayId, RelayInfoConfig>::deserialize(des)?;
1460 Ok(map.into_iter().map(|(k, v)| (k, v.into())).collect())
1461 }
1462
1463 pub(super) fn serialize<S>(elm: &HashMap<RelayId, RelayInfo>, ser: S) -> Result<S::Ok, S::Error>
1464 where
1465 S: Serializer,
1466 {
1467 let mut map = ser.serialize_map(Some(elm.len()))?;
1468
1469 for (k, v) in elm {
1470 map.serialize_entry(k, &RelayInfoConfig::from(v.clone()))?;
1471 }
1472
1473 map.end()
1474 }
1475}
1476
1477#[derive(Serialize, Deserialize, Debug, Default)]
1479pub struct AuthConfig {
1480 #[serde(default, skip_serializing_if = "is_default")]
1482 pub ready: ReadinessCondition,
1483
1484 #[serde(default, with = "config_relay_info")]
1486 pub static_relays: HashMap<RelayId, RelayInfo>,
1487}
1488
1489#[derive(Serialize, Deserialize, Debug, Default)]
1491pub struct GeoIpConfig {
1492 pub path: Option<PathBuf>,
1494}
1495
1496#[derive(Serialize, Deserialize, Debug)]
1498#[serde(default)]
1499pub struct CardinalityLimiter {
1500 pub cache_vacuum_interval: u64,
1506}
1507
1508impl Default for CardinalityLimiter {
1509 fn default() -> Self {
1510 Self {
1511 cache_vacuum_interval: 180,
1512 }
1513 }
1514}
1515
1516#[derive(Serialize, Deserialize, Debug)]
1521#[serde(default)]
1522pub struct Health {
1523 pub refresh_interval_ms: u64,
1530 pub max_memory_bytes: Option<ByteSize>,
1535 pub max_memory_percent: f32,
1539 pub probe_timeout_ms: u64,
1546 pub memory_stat_refresh_frequency_ms: u64,
1552}
1553
1554impl Default for Health {
1555 fn default() -> Self {
1556 Self {
1557 refresh_interval_ms: 3000,
1558 max_memory_bytes: None,
1559 max_memory_percent: 0.95,
1560 probe_timeout_ms: 900,
1561 memory_stat_refresh_frequency_ms: 100,
1562 }
1563 }
1564}
1565
1566#[derive(Serialize, Deserialize, Debug)]
1568#[serde(default)]
1569pub struct Cogs {
1570 pub max_queue_size: u64,
1576 pub relay_resource_id: String,
1582}
1583
1584impl Default for Cogs {
1585 fn default() -> Self {
1586 Self {
1587 max_queue_size: 10_000,
1588 relay_resource_id: "relay_service".to_owned(),
1589 }
1590 }
1591}
1592
1593#[derive(Serialize, Deserialize, Debug, Default)]
1594struct ConfigValues {
1595 #[serde(default)]
1596 relay: Relay,
1597 #[serde(default)]
1598 http: Http,
1599 #[serde(default)]
1600 cache: Cache,
1601 #[serde(default)]
1602 spool: Spool,
1603 #[serde(default)]
1604 limits: Limits,
1605 #[serde(default)]
1606 logging: relay_log::LogConfig,
1607 #[serde(default)]
1608 routing: Routing,
1609 #[serde(default)]
1610 metrics: Metrics,
1611 #[serde(default)]
1612 sentry_metrics: SentryMetrics,
1613 #[serde(default)]
1614 sentry: relay_log::SentryConfig,
1615 #[serde(default)]
1616 processing: Processing,
1617 #[serde(default)]
1618 outcomes: Outcomes,
1619 #[serde(default)]
1620 aggregator: AggregatorServiceConfig,
1621 #[serde(default)]
1622 secondary_aggregators: Vec<ScopedAggregatorConfig>,
1623 #[serde(default)]
1624 auth: AuthConfig,
1625 #[serde(default)]
1626 geoip: GeoIpConfig,
1627 #[serde(default)]
1628 normalization: Normalization,
1629 #[serde(default)]
1630 cardinality_limiter: CardinalityLimiter,
1631 #[serde(default)]
1632 health: Health,
1633 #[serde(default)]
1634 cogs: Cogs,
1635}
1636
1637impl ConfigObject for ConfigValues {
1638 fn format() -> ConfigFormat {
1639 ConfigFormat::Yaml
1640 }
1641
1642 fn name() -> &'static str {
1643 "config"
1644 }
1645}
1646
1647pub struct Config {
1649 values: ConfigValues,
1650 credentials: Option<Credentials>,
1651 path: PathBuf,
1652}
1653
1654impl fmt::Debug for Config {
1655 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1656 f.debug_struct("Config")
1657 .field("path", &self.path)
1658 .field("values", &self.values)
1659 .finish()
1660 }
1661}
1662
1663impl Config {
1664 pub fn from_path<P: AsRef<Path>>(path: P) -> anyhow::Result<Config> {
1666 let path = env::current_dir()
1667 .map(|x| x.join(path.as_ref()))
1668 .unwrap_or_else(|_| path.as_ref().to_path_buf());
1669
1670 let config = Config {
1671 values: ConfigValues::load(&path)?,
1672 credentials: if Credentials::path(&path).exists() {
1673 Some(Credentials::load(&path)?)
1674 } else {
1675 None
1676 },
1677 path: path.clone(),
1678 };
1679
1680 if cfg!(not(feature = "processing")) && config.processing_enabled() {
1681 return Err(ConfigError::file(ConfigErrorKind::ProcessingNotAvailable, &path).into());
1682 }
1683
1684 Ok(config)
1685 }
1686
1687 pub fn from_json_value(value: serde_json::Value) -> anyhow::Result<Config> {
1691 Ok(Config {
1692 values: serde_json::from_value(value)
1693 .with_context(|| ConfigError::new(ConfigErrorKind::BadJson))?,
1694 credentials: None,
1695 path: PathBuf::new(),
1696 })
1697 }
1698
1699 pub fn apply_override(
1702 &mut self,
1703 mut overrides: OverridableConfig,
1704 ) -> anyhow::Result<&mut Self> {
1705 let relay = &mut self.values.relay;
1706
1707 if let Some(mode) = overrides.mode {
1708 relay.mode = mode
1709 .parse::<RelayMode>()
1710 .with_context(|| ConfigError::field("mode"))?;
1711 }
1712
1713 if let Some(deployment) = overrides.instance {
1714 relay.instance = deployment
1715 .parse::<RelayInstance>()
1716 .with_context(|| ConfigError::field("deployment"))?;
1717 }
1718
1719 if let Some(log_level) = overrides.log_level {
1720 self.values.logging.level = log_level.parse()?;
1721 }
1722
1723 if let Some(log_format) = overrides.log_format {
1724 self.values.logging.format = log_format.parse()?;
1725 }
1726
1727 if let Some(upstream) = overrides.upstream {
1728 relay.upstream = upstream
1729 .parse::<UpstreamDescriptor>()
1730 .with_context(|| ConfigError::field("upstream"))?;
1731 } else if let Some(upstream_dsn) = overrides.upstream_dsn {
1732 relay.upstream = upstream_dsn
1733 .parse::<Dsn>()
1734 .map(|dsn| UpstreamDescriptor::from_dsn(&dsn).into_owned())
1735 .with_context(|| ConfigError::field("upstream_dsn"))?;
1736 }
1737
1738 if let Some(host) = overrides.host {
1739 relay.host = host
1740 .parse::<IpAddr>()
1741 .with_context(|| ConfigError::field("host"))?;
1742 }
1743
1744 if let Some(port) = overrides.port {
1745 relay.port = port
1746 .as_str()
1747 .parse()
1748 .with_context(|| ConfigError::field("port"))?;
1749 }
1750
1751 let processing = &mut self.values.processing;
1752 if let Some(enabled) = overrides.processing {
1753 match enabled.to_lowercase().as_str() {
1754 "true" | "1" => processing.enabled = true,
1755 "false" | "0" | "" => processing.enabled = false,
1756 _ => return Err(ConfigError::field("processing").into()),
1757 }
1758 }
1759
1760 if let Some(redis) = overrides.redis_url {
1761 processing.redis = Some(RedisConfigs::Unified(RedisConfig::single(redis)))
1762 }
1763
1764 if let Some(kafka_url) = overrides.kafka_url {
1765 let existing = processing
1766 .kafka_config
1767 .iter_mut()
1768 .find(|e| e.name == "bootstrap.servers");
1769
1770 if let Some(config_param) = existing {
1771 config_param.value = kafka_url;
1772 } else {
1773 processing.kafka_config.push(KafkaConfigParam {
1774 name: "bootstrap.servers".to_owned(),
1775 value: kafka_url,
1776 })
1777 }
1778 }
1779 let id = if let Some(id) = overrides.id {
1781 let id = Uuid::parse_str(&id).with_context(|| ConfigError::field("id"))?;
1782 Some(id)
1783 } else {
1784 None
1785 };
1786 let public_key = if let Some(public_key) = overrides.public_key {
1787 let public_key = public_key
1788 .parse::<PublicKey>()
1789 .with_context(|| ConfigError::field("public_key"))?;
1790 Some(public_key)
1791 } else {
1792 None
1793 };
1794
1795 let secret_key = if let Some(secret_key) = overrides.secret_key {
1796 let secret_key = secret_key
1797 .parse::<SecretKey>()
1798 .with_context(|| ConfigError::field("secret_key"))?;
1799 Some(secret_key)
1800 } else {
1801 None
1802 };
1803 let outcomes = &mut self.values.outcomes;
1804 if overrides.outcome_source.is_some() {
1805 outcomes.source = overrides.outcome_source.take();
1806 }
1807
1808 if let Some(credentials) = &mut self.credentials {
1809 if let Some(id) = id {
1811 credentials.id = id;
1812 }
1813 if let Some(public_key) = public_key {
1814 credentials.public_key = public_key;
1815 }
1816 if let Some(secret_key) = secret_key {
1817 credentials.secret_key = secret_key
1818 }
1819 } else {
1820 match (id, public_key, secret_key) {
1822 (Some(id), Some(public_key), Some(secret_key)) => {
1823 self.credentials = Some(Credentials {
1824 secret_key,
1825 public_key,
1826 id,
1827 })
1828 }
1829 (None, None, None) => {
1830 }
1833 _ => {
1834 return Err(ConfigError::field("incomplete credentials").into());
1835 }
1836 }
1837 }
1838
1839 let limits = &mut self.values.limits;
1840 if let Some(shutdown_timeout) = overrides.shutdown_timeout {
1841 if let Ok(shutdown_timeout) = shutdown_timeout.parse::<u64>() {
1842 limits.shutdown_timeout = shutdown_timeout;
1843 }
1844 }
1845
1846 if let Some(server_name) = overrides.server_name {
1847 self.values.sentry.server_name = Some(server_name.into());
1848 }
1849
1850 Ok(self)
1851 }
1852
1853 pub fn config_exists<P: AsRef<Path>>(path: P) -> bool {
1855 fs::metadata(ConfigValues::path(path.as_ref())).is_ok()
1856 }
1857
1858 pub fn path(&self) -> &Path {
1860 &self.path
1861 }
1862
1863 pub fn to_yaml_string(&self) -> anyhow::Result<String> {
1865 serde_yaml::to_string(&self.values)
1866 .with_context(|| ConfigError::new(ConfigErrorKind::CouldNotWriteFile))
1867 }
1868
1869 pub fn regenerate_credentials(&mut self, save: bool) -> anyhow::Result<()> {
1873 let creds = Credentials::generate();
1874 if save {
1875 creds.save(&self.path)?;
1876 }
1877 self.credentials = Some(creds);
1878 Ok(())
1879 }
1880
1881 pub fn credentials(&self) -> Option<&Credentials> {
1883 self.credentials.as_ref()
1884 }
1885
1886 pub fn replace_credentials(
1890 &mut self,
1891 credentials: Option<Credentials>,
1892 ) -> anyhow::Result<bool> {
1893 if self.credentials == credentials {
1894 return Ok(false);
1895 }
1896
1897 match credentials {
1898 Some(ref creds) => {
1899 creds.save(&self.path)?;
1900 }
1901 None => {
1902 let path = Credentials::path(&self.path);
1903 if fs::metadata(&path).is_ok() {
1904 fs::remove_file(&path).with_context(|| {
1905 ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path)
1906 })?;
1907 }
1908 }
1909 }
1910
1911 self.credentials = credentials;
1912 Ok(true)
1913 }
1914
1915 pub fn has_credentials(&self) -> bool {
1917 self.credentials.is_some()
1918 }
1919
1920 pub fn secret_key(&self) -> Option<&SecretKey> {
1922 self.credentials.as_ref().map(|x| &x.secret_key)
1923 }
1924
1925 pub fn public_key(&self) -> Option<&PublicKey> {
1927 self.credentials.as_ref().map(|x| &x.public_key)
1928 }
1929
1930 pub fn relay_id(&self) -> Option<&RelayId> {
1932 self.credentials.as_ref().map(|x| &x.id)
1933 }
1934
1935 pub fn relay_mode(&self) -> RelayMode {
1937 self.values.relay.mode
1938 }
1939
1940 pub fn relay_instance(&self) -> RelayInstance {
1942 self.values.relay.instance
1943 }
1944
1945 pub fn upstream_descriptor(&self) -> &UpstreamDescriptor<'_> {
1947 &self.values.relay.upstream
1948 }
1949
1950 pub fn http_host_header(&self) -> Option<&str> {
1952 self.values.http.host_header.as_deref()
1953 }
1954
1955 pub fn listen_addr(&self) -> SocketAddr {
1957 (self.values.relay.host, self.values.relay.port).into()
1958 }
1959
1960 pub fn tls_listen_addr(&self) -> Option<SocketAddr> {
1962 if self.values.relay.tls_identity_path.is_some() {
1963 let port = self.values.relay.tls_port.unwrap_or(3443);
1964 Some((self.values.relay.host, port).into())
1965 } else {
1966 None
1967 }
1968 }
1969
1970 pub fn tls_identity_path(&self) -> Option<&Path> {
1972 self.values.relay.tls_identity_path.as_deref()
1973 }
1974
1975 pub fn tls_identity_password(&self) -> Option<&str> {
1977 self.values.relay.tls_identity_password.as_deref()
1978 }
1979
1980 pub fn override_project_ids(&self) -> bool {
1984 self.values.relay.override_project_ids
1985 }
1986
1987 pub fn requires_auth(&self) -> bool {
1991 match self.values.auth.ready {
1992 ReadinessCondition::Authenticated => self.relay_mode() == RelayMode::Managed,
1993 ReadinessCondition::Always => false,
1994 }
1995 }
1996
1997 pub fn http_auth_interval(&self) -> Option<Duration> {
2001 if self.processing_enabled() {
2002 return None;
2003 }
2004
2005 match self.values.http.auth_interval {
2006 None | Some(0) => None,
2007 Some(secs) => Some(Duration::from_secs(secs)),
2008 }
2009 }
2010
2011 pub fn http_outage_grace_period(&self) -> Duration {
2014 Duration::from_secs(self.values.http.outage_grace_period)
2015 }
2016
2017 pub fn http_retry_delay(&self) -> Duration {
2022 Duration::from_secs(self.values.http.retry_delay)
2023 }
2024
2025 pub fn http_project_failure_interval(&self) -> Duration {
2027 Duration::from_secs(self.values.http.project_failure_interval)
2028 }
2029
2030 pub fn http_encoding(&self) -> HttpEncoding {
2032 self.values.http.encoding
2033 }
2034
2035 pub fn http_global_metrics(&self) -> bool {
2037 self.values.http.global_metrics
2038 }
2039
2040 pub fn emit_outcomes(&self) -> EmitOutcomes {
2045 if self.processing_enabled() {
2046 return EmitOutcomes::AsOutcomes;
2047 }
2048 self.values.outcomes.emit_outcomes
2049 }
2050
2051 pub fn emit_client_outcomes(&self) -> bool {
2061 self.values.outcomes.emit_client_outcomes
2062 }
2063
2064 pub fn outcome_batch_size(&self) -> usize {
2066 self.values.outcomes.batch_size
2067 }
2068
2069 pub fn outcome_batch_interval(&self) -> Duration {
2071 Duration::from_millis(self.values.outcomes.batch_interval)
2072 }
2073
2074 pub fn outcome_source(&self) -> Option<&str> {
2076 self.values.outcomes.source.as_deref()
2077 }
2078
2079 pub fn outcome_aggregator(&self) -> &OutcomeAggregatorConfig {
2081 &self.values.outcomes.aggregator
2082 }
2083
2084 pub fn logging(&self) -> &relay_log::LogConfig {
2086 &self.values.logging
2087 }
2088
2089 pub fn sentry(&self) -> &relay_log::SentryConfig {
2091 &self.values.sentry
2092 }
2093
2094 pub fn statsd_addrs(&self) -> anyhow::Result<Vec<SocketAddr>> {
2098 if let Some(ref addr) = self.values.metrics.statsd {
2099 let addrs = addr
2100 .as_str()
2101 .to_socket_addrs()
2102 .with_context(|| ConfigError::file(ConfigErrorKind::InvalidValue, &self.path))?
2103 .collect();
2104 Ok(addrs)
2105 } else {
2106 Ok(vec![])
2107 }
2108 }
2109
2110 pub fn metrics_prefix(&self) -> &str {
2112 &self.values.metrics.prefix
2113 }
2114
2115 pub fn metrics_default_tags(&self) -> &BTreeMap<String, String> {
2117 &self.values.metrics.default_tags
2118 }
2119
2120 pub fn metrics_hostname_tag(&self) -> Option<&str> {
2122 self.values.metrics.hostname_tag.as_deref()
2123 }
2124
2125 pub fn metrics_sample_rate(&self) -> f32 {
2127 self.values.metrics.sample_rate
2128 }
2129
2130 pub fn metrics_aggregate(&self) -> bool {
2132 self.values.metrics.aggregate
2133 }
2134
2135 pub fn metrics_allow_high_cardinality_tags(&self) -> bool {
2137 self.values.metrics.allow_high_cardinality_tags
2138 }
2139
2140 pub fn metrics_periodic_interval(&self) -> Option<Duration> {
2144 match self.values.metrics.periodic_secs {
2145 0 => None,
2146 secs => Some(Duration::from_secs(secs)),
2147 }
2148 }
2149
2150 pub fn http_timeout(&self) -> Duration {
2152 Duration::from_secs(self.values.http.timeout.into())
2153 }
2154
2155 pub fn http_connection_timeout(&self) -> Duration {
2157 Duration::from_secs(self.values.http.connection_timeout.into())
2158 }
2159
2160 pub fn http_max_retry_interval(&self) -> Duration {
2162 Duration::from_secs(self.values.http.max_retry_interval.into())
2163 }
2164
2165 pub fn project_cache_expiry(&self) -> Duration {
2167 Duration::from_secs(self.values.cache.project_expiry.into())
2168 }
2169
2170 pub fn request_full_project_config(&self) -> bool {
2172 self.values.cache.project_request_full_config
2173 }
2174
2175 pub fn relay_cache_expiry(&self) -> Duration {
2177 Duration::from_secs(self.values.cache.relay_expiry.into())
2178 }
2179
2180 pub fn envelope_buffer_size(&self) -> usize {
2182 self.values
2183 .cache
2184 .envelope_buffer_size
2185 .try_into()
2186 .unwrap_or(usize::MAX)
2187 }
2188
2189 pub fn cache_miss_expiry(&self) -> Duration {
2191 Duration::from_secs(self.values.cache.miss_expiry.into())
2192 }
2193
2194 pub fn project_grace_period(&self) -> Duration {
2196 Duration::from_secs(self.values.cache.project_grace_period.into())
2197 }
2198
2199 pub fn project_refresh_interval(&self) -> Option<Duration> {
2203 self.values
2204 .cache
2205 .project_refresh_interval
2206 .map(Into::into)
2207 .map(Duration::from_secs)
2208 }
2209
2210 pub fn query_batch_interval(&self) -> Duration {
2213 Duration::from_millis(self.values.cache.batch_interval.into())
2214 }
2215
2216 pub fn downstream_relays_batch_interval(&self) -> Duration {
2218 Duration::from_millis(self.values.cache.downstream_relays_batch_interval.into())
2219 }
2220
2221 pub fn local_cache_interval(&self) -> Duration {
2223 Duration::from_secs(self.values.cache.file_interval.into())
2224 }
2225
2226 pub fn global_config_fetch_interval(&self) -> Duration {
2229 Duration::from_secs(self.values.cache.global_config_fetch_interval.into())
2230 }
2231
2232 pub fn spool_envelopes_path(&self, partition_id: u8) -> Option<PathBuf> {
2237 let mut path = self
2238 .values
2239 .spool
2240 .envelopes
2241 .path
2242 .as_ref()
2243 .map(|path| path.to_owned())?;
2244
2245 if partition_id == 0 {
2246 return Some(path);
2247 }
2248
2249 let file_name = path.file_name().and_then(|f| f.to_str())?;
2250 let new_file_name = format!("{file_name}.{partition_id}");
2251 path.set_file_name(new_file_name);
2252
2253 Some(path)
2254 }
2255
2256 pub fn spool_envelopes_max_disk_size(&self) -> usize {
2258 self.values.spool.envelopes.max_disk_size.as_bytes()
2259 }
2260
2261 pub fn spool_envelopes_batch_size_bytes(&self) -> usize {
2264 self.values.spool.envelopes.batch_size_bytes.as_bytes()
2265 }
2266
2267 pub fn spool_envelopes_max_age(&self) -> Duration {
2269 Duration::from_secs(self.values.spool.envelopes.max_envelope_delay_secs)
2270 }
2271
2272 pub fn spool_disk_usage_refresh_frequency_ms(&self) -> Duration {
2274 Duration::from_millis(self.values.spool.envelopes.disk_usage_refresh_frequency_ms)
2275 }
2276
2277 pub fn spool_max_backpressure_envelopes(&self) -> usize {
2279 self.values.spool.envelopes.max_backpressure_envelopes
2280 }
2281
2282 pub fn spool_max_backpressure_memory_percent(&self) -> f32 {
2284 self.values.spool.envelopes.max_backpressure_memory_percent
2285 }
2286
2287 pub fn spool_partitions(&self) -> NonZeroU8 {
2289 self.values.spool.envelopes.partitions
2290 }
2291
2292 pub fn max_event_size(&self) -> usize {
2294 self.values.limits.max_event_size.as_bytes()
2295 }
2296
2297 pub fn max_attachment_size(&self) -> usize {
2299 self.values.limits.max_attachment_size.as_bytes()
2300 }
2301
2302 pub fn max_attachments_size(&self) -> usize {
2305 self.values.limits.max_attachments_size.as_bytes()
2306 }
2307
2308 pub fn max_client_reports_size(&self) -> usize {
2310 self.values.limits.max_client_reports_size.as_bytes()
2311 }
2312
2313 pub fn max_check_in_size(&self) -> usize {
2315 self.values.limits.max_check_in_size.as_bytes()
2316 }
2317
2318 pub fn max_log_size(&self) -> usize {
2320 self.values.limits.max_log_size.as_bytes()
2321 }
2322
2323 pub fn max_span_size(&self) -> usize {
2325 self.values.limits.max_span_size.as_bytes()
2326 }
2327
2328 pub fn max_container_size(&self) -> usize {
2330 self.values.limits.max_container_size.as_bytes()
2331 }
2332
2333 pub fn max_envelope_size(&self) -> usize {
2337 self.values.limits.max_envelope_size.as_bytes()
2338 }
2339
2340 pub fn max_session_count(&self) -> usize {
2342 self.values.limits.max_session_count
2343 }
2344
2345 pub fn max_span_count(&self) -> usize {
2347 self.values.limits.max_span_count
2348 }
2349
2350 pub fn max_log_count(&self) -> usize {
2352 self.values.limits.max_log_count
2353 }
2354
2355 pub fn max_statsd_size(&self) -> usize {
2357 self.values.limits.max_statsd_size.as_bytes()
2358 }
2359
2360 pub fn max_metric_buckets_size(&self) -> usize {
2362 self.values.limits.max_metric_buckets_size.as_bytes()
2363 }
2364
2365 pub fn metric_stats_enabled(&self) -> bool {
2370 self.values.sentry_metrics.metric_stats_enabled || self.values.processing.enabled
2371 }
2372
2373 pub fn max_api_payload_size(&self) -> usize {
2375 self.values.limits.max_api_payload_size.as_bytes()
2376 }
2377
2378 pub fn max_api_file_upload_size(&self) -> usize {
2380 self.values.limits.max_api_file_upload_size.as_bytes()
2381 }
2382
2383 pub fn max_api_chunk_upload_size(&self) -> usize {
2385 self.values.limits.max_api_chunk_upload_size.as_bytes()
2386 }
2387
2388 pub fn max_profile_size(&self) -> usize {
2390 self.values.limits.max_profile_size.as_bytes()
2391 }
2392
2393 pub fn max_replay_compressed_size(&self) -> usize {
2395 self.values.limits.max_replay_compressed_size.as_bytes()
2396 }
2397
2398 pub fn max_replay_uncompressed_size(&self) -> usize {
2400 self.values.limits.max_replay_uncompressed_size.as_bytes()
2401 }
2402
2403 pub fn max_replay_message_size(&self) -> usize {
2409 self.values.limits.max_replay_message_size.as_bytes()
2410 }
2411
2412 pub fn max_concurrent_requests(&self) -> usize {
2414 self.values.limits.max_concurrent_requests
2415 }
2416
2417 pub fn max_concurrent_queries(&self) -> usize {
2419 self.values.limits.max_concurrent_queries
2420 }
2421
2422 pub fn query_timeout(&self) -> Duration {
2424 Duration::from_secs(self.values.limits.query_timeout)
2425 }
2426
2427 pub fn shutdown_timeout(&self) -> Duration {
2430 Duration::from_secs(self.values.limits.shutdown_timeout)
2431 }
2432
2433 pub fn keepalive_timeout(&self) -> Duration {
2437 Duration::from_secs(self.values.limits.keepalive_timeout)
2438 }
2439
2440 pub fn idle_timeout(&self) -> Option<Duration> {
2442 self.values.limits.idle_timeout.map(Duration::from_secs)
2443 }
2444
2445 pub fn max_connections(&self) -> Option<usize> {
2447 self.values.limits.max_connections
2448 }
2449
2450 pub fn tcp_listen_backlog(&self) -> u32 {
2452 self.values.limits.tcp_listen_backlog
2453 }
2454
2455 pub fn cpu_concurrency(&self) -> usize {
2457 self.values.limits.max_thread_count
2458 }
2459
2460 pub fn pool_concurrency(&self) -> usize {
2462 self.values.limits.max_pool_concurrency
2463 }
2464
2465 pub fn query_batch_size(&self) -> usize {
2467 self.values.cache.batch_size
2468 }
2469
2470 pub fn project_configs_path(&self) -> PathBuf {
2472 self.path.join("projects")
2473 }
2474
2475 pub fn processing_enabled(&self) -> bool {
2477 self.values.processing.enabled
2478 }
2479
2480 pub fn normalization_level(&self) -> NormalizationLevel {
2482 self.values.normalization.level
2483 }
2484
2485 pub fn geoip_path(&self) -> Option<&Path> {
2487 self.values
2488 .geoip
2489 .path
2490 .as_deref()
2491 .or(self.values.processing.geoip_path.as_deref())
2492 }
2493
2494 pub fn max_secs_in_future(&self) -> i64 {
2498 self.values.processing.max_secs_in_future.into()
2499 }
2500
2501 pub fn max_session_secs_in_past(&self) -> i64 {
2503 self.values.processing.max_session_secs_in_past.into()
2504 }
2505
2506 pub fn kafka_configs(
2508 &self,
2509 topic: KafkaTopic,
2510 ) -> Result<KafkaTopicConfig<'_>, KafkaConfigError> {
2511 self.values.processing.topics.get(topic).kafka_configs(
2512 &self.values.processing.kafka_config,
2513 &self.values.processing.secondary_kafka_configs,
2514 )
2515 }
2516
2517 pub fn kafka_validate_topics(&self) -> bool {
2519 self.values.processing.kafka_validate_topics
2520 }
2521
2522 pub fn unused_topic_assignments(&self) -> &relay_kafka::Unused {
2524 &self.values.processing.topics.unused
2525 }
2526
2527 pub fn redis(&self) -> Option<RedisConfigsRef> {
2530 let redis_configs = self.values.processing.redis.as_ref()?;
2531
2532 Some(build_redis_configs(
2533 redis_configs,
2534 self.cpu_concurrency() as u32,
2535 ))
2536 }
2537
2538 pub fn attachment_chunk_size(&self) -> usize {
2540 self.values.processing.attachment_chunk_size.as_bytes()
2541 }
2542
2543 pub fn metrics_max_batch_size_bytes(&self) -> usize {
2545 self.values.aggregator.max_flush_bytes
2546 }
2547
2548 pub fn projectconfig_cache_prefix(&self) -> &str {
2551 &self.values.processing.projectconfig_cache_prefix
2552 }
2553
2554 pub fn max_rate_limit(&self) -> Option<u64> {
2556 self.values.processing.max_rate_limit.map(u32::into)
2557 }
2558
2559 pub fn cardinality_limiter_cache_vacuum_interval(&self) -> Duration {
2563 Duration::from_secs(self.values.cardinality_limiter.cache_vacuum_interval)
2564 }
2565
2566 pub fn health_refresh_interval(&self) -> Duration {
2568 Duration::from_millis(self.values.health.refresh_interval_ms)
2569 }
2570
2571 pub fn health_max_memory_watermark_bytes(&self) -> u64 {
2573 self.values
2574 .health
2575 .max_memory_bytes
2576 .as_ref()
2577 .map_or(u64::MAX, |b| b.as_bytes() as u64)
2578 }
2579
2580 pub fn health_max_memory_watermark_percent(&self) -> f32 {
2582 self.values.health.max_memory_percent
2583 }
2584
2585 pub fn health_probe_timeout(&self) -> Duration {
2587 Duration::from_millis(self.values.health.probe_timeout_ms)
2588 }
2589
2590 pub fn memory_stat_refresh_frequency_ms(&self) -> u64 {
2592 self.values.health.memory_stat_refresh_frequency_ms
2593 }
2594
2595 pub fn cogs_max_queue_size(&self) -> u64 {
2597 self.values.cogs.max_queue_size
2598 }
2599
2600 pub fn cogs_relay_resource_id(&self) -> &str {
2602 &self.values.cogs.relay_resource_id
2603 }
2604
2605 pub fn default_aggregator_config(&self) -> &AggregatorServiceConfig {
2607 &self.values.aggregator
2608 }
2609
2610 pub fn secondary_aggregator_configs(&self) -> &Vec<ScopedAggregatorConfig> {
2612 &self.values.secondary_aggregators
2613 }
2614
2615 pub fn aggregator_config_for(&self, namespace: MetricNamespace) -> &AggregatorServiceConfig {
2617 for entry in &self.values.secondary_aggregators {
2618 if entry.condition.matches(Some(namespace)) {
2619 return &entry.config;
2620 }
2621 }
2622 &self.values.aggregator
2623 }
2624
2625 pub fn static_relays(&self) -> &HashMap<RelayId, RelayInfo> {
2627 &self.values.auth.static_relays
2628 }
2629
2630 pub fn accept_unknown_items(&self) -> bool {
2632 let forward = self.values.routing.accept_unknown_items;
2633 forward.unwrap_or_else(|| !self.processing_enabled())
2634 }
2635
2636 pub fn produce_protobuf_spans(&self) -> bool {
2638 self.values.processing.span_producers.produce_protobuf
2639 }
2640
2641 pub fn produce_json_spans(&self) -> bool {
2643 self.values.processing.span_producers.produce_json
2644 }
2645}
2646
2647impl Default for Config {
2648 fn default() -> Self {
2649 Self {
2650 values: ConfigValues::default(),
2651 credentials: None,
2652 path: PathBuf::new(),
2653 }
2654 }
2655}
2656
2657#[cfg(test)]
2658mod tests {
2659
2660 use super::*;
2661
2662 #[test]
2664 fn test_event_buffer_size() {
2665 let yaml = r###"
2666cache:
2667 event_buffer_size: 1000000
2668 event_expiry: 1800
2669"###;
2670
2671 let values: ConfigValues = serde_yaml::from_str(yaml).unwrap();
2672 assert_eq!(values.cache.envelope_buffer_size, 1_000_000);
2673 assert_eq!(values.cache.envelope_expiry, 1800);
2674 }
2675
2676 #[test]
2677 fn test_emit_outcomes() {
2678 for (serialized, deserialized) in &[
2679 ("true", EmitOutcomes::AsOutcomes),
2680 ("false", EmitOutcomes::None),
2681 ("\"as_client_reports\"", EmitOutcomes::AsClientReports),
2682 ] {
2683 let value: EmitOutcomes = serde_json::from_str(serialized).unwrap();
2684 assert_eq!(value, *deserialized);
2685 assert_eq!(serde_json::to_string(&value).unwrap(), *serialized);
2686 }
2687 }
2688
2689 #[test]
2690 fn test_emit_outcomes_invalid() {
2691 assert!(serde_json::from_str::<EmitOutcomes>("asdf").is_err());
2692 }
2693}