relay_config/
config.rs

1use std::collections::{BTreeMap, HashMap};
2use std::error::Error;
3use std::io::Write;
4use std::net::{IpAddr, SocketAddr, ToSocketAddrs};
5use std::num::NonZeroU8;
6use std::path::{Path, PathBuf};
7use std::str::FromStr;
8use std::time::Duration;
9use std::{env, fmt, fs, io};
10
11use anyhow::Context;
12use relay_auth::{PublicKey, RelayId, SecretKey, generate_key_pair, generate_relay_id};
13use relay_common::Dsn;
14use relay_kafka::{
15    ConfigError as KafkaConfigError, KafkaConfigParam, KafkaTopic, KafkaTopicConfig,
16    TopicAssignments,
17};
18use relay_metrics::MetricNamespace;
19use serde::de::{DeserializeOwned, Unexpected, Visitor};
20use serde::{Deserialize, Deserializer, Serialize, Serializer};
21use uuid::Uuid;
22
23use crate::aggregator::{AggregatorServiceConfig, ScopedAggregatorConfig};
24use crate::byte_size::ByteSize;
25use crate::upstream::UpstreamDescriptor;
26use crate::{RedisConfig, RedisConfigs, RedisConfigsRef, build_redis_configs};
27
28const DEFAULT_NETWORK_OUTAGE_GRACE_PERIOD: u64 = 10;
29
30static CONFIG_YAML_HEADER: &str = r###"# Please see the relevant documentation.
31# Performance tuning: https://docs.sentry.io/product/relay/operating-guidelines/
32# All config options: https://docs.sentry.io/product/relay/options/
33"###;
34
35/// Indicates config related errors.
36#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
37#[non_exhaustive]
38pub enum ConfigErrorKind {
39    /// Failed to open the file.
40    CouldNotOpenFile,
41    /// Failed to save a file.
42    CouldNotWriteFile,
43    /// Parsing YAML failed.
44    BadYaml,
45    /// Parsing JSON failed.
46    BadJson,
47    /// Invalid config value
48    InvalidValue,
49    /// The user attempted to run Relay with processing enabled, but uses a binary that was
50    /// compiled without the processing feature.
51    ProcessingNotAvailable,
52}
53
54impl fmt::Display for ConfigErrorKind {
55    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
56        match self {
57            Self::CouldNotOpenFile => write!(f, "could not open config file"),
58            Self::CouldNotWriteFile => write!(f, "could not write config file"),
59            Self::BadYaml => write!(f, "could not parse yaml config file"),
60            Self::BadJson => write!(f, "could not parse json config file"),
61            Self::InvalidValue => write!(f, "invalid config value"),
62            Self::ProcessingNotAvailable => write!(
63                f,
64                "was not compiled with processing, cannot enable processing"
65            ),
66        }
67    }
68}
69
70/// Defines the source of a config error
71#[derive(Debug, Default)]
72enum ConfigErrorSource {
73    /// An error occurring independently.
74    #[default]
75    None,
76    /// An error originating from a configuration file.
77    File(PathBuf),
78    /// An error originating in a field override (an env var, or a CLI parameter).
79    FieldOverride(String),
80}
81
82impl fmt::Display for ConfigErrorSource {
83    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84        match self {
85            ConfigErrorSource::None => Ok(()),
86            ConfigErrorSource::File(file_name) => {
87                write!(f, " (file {})", file_name.display())
88            }
89            ConfigErrorSource::FieldOverride(name) => write!(f, " (field {name})"),
90        }
91    }
92}
93
94/// Indicates config related errors.
95#[derive(Debug)]
96pub struct ConfigError {
97    source: ConfigErrorSource,
98    kind: ConfigErrorKind,
99}
100
101impl ConfigError {
102    #[inline]
103    fn new(kind: ConfigErrorKind) -> Self {
104        Self {
105            source: ConfigErrorSource::None,
106            kind,
107        }
108    }
109
110    #[inline]
111    fn field(field: &'static str) -> Self {
112        Self {
113            source: ConfigErrorSource::FieldOverride(field.to_owned()),
114            kind: ConfigErrorKind::InvalidValue,
115        }
116    }
117
118    #[inline]
119    fn file(kind: ConfigErrorKind, p: impl AsRef<Path>) -> Self {
120        Self {
121            source: ConfigErrorSource::File(p.as_ref().to_path_buf()),
122            kind,
123        }
124    }
125
126    /// Returns the error kind of the error.
127    pub fn kind(&self) -> ConfigErrorKind {
128        self.kind
129    }
130}
131
132impl fmt::Display for ConfigError {
133    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
134        write!(f, "{}{}", self.kind(), self.source)
135    }
136}
137
138impl Error for ConfigError {}
139
140enum ConfigFormat {
141    Yaml,
142    Json,
143}
144
145impl ConfigFormat {
146    pub fn extension(&self) -> &'static str {
147        match self {
148            ConfigFormat::Yaml => "yml",
149            ConfigFormat::Json => "json",
150        }
151    }
152}
153
154trait ConfigObject: DeserializeOwned + Serialize {
155    /// The format in which to serialize this configuration.
156    fn format() -> ConfigFormat;
157
158    /// The basename of the config file.
159    fn name() -> &'static str;
160
161    /// The full filename of the config file, including the file extension.
162    fn path(base: &Path) -> PathBuf {
163        base.join(format!("{}.{}", Self::name(), Self::format().extension()))
164    }
165
166    /// Loads the config file from a file within the given directory location.
167    fn load(base: &Path) -> anyhow::Result<Self> {
168        let path = Self::path(base);
169
170        let f = fs::File::open(&path)
171            .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotOpenFile, &path))?;
172        let f = io::BufReader::new(f);
173
174        let mut source = serde_vars::EnvSource::default();
175        match Self::format() {
176            ConfigFormat::Yaml => {
177                serde_vars::deserialize(serde_yaml::Deserializer::from_reader(f), &mut source)
178                    .with_context(|| ConfigError::file(ConfigErrorKind::BadYaml, &path))
179            }
180            ConfigFormat::Json => {
181                serde_vars::deserialize(&mut serde_json::Deserializer::from_reader(f), &mut source)
182                    .with_context(|| ConfigError::file(ConfigErrorKind::BadJson, &path))
183            }
184        }
185    }
186
187    /// Writes the configuration to a file within the given directory location.
188    fn save(&self, base: &Path) -> anyhow::Result<()> {
189        let path = Self::path(base);
190        let mut options = fs::OpenOptions::new();
191        options.write(true).truncate(true).create(true);
192
193        // Remove all non-user permissions for the newly created file
194        #[cfg(unix)]
195        {
196            use std::os::unix::fs::OpenOptionsExt;
197            options.mode(0o600);
198        }
199
200        let mut f = options
201            .open(&path)
202            .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path))?;
203
204        match Self::format() {
205            ConfigFormat::Yaml => {
206                f.write_all(CONFIG_YAML_HEADER.as_bytes())?;
207                serde_yaml::to_writer(&mut f, self)
208                    .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path))?
209            }
210            ConfigFormat::Json => serde_json::to_writer_pretty(&mut f, self)
211                .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path))?,
212        }
213
214        f.write_all(b"\n").ok();
215
216        Ok(())
217    }
218}
219
220/// Structure used to hold information about configuration overrides via
221/// CLI parameters or environment variables
222#[derive(Debug, Default)]
223pub struct OverridableConfig {
224    /// The operation mode of this relay.
225    pub mode: Option<String>,
226    /// The instance type of this relay.
227    pub instance: Option<String>,
228    /// The log level of this relay.
229    pub log_level: Option<String>,
230    /// The log format of this relay.
231    pub log_format: Option<String>,
232    /// The upstream relay or sentry instance.
233    pub upstream: Option<String>,
234    /// Alternate upstream provided through a Sentry DSN. Key and project will be ignored.
235    pub upstream_dsn: Option<String>,
236    /// The host the relay should bind to (network interface).
237    pub host: Option<String>,
238    /// The port to bind for the unencrypted relay HTTP server.
239    pub port: Option<String>,
240    /// "true" if processing is enabled "false" otherwise
241    pub processing: Option<String>,
242    /// the kafka bootstrap.servers configuration string
243    pub kafka_url: Option<String>,
244    /// the redis server url
245    pub redis_url: Option<String>,
246    /// The globally unique ID of the relay.
247    pub id: Option<String>,
248    /// The secret key of the relay
249    pub secret_key: Option<String>,
250    /// The public key of the relay
251    pub public_key: Option<String>,
252    /// Outcome source
253    pub outcome_source: Option<String>,
254    /// shutdown timeout
255    pub shutdown_timeout: Option<String>,
256    /// Server name reported in the Sentry SDK.
257    pub server_name: Option<String>,
258}
259
260/// The relay credentials
261#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
262pub struct Credentials {
263    /// The secret key of the relay
264    pub secret_key: SecretKey,
265    /// The public key of the relay
266    pub public_key: PublicKey,
267    /// The globally unique ID of the relay.
268    pub id: RelayId,
269}
270
271impl Credentials {
272    /// Generates new random credentials.
273    pub fn generate() -> Self {
274        relay_log::info!("generating new relay credentials");
275        let (sk, pk) = generate_key_pair();
276        Self {
277            secret_key: sk,
278            public_key: pk,
279            id: generate_relay_id(),
280        }
281    }
282
283    /// Serializes this configuration to JSON.
284    pub fn to_json_string(&self) -> anyhow::Result<String> {
285        serde_json::to_string(self)
286            .with_context(|| ConfigError::new(ConfigErrorKind::CouldNotWriteFile))
287    }
288}
289
290impl ConfigObject for Credentials {
291    fn format() -> ConfigFormat {
292        ConfigFormat::Json
293    }
294    fn name() -> &'static str {
295        "credentials"
296    }
297}
298
299/// Information on a downstream Relay.
300#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
301#[serde(rename_all = "camelCase")]
302pub struct RelayInfo {
303    /// The public key that this Relay uses to authenticate and sign requests.
304    pub public_key: PublicKey,
305
306    /// Marks an internal relay that has privileged access to more project configuration.
307    #[serde(default)]
308    pub internal: bool,
309}
310
311impl RelayInfo {
312    /// Creates a new RelayInfo
313    pub fn new(public_key: PublicKey) -> Self {
314        Self {
315            public_key,
316            internal: false,
317        }
318    }
319}
320
321/// The operation mode of a relay.
322#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
323#[serde(rename_all = "camelCase")]
324pub enum RelayMode {
325    /// This relay acts as a proxy for all requests and events.
326    ///
327    /// Events are normalized and rate limits from the upstream are enforced, but the relay will not
328    /// fetch project configurations from the upstream or perform PII stripping. All events are
329    /// accepted unless overridden on the file system.
330    Proxy,
331
332    /// Project configurations are managed by the upstream.
333    ///
334    /// Project configurations are always fetched from the upstream, unless they are statically
335    /// overridden in the file system. This relay must be allowed in the upstream Sentry. This is
336    /// only possible, if the upstream is Sentry directly, or another managed Relay.
337    Managed,
338}
339
340impl<'de> Deserialize<'de> for RelayMode {
341    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
342    where
343        D: Deserializer<'de>,
344    {
345        let s = String::deserialize(deserializer)?;
346        match s.as_str() {
347            "proxy" => Ok(RelayMode::Proxy),
348            "managed" => Ok(RelayMode::Managed),
349            "static" => Err(serde::de::Error::custom(
350                "Relay mode 'static' has been removed. Please use 'managed' or 'proxy' instead.",
351            )),
352            other => Err(serde::de::Error::unknown_variant(
353                other,
354                &["proxy", "managed"],
355            )),
356        }
357    }
358}
359
360impl fmt::Display for RelayMode {
361    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
362        match self {
363            RelayMode::Proxy => write!(f, "proxy"),
364            RelayMode::Managed => write!(f, "managed"),
365        }
366    }
367}
368
369/// The instance type of Relay.
370#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize)]
371#[serde(rename_all = "camelCase")]
372pub enum RelayInstance {
373    /// This Relay is run as a default instance.
374    Default,
375
376    /// This Relay is run as a canary instance where experiments can be run.
377    Canary,
378}
379
380impl RelayInstance {
381    /// Returns `true` if the [`RelayInstance`] is of type [`RelayInstance::Canary`].
382    pub fn is_canary(&self) -> bool {
383        matches!(self, RelayInstance::Canary)
384    }
385}
386
387impl fmt::Display for RelayInstance {
388    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
389        match self {
390            RelayInstance::Default => write!(f, "default"),
391            RelayInstance::Canary => write!(f, "canary"),
392        }
393    }
394}
395
396impl FromStr for RelayInstance {
397    type Err = fmt::Error;
398
399    fn from_str(s: &str) -> Result<Self, Self::Err> {
400        match s {
401            "canary" => Ok(RelayInstance::Canary),
402            _ => Ok(RelayInstance::Default),
403        }
404    }
405}
406
407/// Error returned when parsing an invalid [`RelayMode`].
408#[derive(Clone, Copy, Debug, Eq, PartialEq)]
409pub struct ParseRelayModeError;
410
411impl fmt::Display for ParseRelayModeError {
412    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
413        write!(f, "Relay mode must be one of: managed or proxy")
414    }
415}
416
417impl Error for ParseRelayModeError {}
418
419impl FromStr for RelayMode {
420    type Err = ParseRelayModeError;
421
422    fn from_str(s: &str) -> Result<Self, Self::Err> {
423        match s {
424            "proxy" => Ok(RelayMode::Proxy),
425            "managed" => Ok(RelayMode::Managed),
426            _ => Err(ParseRelayModeError),
427        }
428    }
429}
430
431/// Returns `true` if this value is equal to `Default::default()`.
432fn is_default<T: Default + PartialEq>(t: &T) -> bool {
433    *t == T::default()
434}
435
436/// Checks if we are running in docker.
437fn is_docker() -> bool {
438    if fs::metadata("/.dockerenv").is_ok() {
439        return true;
440    }
441
442    fs::read_to_string("/proc/self/cgroup").is_ok_and(|s| s.contains("/docker"))
443}
444
445/// Default value for the "bind" configuration.
446fn default_host() -> IpAddr {
447    if is_docker() {
448        // Docker images rely on this service being exposed
449        "0.0.0.0".parse().unwrap()
450    } else {
451        "127.0.0.1".parse().unwrap()
452    }
453}
454
455/// Controls responses from the readiness health check endpoint based on authentication.
456///
457/// Independent of the the readiness condition, shutdown always switches Relay into unready state.
458#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize)]
459#[serde(rename_all = "lowercase")]
460#[derive(Default)]
461pub enum ReadinessCondition {
462    /// (default) Relay is ready when authenticated and connected to the upstream.
463    ///
464    /// Before authentication has succeeded and during network outages, Relay responds as not ready.
465    /// Relay reauthenticates based on the `http.auth_interval` parameter. During reauthentication,
466    /// Relay remains ready until authentication fails.
467    ///
468    /// Authentication is only required for Relays in managed mode. Other Relays will only check for
469    /// network outages.
470    #[default]
471    Authenticated,
472    /// Relay reports readiness regardless of the authentication and networking state.
473    Always,
474}
475
476/// Relay specific configuration values.
477#[derive(Serialize, Deserialize, Debug)]
478#[serde(default)]
479pub struct Relay {
480    /// The operation mode of this relay.
481    pub mode: RelayMode,
482    /// The instance type of this relay.
483    pub instance: RelayInstance,
484    /// The upstream relay or sentry instance.
485    pub upstream: UpstreamDescriptor<'static>,
486    /// The host the relay should bind to (network interface).
487    pub host: IpAddr,
488    /// The port to bind for the unencrypted relay HTTP server.
489    pub port: u16,
490    /// The host the relay should bind to (network interface) for internally exposed APIs, like
491    /// health checks.
492    ///
493    /// If not configured, internal routes are exposed on the main HTTP server.
494    ///
495    /// Note: configuring the internal http server on an address which overlaps with the main
496    /// server (e.g. main on `0.0.0.0:3000` and internal on `127.0.0.1:3000`) is a misconfiguration
497    /// resulting in approximately half of the requests sent to `127.0.0.1:3000` to fail, as the handling
498    /// http server is chosen by the operating system 'at random'.
499    ///
500    /// As a best practice you should always choose different ports to avoid this issue.
501    ///
502    /// Defaults to [`Self::host`].
503    pub internal_host: Option<IpAddr>,
504    /// The port to bind for internally exposed APIs.
505    ///
506    /// Defaults to [`Self::port`].
507    pub internal_port: Option<u16>,
508    /// Optional port to bind for the encrypted relay HTTPS server.
509    #[serde(skip_serializing)]
510    pub tls_port: Option<u16>,
511    /// The path to the identity (DER-encoded PKCS12) to use for TLS.
512    #[serde(skip_serializing)]
513    pub tls_identity_path: Option<PathBuf>,
514    /// Password for the PKCS12 archive.
515    #[serde(skip_serializing)]
516    pub tls_identity_password: Option<String>,
517    /// Always override project IDs from the URL and DSN with the identifier used at the upstream.
518    ///
519    /// Enable this setting for Relays used to redirect traffic to a migrated Sentry instance.
520    /// Validation of project identifiers can be safely skipped in these cases.
521    #[serde(skip_serializing_if = "is_default")]
522    pub override_project_ids: bool,
523}
524
525impl Default for Relay {
526    fn default() -> Self {
527        Relay {
528            mode: RelayMode::Managed,
529            instance: RelayInstance::Default,
530            upstream: "https://sentry.io/".parse().unwrap(),
531            host: default_host(),
532            port: 3000,
533            internal_host: None,
534            internal_port: None,
535            tls_port: None,
536            tls_identity_path: None,
537            tls_identity_password: None,
538            override_project_ids: false,
539        }
540    }
541}
542
543/// Control the metrics.
544#[derive(Serialize, Deserialize, Debug)]
545#[serde(default)]
546pub struct Metrics {
547    /// Hostname and port of the statsd server.
548    ///
549    /// Defaults to `None`.
550    pub statsd: Option<String>,
551    /// Common prefix that should be added to all metrics.
552    ///
553    /// Defaults to `"sentry.relay"`.
554    pub prefix: String,
555    /// Default tags to apply to all metrics.
556    pub default_tags: BTreeMap<String, String>,
557    /// Tag name to report the hostname to for each metric. Defaults to not sending such a tag.
558    pub hostname_tag: Option<String>,
559    /// Global sample rate for all emitted metrics between `0.0` and `1.0`.
560    ///
561    /// For example, a value of `0.3` means that only 30% of the emitted metrics will be sent.
562    /// Defaults to `1.0` (100%).
563    pub sample_rate: f32,
564    /// Interval for periodic metrics emitted from Relay.
565    ///
566    /// Setting it to `0` seconds disables the periodic metrics.
567    /// Defaults to 5 seconds.
568    pub periodic_secs: u64,
569    /// Whether local metric aggregation using statdsproxy should be enabled.
570    ///
571    /// Defaults to `true`.
572    pub aggregate: bool,
573    /// Allows emission of metrics with high cardinality tags.
574    ///
575    /// High cardinality tags are dynamic values attached to metrics,
576    /// such as project IDs. When enabled, these tags will be included
577    /// in the emitted metrics. When disabled, the tags will be omitted.
578    ///
579    /// Defaults to `false`.
580    pub allow_high_cardinality_tags: bool,
581}
582
583impl Default for Metrics {
584    fn default() -> Self {
585        Metrics {
586            statsd: None,
587            prefix: "sentry.relay".into(),
588            default_tags: BTreeMap::new(),
589            hostname_tag: None,
590            sample_rate: 1.0,
591            periodic_secs: 5,
592            aggregate: true,
593            allow_high_cardinality_tags: false,
594        }
595    }
596}
597
598/// Controls various limits
599#[derive(Serialize, Deserialize, Debug)]
600#[serde(default)]
601pub struct Limits {
602    /// How many requests can be sent concurrently from Relay to the upstream before Relay starts
603    /// buffering.
604    pub max_concurrent_requests: usize,
605    /// How many queries can be sent concurrently from Relay to the upstream before Relay starts
606    /// buffering.
607    ///
608    /// The concurrency of queries is additionally constrained by `max_concurrent_requests`.
609    pub max_concurrent_queries: usize,
610    /// The maximum payload size for events.
611    pub max_event_size: ByteSize,
612    /// The maximum size for each attachment.
613    pub max_attachment_size: ByteSize,
614    /// The maximum combined size for all attachments in an envelope or request.
615    pub max_attachments_size: ByteSize,
616    /// The maximum combined size for all client reports in an envelope or request.
617    pub max_client_reports_size: ByteSize,
618    /// The maximum payload size for a monitor check-in.
619    pub max_check_in_size: ByteSize,
620    /// The maximum payload size for an entire envelopes. Individual limits still apply.
621    pub max_envelope_size: ByteSize,
622    /// The maximum number of session items per envelope.
623    pub max_session_count: usize,
624    /// The maximum payload size for general API requests.
625    pub max_api_payload_size: ByteSize,
626    /// The maximum payload size for file uploads and chunks.
627    pub max_api_file_upload_size: ByteSize,
628    /// The maximum payload size for chunks
629    pub max_api_chunk_upload_size: ByteSize,
630    /// The maximum payload size for a profile
631    pub max_profile_size: ByteSize,
632    /// The maximum payload size for a trace metric.
633    pub max_trace_metric_size: ByteSize,
634    /// The maximum payload size for a log.
635    pub max_log_size: ByteSize,
636    /// The maximum payload size for a span.
637    pub max_span_size: ByteSize,
638    /// The maximum payload size for an item container.
639    pub max_container_size: ByteSize,
640    /// The maximum payload size for a statsd metric.
641    pub max_statsd_size: ByteSize,
642    /// The maximum payload size for metric buckets.
643    pub max_metric_buckets_size: ByteSize,
644    /// The maximum payload size for a compressed replay.
645    pub max_replay_compressed_size: ByteSize,
646    /// The maximum payload size for an uncompressed replay.
647    #[serde(alias = "max_replay_size")]
648    max_replay_uncompressed_size: ByteSize,
649    /// The maximum size for a replay recording Kafka message.
650    pub max_replay_message_size: ByteSize,
651    /// The maximum number of threads to spawn for CPU and web work, each.
652    ///
653    /// The total number of threads spawned will roughly be `2 * max_thread_count`. Defaults to
654    /// the number of logical CPU cores on the host.
655    pub max_thread_count: usize,
656    /// Controls the maximum concurrency of each worker thread.
657    ///
658    /// Increasing the concurrency, can lead to a better utilization of worker threads by
659    /// increasing the amount of I/O done concurrently.
660    //
661    /// Currently has no effect on defaults to `1`.
662    pub max_pool_concurrency: usize,
663    /// The maximum number of seconds a query is allowed to take across retries. Individual requests
664    /// have lower timeouts. Defaults to 30 seconds.
665    pub query_timeout: u64,
666    /// The maximum number of seconds to wait for pending envelopes after receiving a shutdown
667    /// signal.
668    pub shutdown_timeout: u64,
669    /// Server keep-alive timeout in seconds.
670    ///
671    /// By default, keep-alive is set to 5 seconds.
672    pub keepalive_timeout: u64,
673    /// Server idle timeout in seconds.
674    ///
675    /// The idle timeout limits the amount of time a connection is kept open without activity.
676    /// Setting this too short may abort connections before Relay is able to send a response.
677    ///
678    /// By default there is no idle timeout.
679    pub idle_timeout: Option<u64>,
680    /// Sets the maximum number of concurrent connections.
681    ///
682    /// Upon reaching the limit, the server will stop accepting connections.
683    ///
684    /// By default there is no limit.
685    pub max_connections: Option<usize>,
686    /// The TCP listen backlog.
687    ///
688    /// Configures the TCP listen backlog for the listening socket of Relay.
689    /// See [`man listen(2)`](https://man7.org/linux/man-pages/man2/listen.2.html)
690    /// for a more detailed description of the listen backlog.
691    ///
692    /// Defaults to `1024`, a value [google has been using for a long time](https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/commit/?id=19f92a030ca6d772ab44b22ee6a01378a8cb32d4).
693    pub tcp_listen_backlog: u32,
694}
695
696impl Default for Limits {
697    fn default() -> Self {
698        Limits {
699            max_concurrent_requests: 100,
700            max_concurrent_queries: 5,
701            max_event_size: ByteSize::mebibytes(1),
702            max_attachment_size: ByteSize::mebibytes(200),
703            max_attachments_size: ByteSize::mebibytes(200),
704            max_client_reports_size: ByteSize::kibibytes(4),
705            max_check_in_size: ByteSize::kibibytes(100),
706            max_envelope_size: ByteSize::mebibytes(200),
707            max_session_count: 100,
708            max_api_payload_size: ByteSize::mebibytes(20),
709            max_api_file_upload_size: ByteSize::mebibytes(40),
710            max_api_chunk_upload_size: ByteSize::mebibytes(100),
711            max_profile_size: ByteSize::mebibytes(50),
712            max_trace_metric_size: ByteSize::kibibytes(2),
713            max_log_size: ByteSize::mebibytes(1),
714            max_span_size: ByteSize::mebibytes(1),
715            max_container_size: ByteSize::mebibytes(12),
716            max_statsd_size: ByteSize::mebibytes(1),
717            max_metric_buckets_size: ByteSize::mebibytes(1),
718            max_replay_compressed_size: ByteSize::mebibytes(10),
719            max_replay_uncompressed_size: ByteSize::mebibytes(100),
720            max_replay_message_size: ByteSize::mebibytes(15),
721            max_thread_count: num_cpus::get(),
722            max_pool_concurrency: 1,
723            query_timeout: 30,
724            shutdown_timeout: 10,
725            keepalive_timeout: 5,
726            idle_timeout: None,
727            max_connections: None,
728            tcp_listen_backlog: 1024,
729        }
730    }
731}
732
733/// Controls traffic steering.
734#[derive(Debug, Default, Deserialize, Serialize)]
735#[serde(default)]
736pub struct Routing {
737    /// Accept and forward unknown Envelope items to the upstream.
738    ///
739    /// Forwarding unknown items should be enabled in most cases to allow proxying traffic for newer
740    /// SDK versions. The upstream in Sentry makes the final decision on which items are valid. If
741    /// this is disabled, just the unknown items are removed from Envelopes, and the rest is
742    /// processed as usual.
743    ///
744    /// Defaults to `true` for all Relay modes other than processing mode. In processing mode, this
745    /// is disabled by default since the item cannot be handled.
746    pub accept_unknown_items: Option<bool>,
747}
748
749/// Http content encoding for both incoming and outgoing web requests.
750#[derive(Clone, Copy, Debug, Default, Deserialize, Serialize)]
751#[serde(rename_all = "lowercase")]
752pub enum HttpEncoding {
753    /// Identity function without no compression.
754    ///
755    /// This is the default encoding and does not require the presence of the `content-encoding`
756    /// HTTP header.
757    #[default]
758    Identity,
759    /// Compression using a [zlib](https://en.wikipedia.org/wiki/Zlib) structure with
760    /// [deflate](https://en.wikipedia.org/wiki/DEFLATE) encoding.
761    ///
762    /// These structures are defined in [RFC 1950](https://datatracker.ietf.org/doc/html/rfc1950)
763    /// and [RFC 1951](https://datatracker.ietf.org/doc/html/rfc1951).
764    Deflate,
765    /// A format using the [Lempel-Ziv coding](https://en.wikipedia.org/wiki/LZ77_and_LZ78#LZ77)
766    /// (LZ77), with a 32-bit CRC.
767    ///
768    /// This is the original format of the UNIX gzip program. The HTTP/1.1 standard also recommends
769    /// that the servers supporting this content-encoding should recognize `x-gzip` as an alias, for
770    /// compatibility purposes.
771    Gzip,
772    /// A format using the [Brotli](https://en.wikipedia.org/wiki/Brotli) algorithm.
773    Br,
774    /// A format using the [Zstd](https://en.wikipedia.org/wiki/Zstd) compression algorithm.
775    Zstd,
776}
777
778impl HttpEncoding {
779    /// Parses a [`HttpEncoding`] from its `content-encoding` header value.
780    pub fn parse(str: &str) -> Self {
781        let str = str.trim();
782        if str.eq_ignore_ascii_case("zstd") {
783            Self::Zstd
784        } else if str.eq_ignore_ascii_case("br") {
785            Self::Br
786        } else if str.eq_ignore_ascii_case("gzip") || str.eq_ignore_ascii_case("x-gzip") {
787            Self::Gzip
788        } else if str.eq_ignore_ascii_case("deflate") {
789            Self::Deflate
790        } else {
791            Self::Identity
792        }
793    }
794
795    /// Returns the value for the `content-encoding` HTTP header.
796    ///
797    /// Returns `None` for [`Identity`](Self::Identity), and `Some` for other encodings.
798    pub fn name(&self) -> Option<&'static str> {
799        match self {
800            Self::Identity => None,
801            Self::Deflate => Some("deflate"),
802            Self::Gzip => Some("gzip"),
803            Self::Br => Some("br"),
804            Self::Zstd => Some("zstd"),
805        }
806    }
807}
808
809/// Controls authentication with upstream.
810#[derive(Serialize, Deserialize, Debug)]
811#[serde(default)]
812pub struct Http {
813    /// Timeout for upstream requests in seconds.
814    ///
815    /// This timeout covers the time from sending the request until receiving response headers.
816    /// Neither the connection process and handshakes, nor reading the response body is covered in
817    /// this timeout.
818    pub timeout: u32,
819    /// Timeout for establishing connections with the upstream in seconds.
820    ///
821    /// This includes SSL handshakes. Relay reuses connections when the upstream supports connection
822    /// keep-alive. Connections are retained for a maximum 75 seconds, or 15 seconds of inactivity.
823    pub connection_timeout: u32,
824    /// Maximum interval between failed request retries in seconds.
825    pub max_retry_interval: u32,
826    /// The custom HTTP Host header to send to the upstream.
827    pub host_header: Option<String>,
828    /// The interval in seconds at which Relay attempts to reauthenticate with the upstream server.
829    ///
830    /// Re-authentication happens even when Relay is idle. If authentication fails, Relay reverts
831    /// back into startup mode and tries to establish a connection. During this time, incoming
832    /// envelopes will be buffered.
833    ///
834    /// Defaults to `600` (10 minutes).
835    pub auth_interval: Option<u64>,
836    /// The maximum time of experiencing uninterrupted network failures until Relay considers that
837    /// it has encountered a network outage in seconds.
838    ///
839    /// During a network outage relay will try to reconnect and will buffer all upstream messages
840    /// until it manages to reconnect.
841    pub outage_grace_period: u64,
842    /// The time Relay waits before retrying an upstream request, in seconds.
843    ///
844    /// This time is only used before going into a network outage mode.
845    pub retry_delay: u64,
846    /// The interval in seconds for continued failed project fetches at which Relay will error.
847    ///
848    /// A successful fetch resets this interval. Relay does nothing during long
849    /// times without emitting requests.
850    pub project_failure_interval: u64,
851    /// Content encoding to apply to upstream store requests.
852    ///
853    /// By default, Relay applies `zstd` content encoding to compress upstream requests. Compression
854    /// can be disabled to reduce CPU consumption, but at the expense of increased network traffic.
855    ///
856    /// This setting applies to all store requests of SDK data, including events, transactions,
857    /// envelopes and sessions. At the moment, this does not apply to Relay's internal queries.
858    ///
859    /// Available options are:
860    ///
861    ///  - `identity`: Disables compression.
862    ///  - `deflate`: Compression using a zlib header with deflate encoding.
863    ///  - `gzip` (default): Compression using gzip.
864    ///  - `br`: Compression using the brotli algorithm.
865    ///  - `zstd`: Compression using the zstd algorithm.
866    pub encoding: HttpEncoding,
867    /// Submit metrics globally through a shared endpoint.
868    ///
869    /// As opposed to regular envelopes which are sent to an endpoint inferred from the project's
870    /// DSN, this submits metrics to the global endpoint with Relay authentication.
871    ///
872    /// This option does not have any effect on processing mode.
873    pub global_metrics: bool,
874    /// Controls whether the forward endpoint is enabled.
875    ///
876    /// The forward endpoint forwards unknown API requests to the upstream.
877    pub forward: bool,
878}
879
880impl Default for Http {
881    fn default() -> Self {
882        Http {
883            timeout: 5,
884            connection_timeout: 3,
885            max_retry_interval: 60, // 1 minute
886            host_header: None,
887            auth_interval: Some(600), // 10 minutes
888            outage_grace_period: DEFAULT_NETWORK_OUTAGE_GRACE_PERIOD,
889            retry_delay: default_retry_delay(),
890            project_failure_interval: default_project_failure_interval(),
891            encoding: HttpEncoding::Zstd,
892            global_metrics: false,
893            forward: true,
894        }
895    }
896}
897
898/// Default for unavailable upstream retry period, 1s.
899fn default_retry_delay() -> u64 {
900    1
901}
902
903/// Default for project failure interval, 90s.
904fn default_project_failure_interval() -> u64 {
905    90
906}
907
908/// Default for max disk size, 500 MB.
909fn spool_envelopes_max_disk_size() -> ByteSize {
910    ByteSize::mebibytes(500)
911}
912
913/// Default number of encoded envelope bytes to cache before writing to disk.
914fn spool_envelopes_batch_size_bytes() -> ByteSize {
915    ByteSize::kibibytes(10)
916}
917
918fn spool_envelopes_max_envelope_delay_secs() -> u64 {
919    24 * 60 * 60
920}
921
922/// Default refresh frequency in ms for the disk usage monitoring.
923fn spool_disk_usage_refresh_frequency_ms() -> u64 {
924    100
925}
926
927/// Default max memory usage for unspooling.
928fn spool_max_backpressure_memory_percent() -> f32 {
929    0.8
930}
931
932/// Default number of partitions for the buffer.
933fn spool_envelopes_partitions() -> NonZeroU8 {
934    NonZeroU8::new(1).unwrap()
935}
936
937/// Persistent buffering configuration for incoming envelopes.
938#[derive(Debug, Serialize, Deserialize)]
939pub struct EnvelopeSpool {
940    /// The path of the SQLite database file(s) which persist the data.
941    ///
942    /// Based on the number of partitions, more database files will be created within the same path.
943    ///
944    /// If not set, the envelopes will be buffered in memory.
945    pub path: Option<PathBuf>,
946    /// The maximum size of the buffer to keep, in bytes.
947    ///
948    /// When the on-disk buffer reaches this size, new envelopes will be dropped.
949    ///
950    /// Defaults to 500MB.
951    #[serde(default = "spool_envelopes_max_disk_size")]
952    pub max_disk_size: ByteSize,
953    /// Size of the batch of compressed envelopes that are spooled to disk at once.
954    ///
955    /// Note that this is the size after which spooling will be triggered but it does not guarantee
956    /// that exactly this size will be spooled, it can be greater or equal.
957    ///
958    /// Defaults to 10 KiB.
959    #[serde(default = "spool_envelopes_batch_size_bytes")]
960    pub batch_size_bytes: ByteSize,
961    /// Maximum time between receiving the envelope and processing it.
962    ///
963    /// When envelopes spend too much time in the buffer (e.g. because their project cannot be loaded),
964    /// they are dropped.
965    ///
966    /// Defaults to 24h.
967    #[serde(default = "spool_envelopes_max_envelope_delay_secs")]
968    pub max_envelope_delay_secs: u64,
969    /// The refresh frequency in ms of how frequently disk usage is updated by querying SQLite
970    /// internal page stats.
971    ///
972    /// Defaults to 100ms.
973    #[serde(default = "spool_disk_usage_refresh_frequency_ms")]
974    pub disk_usage_refresh_frequency_ms: u64,
975    /// The relative memory usage above which the buffer service will stop dequeueing envelopes.
976    ///
977    /// Only applies when [`Self::path`] is set.
978    ///
979    /// This value should be lower than [`Health::max_memory_percent`] to prevent flip-flopping.
980    ///
981    /// Warning: This threshold can cause the buffer service to deadlock when the buffer consumes
982    /// excessive memory (as influenced by [`Self::batch_size_bytes`]).
983    ///
984    /// This scenario arises when the buffer stops spooling due to reaching the
985    /// [`Self::max_backpressure_memory_percent`] limit, but the batch threshold for spooling
986    /// ([`Self::batch_size_bytes`]) is never reached. As a result, no data is spooled, memory usage
987    /// continues to grow, and the system becomes deadlocked.
988    ///
989    /// ### Example
990    /// Suppose the system has 1GB of available memory and is configured to spool only after
991    /// accumulating 10GB worth of envelopes. If Relay consumes 900MB of memory, it will stop
992    /// unspooling due to reaching the [`Self::max_backpressure_memory_percent`] threshold.
993    ///
994    /// However, because the buffer hasn't accumulated the 10GB needed to trigger spooling,
995    /// no data will be offloaded. Memory usage keeps increasing until it hits the
996    /// [`Health::max_memory_percent`] threshold, e.g., at 950MB. At this point:
997    ///
998    /// - No more envelopes are accepted.
999    /// - The buffer remains stuck, as unspooling won’t resume until memory drops below 900MB which
1000    ///   will not happen.
1001    /// - A deadlock occurs, with the system unable to recover without manual intervention.
1002    ///
1003    /// Defaults to 90% (5% less than max memory).
1004    #[serde(default = "spool_max_backpressure_memory_percent")]
1005    pub max_backpressure_memory_percent: f32,
1006    /// Number of partitions of the buffer.
1007    ///
1008    /// A partition is a separate instance of the buffer which has its own isolated queue, stacks
1009    /// and other resources.
1010    ///
1011    /// Defaults to 1.
1012    #[serde(default = "spool_envelopes_partitions")]
1013    pub partitions: NonZeroU8,
1014}
1015
1016impl Default for EnvelopeSpool {
1017    fn default() -> Self {
1018        Self {
1019            path: None,
1020            max_disk_size: spool_envelopes_max_disk_size(),
1021            batch_size_bytes: spool_envelopes_batch_size_bytes(),
1022            max_envelope_delay_secs: spool_envelopes_max_envelope_delay_secs(),
1023            disk_usage_refresh_frequency_ms: spool_disk_usage_refresh_frequency_ms(),
1024            max_backpressure_memory_percent: spool_max_backpressure_memory_percent(),
1025            partitions: spool_envelopes_partitions(),
1026        }
1027    }
1028}
1029
1030/// Persistent buffering configuration.
1031#[derive(Debug, Serialize, Deserialize, Default)]
1032pub struct Spool {
1033    /// Configuration for envelope spooling.
1034    #[serde(default)]
1035    pub envelopes: EnvelopeSpool,
1036}
1037
1038/// Controls internal caching behavior.
1039#[derive(Serialize, Deserialize, Debug)]
1040#[serde(default)]
1041pub struct Cache {
1042    /// The full project state will be requested by this Relay if set to `true`.
1043    pub project_request_full_config: bool,
1044    /// The cache timeout for project configurations in seconds.
1045    pub project_expiry: u32,
1046    /// Continue using project state this many seconds after cache expiry while a new state is
1047    /// being fetched. This is added on top of `project_expiry`.
1048    ///
1049    /// Default is 2 minutes.
1050    pub project_grace_period: u32,
1051    /// Refresh a project after the specified seconds.
1052    ///
1053    /// The time must be between expiry time and the grace period.
1054    ///
1055    /// By default there are no refreshes enabled.
1056    pub project_refresh_interval: Option<u32>,
1057    /// The cache timeout for downstream relay info (public keys) in seconds.
1058    pub relay_expiry: u32,
1059    /// Unused cache timeout for envelopes.
1060    ///
1061    /// The envelope buffer is instead controlled by `envelope_buffer_size`, which controls the
1062    /// maximum number of envelopes in the buffer. A time based configuration may be re-introduced
1063    /// at a later point.
1064    #[serde(alias = "event_expiry")]
1065    envelope_expiry: u32,
1066    /// The maximum amount of envelopes to queue before dropping them.
1067    #[serde(alias = "event_buffer_size")]
1068    envelope_buffer_size: u32,
1069    /// The cache timeout for non-existing entries.
1070    pub miss_expiry: u32,
1071    /// The buffer timeout for batched project config queries before sending them upstream in ms.
1072    pub batch_interval: u32,
1073    /// The buffer timeout for batched queries of downstream relays in ms. Defaults to 100ms.
1074    pub downstream_relays_batch_interval: u32,
1075    /// The maximum number of project configs to fetch from Sentry at once. Defaults to 500.
1076    ///
1077    /// `cache.batch_interval` controls how quickly batches are sent, this controls the batch size.
1078    pub batch_size: usize,
1079    /// Interval for watching local cache override files in seconds.
1080    pub file_interval: u32,
1081    /// Interval for fetching new global configs from the upstream, in seconds.
1082    pub global_config_fetch_interval: u32,
1083}
1084
1085impl Default for Cache {
1086    fn default() -> Self {
1087        Cache {
1088            project_request_full_config: false,
1089            project_expiry: 300,       // 5 minutes
1090            project_grace_period: 120, // 2 minutes
1091            project_refresh_interval: None,
1092            relay_expiry: 3600,   // 1 hour
1093            envelope_expiry: 600, // 10 minutes
1094            envelope_buffer_size: 1000,
1095            miss_expiry: 60,                       // 1 minute
1096            batch_interval: 100,                   // 100ms
1097            downstream_relays_batch_interval: 100, // 100ms
1098            batch_size: 500,
1099            file_interval: 10,                // 10 seconds
1100            global_config_fetch_interval: 10, // 10 seconds
1101        }
1102    }
1103}
1104
1105fn default_max_secs_in_future() -> u32 {
1106    60 // 1 minute
1107}
1108
1109fn default_max_session_secs_in_past() -> u32 {
1110    5 * 24 * 3600 // 5 days
1111}
1112
1113fn default_chunk_size() -> ByteSize {
1114    ByteSize::mebibytes(1)
1115}
1116
1117fn default_projectconfig_cache_prefix() -> String {
1118    "relayconfig".to_owned()
1119}
1120
1121#[allow(clippy::unnecessary_wraps)]
1122fn default_max_rate_limit() -> Option<u32> {
1123    Some(300) // 5 minutes
1124}
1125
1126/// Controls Sentry-internal event processing.
1127#[derive(Serialize, Deserialize, Debug)]
1128pub struct Processing {
1129    /// True if the Relay should do processing. Defaults to `false`.
1130    pub enabled: bool,
1131    /// GeoIp DB file source.
1132    #[serde(default)]
1133    pub geoip_path: Option<PathBuf>,
1134    /// Maximum future timestamp of ingested events.
1135    #[serde(default = "default_max_secs_in_future")]
1136    pub max_secs_in_future: u32,
1137    /// Maximum age of ingested sessions. Older sessions will be dropped.
1138    #[serde(default = "default_max_session_secs_in_past")]
1139    pub max_session_secs_in_past: u32,
1140    /// Kafka producer configurations.
1141    pub kafka_config: Vec<KafkaConfigParam>,
1142    /// Additional kafka producer configurations.
1143    ///
1144    /// The `kafka_config` is the default producer configuration used for all topics. A secondary
1145    /// kafka config can be referenced in `topics:` like this:
1146    ///
1147    /// ```yaml
1148    /// secondary_kafka_configs:
1149    ///   mycustomcluster:
1150    ///     - name: 'bootstrap.servers'
1151    ///       value: 'sentry_kafka_metrics:9093'
1152    ///
1153    /// topics:
1154    ///   transactions: ingest-transactions
1155    ///   metrics:
1156    ///     name: ingest-metrics
1157    ///     config: mycustomcluster
1158    /// ```
1159    ///
1160    /// Then metrics will be produced to an entirely different Kafka cluster.
1161    #[serde(default)]
1162    pub secondary_kafka_configs: BTreeMap<String, Vec<KafkaConfigParam>>,
1163    /// Kafka topic names.
1164    #[serde(default)]
1165    pub topics: TopicAssignments,
1166    /// Whether to validate the supplied topics by calling Kafka's metadata endpoints.
1167    #[serde(default)]
1168    pub kafka_validate_topics: bool,
1169    /// Redis hosts to connect to for storing state for rate limits.
1170    #[serde(default)]
1171    pub redis: Option<RedisConfigs>,
1172    /// Maximum chunk size of attachments for Kafka.
1173    #[serde(default = "default_chunk_size")]
1174    pub attachment_chunk_size: ByteSize,
1175    /// Prefix to use when looking up project configs in Redis. Defaults to "relayconfig".
1176    #[serde(default = "default_projectconfig_cache_prefix")]
1177    pub projectconfig_cache_prefix: String,
1178    /// Maximum rate limit to report to clients.
1179    #[serde(default = "default_max_rate_limit")]
1180    pub max_rate_limit: Option<u32>,
1181    /// Configures the quota cache ratio between `0.0` and `1.0`.
1182    ///
1183    /// The quota cache, caches the specified ratio of remaining quota in memory to reduce the
1184    /// amount of synchronizations required with Redis.
1185    ///
1186    /// The ratio is applied to the (per second) rate of the quota, not the total limit.
1187    /// For example a quota with limit 100 with a 10 second window is treated equally to a quota of
1188    /// 10 with a 1 second window.
1189    ///
1190    /// By default quota caching is disabled.
1191    pub quota_cache_ratio: Option<f32>,
1192    /// Relative amount of the total quota limit to which quota caching is applied.
1193    ///
1194    /// If exceeded, the rate limiter will no longer cache the quota and sync with Redis on every call instead.
1195    /// Lowering this value reduces the probability of incorrectly over-accepting.
1196    ///
1197    /// Must be between `0.0` and `1.0`, by default there is no limit configured.
1198    pub quota_cache_max: Option<f32>,
1199    /// Configuration for attachment uploads.
1200    #[serde(default)]
1201    pub upload: UploadServiceConfig,
1202}
1203
1204impl Default for Processing {
1205    /// Constructs a disabled processing configuration.
1206    fn default() -> Self {
1207        Self {
1208            enabled: false,
1209            geoip_path: None,
1210            max_secs_in_future: default_max_secs_in_future(),
1211            max_session_secs_in_past: default_max_session_secs_in_past(),
1212            kafka_config: Vec::new(),
1213            secondary_kafka_configs: BTreeMap::new(),
1214            topics: TopicAssignments::default(),
1215            kafka_validate_topics: false,
1216            redis: None,
1217            attachment_chunk_size: default_chunk_size(),
1218            projectconfig_cache_prefix: default_projectconfig_cache_prefix(),
1219            max_rate_limit: default_max_rate_limit(),
1220            quota_cache_ratio: None,
1221            quota_cache_max: None,
1222            upload: UploadServiceConfig::default(),
1223        }
1224    }
1225}
1226
1227/// Configuration for normalization in this Relay.
1228#[derive(Debug, Default, Serialize, Deserialize)]
1229#[serde(default)]
1230pub struct Normalization {
1231    /// Level of normalization for Relay to apply to incoming data.
1232    #[serde(default)]
1233    pub level: NormalizationLevel,
1234}
1235
1236/// Configuration for the level of normalization this Relay should do.
1237#[derive(Copy, Clone, Debug, Default, Serialize, Deserialize, Eq, PartialEq)]
1238#[serde(rename_all = "lowercase")]
1239pub enum NormalizationLevel {
1240    /// Runs normalization, excluding steps that break future compatibility.
1241    ///
1242    /// Processing Relays run [`NormalizationLevel::Full`] if this option is set.
1243    #[default]
1244    Default,
1245    /// Run full normalization.
1246    ///
1247    /// It includes steps that break future compatibility and should only run in
1248    /// the last layer of relays.
1249    Full,
1250}
1251
1252/// Configuration values for the outcome aggregator
1253#[derive(Serialize, Deserialize, Debug)]
1254#[serde(default)]
1255pub struct OutcomeAggregatorConfig {
1256    /// Defines the width of the buckets into which outcomes are aggregated, in seconds.
1257    pub bucket_interval: u64,
1258    /// Defines how often all buckets are flushed, in seconds.
1259    pub flush_interval: u64,
1260}
1261
1262impl Default for OutcomeAggregatorConfig {
1263    fn default() -> Self {
1264        Self {
1265            bucket_interval: 60,
1266            flush_interval: 120,
1267        }
1268    }
1269}
1270
1271/// Configuration values for attachment uploads.
1272#[derive(Serialize, Deserialize, Debug)]
1273#[serde(default)]
1274pub struct UploadServiceConfig {
1275    /// The base URL for the objectstore service.
1276    ///
1277    /// This defaults to [`None`], which means that the service will be disabled,
1278    /// unless a proper configuration is provided.
1279    pub objectstore_url: Option<String>,
1280
1281    /// Maximum concurrency of uploads.
1282    pub max_concurrent_requests: usize,
1283
1284    /// Maximum duration of an attachment upload in seconds. Uploads that take longer are discarded.
1285    pub timeout: u64,
1286}
1287
1288impl Default for UploadServiceConfig {
1289    fn default() -> Self {
1290        Self {
1291            objectstore_url: None,
1292            max_concurrent_requests: 10,
1293            timeout: 60,
1294        }
1295    }
1296}
1297
1298/// Determines how to emit outcomes.
1299/// For compatibility reasons, this can either be true, false or AsClientReports
1300#[derive(Copy, Clone, Debug, PartialEq, Eq)]
1301
1302pub enum EmitOutcomes {
1303    /// Do not emit any outcomes
1304    None,
1305    /// Emit outcomes as client reports
1306    AsClientReports,
1307    /// Emit outcomes as outcomes
1308    AsOutcomes,
1309}
1310
1311impl EmitOutcomes {
1312    /// Returns true of outcomes are emitted via http, kafka, or client reports.
1313    pub fn any(&self) -> bool {
1314        !matches!(self, EmitOutcomes::None)
1315    }
1316}
1317
1318impl Serialize for EmitOutcomes {
1319    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1320    where
1321        S: Serializer,
1322    {
1323        // For compatibility, serialize None and AsOutcomes as booleans.
1324        match self {
1325            Self::None => serializer.serialize_bool(false),
1326            Self::AsClientReports => serializer.serialize_str("as_client_reports"),
1327            Self::AsOutcomes => serializer.serialize_bool(true),
1328        }
1329    }
1330}
1331
1332struct EmitOutcomesVisitor;
1333
1334impl Visitor<'_> for EmitOutcomesVisitor {
1335    type Value = EmitOutcomes;
1336
1337    fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
1338        formatter.write_str("true, false, or 'as_client_reports'")
1339    }
1340
1341    fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E>
1342    where
1343        E: serde::de::Error,
1344    {
1345        Ok(if v {
1346            EmitOutcomes::AsOutcomes
1347        } else {
1348            EmitOutcomes::None
1349        })
1350    }
1351
1352    fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
1353    where
1354        E: serde::de::Error,
1355    {
1356        if v == "as_client_reports" {
1357            Ok(EmitOutcomes::AsClientReports)
1358        } else {
1359            Err(E::invalid_value(Unexpected::Str(v), &"as_client_reports"))
1360        }
1361    }
1362}
1363
1364impl<'de> Deserialize<'de> for EmitOutcomes {
1365    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1366    where
1367        D: Deserializer<'de>,
1368    {
1369        deserializer.deserialize_any(EmitOutcomesVisitor)
1370    }
1371}
1372
1373/// Outcome generation specific configuration values.
1374#[derive(Serialize, Deserialize, Debug)]
1375#[serde(default)]
1376pub struct Outcomes {
1377    /// Controls whether outcomes will be emitted when processing is disabled.
1378    /// Processing relays always emit outcomes (for backwards compatibility).
1379    /// Can take the following values: false, "as_client_reports", true
1380    pub emit_outcomes: EmitOutcomes,
1381    /// Controls wheather client reported outcomes should be emitted.
1382    pub emit_client_outcomes: bool,
1383    /// The maximum number of outcomes that are batched before being sent
1384    /// via http to the upstream (only applies to non processing relays).
1385    pub batch_size: usize,
1386    /// The maximum time interval (in milliseconds) that an outcome may be batched
1387    /// via http to the upstream (only applies to non processing relays).
1388    pub batch_interval: u64,
1389    /// Defines the source string registered in the outcomes originating from
1390    /// this Relay (typically something like the region or the layer).
1391    pub source: Option<String>,
1392    /// Configures the outcome aggregator.
1393    pub aggregator: OutcomeAggregatorConfig,
1394}
1395
1396impl Default for Outcomes {
1397    fn default() -> Self {
1398        Outcomes {
1399            emit_outcomes: EmitOutcomes::AsClientReports,
1400            emit_client_outcomes: true,
1401            batch_size: 1000,
1402            batch_interval: 500,
1403            source: None,
1404            aggregator: OutcomeAggregatorConfig::default(),
1405        }
1406    }
1407}
1408
1409/// Minimal version of a config for dumping out.
1410#[derive(Serialize, Deserialize, Debug, Default)]
1411pub struct MinimalConfig {
1412    /// The relay part of the config.
1413    pub relay: Relay,
1414}
1415
1416impl MinimalConfig {
1417    /// Saves the config in the given config folder as config.yml
1418    pub fn save_in_folder<P: AsRef<Path>>(&self, p: P) -> anyhow::Result<()> {
1419        let path = p.as_ref();
1420        if fs::metadata(path).is_err() {
1421            fs::create_dir_all(path)
1422                .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotOpenFile, path))?;
1423        }
1424        self.save(path)
1425    }
1426}
1427
1428impl ConfigObject for MinimalConfig {
1429    fn format() -> ConfigFormat {
1430        ConfigFormat::Yaml
1431    }
1432
1433    fn name() -> &'static str {
1434        "config"
1435    }
1436}
1437
1438/// Alternative serialization of RelayInfo for config file using snake case.
1439mod config_relay_info {
1440    use serde::ser::SerializeMap;
1441
1442    use super::*;
1443
1444    // Uses snake_case as opposed to camelCase.
1445    #[derive(Debug, Serialize, Deserialize, Clone)]
1446    struct RelayInfoConfig {
1447        public_key: PublicKey,
1448        #[serde(default)]
1449        internal: bool,
1450    }
1451
1452    impl From<RelayInfoConfig> for RelayInfo {
1453        fn from(v: RelayInfoConfig) -> Self {
1454            RelayInfo {
1455                public_key: v.public_key,
1456                internal: v.internal,
1457            }
1458        }
1459    }
1460
1461    impl From<RelayInfo> for RelayInfoConfig {
1462        fn from(v: RelayInfo) -> Self {
1463            RelayInfoConfig {
1464                public_key: v.public_key,
1465                internal: v.internal,
1466            }
1467        }
1468    }
1469
1470    pub(super) fn deserialize<'de, D>(des: D) -> Result<HashMap<RelayId, RelayInfo>, D::Error>
1471    where
1472        D: Deserializer<'de>,
1473    {
1474        let map = HashMap::<RelayId, RelayInfoConfig>::deserialize(des)?;
1475        Ok(map.into_iter().map(|(k, v)| (k, v.into())).collect())
1476    }
1477
1478    pub(super) fn serialize<S>(elm: &HashMap<RelayId, RelayInfo>, ser: S) -> Result<S::Ok, S::Error>
1479    where
1480        S: Serializer,
1481    {
1482        let mut map = ser.serialize_map(Some(elm.len()))?;
1483
1484        for (k, v) in elm {
1485            map.serialize_entry(k, &RelayInfoConfig::from(v.clone()))?;
1486        }
1487
1488        map.end()
1489    }
1490}
1491
1492/// Authentication options.
1493#[derive(Serialize, Deserialize, Debug, Default)]
1494pub struct AuthConfig {
1495    /// Controls responses from the readiness health check endpoint based on authentication.
1496    #[serde(default, skip_serializing_if = "is_default")]
1497    pub ready: ReadinessCondition,
1498
1499    /// Statically authenticated downstream relays.
1500    #[serde(default, with = "config_relay_info")]
1501    pub static_relays: HashMap<RelayId, RelayInfo>,
1502
1503    /// How old a signature can be before it is considered invalid, in seconds.
1504    ///
1505    /// Defaults to 5 minutes.
1506    #[serde(default = "default_max_age")]
1507    pub signature_max_age: u64,
1508}
1509
1510fn default_max_age() -> u64 {
1511    300
1512}
1513
1514/// GeoIp database configuration options.
1515#[derive(Serialize, Deserialize, Debug, Default)]
1516pub struct GeoIpConfig {
1517    /// The path to GeoIP database.
1518    pub path: Option<PathBuf>,
1519}
1520
1521/// Cardinality Limiter configuration options.
1522#[derive(Serialize, Deserialize, Debug)]
1523#[serde(default)]
1524pub struct CardinalityLimiter {
1525    /// Cache vacuum interval in seconds for the in memory cache.
1526    ///
1527    /// The cache will scan for expired values based on this interval.
1528    ///
1529    /// Defaults to 180 seconds, 3 minutes.
1530    pub cache_vacuum_interval: u64,
1531}
1532
1533impl Default for CardinalityLimiter {
1534    fn default() -> Self {
1535        Self {
1536            cache_vacuum_interval: 180,
1537        }
1538    }
1539}
1540
1541/// Settings to control Relay's health checks.
1542///
1543/// After breaching one of the configured thresholds, Relay will
1544/// return an `unhealthy` status from its health endpoint.
1545#[derive(Serialize, Deserialize, Debug)]
1546#[serde(default)]
1547pub struct Health {
1548    /// Interval to refresh internal health checks.
1549    ///
1550    /// Shorter intervals will decrease the time it takes the health check endpoint to report
1551    /// issues, but can also increase sporadic unhealthy responses.
1552    ///
1553    /// Defaults to `3000`` (3 seconds).
1554    pub refresh_interval_ms: u64,
1555    /// Maximum memory watermark in bytes.
1556    ///
1557    /// By default, there is no absolute limit set and the watermark
1558    /// is only controlled by setting [`Self::max_memory_percent`].
1559    pub max_memory_bytes: Option<ByteSize>,
1560    /// Maximum memory watermark as a percentage of maximum system memory.
1561    ///
1562    /// Defaults to `0.95` (95%).
1563    pub max_memory_percent: f32,
1564    /// Health check probe timeout in milliseconds.
1565    ///
1566    /// Any probe exceeding the timeout will be considered failed.
1567    /// This limits the max execution time of Relay health checks.
1568    ///
1569    /// Defaults to 900 milliseconds.
1570    pub probe_timeout_ms: u64,
1571    /// The refresh frequency of memory stats which are used to poll memory
1572    /// usage of Relay.
1573    ///
1574    /// The implementation of memory stats guarantees that the refresh will happen at
1575    /// least every `x` ms since memory readings are lazy and are updated only if needed.
1576    pub memory_stat_refresh_frequency_ms: u64,
1577}
1578
1579impl Default for Health {
1580    fn default() -> Self {
1581        Self {
1582            refresh_interval_ms: 3000,
1583            max_memory_bytes: None,
1584            max_memory_percent: 0.95,
1585            probe_timeout_ms: 900,
1586            memory_stat_refresh_frequency_ms: 100,
1587        }
1588    }
1589}
1590
1591/// COGS configuration.
1592#[derive(Serialize, Deserialize, Debug)]
1593#[serde(default)]
1594pub struct Cogs {
1595    /// Maximium amount of COGS measurements allowed to backlog.
1596    ///
1597    /// Any additional COGS measurements recorded will be dropped.
1598    ///
1599    /// Defaults to `10_000`.
1600    pub max_queue_size: u64,
1601    /// Relay COGS resource id.
1602    ///
1603    /// All Relay related COGS measurements are emitted with this resource id.
1604    ///
1605    /// Defaults to `relay_service`.
1606    pub relay_resource_id: String,
1607}
1608
1609impl Default for Cogs {
1610    fn default() -> Self {
1611        Self {
1612            max_queue_size: 10_000,
1613            relay_resource_id: "relay_service".to_owned(),
1614        }
1615    }
1616}
1617
1618#[derive(Serialize, Deserialize, Debug, Default)]
1619struct ConfigValues {
1620    #[serde(default)]
1621    relay: Relay,
1622    #[serde(default)]
1623    http: Http,
1624    #[serde(default)]
1625    cache: Cache,
1626    #[serde(default)]
1627    spool: Spool,
1628    #[serde(default)]
1629    limits: Limits,
1630    #[serde(default)]
1631    logging: relay_log::LogConfig,
1632    #[serde(default)]
1633    routing: Routing,
1634    #[serde(default)]
1635    metrics: Metrics,
1636    #[serde(default)]
1637    sentry: relay_log::SentryConfig,
1638    #[serde(default)]
1639    processing: Processing,
1640    #[serde(default)]
1641    outcomes: Outcomes,
1642    #[serde(default)]
1643    aggregator: AggregatorServiceConfig,
1644    #[serde(default)]
1645    secondary_aggregators: Vec<ScopedAggregatorConfig>,
1646    #[serde(default)]
1647    auth: AuthConfig,
1648    #[serde(default)]
1649    geoip: GeoIpConfig,
1650    #[serde(default)]
1651    normalization: Normalization,
1652    #[serde(default)]
1653    cardinality_limiter: CardinalityLimiter,
1654    #[serde(default)]
1655    health: Health,
1656    #[serde(default)]
1657    cogs: Cogs,
1658}
1659
1660impl ConfigObject for ConfigValues {
1661    fn format() -> ConfigFormat {
1662        ConfigFormat::Yaml
1663    }
1664
1665    fn name() -> &'static str {
1666        "config"
1667    }
1668}
1669
1670/// Config struct.
1671pub struct Config {
1672    values: ConfigValues,
1673    credentials: Option<Credentials>,
1674    path: PathBuf,
1675}
1676
1677impl fmt::Debug for Config {
1678    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1679        f.debug_struct("Config")
1680            .field("path", &self.path)
1681            .field("values", &self.values)
1682            .finish()
1683    }
1684}
1685
1686impl Config {
1687    /// Loads a config from a given config folder.
1688    pub fn from_path<P: AsRef<Path>>(path: P) -> anyhow::Result<Config> {
1689        let path = env::current_dir()
1690            .map(|x| x.join(path.as_ref()))
1691            .unwrap_or_else(|_| path.as_ref().to_path_buf());
1692
1693        let config = Config {
1694            values: ConfigValues::load(&path)?,
1695            credentials: if Credentials::path(&path).exists() {
1696                Some(Credentials::load(&path)?)
1697            } else {
1698                None
1699            },
1700            path: path.clone(),
1701        };
1702
1703        if cfg!(not(feature = "processing")) && config.processing_enabled() {
1704            return Err(ConfigError::file(ConfigErrorKind::ProcessingNotAvailable, &path).into());
1705        }
1706
1707        Ok(config)
1708    }
1709
1710    /// Creates a config from a JSON value.
1711    ///
1712    /// This is mostly useful for tests.
1713    pub fn from_json_value(value: serde_json::Value) -> anyhow::Result<Config> {
1714        Ok(Config {
1715            values: serde_json::from_value(value)
1716                .with_context(|| ConfigError::new(ConfigErrorKind::BadJson))?,
1717            credentials: None,
1718            path: PathBuf::new(),
1719        })
1720    }
1721
1722    /// Override configuration with values coming from other sources (e.g. env variables or
1723    /// command line parameters)
1724    pub fn apply_override(
1725        &mut self,
1726        mut overrides: OverridableConfig,
1727    ) -> anyhow::Result<&mut Self> {
1728        let relay = &mut self.values.relay;
1729
1730        if let Some(mode) = overrides.mode {
1731            relay.mode = mode
1732                .parse::<RelayMode>()
1733                .with_context(|| ConfigError::field("mode"))?;
1734        }
1735
1736        if let Some(deployment) = overrides.instance {
1737            relay.instance = deployment
1738                .parse::<RelayInstance>()
1739                .with_context(|| ConfigError::field("deployment"))?;
1740        }
1741
1742        if let Some(log_level) = overrides.log_level {
1743            self.values.logging.level = log_level.parse()?;
1744        }
1745
1746        if let Some(log_format) = overrides.log_format {
1747            self.values.logging.format = log_format.parse()?;
1748        }
1749
1750        if let Some(upstream) = overrides.upstream {
1751            relay.upstream = upstream
1752                .parse::<UpstreamDescriptor>()
1753                .with_context(|| ConfigError::field("upstream"))?;
1754        } else if let Some(upstream_dsn) = overrides.upstream_dsn {
1755            relay.upstream = upstream_dsn
1756                .parse::<Dsn>()
1757                .map(|dsn| UpstreamDescriptor::from_dsn(&dsn).into_owned())
1758                .with_context(|| ConfigError::field("upstream_dsn"))?;
1759        }
1760
1761        if let Some(host) = overrides.host {
1762            relay.host = host
1763                .parse::<IpAddr>()
1764                .with_context(|| ConfigError::field("host"))?;
1765        }
1766
1767        if let Some(port) = overrides.port {
1768            relay.port = port
1769                .as_str()
1770                .parse()
1771                .with_context(|| ConfigError::field("port"))?;
1772        }
1773
1774        let processing = &mut self.values.processing;
1775        if let Some(enabled) = overrides.processing {
1776            match enabled.to_lowercase().as_str() {
1777                "true" | "1" => processing.enabled = true,
1778                "false" | "0" | "" => processing.enabled = false,
1779                _ => return Err(ConfigError::field("processing").into()),
1780            }
1781        }
1782
1783        if let Some(redis) = overrides.redis_url {
1784            processing.redis = Some(RedisConfigs::Unified(RedisConfig::single(redis)))
1785        }
1786
1787        if let Some(kafka_url) = overrides.kafka_url {
1788            let existing = processing
1789                .kafka_config
1790                .iter_mut()
1791                .find(|e| e.name == "bootstrap.servers");
1792
1793            if let Some(config_param) = existing {
1794                config_param.value = kafka_url;
1795            } else {
1796                processing.kafka_config.push(KafkaConfigParam {
1797                    name: "bootstrap.servers".to_owned(),
1798                    value: kafka_url,
1799                })
1800            }
1801        }
1802        // credentials overrides
1803        let id = if let Some(id) = overrides.id {
1804            let id = Uuid::parse_str(&id).with_context(|| ConfigError::field("id"))?;
1805            Some(id)
1806        } else {
1807            None
1808        };
1809        let public_key = if let Some(public_key) = overrides.public_key {
1810            let public_key = public_key
1811                .parse::<PublicKey>()
1812                .with_context(|| ConfigError::field("public_key"))?;
1813            Some(public_key)
1814        } else {
1815            None
1816        };
1817
1818        let secret_key = if let Some(secret_key) = overrides.secret_key {
1819            let secret_key = secret_key
1820                .parse::<SecretKey>()
1821                .with_context(|| ConfigError::field("secret_key"))?;
1822            Some(secret_key)
1823        } else {
1824            None
1825        };
1826        let outcomes = &mut self.values.outcomes;
1827        if overrides.outcome_source.is_some() {
1828            outcomes.source = overrides.outcome_source.take();
1829        }
1830
1831        if let Some(credentials) = &mut self.credentials {
1832            //we have existing credentials we may override some entries
1833            if let Some(id) = id {
1834                credentials.id = id;
1835            }
1836            if let Some(public_key) = public_key {
1837                credentials.public_key = public_key;
1838            }
1839            if let Some(secret_key) = secret_key {
1840                credentials.secret_key = secret_key
1841            }
1842        } else {
1843            //no existing credentials we may only create the full credentials
1844            match (id, public_key, secret_key) {
1845                (Some(id), Some(public_key), Some(secret_key)) => {
1846                    self.credentials = Some(Credentials {
1847                        secret_key,
1848                        public_key,
1849                        id,
1850                    })
1851                }
1852                (None, None, None) => {
1853                    // nothing provided, we'll just leave the credentials None, maybe we
1854                    // don't need them in the current command or we'll override them later
1855                }
1856                _ => {
1857                    return Err(ConfigError::field("incomplete credentials").into());
1858                }
1859            }
1860        }
1861
1862        let limits = &mut self.values.limits;
1863        if let Some(shutdown_timeout) = overrides.shutdown_timeout
1864            && let Ok(shutdown_timeout) = shutdown_timeout.parse::<u64>()
1865        {
1866            limits.shutdown_timeout = shutdown_timeout;
1867        }
1868
1869        if let Some(server_name) = overrides.server_name {
1870            self.values.sentry.server_name = Some(server_name.into());
1871        }
1872
1873        Ok(self)
1874    }
1875
1876    /// Checks if the config is already initialized.
1877    pub fn config_exists<P: AsRef<Path>>(path: P) -> bool {
1878        fs::metadata(ConfigValues::path(path.as_ref())).is_ok()
1879    }
1880
1881    /// Returns the filename of the config file.
1882    pub fn path(&self) -> &Path {
1883        &self.path
1884    }
1885
1886    /// Dumps out a YAML string of the values.
1887    pub fn to_yaml_string(&self) -> anyhow::Result<String> {
1888        serde_yaml::to_string(&self.values)
1889            .with_context(|| ConfigError::new(ConfigErrorKind::CouldNotWriteFile))
1890    }
1891
1892    /// Regenerates the relay credentials.
1893    ///
1894    /// This also writes the credentials back to the file.
1895    pub fn regenerate_credentials(&mut self, save: bool) -> anyhow::Result<()> {
1896        let creds = Credentials::generate();
1897        if save {
1898            creds.save(&self.path)?;
1899        }
1900        self.credentials = Some(creds);
1901        Ok(())
1902    }
1903
1904    /// Return the current credentials
1905    pub fn credentials(&self) -> Option<&Credentials> {
1906        self.credentials.as_ref()
1907    }
1908
1909    /// Set new credentials.
1910    ///
1911    /// This also writes the credentials back to the file.
1912    pub fn replace_credentials(
1913        &mut self,
1914        credentials: Option<Credentials>,
1915    ) -> anyhow::Result<bool> {
1916        if self.credentials == credentials {
1917            return Ok(false);
1918        }
1919
1920        match credentials {
1921            Some(ref creds) => {
1922                creds.save(&self.path)?;
1923            }
1924            None => {
1925                let path = Credentials::path(&self.path);
1926                if fs::metadata(&path).is_ok() {
1927                    fs::remove_file(&path).with_context(|| {
1928                        ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path)
1929                    })?;
1930                }
1931            }
1932        }
1933
1934        self.credentials = credentials;
1935        Ok(true)
1936    }
1937
1938    /// Returns `true` if the config is ready to use.
1939    pub fn has_credentials(&self) -> bool {
1940        self.credentials.is_some()
1941    }
1942
1943    /// Returns the secret key if set.
1944    pub fn secret_key(&self) -> Option<&SecretKey> {
1945        self.credentials.as_ref().map(|x| &x.secret_key)
1946    }
1947
1948    /// Returns the public key if set.
1949    pub fn public_key(&self) -> Option<&PublicKey> {
1950        self.credentials.as_ref().map(|x| &x.public_key)
1951    }
1952
1953    /// Returns the relay ID.
1954    pub fn relay_id(&self) -> Option<&RelayId> {
1955        self.credentials.as_ref().map(|x| &x.id)
1956    }
1957
1958    /// Returns the relay mode.
1959    pub fn relay_mode(&self) -> RelayMode {
1960        self.values.relay.mode
1961    }
1962
1963    /// Returns the instance type of relay.
1964    pub fn relay_instance(&self) -> RelayInstance {
1965        self.values.relay.instance
1966    }
1967
1968    /// Returns the upstream target as descriptor.
1969    pub fn upstream_descriptor(&self) -> &UpstreamDescriptor<'_> {
1970        &self.values.relay.upstream
1971    }
1972
1973    /// Returns the custom HTTP "Host" header.
1974    pub fn http_host_header(&self) -> Option<&str> {
1975        self.values.http.host_header.as_deref()
1976    }
1977
1978    /// Returns the listen address.
1979    pub fn listen_addr(&self) -> SocketAddr {
1980        (self.values.relay.host, self.values.relay.port).into()
1981    }
1982
1983    /// Returns the listen address for internal APIs.
1984    ///
1985    /// Internal APIs are APIs which do not need to be publicly exposed,
1986    /// like health checks.
1987    ///
1988    /// Returns `None` when there is no explicit address configured for internal APIs,
1989    /// and they should instead be exposed on the main [`Self::listen_addr`].
1990    pub fn listen_addr_internal(&self) -> Option<SocketAddr> {
1991        match (
1992            self.values.relay.internal_host,
1993            self.values.relay.internal_port,
1994        ) {
1995            (Some(host), None) => Some((host, self.values.relay.port).into()),
1996            (None, Some(port)) => Some((self.values.relay.host, port).into()),
1997            (Some(host), Some(port)) => Some((host, port).into()),
1998            (None, None) => None,
1999        }
2000    }
2001
2002    /// Returns the TLS listen address.
2003    pub fn tls_listen_addr(&self) -> Option<SocketAddr> {
2004        if self.values.relay.tls_identity_path.is_some() {
2005            let port = self.values.relay.tls_port.unwrap_or(3443);
2006            Some((self.values.relay.host, port).into())
2007        } else {
2008            None
2009        }
2010    }
2011
2012    /// Returns the path to the identity bundle
2013    pub fn tls_identity_path(&self) -> Option<&Path> {
2014        self.values.relay.tls_identity_path.as_deref()
2015    }
2016
2017    /// Returns the password for the identity bundle
2018    pub fn tls_identity_password(&self) -> Option<&str> {
2019        self.values.relay.tls_identity_password.as_deref()
2020    }
2021
2022    /// Returns `true` when project IDs should be overriden rather than validated.
2023    ///
2024    /// Defaults to `false`, which requires project ID validation.
2025    pub fn override_project_ids(&self) -> bool {
2026        self.values.relay.override_project_ids
2027    }
2028
2029    /// Returns `true` if Relay requires authentication for readiness.
2030    ///
2031    /// See [`ReadinessCondition`] for more information.
2032    pub fn requires_auth(&self) -> bool {
2033        match self.values.auth.ready {
2034            ReadinessCondition::Authenticated => self.relay_mode() == RelayMode::Managed,
2035            ReadinessCondition::Always => false,
2036        }
2037    }
2038
2039    /// Returns the interval at which Realy should try to re-authenticate with the upstream.
2040    ///
2041    /// Always disabled in processing mode.
2042    pub fn http_auth_interval(&self) -> Option<Duration> {
2043        if self.processing_enabled() {
2044            return None;
2045        }
2046
2047        match self.values.http.auth_interval {
2048            None | Some(0) => None,
2049            Some(secs) => Some(Duration::from_secs(secs)),
2050        }
2051    }
2052
2053    /// The maximum time of experiencing uninterrupted network failures until Relay considers that
2054    /// it has encountered a network outage.
2055    pub fn http_outage_grace_period(&self) -> Duration {
2056        Duration::from_secs(self.values.http.outage_grace_period)
2057    }
2058
2059    /// Time Relay waits before retrying an upstream request.
2060    ///
2061    /// Before going into a network outage, Relay may fail to make upstream
2062    /// requests. This is the time Relay waits before retrying the same request.
2063    pub fn http_retry_delay(&self) -> Duration {
2064        Duration::from_secs(self.values.http.retry_delay)
2065    }
2066
2067    /// Time of continued project request failures before Relay emits an error.
2068    pub fn http_project_failure_interval(&self) -> Duration {
2069        Duration::from_secs(self.values.http.project_failure_interval)
2070    }
2071
2072    /// Content encoding of upstream requests.
2073    pub fn http_encoding(&self) -> HttpEncoding {
2074        self.values.http.encoding
2075    }
2076
2077    /// Returns whether metrics should be sent globally through a shared endpoint.
2078    pub fn http_global_metrics(&self) -> bool {
2079        self.values.http.global_metrics
2080    }
2081
2082    /// Returns `true` if Relay supports forwarding unknown API requests.
2083    pub fn http_forward(&self) -> bool {
2084        self.values.http.forward
2085    }
2086
2087    /// Returns whether this Relay should emit outcomes.
2088    ///
2089    /// This is `true` either if `outcomes.emit_outcomes` is explicitly enabled, or if this Relay is
2090    /// in processing mode.
2091    pub fn emit_outcomes(&self) -> EmitOutcomes {
2092        if self.processing_enabled() {
2093            return EmitOutcomes::AsOutcomes;
2094        }
2095        self.values.outcomes.emit_outcomes
2096    }
2097
2098    /// Returns whether this Relay should emit client outcomes
2099    ///
2100    /// Relays that do not emit client outcomes will forward client recieved outcomes
2101    /// directly to the next relay in the chain as client report envelope.  This is only done
2102    /// if this relay emits outcomes at all. A relay that will not emit outcomes
2103    /// will forward the envelope unchanged.
2104    ///
2105    /// This flag can be explicitly disabled on processing relays as well to prevent the
2106    /// emitting of client outcomes to the kafka topic.
2107    pub fn emit_client_outcomes(&self) -> bool {
2108        self.values.outcomes.emit_client_outcomes
2109    }
2110
2111    /// Returns the maximum number of outcomes that are batched before being sent
2112    pub fn outcome_batch_size(&self) -> usize {
2113        self.values.outcomes.batch_size
2114    }
2115
2116    /// Returns the maximum interval that an outcome may be batched
2117    pub fn outcome_batch_interval(&self) -> Duration {
2118        Duration::from_millis(self.values.outcomes.batch_interval)
2119    }
2120
2121    /// The originating source of the outcome
2122    pub fn outcome_source(&self) -> Option<&str> {
2123        self.values.outcomes.source.as_deref()
2124    }
2125
2126    /// Returns the width of the buckets into which outcomes are aggregated, in seconds.
2127    pub fn outcome_aggregator(&self) -> &OutcomeAggregatorConfig {
2128        &self.values.outcomes.aggregator
2129    }
2130
2131    /// Returns logging configuration.
2132    pub fn logging(&self) -> &relay_log::LogConfig {
2133        &self.values.logging
2134    }
2135
2136    /// Returns logging configuration.
2137    pub fn sentry(&self) -> &relay_log::SentryConfig {
2138        &self.values.sentry
2139    }
2140
2141    /// Returns the socket addresses for statsd.
2142    ///
2143    /// If stats is disabled an empty vector is returned.
2144    pub fn statsd_addrs(&self) -> anyhow::Result<Vec<SocketAddr>> {
2145        if let Some(ref addr) = self.values.metrics.statsd {
2146            let addrs = addr
2147                .as_str()
2148                .to_socket_addrs()
2149                .with_context(|| ConfigError::file(ConfigErrorKind::InvalidValue, &self.path))?
2150                .collect();
2151            Ok(addrs)
2152        } else {
2153            Ok(vec![])
2154        }
2155    }
2156
2157    /// Return the prefix for statsd metrics.
2158    pub fn metrics_prefix(&self) -> &str {
2159        &self.values.metrics.prefix
2160    }
2161
2162    /// Returns the default tags for statsd metrics.
2163    pub fn metrics_default_tags(&self) -> &BTreeMap<String, String> {
2164        &self.values.metrics.default_tags
2165    }
2166
2167    /// Returns the name of the hostname tag that should be attached to each outgoing metric.
2168    pub fn metrics_hostname_tag(&self) -> Option<&str> {
2169        self.values.metrics.hostname_tag.as_deref()
2170    }
2171
2172    /// Returns the global sample rate for all metrics.
2173    pub fn metrics_sample_rate(&self) -> f32 {
2174        self.values.metrics.sample_rate
2175    }
2176
2177    /// Returns whether local metric aggregation should be enabled.
2178    pub fn metrics_aggregate(&self) -> bool {
2179        self.values.metrics.aggregate
2180    }
2181
2182    /// Returns whether high cardinality tags should be removed before sending metrics.
2183    pub fn metrics_allow_high_cardinality_tags(&self) -> bool {
2184        self.values.metrics.allow_high_cardinality_tags
2185    }
2186
2187    /// Returns the interval for periodic metrics emitted from Relay.
2188    ///
2189    /// `None` if periodic metrics are disabled.
2190    pub fn metrics_periodic_interval(&self) -> Option<Duration> {
2191        match self.values.metrics.periodic_secs {
2192            0 => None,
2193            secs => Some(Duration::from_secs(secs)),
2194        }
2195    }
2196
2197    /// Returns the default timeout for all upstream HTTP requests.
2198    pub fn http_timeout(&self) -> Duration {
2199        Duration::from_secs(self.values.http.timeout.into())
2200    }
2201
2202    /// Returns the connection timeout for all upstream HTTP requests.
2203    pub fn http_connection_timeout(&self) -> Duration {
2204        Duration::from_secs(self.values.http.connection_timeout.into())
2205    }
2206
2207    /// Returns the failed upstream request retry interval.
2208    pub fn http_max_retry_interval(&self) -> Duration {
2209        Duration::from_secs(self.values.http.max_retry_interval.into())
2210    }
2211
2212    /// Returns the expiry timeout for cached projects.
2213    pub fn project_cache_expiry(&self) -> Duration {
2214        Duration::from_secs(self.values.cache.project_expiry.into())
2215    }
2216
2217    /// Returns `true` if the full project state should be requested from upstream.
2218    pub fn request_full_project_config(&self) -> bool {
2219        self.values.cache.project_request_full_config
2220    }
2221
2222    /// Returns the expiry timeout for cached relay infos (public keys).
2223    pub fn relay_cache_expiry(&self) -> Duration {
2224        Duration::from_secs(self.values.cache.relay_expiry.into())
2225    }
2226
2227    /// Returns the maximum number of buffered envelopes
2228    pub fn envelope_buffer_size(&self) -> usize {
2229        self.values
2230            .cache
2231            .envelope_buffer_size
2232            .try_into()
2233            .unwrap_or(usize::MAX)
2234    }
2235
2236    /// Returns the expiry timeout for cached misses before trying to refetch.
2237    pub fn cache_miss_expiry(&self) -> Duration {
2238        Duration::from_secs(self.values.cache.miss_expiry.into())
2239    }
2240
2241    /// Returns the grace period for project caches.
2242    pub fn project_grace_period(&self) -> Duration {
2243        Duration::from_secs(self.values.cache.project_grace_period.into())
2244    }
2245
2246    /// Returns the refresh interval for a project.
2247    ///
2248    /// Validates the refresh time to be between the grace period and expiry.
2249    pub fn project_refresh_interval(&self) -> Option<Duration> {
2250        self.values
2251            .cache
2252            .project_refresh_interval
2253            .map(Into::into)
2254            .map(Duration::from_secs)
2255    }
2256
2257    /// Returns the duration in which batchable project config queries are
2258    /// collected before sending them in a single request.
2259    pub fn query_batch_interval(&self) -> Duration {
2260        Duration::from_millis(self.values.cache.batch_interval.into())
2261    }
2262
2263    /// Returns the duration in which downstream relays are requested from upstream.
2264    pub fn downstream_relays_batch_interval(&self) -> Duration {
2265        Duration::from_millis(self.values.cache.downstream_relays_batch_interval.into())
2266    }
2267
2268    /// Returns the interval in seconds in which local project configurations should be reloaded.
2269    pub fn local_cache_interval(&self) -> Duration {
2270        Duration::from_secs(self.values.cache.file_interval.into())
2271    }
2272
2273    /// Returns the interval in seconds in which fresh global configs should be
2274    /// fetched from  upstream.
2275    pub fn global_config_fetch_interval(&self) -> Duration {
2276        Duration::from_secs(self.values.cache.global_config_fetch_interval.into())
2277    }
2278
2279    /// Returns the path of the buffer file if the `cache.persistent_envelope_buffer.path` is configured.
2280    ///
2281    /// In case a partition with id > 0 is supplied, the filename of the envelopes path will be
2282    /// suffixed with `.{partition_id}`.
2283    pub fn spool_envelopes_path(&self, partition_id: u8) -> Option<PathBuf> {
2284        let mut path = self
2285            .values
2286            .spool
2287            .envelopes
2288            .path
2289            .as_ref()
2290            .map(|path| path.to_owned())?;
2291
2292        if partition_id == 0 {
2293            return Some(path);
2294        }
2295
2296        let file_name = path.file_name().and_then(|f| f.to_str())?;
2297        let new_file_name = format!("{file_name}.{partition_id}");
2298        path.set_file_name(new_file_name);
2299
2300        Some(path)
2301    }
2302
2303    /// The maximum size of the buffer, in bytes.
2304    pub fn spool_envelopes_max_disk_size(&self) -> usize {
2305        self.values.spool.envelopes.max_disk_size.as_bytes()
2306    }
2307
2308    /// Number of encoded envelope bytes that need to be accumulated before
2309    /// flushing one batch to disk.
2310    pub fn spool_envelopes_batch_size_bytes(&self) -> usize {
2311        self.values.spool.envelopes.batch_size_bytes.as_bytes()
2312    }
2313
2314    /// Returns the time after which we drop envelopes as a [`Duration`] object.
2315    pub fn spool_envelopes_max_age(&self) -> Duration {
2316        Duration::from_secs(self.values.spool.envelopes.max_envelope_delay_secs)
2317    }
2318
2319    /// Returns the refresh frequency for disk usage monitoring as a [`Duration`] object.
2320    pub fn spool_disk_usage_refresh_frequency_ms(&self) -> Duration {
2321        Duration::from_millis(self.values.spool.envelopes.disk_usage_refresh_frequency_ms)
2322    }
2323
2324    /// Returns the relative memory usage up to which the disk buffer will unspool envelopes.
2325    pub fn spool_max_backpressure_memory_percent(&self) -> f32 {
2326        self.values.spool.envelopes.max_backpressure_memory_percent
2327    }
2328
2329    /// Returns the number of partitions for the buffer.
2330    pub fn spool_partitions(&self) -> NonZeroU8 {
2331        self.values.spool.envelopes.partitions
2332    }
2333
2334    /// Returns the maximum size of an event payload in bytes.
2335    pub fn max_event_size(&self) -> usize {
2336        self.values.limits.max_event_size.as_bytes()
2337    }
2338
2339    /// Returns the maximum size of each attachment.
2340    pub fn max_attachment_size(&self) -> usize {
2341        self.values.limits.max_attachment_size.as_bytes()
2342    }
2343
2344    /// Returns the maximum combined size of attachments or payloads containing attachments
2345    /// (minidump, unreal, standalone attachments) in bytes.
2346    pub fn max_attachments_size(&self) -> usize {
2347        self.values.limits.max_attachments_size.as_bytes()
2348    }
2349
2350    /// Returns the maximum combined size of client reports in bytes.
2351    pub fn max_client_reports_size(&self) -> usize {
2352        self.values.limits.max_client_reports_size.as_bytes()
2353    }
2354
2355    /// Returns the maximum payload size of a monitor check-in in bytes.
2356    pub fn max_check_in_size(&self) -> usize {
2357        self.values.limits.max_check_in_size.as_bytes()
2358    }
2359
2360    /// Returns the maximum payload size of a log in bytes.
2361    pub fn max_log_size(&self) -> usize {
2362        self.values.limits.max_log_size.as_bytes()
2363    }
2364
2365    /// Returns the maximum payload size of a span in bytes.
2366    pub fn max_span_size(&self) -> usize {
2367        self.values.limits.max_span_size.as_bytes()
2368    }
2369
2370    /// Returns the maximum payload size of an item container in bytes.
2371    pub fn max_container_size(&self) -> usize {
2372        self.values.limits.max_container_size.as_bytes()
2373    }
2374
2375    /// Returns the maximum payload size for logs integration items in bytes.
2376    pub fn max_logs_integration_size(&self) -> usize {
2377        // Not explicitly configured, inherited from the maximum size of a log container.
2378        self.max_container_size()
2379    }
2380
2381    /// Returns the maximum payload size for spans integration items in bytes.
2382    pub fn max_spans_integration_size(&self) -> usize {
2383        // Not explicitly configured, inherited from the maximum size of a span container.
2384        self.max_container_size()
2385    }
2386
2387    /// Returns the maximum size of an envelope payload in bytes.
2388    ///
2389    /// Individual item size limits still apply.
2390    pub fn max_envelope_size(&self) -> usize {
2391        self.values.limits.max_envelope_size.as_bytes()
2392    }
2393
2394    /// Returns the maximum number of sessions per envelope.
2395    pub fn max_session_count(&self) -> usize {
2396        self.values.limits.max_session_count
2397    }
2398
2399    /// Returns the maximum payload size of a statsd metric in bytes.
2400    pub fn max_statsd_size(&self) -> usize {
2401        self.values.limits.max_statsd_size.as_bytes()
2402    }
2403
2404    /// Returns the maximum payload size of metric buckets in bytes.
2405    pub fn max_metric_buckets_size(&self) -> usize {
2406        self.values.limits.max_metric_buckets_size.as_bytes()
2407    }
2408
2409    /// Returns the maximum payload size for general API requests.
2410    pub fn max_api_payload_size(&self) -> usize {
2411        self.values.limits.max_api_payload_size.as_bytes()
2412    }
2413
2414    /// Returns the maximum payload size for file uploads and chunks.
2415    pub fn max_api_file_upload_size(&self) -> usize {
2416        self.values.limits.max_api_file_upload_size.as_bytes()
2417    }
2418
2419    /// Returns the maximum payload size for chunks
2420    pub fn max_api_chunk_upload_size(&self) -> usize {
2421        self.values.limits.max_api_chunk_upload_size.as_bytes()
2422    }
2423
2424    /// Returns the maximum payload size for a profile
2425    pub fn max_profile_size(&self) -> usize {
2426        self.values.limits.max_profile_size.as_bytes()
2427    }
2428
2429    /// Returns the maximum payload size for a trace metric.
2430    pub fn max_trace_metric_size(&self) -> usize {
2431        self.values.limits.max_trace_metric_size.as_bytes()
2432    }
2433
2434    /// Returns the maximum payload size for a compressed replay.
2435    pub fn max_replay_compressed_size(&self) -> usize {
2436        self.values.limits.max_replay_compressed_size.as_bytes()
2437    }
2438
2439    /// Returns the maximum payload size for an uncompressed replay.
2440    pub fn max_replay_uncompressed_size(&self) -> usize {
2441        self.values.limits.max_replay_uncompressed_size.as_bytes()
2442    }
2443
2444    /// Returns the maximum message size for an uncompressed replay.
2445    ///
2446    /// This is greater than max_replay_compressed_size because
2447    /// it can include additional metadata about the replay in
2448    /// addition to the recording.
2449    pub fn max_replay_message_size(&self) -> usize {
2450        self.values.limits.max_replay_message_size.as_bytes()
2451    }
2452
2453    /// Returns the maximum number of active requests
2454    pub fn max_concurrent_requests(&self) -> usize {
2455        self.values.limits.max_concurrent_requests
2456    }
2457
2458    /// Returns the maximum number of active queries
2459    pub fn max_concurrent_queries(&self) -> usize {
2460        self.values.limits.max_concurrent_queries
2461    }
2462
2463    /// The maximum number of seconds a query is allowed to take across retries.
2464    pub fn query_timeout(&self) -> Duration {
2465        Duration::from_secs(self.values.limits.query_timeout)
2466    }
2467
2468    /// The maximum number of seconds to wait for pending envelopes after receiving a shutdown
2469    /// signal.
2470    pub fn shutdown_timeout(&self) -> Duration {
2471        Duration::from_secs(self.values.limits.shutdown_timeout)
2472    }
2473
2474    /// Returns the server keep-alive timeout in seconds.
2475    ///
2476    /// By default keep alive is set to a 5 seconds.
2477    pub fn keepalive_timeout(&self) -> Duration {
2478        Duration::from_secs(self.values.limits.keepalive_timeout)
2479    }
2480
2481    /// Returns the server idle timeout in seconds.
2482    pub fn idle_timeout(&self) -> Option<Duration> {
2483        self.values.limits.idle_timeout.map(Duration::from_secs)
2484    }
2485
2486    /// Returns the maximum connections.
2487    pub fn max_connections(&self) -> Option<usize> {
2488        self.values.limits.max_connections
2489    }
2490
2491    /// TCP listen backlog to configure on Relay's listening socket.
2492    pub fn tcp_listen_backlog(&self) -> u32 {
2493        self.values.limits.tcp_listen_backlog
2494    }
2495
2496    /// Returns the number of cores to use for thread pools.
2497    pub fn cpu_concurrency(&self) -> usize {
2498        self.values.limits.max_thread_count
2499    }
2500
2501    /// Returns the number of tasks that can run concurrently in the worker pool.
2502    pub fn pool_concurrency(&self) -> usize {
2503        self.values.limits.max_pool_concurrency
2504    }
2505
2506    /// Returns the maximum size of a project config query.
2507    pub fn query_batch_size(&self) -> usize {
2508        self.values.cache.batch_size
2509    }
2510
2511    /// Get filename for static project config.
2512    pub fn project_configs_path(&self) -> PathBuf {
2513        self.path.join("projects")
2514    }
2515
2516    /// True if the Relay should do processing.
2517    pub fn processing_enabled(&self) -> bool {
2518        self.values.processing.enabled
2519    }
2520
2521    /// Level of normalization for Relay to apply to incoming data.
2522    pub fn normalization_level(&self) -> NormalizationLevel {
2523        self.values.normalization.level
2524    }
2525
2526    /// The path to the GeoIp database required for event processing.
2527    pub fn geoip_path(&self) -> Option<&Path> {
2528        self.values
2529            .geoip
2530            .path
2531            .as_deref()
2532            .or(self.values.processing.geoip_path.as_deref())
2533    }
2534
2535    /// Maximum future timestamp of ingested data.
2536    ///
2537    /// Events past this timestamp will be adjusted to `now()`. Sessions will be dropped.
2538    pub fn max_secs_in_future(&self) -> i64 {
2539        self.values.processing.max_secs_in_future.into()
2540    }
2541
2542    /// Maximum age of ingested sessions. Older sessions will be dropped.
2543    pub fn max_session_secs_in_past(&self) -> i64 {
2544        self.values.processing.max_session_secs_in_past.into()
2545    }
2546
2547    /// Configuration name and list of Kafka configuration parameters for a given topic.
2548    pub fn kafka_configs(
2549        &self,
2550        topic: KafkaTopic,
2551    ) -> Result<KafkaTopicConfig<'_>, KafkaConfigError> {
2552        self.values.processing.topics.get(topic).kafka_configs(
2553            &self.values.processing.kafka_config,
2554            &self.values.processing.secondary_kafka_configs,
2555        )
2556    }
2557
2558    /// Whether to validate the topics against Kafka.
2559    pub fn kafka_validate_topics(&self) -> bool {
2560        self.values.processing.kafka_validate_topics
2561    }
2562
2563    /// All unused but configured topic assignments.
2564    pub fn unused_topic_assignments(&self) -> &relay_kafka::Unused {
2565        &self.values.processing.topics.unused
2566    }
2567
2568    /// Configuration of the attachment upload service.
2569    pub fn upload(&self) -> &UploadServiceConfig {
2570        &self.values.processing.upload
2571    }
2572
2573    /// Redis servers to connect to for project configs, cardinality limits,
2574    /// rate limiting, and metrics metadata.
2575    pub fn redis(&self) -> Option<RedisConfigsRef<'_>> {
2576        let redis_configs = self.values.processing.redis.as_ref()?;
2577
2578        Some(build_redis_configs(
2579            redis_configs,
2580            self.cpu_concurrency() as u32,
2581            self.pool_concurrency() as u32,
2582        ))
2583    }
2584
2585    /// Chunk size of attachments in bytes.
2586    pub fn attachment_chunk_size(&self) -> usize {
2587        self.values.processing.attachment_chunk_size.as_bytes()
2588    }
2589
2590    /// Maximum metrics batch size in bytes.
2591    pub fn metrics_max_batch_size_bytes(&self) -> usize {
2592        self.values.aggregator.max_flush_bytes
2593    }
2594
2595    /// Default prefix to use when looking up project configs in Redis. This is only done when
2596    /// Relay is in processing mode.
2597    pub fn projectconfig_cache_prefix(&self) -> &str {
2598        &self.values.processing.projectconfig_cache_prefix
2599    }
2600
2601    /// Maximum rate limit to report to clients in seconds.
2602    pub fn max_rate_limit(&self) -> Option<u64> {
2603        self.values.processing.max_rate_limit.map(u32::into)
2604    }
2605
2606    /// Amount of remaining quota which is cached in memory.
2607    pub fn quota_cache_ratio(&self) -> Option<f32> {
2608        self.values.processing.quota_cache_ratio
2609    }
2610
2611    /// Maximum limit (ratio) for the in memory quota cache.
2612    pub fn quota_cache_max(&self) -> Option<f32> {
2613        self.values.processing.quota_cache_max
2614    }
2615
2616    /// Cache vacuum interval for the cardinality limiter in memory cache.
2617    ///
2618    /// The cache will scan for expired values based on this interval.
2619    pub fn cardinality_limiter_cache_vacuum_interval(&self) -> Duration {
2620        Duration::from_secs(self.values.cardinality_limiter.cache_vacuum_interval)
2621    }
2622
2623    /// Interval to refresh internal health checks.
2624    pub fn health_refresh_interval(&self) -> Duration {
2625        Duration::from_millis(self.values.health.refresh_interval_ms)
2626    }
2627
2628    /// Maximum memory watermark in bytes.
2629    pub fn health_max_memory_watermark_bytes(&self) -> u64 {
2630        self.values
2631            .health
2632            .max_memory_bytes
2633            .as_ref()
2634            .map_or(u64::MAX, |b| b.as_bytes() as u64)
2635    }
2636
2637    /// Maximum memory watermark as a percentage of maximum system memory.
2638    pub fn health_max_memory_watermark_percent(&self) -> f32 {
2639        self.values.health.max_memory_percent
2640    }
2641
2642    /// Health check probe timeout.
2643    pub fn health_probe_timeout(&self) -> Duration {
2644        Duration::from_millis(self.values.health.probe_timeout_ms)
2645    }
2646
2647    /// Refresh frequency for polling new memory stats.
2648    pub fn memory_stat_refresh_frequency_ms(&self) -> u64 {
2649        self.values.health.memory_stat_refresh_frequency_ms
2650    }
2651
2652    /// Maximum amount of COGS measurements buffered in memory.
2653    pub fn cogs_max_queue_size(&self) -> u64 {
2654        self.values.cogs.max_queue_size
2655    }
2656
2657    /// Resource ID to use for Relay COGS measurements.
2658    pub fn cogs_relay_resource_id(&self) -> &str {
2659        &self.values.cogs.relay_resource_id
2660    }
2661
2662    /// Returns configuration for the default metrics aggregator.
2663    pub fn default_aggregator_config(&self) -> &AggregatorServiceConfig {
2664        &self.values.aggregator
2665    }
2666
2667    /// Returns configuration for non-default metrics aggregator.
2668    pub fn secondary_aggregator_configs(&self) -> &Vec<ScopedAggregatorConfig> {
2669        &self.values.secondary_aggregators
2670    }
2671
2672    /// Returns aggregator config for a given metrics namespace.
2673    pub fn aggregator_config_for(&self, namespace: MetricNamespace) -> &AggregatorServiceConfig {
2674        for entry in &self.values.secondary_aggregators {
2675            if entry.condition.matches(Some(namespace)) {
2676                return &entry.config;
2677            }
2678        }
2679        &self.values.aggregator
2680    }
2681
2682    /// Return the statically configured Relays.
2683    pub fn static_relays(&self) -> &HashMap<RelayId, RelayInfo> {
2684        &self.values.auth.static_relays
2685    }
2686
2687    /// Returns the max age a signature is considered valid, in seconds.
2688    pub fn signature_max_age(&self) -> Duration {
2689        Duration::from_secs(self.values.auth.signature_max_age)
2690    }
2691
2692    /// Returns `true` if unknown items should be accepted and forwarded.
2693    pub fn accept_unknown_items(&self) -> bool {
2694        let forward = self.values.routing.accept_unknown_items;
2695        forward.unwrap_or_else(|| !self.processing_enabled())
2696    }
2697}
2698
2699impl Default for Config {
2700    fn default() -> Self {
2701        Self {
2702            values: ConfigValues::default(),
2703            credentials: None,
2704            path: PathBuf::new(),
2705        }
2706    }
2707}
2708
2709#[cfg(test)]
2710mod tests {
2711
2712    use super::*;
2713
2714    /// Regression test for renaming the envelope buffer flags.
2715    #[test]
2716    fn test_event_buffer_size() {
2717        let yaml = r###"
2718cache:
2719    event_buffer_size: 1000000
2720    event_expiry: 1800
2721"###;
2722
2723        let values: ConfigValues = serde_yaml::from_str(yaml).unwrap();
2724        assert_eq!(values.cache.envelope_buffer_size, 1_000_000);
2725        assert_eq!(values.cache.envelope_expiry, 1800);
2726    }
2727
2728    #[test]
2729    fn test_emit_outcomes() {
2730        for (serialized, deserialized) in &[
2731            ("true", EmitOutcomes::AsOutcomes),
2732            ("false", EmitOutcomes::None),
2733            ("\"as_client_reports\"", EmitOutcomes::AsClientReports),
2734        ] {
2735            let value: EmitOutcomes = serde_json::from_str(serialized).unwrap();
2736            assert_eq!(value, *deserialized);
2737            assert_eq!(serde_json::to_string(&value).unwrap(), *serialized);
2738        }
2739    }
2740
2741    #[test]
2742    fn test_emit_outcomes_invalid() {
2743        assert!(serde_json::from_str::<EmitOutcomes>("asdf").is_err());
2744    }
2745}