Skip to main content

relay_config/
config.rs

1use std::collections::{BTreeMap, HashMap};
2use std::error::Error;
3use std::io::Write;
4use std::net::{IpAddr, SocketAddr};
5use std::num::{NonZeroU8, NonZeroU16};
6use std::path::{Path, PathBuf};
7use std::str::FromStr;
8use std::time::Duration;
9use std::{env, fmt, fs, io};
10
11use anyhow::Context;
12use relay_auth::{PublicKey, RelayId, SecretKey, generate_key_pair, generate_relay_id};
13use relay_common::Dsn;
14use relay_kafka::{
15    ConfigError as KafkaConfigError, KafkaConfigParam, KafkaTopic, KafkaTopicConfig,
16    TopicAssignments,
17};
18use relay_metrics::MetricNamespace;
19use serde::de::{DeserializeOwned, Unexpected, Visitor};
20use serde::{Deserialize, Deserializer, Serialize, Serializer};
21use uuid::Uuid;
22
23use crate::aggregator::{AggregatorServiceConfig, ScopedAggregatorConfig};
24use crate::byte_size::ByteSize;
25use crate::upstream::UpstreamDescriptor;
26use crate::{RedisConfig, RedisConfigs, RedisConfigsRef, build_redis_configs};
27
28const DEFAULT_NETWORK_OUTAGE_GRACE_PERIOD: u64 = 10;
29
30static CONFIG_YAML_HEADER: &str = r###"# Please see the relevant documentation.
31# Performance tuning: https://docs.sentry.io/product/relay/operating-guidelines/
32# All config options: https://docs.sentry.io/product/relay/options/
33"###;
34
35/// Indicates config related errors.
36#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
37#[non_exhaustive]
38pub enum ConfigErrorKind {
39    /// Failed to open the file.
40    CouldNotOpenFile,
41    /// Failed to save a file.
42    CouldNotWriteFile,
43    /// Parsing YAML failed.
44    BadYaml,
45    /// Parsing JSON failed.
46    BadJson,
47    /// Invalid config value
48    InvalidValue,
49    /// The user attempted to run Relay with processing enabled, but uses a binary that was
50    /// compiled without the processing feature.
51    ProcessingNotAvailable,
52}
53
54impl fmt::Display for ConfigErrorKind {
55    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
56        match self {
57            Self::CouldNotOpenFile => write!(f, "could not open config file"),
58            Self::CouldNotWriteFile => write!(f, "could not write config file"),
59            Self::BadYaml => write!(f, "could not parse yaml config file"),
60            Self::BadJson => write!(f, "could not parse json config file"),
61            Self::InvalidValue => write!(f, "invalid config value"),
62            Self::ProcessingNotAvailable => write!(
63                f,
64                "was not compiled with processing, cannot enable processing"
65            ),
66        }
67    }
68}
69
70/// Defines the source of a config error
71#[derive(Debug, Default)]
72enum ConfigErrorSource {
73    /// An error occurring independently.
74    #[default]
75    None,
76    /// An error originating from a configuration file.
77    File(PathBuf),
78    /// An error originating in a field override (an env var, or a CLI parameter).
79    FieldOverride(String),
80}
81
82impl fmt::Display for ConfigErrorSource {
83    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84        match self {
85            ConfigErrorSource::None => Ok(()),
86            ConfigErrorSource::File(file_name) => {
87                write!(f, " (file {})", file_name.display())
88            }
89            ConfigErrorSource::FieldOverride(name) => write!(f, " (field {name})"),
90        }
91    }
92}
93
94/// Indicates config related errors.
95#[derive(Debug)]
96pub struct ConfigError {
97    source: ConfigErrorSource,
98    kind: ConfigErrorKind,
99}
100
101impl ConfigError {
102    #[inline]
103    fn new(kind: ConfigErrorKind) -> Self {
104        Self {
105            source: ConfigErrorSource::None,
106            kind,
107        }
108    }
109
110    #[inline]
111    fn field(field: &'static str) -> Self {
112        Self {
113            source: ConfigErrorSource::FieldOverride(field.to_owned()),
114            kind: ConfigErrorKind::InvalidValue,
115        }
116    }
117
118    #[inline]
119    fn file(kind: ConfigErrorKind, p: impl AsRef<Path>) -> Self {
120        Self {
121            source: ConfigErrorSource::File(p.as_ref().to_path_buf()),
122            kind,
123        }
124    }
125
126    /// Returns the error kind of the error.
127    pub fn kind(&self) -> ConfigErrorKind {
128        self.kind
129    }
130}
131
132impl fmt::Display for ConfigError {
133    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
134        write!(f, "{}{}", self.kind(), self.source)
135    }
136}
137
138impl Error for ConfigError {}
139
140enum ConfigFormat {
141    Yaml,
142    Json,
143}
144
145impl ConfigFormat {
146    pub fn extension(&self) -> &'static str {
147        match self {
148            ConfigFormat::Yaml => "yml",
149            ConfigFormat::Json => "json",
150        }
151    }
152}
153
154trait ConfigObject: DeserializeOwned + Serialize {
155    /// The format in which to serialize this configuration.
156    fn format() -> ConfigFormat;
157
158    /// The basename of the config file.
159    fn name() -> &'static str;
160
161    /// The full filename of the config file, including the file extension.
162    fn path(base: &Path) -> PathBuf {
163        base.join(format!("{}.{}", Self::name(), Self::format().extension()))
164    }
165
166    /// Loads the config file from a file within the given directory location.
167    fn load(base: &Path) -> anyhow::Result<Self> {
168        let path = Self::path(base);
169
170        let f = fs::File::open(&path)
171            .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotOpenFile, &path))?;
172        let f = io::BufReader::new(f);
173
174        let mut source = {
175            let file = serde_vars::FileSource::default()
176                .with_variable_prefix("${file:")
177                .with_variable_suffix("}")
178                .with_base_path(base);
179            let env = serde_vars::EnvSource::default()
180                .with_variable_prefix("${")
181                .with_variable_suffix("}");
182            (file, env)
183        };
184        match Self::format() {
185            ConfigFormat::Yaml => {
186                serde_vars::deserialize(serde_yaml::Deserializer::from_reader(f), &mut source)
187                    .with_context(|| ConfigError::file(ConfigErrorKind::BadYaml, &path))
188            }
189            ConfigFormat::Json => {
190                serde_vars::deserialize(&mut serde_json::Deserializer::from_reader(f), &mut source)
191                    .with_context(|| ConfigError::file(ConfigErrorKind::BadJson, &path))
192            }
193        }
194    }
195
196    /// Writes the configuration to a file within the given directory location.
197    fn save(&self, base: &Path) -> anyhow::Result<()> {
198        let path = Self::path(base);
199        let mut options = fs::OpenOptions::new();
200        options.write(true).truncate(true).create(true);
201
202        // Remove all non-user permissions for the newly created file
203        #[cfg(unix)]
204        {
205            use std::os::unix::fs::OpenOptionsExt;
206            options.mode(0o600);
207        }
208
209        let mut f = options
210            .open(&path)
211            .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path))?;
212
213        match Self::format() {
214            ConfigFormat::Yaml => {
215                f.write_all(CONFIG_YAML_HEADER.as_bytes())?;
216                serde_yaml::to_writer(&mut f, self)
217                    .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path))?
218            }
219            ConfigFormat::Json => serde_json::to_writer_pretty(&mut f, self)
220                .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path))?,
221        }
222
223        f.write_all(b"\n").ok();
224
225        Ok(())
226    }
227}
228
229/// Structure used to hold information about configuration overrides via
230/// CLI parameters or environment variables
231#[derive(Debug, Default)]
232pub struct OverridableConfig {
233    /// The operation mode of this relay.
234    pub mode: Option<String>,
235    /// The instance type of this relay.
236    pub instance: Option<String>,
237    /// The log level of this relay.
238    pub log_level: Option<String>,
239    /// The log format of this relay.
240    pub log_format: Option<String>,
241    /// The upstream relay or sentry instance.
242    pub upstream: Option<String>,
243    /// Alternate upstream provided through a Sentry DSN. Key and project will be ignored.
244    pub upstream_dsn: Option<String>,
245    /// The host the relay should bind to (network interface).
246    pub host: Option<String>,
247    /// The port to bind for the unencrypted relay HTTP server.
248    pub port: Option<String>,
249    /// "true" if processing is enabled "false" otherwise
250    pub processing: Option<String>,
251    /// the kafka bootstrap.servers configuration string
252    pub kafka_url: Option<String>,
253    /// the redis server url
254    pub redis_url: Option<String>,
255    /// The globally unique ID of the relay.
256    pub id: Option<String>,
257    /// The secret key of the relay
258    pub secret_key: Option<String>,
259    /// The public key of the relay
260    pub public_key: Option<String>,
261    /// Outcome source
262    pub outcome_source: Option<String>,
263    /// shutdown timeout
264    pub shutdown_timeout: Option<String>,
265    /// Server name reported in the Sentry SDK.
266    pub server_name: Option<String>,
267}
268
269/// The relay credentials
270#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
271pub struct Credentials {
272    /// The secret key of the relay
273    pub secret_key: SecretKey,
274    /// The public key of the relay
275    pub public_key: PublicKey,
276    /// The globally unique ID of the relay.
277    pub id: RelayId,
278}
279
280impl Credentials {
281    /// Generates new random credentials.
282    pub fn generate() -> Self {
283        relay_log::info!("generating new relay credentials");
284        let (sk, pk) = generate_key_pair();
285        Self {
286            secret_key: sk,
287            public_key: pk,
288            id: generate_relay_id(),
289        }
290    }
291
292    /// Serializes this configuration to JSON.
293    pub fn to_json_string(&self) -> anyhow::Result<String> {
294        serde_json::to_string(self)
295            .with_context(|| ConfigError::new(ConfigErrorKind::CouldNotWriteFile))
296    }
297}
298
299impl ConfigObject for Credentials {
300    fn format() -> ConfigFormat {
301        ConfigFormat::Json
302    }
303    fn name() -> &'static str {
304        "credentials"
305    }
306}
307
308/// Information on a downstream Relay.
309#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
310#[serde(rename_all = "camelCase")]
311pub struct RelayInfo {
312    /// The public key that this Relay uses to authenticate and sign requests.
313    pub public_key: PublicKey,
314
315    /// Marks an internal relay that has privileged access to more project configuration.
316    #[serde(default)]
317    pub internal: bool,
318}
319
320impl RelayInfo {
321    /// Creates a new RelayInfo
322    pub fn new(public_key: PublicKey) -> Self {
323        Self {
324            public_key,
325            internal: false,
326        }
327    }
328}
329
330/// The operation mode of a relay.
331#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
332#[serde(rename_all = "camelCase")]
333pub enum RelayMode {
334    /// This relay acts as a proxy for all requests and events.
335    ///
336    /// Events are normalized and rate limits from the upstream are enforced, but the relay will not
337    /// fetch project configurations from the upstream or perform PII stripping. All events are
338    /// accepted unless overridden on the file system.
339    Proxy,
340
341    /// Project configurations are managed by the upstream.
342    ///
343    /// Project configurations are always fetched from the upstream, unless they are statically
344    /// overridden in the file system. This relay must be allowed in the upstream Sentry. This is
345    /// only possible, if the upstream is Sentry directly, or another managed Relay.
346    Managed,
347}
348
349impl<'de> Deserialize<'de> for RelayMode {
350    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
351    where
352        D: Deserializer<'de>,
353    {
354        let s = String::deserialize(deserializer)?;
355        match s.as_str() {
356            "proxy" => Ok(RelayMode::Proxy),
357            "managed" => Ok(RelayMode::Managed),
358            "static" => Err(serde::de::Error::custom(
359                "Relay mode 'static' has been removed. Please use 'managed' or 'proxy' instead.",
360            )),
361            other => Err(serde::de::Error::unknown_variant(
362                other,
363                &["proxy", "managed"],
364            )),
365        }
366    }
367}
368
369impl fmt::Display for RelayMode {
370    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
371        match self {
372            RelayMode::Proxy => write!(f, "proxy"),
373            RelayMode::Managed => write!(f, "managed"),
374        }
375    }
376}
377
378/// The instance type of Relay.
379#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize)]
380#[serde(rename_all = "camelCase")]
381pub enum RelayInstance {
382    /// This Relay is run as a default instance.
383    Default,
384
385    /// This Relay is run as a canary instance where experiments can be run.
386    Canary,
387}
388
389impl RelayInstance {
390    /// Returns `true` if the [`RelayInstance`] is of type [`RelayInstance::Canary`].
391    pub fn is_canary(&self) -> bool {
392        matches!(self, RelayInstance::Canary)
393    }
394}
395
396impl fmt::Display for RelayInstance {
397    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
398        match self {
399            RelayInstance::Default => write!(f, "default"),
400            RelayInstance::Canary => write!(f, "canary"),
401        }
402    }
403}
404
405impl FromStr for RelayInstance {
406    type Err = fmt::Error;
407
408    fn from_str(s: &str) -> Result<Self, Self::Err> {
409        match s {
410            "canary" => Ok(RelayInstance::Canary),
411            _ => Ok(RelayInstance::Default),
412        }
413    }
414}
415
416/// Error returned when parsing an invalid [`RelayMode`].
417#[derive(Clone, Copy, Debug, Eq, PartialEq)]
418pub struct ParseRelayModeError;
419
420impl fmt::Display for ParseRelayModeError {
421    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
422        write!(f, "Relay mode must be one of: managed or proxy")
423    }
424}
425
426impl Error for ParseRelayModeError {}
427
428impl FromStr for RelayMode {
429    type Err = ParseRelayModeError;
430
431    fn from_str(s: &str) -> Result<Self, Self::Err> {
432        match s {
433            "proxy" => Ok(RelayMode::Proxy),
434            "managed" => Ok(RelayMode::Managed),
435            _ => Err(ParseRelayModeError),
436        }
437    }
438}
439
440/// Returns `true` if this value is equal to `Default::default()`.
441fn is_default<T: Default + PartialEq>(t: &T) -> bool {
442    *t == T::default()
443}
444
445/// Checks if we are running in docker.
446fn is_docker() -> bool {
447    if fs::metadata("/.dockerenv").is_ok() {
448        return true;
449    }
450
451    fs::read_to_string("/proc/self/cgroup").is_ok_and(|s| s.contains("/docker"))
452}
453
454/// Default value for the "bind" configuration.
455fn default_host() -> IpAddr {
456    if is_docker() {
457        // Docker images rely on this service being exposed
458        "0.0.0.0".parse().unwrap()
459    } else {
460        "127.0.0.1".parse().unwrap()
461    }
462}
463
464/// Controls responses from the readiness health check endpoint based on authentication.
465///
466/// Independent of the the readiness condition, shutdown always switches Relay into unready state.
467#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize)]
468#[serde(rename_all = "lowercase")]
469#[derive(Default)]
470pub enum ReadinessCondition {
471    /// (default) Relay is ready when authenticated and connected to the upstream.
472    ///
473    /// Before authentication has succeeded and during network outages, Relay responds as not ready.
474    /// Relay reauthenticates based on the `http.auth_interval` parameter. During reauthentication,
475    /// Relay remains ready until authentication fails.
476    ///
477    /// Authentication is only required for Relays in managed mode. Other Relays will only check for
478    /// network outages.
479    #[default]
480    Authenticated,
481    /// Relay reports readiness regardless of the authentication and networking state.
482    Always,
483}
484
485/// Relay specific configuration values.
486#[derive(Serialize, Deserialize, Debug)]
487#[serde(default)]
488pub struct Relay {
489    /// The operation mode of this Relay.
490    pub mode: RelayMode,
491    /// The instance type of this Relay.
492    pub instance: RelayInstance,
493    /// The upstream Relay or Sentry instance.
494    pub upstream: UpstreamDescriptor,
495    /// The upstream advertised to downstream Relay instances.
496    ///
497    /// This value will be advertised to downstream Relays as the upstream to use when forwarding
498    /// data. It can be used for traffic routing and balancing, it must not redirect to a different
499    /// Sentry instance.
500    ///
501    /// Downstream Relays will treat the advertised upstream as the same logical component as this instance
502    /// and re-use already established authentication keys.
503    pub advertised_upstream: Option<UpstreamDescriptor>,
504    /// The host the relay should bind to (network interface).
505    pub host: IpAddr,
506    /// The port to bind for the unencrypted relay HTTP server.
507    pub port: u16,
508    /// The host the relay should bind to (network interface) for internally exposed APIs, like
509    /// health checks.
510    ///
511    /// If not configured, internal routes are exposed on the main HTTP server.
512    ///
513    /// Note: configuring the internal http server on an address which overlaps with the main
514    /// server (e.g. main on `0.0.0.0:3000` and internal on `127.0.0.1:3000`) is a misconfiguration
515    /// resulting in approximately half of the requests sent to `127.0.0.1:3000` to fail, as the handling
516    /// http server is chosen by the operating system 'at random'.
517    ///
518    /// As a best practice you should always choose different ports to avoid this issue.
519    ///
520    /// Defaults to [`Self::host`].
521    pub internal_host: Option<IpAddr>,
522    /// The port to bind for internally exposed APIs.
523    ///
524    /// Defaults to [`Self::port`].
525    pub internal_port: Option<u16>,
526    /// Optional port to bind for the encrypted relay HTTPS server.
527    #[serde(skip_serializing)]
528    pub tls_port: Option<u16>,
529    /// The path to the identity (DER-encoded PKCS12) to use for TLS.
530    #[serde(skip_serializing)]
531    pub tls_identity_path: Option<PathBuf>,
532    /// Password for the PKCS12 archive.
533    #[serde(skip_serializing)]
534    pub tls_identity_password: Option<String>,
535    /// Always override project IDs from the URL and DSN with the identifier used at the upstream.
536    ///
537    /// Enable this setting for Relays used to redirect traffic to a migrated Sentry instance.
538    /// Validation of project identifiers can be safely skipped in these cases.
539    #[serde(skip_serializing_if = "is_default")]
540    pub override_project_ids: bool,
541}
542
543impl Default for Relay {
544    fn default() -> Self {
545        Relay {
546            mode: RelayMode::Managed,
547            instance: RelayInstance::Default,
548            upstream: "https://sentry.io/".parse().unwrap(),
549            advertised_upstream: None,
550            host: default_host(),
551            port: 3000,
552            internal_host: None,
553            internal_port: None,
554            tls_port: None,
555            tls_identity_path: None,
556            tls_identity_password: None,
557            override_project_ids: false,
558        }
559    }
560}
561
562/// Control the metrics.
563#[derive(Serialize, Deserialize, Debug)]
564#[serde(default)]
565pub struct Metrics {
566    /// Hostname and port of the statsd server.
567    ///
568    /// Defaults to `None`.
569    pub statsd: Option<String>,
570    /// Buffer size used for metrics sent to the statsd socket.
571    ///
572    /// Defaults to `None`.
573    pub statsd_buffer_size: Option<usize>,
574    /// Common prefix that should be added to all metrics.
575    ///
576    /// Defaults to `"sentry.relay"`.
577    pub prefix: String,
578    /// Default tags to apply to all metrics.
579    pub default_tags: BTreeMap<String, String>,
580    /// Tag name to report the hostname to for each metric. Defaults to not sending such a tag.
581    pub hostname_tag: Option<String>,
582    /// Interval for periodic metrics emitted from Relay.
583    ///
584    /// Setting it to `0` seconds disables the periodic metrics.
585    /// Defaults to 5 seconds.
586    pub periodic_secs: u64,
587}
588
589impl Default for Metrics {
590    fn default() -> Self {
591        Metrics {
592            statsd: None,
593            statsd_buffer_size: None,
594            prefix: "sentry.relay".into(),
595            default_tags: BTreeMap::new(),
596            hostname_tag: None,
597            periodic_secs: 5,
598        }
599    }
600}
601
602/// Controls various limits
603#[derive(Serialize, Deserialize, Debug)]
604#[serde(default)]
605pub struct Limits {
606    /// How many requests can be sent concurrently from Relay to the upstream before Relay starts
607    /// buffering.
608    pub max_concurrent_requests: usize,
609    /// How many queries can be sent concurrently from Relay to the upstream before Relay starts
610    /// buffering.
611    ///
612    /// The concurrency of queries is additionally constrained by `max_concurrent_requests`.
613    pub max_concurrent_queries: usize,
614    /// The maximum payload size for events.
615    pub max_event_size: ByteSize,
616    /// The maximum size for each attachment.
617    pub max_attachment_size: ByteSize,
618    /// The maximum size for a TUS upload request body.
619    pub max_upload_size: ByteSize,
620    /// The maximum combined size for all attachments in an envelope or request.
621    pub max_attachments_size: ByteSize,
622    /// The maximum combined size for all client reports in an envelope or request.
623    pub max_client_reports_size: ByteSize,
624    /// The maximum payload size for a monitor check-in.
625    pub max_check_in_size: ByteSize,
626    /// The maximum payload size for an entire envelopes. Individual limits still apply.
627    pub max_envelope_size: ByteSize,
628    /// The maximum number of session items per envelope.
629    pub max_session_count: usize,
630    /// The maximum payload size for general API requests.
631    pub max_api_payload_size: ByteSize,
632    /// The maximum payload size for file uploads and chunks.
633    pub max_api_file_upload_size: ByteSize,
634    /// The maximum payload size for chunks
635    pub max_api_chunk_upload_size: ByteSize,
636    /// The maximum payload size for a profile
637    pub max_profile_size: ByteSize,
638    /// The maximum payload size for a trace metric.
639    pub max_trace_metric_size: ByteSize,
640    /// The maximum payload size for a log.
641    pub max_log_size: ByteSize,
642    /// The maximum payload size for a span.
643    pub max_span_size: ByteSize,
644    /// The maximum payload size for an item container.
645    pub max_container_size: ByteSize,
646    /// The maximum payload size for a statsd metric.
647    pub max_statsd_size: ByteSize,
648    /// The maximum payload size for metric buckets.
649    pub max_metric_buckets_size: ByteSize,
650    /// The maximum payload size for a compressed replay.
651    pub max_replay_compressed_size: ByteSize,
652    /// The maximum payload size for an uncompressed replay.
653    #[serde(alias = "max_replay_size")]
654    max_replay_uncompressed_size: ByteSize,
655    /// The maximum size for a replay recording Kafka message.
656    pub max_replay_message_size: ByteSize,
657    /// The byte size limit up to which Relay will retain
658    /// keys of invalid/removed attributes.
659    ///
660    /// This is only relevant for EAP items (spans, logs, …).
661    /// In principle, we want to record all deletions of attributes,
662    /// but we have to institute some limit to protect our infrastructure
663    /// against excessive metadata sizes.
664    ///
665    /// Defaults to 10KiB.
666    pub max_removed_attribute_key_size: ByteSize,
667    /// The maximum number of threads to spawn for CPU and web work, each.
668    ///
669    /// The total number of threads spawned will roughly be `2 * max_thread_count`. Defaults to
670    /// the number of logical CPU cores on the host.
671    pub max_thread_count: usize,
672    /// Controls the maximum concurrency of each worker thread.
673    ///
674    /// Increasing the concurrency, can lead to a better utilization of worker threads by
675    /// increasing the amount of I/O done concurrently.
676    //
677    /// Currently has no effect on defaults to `1`.
678    pub max_pool_concurrency: usize,
679    /// The maximum number of seconds a query is allowed to take across retries. Individual requests
680    /// have lower timeouts. Defaults to 30 seconds.
681    pub query_timeout: u64,
682    /// The maximum number of seconds to wait for pending envelopes after receiving a shutdown
683    /// signal.
684    pub shutdown_timeout: u64,
685    /// Server keep-alive timeout in seconds.
686    ///
687    /// By default, keep-alive is set to 5 seconds.
688    pub keepalive_timeout: u64,
689    /// Server idle timeout in seconds.
690    ///
691    /// The idle timeout limits the amount of time a connection is kept open without activity.
692    /// Setting this too short may abort connections before Relay is able to send a response.
693    ///
694    /// By default there is no idle timeout.
695    pub idle_timeout: Option<u64>,
696    /// Sets the maximum number of concurrent connections.
697    ///
698    /// Upon reaching the limit, the server will stop accepting connections.
699    ///
700    /// By default there is no limit.
701    pub max_connections: Option<usize>,
702    /// The TCP listen backlog.
703    ///
704    /// Configures the TCP listen backlog for the listening socket of Relay.
705    /// See [`man listen(2)`](https://man7.org/linux/man-pages/man2/listen.2.html)
706    /// for a more detailed description of the listen backlog.
707    ///
708    /// Defaults to `1024`, a value [google has been using for a long time](https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/commit/?id=19f92a030ca6d772ab44b22ee6a01378a8cb32d4).
709    pub tcp_listen_backlog: u32,
710}
711
712impl Default for Limits {
713    fn default() -> Self {
714        Limits {
715            max_concurrent_requests: 100,
716            max_concurrent_queries: 5,
717            max_event_size: ByteSize::mebibytes(1),
718            max_attachment_size: ByteSize::mebibytes(200),
719            max_upload_size: ByteSize::mebibytes(1024),
720            max_attachments_size: ByteSize::mebibytes(200),
721            max_client_reports_size: ByteSize::kibibytes(4),
722            max_check_in_size: ByteSize::kibibytes(100),
723            max_envelope_size: ByteSize::mebibytes(200),
724            max_session_count: 100,
725            max_api_payload_size: ByteSize::mebibytes(20),
726            max_api_file_upload_size: ByteSize::mebibytes(40),
727            max_api_chunk_upload_size: ByteSize::mebibytes(100),
728            max_profile_size: ByteSize::mebibytes(50),
729            max_trace_metric_size: ByteSize::mebibytes(1),
730            max_log_size: ByteSize::mebibytes(1),
731            max_span_size: ByteSize::mebibytes(10),
732            max_container_size: ByteSize::mebibytes(12),
733            max_statsd_size: ByteSize::mebibytes(1),
734            max_metric_buckets_size: ByteSize::mebibytes(1),
735            max_replay_compressed_size: ByteSize::mebibytes(10),
736            max_replay_uncompressed_size: ByteSize::mebibytes(100),
737            max_replay_message_size: ByteSize::mebibytes(15),
738            max_thread_count: num_cpus::get(),
739            max_pool_concurrency: 1,
740            query_timeout: 30,
741            shutdown_timeout: 10,
742            keepalive_timeout: 5,
743            idle_timeout: None,
744            max_connections: None,
745            tcp_listen_backlog: 1024,
746            max_removed_attribute_key_size: ByteSize::kibibytes(10),
747        }
748    }
749}
750
751/// Controls traffic steering.
752#[derive(Debug, Default, Deserialize, Serialize)]
753#[serde(default)]
754pub struct Routing {
755    /// Accept and forward unknown Envelope items to the upstream.
756    ///
757    /// Forwarding unknown items should be enabled in most cases to allow proxying traffic for newer
758    /// SDK versions. The upstream in Sentry makes the final decision on which items are valid. If
759    /// this is disabled, just the unknown items are removed from Envelopes, and the rest is
760    /// processed as usual.
761    ///
762    /// Defaults to `true` for all Relay modes other than processing mode. In processing mode, this
763    /// is disabled by default since the item cannot be handled.
764    pub accept_unknown_items: Option<bool>,
765}
766
767/// Http content encoding for both incoming and outgoing web requests.
768#[derive(Clone, Copy, Debug, Default, Deserialize, Serialize)]
769#[serde(rename_all = "lowercase")]
770pub enum HttpEncoding {
771    /// Identity function without no compression.
772    ///
773    /// This is the default encoding and does not require the presence of the `content-encoding`
774    /// HTTP header.
775    #[default]
776    Identity,
777    /// Compression using a [zlib](https://en.wikipedia.org/wiki/Zlib) structure with
778    /// [deflate](https://en.wikipedia.org/wiki/DEFLATE) encoding.
779    ///
780    /// These structures are defined in [RFC 1950](https://datatracker.ietf.org/doc/html/rfc1950)
781    /// and [RFC 1951](https://datatracker.ietf.org/doc/html/rfc1951).
782    Deflate,
783    /// A format using the [Lempel-Ziv coding](https://en.wikipedia.org/wiki/LZ77_and_LZ78#LZ77)
784    /// (LZ77), with a 32-bit CRC.
785    ///
786    /// This is the original format of the UNIX gzip program. The HTTP/1.1 standard also recommends
787    /// that the servers supporting this content-encoding should recognize `x-gzip` as an alias, for
788    /// compatibility purposes.
789    Gzip,
790    /// A format using the [Brotli](https://en.wikipedia.org/wiki/Brotli) algorithm.
791    Br,
792    /// A format using the [Zstd](https://en.wikipedia.org/wiki/Zstd) compression algorithm.
793    Zstd,
794}
795
796impl HttpEncoding {
797    /// Parses a [`HttpEncoding`] from its `content-encoding` header value.
798    pub fn parse(str: &str) -> Self {
799        let str = str.trim();
800        if str.eq_ignore_ascii_case("zstd") {
801            Self::Zstd
802        } else if str.eq_ignore_ascii_case("br") {
803            Self::Br
804        } else if str.eq_ignore_ascii_case("gzip") || str.eq_ignore_ascii_case("x-gzip") {
805            Self::Gzip
806        } else if str.eq_ignore_ascii_case("deflate") {
807            Self::Deflate
808        } else {
809            Self::Identity
810        }
811    }
812
813    /// Returns the value for the `content-encoding` HTTP header.
814    ///
815    /// Returns `None` for [`Identity`](Self::Identity), and `Some` for other encodings.
816    pub fn name(&self) -> Option<&'static str> {
817        match self {
818            Self::Identity => None,
819            Self::Deflate => Some("deflate"),
820            Self::Gzip => Some("gzip"),
821            Self::Br => Some("br"),
822            Self::Zstd => Some("zstd"),
823        }
824    }
825}
826
827/// Controls authentication with upstream.
828#[derive(Serialize, Deserialize, Debug)]
829#[serde(default)]
830pub struct Http {
831    /// Timeout for upstream requests in seconds.
832    ///
833    /// This timeout covers the time from sending the request until receiving response headers.
834    /// Neither the connection process and handshakes, nor reading the response body is covered in
835    /// this timeout.
836    pub timeout: u32,
837    /// Timeout for establishing connections with the upstream in seconds.
838    ///
839    /// This includes SSL handshakes. Relay reuses connections when the upstream supports connection
840    /// keep-alive. Connections are retained for a maximum 75 seconds, or 15 seconds of inactivity.
841    pub connection_timeout: u32,
842    /// Maximum interval between failed request retries in seconds.
843    pub max_retry_interval: u32,
844    /// The custom HTTP Host header to send to the upstream.
845    pub host_header: Option<String>,
846    /// The interval in seconds at which Relay attempts to reauthenticate with the upstream server.
847    ///
848    /// Re-authentication happens even when Relay is idle. If authentication fails, Relay reverts
849    /// back into startup mode and tries to establish a connection. During this time, incoming
850    /// envelopes will be buffered.
851    ///
852    /// Defaults to `600` (10 minutes).
853    pub auth_interval: Option<u64>,
854    /// The maximum time of experiencing uninterrupted network failures until Relay considers that
855    /// it has encountered a network outage in seconds.
856    ///
857    /// During a network outage relay will try to reconnect and will buffer all upstream messages
858    /// until it manages to reconnect.
859    pub outage_grace_period: u64,
860    /// The time Relay waits before retrying an upstream request, in seconds.
861    ///
862    /// This time is only used before going into a network outage mode.
863    pub retry_delay: u64,
864    /// The interval in seconds for continued failed project fetches at which Relay will error.
865    ///
866    /// A successful fetch resets this interval. Relay does nothing during long
867    /// times without emitting requests.
868    pub project_failure_interval: u64,
869    /// Content encoding to apply to upstream store requests.
870    ///
871    /// By default, Relay applies `zstd` content encoding to compress upstream requests. Compression
872    /// can be disabled to reduce CPU consumption, but at the expense of increased network traffic.
873    ///
874    /// This setting applies to all store requests of SDK data, including events, transactions,
875    /// envelopes and sessions. At the moment, this does not apply to Relay's internal queries.
876    ///
877    /// Available options are:
878    ///
879    ///  - `identity`: Disables compression.
880    ///  - `deflate`: Compression using a zlib header with deflate encoding.
881    ///  - `gzip` (default): Compression using gzip.
882    ///  - `br`: Compression using the brotli algorithm.
883    ///  - `zstd`: Compression using the zstd algorithm.
884    pub encoding: HttpEncoding,
885    /// Submit metrics globally through a shared endpoint.
886    ///
887    /// As opposed to regular envelopes which are sent to an endpoint inferred from the project's
888    /// DSN, this submits metrics to the global endpoint with Relay authentication.
889    ///
890    /// This option does not have any effect on processing mode.
891    pub global_metrics: bool,
892    /// Controls whether the forward endpoint is enabled.
893    ///
894    /// The forward endpoint forwards unknown API requests to the upstream.
895    ///
896    /// Relay instances with processing enabled are expected to support the latest API and do never
897    /// support forwarding requests to Sentry.
898    pub forward: bool,
899    /// Enables an async DNS resolver through the `hickory-dns` crate, which uses an LRU cache for
900    /// the resolved entries. This helps to limit the amount of requests made to the upstream DNS
901    /// server (important for K8s infrastructure).
902    pub dns_cache: bool,
903}
904
905impl Default for Http {
906    fn default() -> Self {
907        Http {
908            timeout: 5,
909            connection_timeout: 3,
910            max_retry_interval: 60, // 1 minute
911            host_header: None,
912            auth_interval: Some(600), // 10 minutes
913            outage_grace_period: DEFAULT_NETWORK_OUTAGE_GRACE_PERIOD,
914            retry_delay: default_retry_delay(),
915            project_failure_interval: default_project_failure_interval(),
916            encoding: HttpEncoding::Zstd,
917            global_metrics: false,
918            forward: true,
919            dns_cache: true,
920        }
921    }
922}
923
924/// Default for unavailable upstream retry period, 1s.
925fn default_retry_delay() -> u64 {
926    1
927}
928
929/// Default for project failure interval, 90s.
930fn default_project_failure_interval() -> u64 {
931    90
932}
933
934/// Default for max disk size, 500 MB.
935fn spool_envelopes_max_disk_size() -> ByteSize {
936    ByteSize::mebibytes(500)
937}
938
939/// Default number of encoded envelope bytes to cache before writing to disk.
940fn spool_envelopes_batch_size_bytes() -> ByteSize {
941    ByteSize::kibibytes(10)
942}
943
944fn spool_envelopes_max_envelope_delay_secs() -> u64 {
945    24 * 60 * 60
946}
947
948/// Default refresh frequency in ms for the disk usage monitoring.
949fn spool_disk_usage_refresh_frequency_ms() -> u64 {
950    100
951}
952
953/// Default max memory usage for unspooling.
954fn spool_max_backpressure_memory_percent() -> f32 {
955    0.8
956}
957
958/// Default number of partitions for the buffer.
959fn spool_envelopes_partitions() -> NonZeroU8 {
960    NonZeroU8::new(1).unwrap()
961}
962
963/// Strategy used to assign envelopes to buffer partitions.
964#[derive(Clone, Copy, Debug, Eq, PartialEq, Default, Deserialize, Serialize)]
965#[serde(rename_all = "snake_case")]
966pub enum EnvelopeSpoolPartitioning {
967    /// Envelopes with the same project key pair land on the same partition.
968    ///
969    /// Keeps per-project state, disk files, and event ordering co-located on one partition.
970    ProjectKeyPair,
971    /// Envelopes are distributed across partitions in a round-robin fashion (default).
972    ///
973    /// This prevents "hot" partitions when a single project pair dominates traffic, but has
974    /// trade-offs:
975    /// - Per-project LIFO ordering is no longer preserved across partitions.
976    /// - Per-partition memory footprint grows since every partition sees every project.
977    #[default]
978    RoundRobin,
979}
980
981/// Persistent buffering configuration for incoming envelopes.
982#[derive(Debug, Serialize, Deserialize)]
983pub struct EnvelopeSpool {
984    /// The path of the SQLite database file(s) which persist the data.
985    ///
986    /// Based on the number of partitions, more database files will be created within the same path.
987    ///
988    /// If not set, the envelopes will be buffered in memory.
989    pub path: Option<PathBuf>,
990    /// The maximum size of the buffer to keep, in bytes.
991    ///
992    /// When the on-disk buffer reaches this size, new envelopes will be dropped.
993    ///
994    /// Defaults to 500MB.
995    #[serde(default = "spool_envelopes_max_disk_size")]
996    pub max_disk_size: ByteSize,
997    /// Size of the batch of compressed envelopes that are spooled to disk at once.
998    ///
999    /// Note that this is the size after which spooling will be triggered but it does not guarantee
1000    /// that exactly this size will be spooled, it can be greater or equal.
1001    ///
1002    /// Defaults to 10 KiB.
1003    #[serde(default = "spool_envelopes_batch_size_bytes")]
1004    pub batch_size_bytes: ByteSize,
1005    /// Maximum time between receiving the envelope and processing it.
1006    ///
1007    /// When envelopes spend too much time in the buffer (e.g. because their project cannot be loaded),
1008    /// they are dropped.
1009    ///
1010    /// Defaults to 24h.
1011    #[serde(default = "spool_envelopes_max_envelope_delay_secs")]
1012    pub max_envelope_delay_secs: u64,
1013    /// The refresh frequency in ms of how frequently disk usage is updated by querying SQLite
1014    /// internal page stats.
1015    ///
1016    /// Defaults to 100ms.
1017    #[serde(default = "spool_disk_usage_refresh_frequency_ms")]
1018    pub disk_usage_refresh_frequency_ms: u64,
1019    /// The relative memory usage above which the buffer service will stop dequeueing envelopes.
1020    ///
1021    /// Only applies when [`Self::path`] is set.
1022    ///
1023    /// This value should be lower than [`Health::max_memory_percent`] to prevent flip-flopping.
1024    ///
1025    /// Warning: This threshold can cause the buffer service to deadlock when the buffer consumes
1026    /// excessive memory (as influenced by [`Self::batch_size_bytes`]).
1027    ///
1028    /// This scenario arises when the buffer stops spooling due to reaching the
1029    /// [`Self::max_backpressure_memory_percent`] limit, but the batch threshold for spooling
1030    /// ([`Self::batch_size_bytes`]) is never reached. As a result, no data is spooled, memory usage
1031    /// continues to grow, and the system becomes deadlocked.
1032    ///
1033    /// ### Example
1034    /// Suppose the system has 1GB of available memory and is configured to spool only after
1035    /// accumulating 10GB worth of envelopes. If Relay consumes 900MB of memory, it will stop
1036    /// unspooling due to reaching the [`Self::max_backpressure_memory_percent`] threshold.
1037    ///
1038    /// However, because the buffer hasn't accumulated the 10GB needed to trigger spooling,
1039    /// no data will be offloaded. Memory usage keeps increasing until it hits the
1040    /// [`Health::max_memory_percent`] threshold, e.g., at 950MB. At this point:
1041    ///
1042    /// - No more envelopes are accepted.
1043    /// - The buffer remains stuck, as unspooling won’t resume until memory drops below 900MB which
1044    ///   will not happen.
1045    /// - A deadlock occurs, with the system unable to recover without manual intervention.
1046    ///
1047    /// Defaults to 90% (5% less than max memory).
1048    #[serde(default = "spool_max_backpressure_memory_percent")]
1049    pub max_backpressure_memory_percent: f32,
1050    /// Number of partitions of the buffer.
1051    ///
1052    /// A partition is a separate instance of the buffer which has its own isolated queue, stacks
1053    /// and other resources.
1054    ///
1055    /// Defaults to 1.
1056    #[serde(default = "spool_envelopes_partitions")]
1057    pub partitions: NonZeroU8,
1058    /// Strategy used to assign envelopes to buffer partitions.
1059    ///
1060    /// Defaults to partitioning by `ProjectKeyPair`, which keeps all envelopes of a given project
1061    /// pair on the same partition. See [`EnvelopeSpoolPartitioning`] for alternatives and
1062    /// trade-offs.
1063    #[serde(default)]
1064    pub partitioning: EnvelopeSpoolPartitioning,
1065    /// Whether the database defined in `path` is on an ephemeral storage disk.
1066    ///
1067    /// With `ephemeral: true`, Relay does not spool in-flight data to disk
1068    /// during graceful shutdown. Instead, it attempts to process all data before it terminates.
1069    ///
1070    /// Defaults to `false`.
1071    #[serde(default)]
1072    pub ephemeral: bool,
1073}
1074
1075impl Default for EnvelopeSpool {
1076    fn default() -> Self {
1077        Self {
1078            path: None,
1079            max_disk_size: spool_envelopes_max_disk_size(),
1080            batch_size_bytes: spool_envelopes_batch_size_bytes(),
1081            max_envelope_delay_secs: spool_envelopes_max_envelope_delay_secs(),
1082            disk_usage_refresh_frequency_ms: spool_disk_usage_refresh_frequency_ms(),
1083            max_backpressure_memory_percent: spool_max_backpressure_memory_percent(),
1084            partitions: spool_envelopes_partitions(),
1085            partitioning: EnvelopeSpoolPartitioning::default(),
1086            ephemeral: false,
1087        }
1088    }
1089}
1090
1091/// Persistent buffering configuration.
1092#[derive(Debug, Serialize, Deserialize, Default)]
1093pub struct Spool {
1094    /// Configuration for envelope spooling.
1095    #[serde(default)]
1096    pub envelopes: EnvelopeSpool,
1097}
1098
1099/// Controls internal caching behavior.
1100#[derive(Serialize, Deserialize, Debug)]
1101#[serde(default)]
1102pub struct Cache {
1103    /// The full project state will be requested by this Relay if set to `true`.
1104    pub project_request_full_config: bool,
1105    /// The cache timeout for project configurations in seconds.
1106    pub project_expiry: u32,
1107    /// Continue using project state this many seconds after cache expiry while a new state is
1108    /// being fetched. This is added on top of `project_expiry`.
1109    ///
1110    /// Default is 2 minutes.
1111    pub project_grace_period: u32,
1112    /// Refresh a project after the specified seconds.
1113    ///
1114    /// The time must be between expiry time and the grace period.
1115    ///
1116    /// By default there are no refreshes enabled.
1117    pub project_refresh_interval: Option<u32>,
1118    /// The cache timeout for downstream relay info (public keys) in seconds.
1119    pub relay_expiry: u32,
1120    /// Unused cache timeout for envelopes.
1121    ///
1122    /// The envelope buffer is instead controlled by `envelope_buffer_size`, which controls the
1123    /// maximum number of envelopes in the buffer. A time based configuration may be re-introduced
1124    /// at a later point.
1125    #[serde(alias = "event_expiry")]
1126    envelope_expiry: u32,
1127    /// The maximum amount of envelopes to queue before dropping them.
1128    #[serde(alias = "event_buffer_size")]
1129    envelope_buffer_size: u32,
1130    /// The cache timeout for non-existing entries.
1131    pub miss_expiry: u32,
1132    /// The buffer timeout for batched project config queries before sending them upstream in ms.
1133    pub batch_interval: u32,
1134    /// The buffer timeout for batched queries of downstream relays in ms. Defaults to 100ms.
1135    pub downstream_relays_batch_interval: u32,
1136    /// The maximum number of project configs to fetch from Sentry at once. Defaults to 500.
1137    ///
1138    /// `cache.batch_interval` controls how quickly batches are sent, this controls the batch size.
1139    pub batch_size: usize,
1140    /// Interval for watching local cache override files in seconds.
1141    pub file_interval: u32,
1142    /// Interval for fetching new global configs from the upstream, in seconds.
1143    pub global_config_fetch_interval: u32,
1144}
1145
1146impl Default for Cache {
1147    fn default() -> Self {
1148        Cache {
1149            project_request_full_config: false,
1150            project_expiry: 300,       // 5 minutes
1151            project_grace_period: 120, // 2 minutes
1152            project_refresh_interval: None,
1153            relay_expiry: 3600,   // 1 hour
1154            envelope_expiry: 600, // 10 minutes
1155            envelope_buffer_size: 1000,
1156            miss_expiry: 60,                       // 1 minute
1157            batch_interval: 100,                   // 100ms
1158            downstream_relays_batch_interval: 100, // 100ms
1159            batch_size: 500,
1160            file_interval: 10,                // 10 seconds
1161            global_config_fetch_interval: 10, // 10 seconds
1162        }
1163    }
1164}
1165
1166fn default_max_secs_in_future() -> u32 {
1167    60 // 1 minute
1168}
1169
1170fn default_max_session_secs_in_past() -> u32 {
1171    5 * 24 * 3600 // 5 days
1172}
1173
1174fn default_chunk_size() -> ByteSize {
1175    ByteSize::mebibytes(1)
1176}
1177
1178fn default_projectconfig_cache_prefix() -> String {
1179    "relayconfig".to_owned()
1180}
1181
1182#[allow(clippy::unnecessary_wraps)]
1183fn default_max_rate_limit() -> Option<u32> {
1184    Some(300) // 5 minutes
1185}
1186
1187/// Controls Sentry-internal event processing.
1188#[derive(Serialize, Deserialize, Debug)]
1189pub struct Processing {
1190    /// True if the Relay should do processing. Defaults to `false`.
1191    pub enabled: bool,
1192    /// GeoIp DB file source.
1193    #[serde(default)]
1194    pub geoip_path: Option<PathBuf>,
1195    /// Maximum future timestamp of ingested events.
1196    #[serde(default = "default_max_secs_in_future")]
1197    pub max_secs_in_future: u32,
1198    /// Maximum age of ingested sessions. Older sessions will be dropped.
1199    #[serde(default = "default_max_session_secs_in_past")]
1200    pub max_session_secs_in_past: u32,
1201    /// Kafka producer configurations.
1202    pub kafka_config: Vec<KafkaConfigParam>,
1203    /// Additional kafka producer configurations.
1204    ///
1205    /// The `kafka_config` is the default producer configuration used for all topics. A secondary
1206    /// kafka config can be referenced in `topics:` like this:
1207    ///
1208    /// ```yaml
1209    /// secondary_kafka_configs:
1210    ///   mycustomcluster:
1211    ///     - name: 'bootstrap.servers'
1212    ///       value: 'sentry_kafka_metrics:9093'
1213    ///
1214    /// topics:
1215    ///   transactions: ingest-transactions
1216    ///   metrics:
1217    ///     name: ingest-metrics
1218    ///     config: mycustomcluster
1219    /// ```
1220    ///
1221    /// Then metrics will be produced to an entirely different Kafka cluster.
1222    #[serde(default)]
1223    pub secondary_kafka_configs: BTreeMap<String, Vec<KafkaConfigParam>>,
1224    /// Kafka topic names.
1225    #[serde(default)]
1226    pub topics: TopicAssignments,
1227    /// Whether to validate the supplied topics by calling Kafka's metadata endpoints.
1228    #[serde(default)]
1229    pub kafka_validate_topics: bool,
1230    /// Redis hosts to connect to for storing state for rate limits.
1231    #[serde(default)]
1232    pub redis: Option<RedisConfigs>,
1233    /// Maximum chunk size of attachments for Kafka.
1234    #[serde(default = "default_chunk_size")]
1235    pub attachment_chunk_size: ByteSize,
1236    /// Prefix to use when looking up project configs in Redis. Defaults to "relayconfig".
1237    #[serde(default = "default_projectconfig_cache_prefix")]
1238    pub projectconfig_cache_prefix: String,
1239    /// Maximum rate limit to report to clients.
1240    #[serde(default = "default_max_rate_limit")]
1241    pub max_rate_limit: Option<u32>,
1242    /// Configures the quota cache ratio between `0.0` and `1.0`.
1243    ///
1244    /// The quota cache, caches the specified ratio of remaining quota in memory to reduce the
1245    /// amount of synchronizations required with Redis.
1246    ///
1247    /// The ratio is applied to the (per second) rate of the quota, not the total limit.
1248    /// For example a quota with limit 100 with a 10 second window is treated equally to a quota of
1249    /// 10 with a 1 second window.
1250    ///
1251    /// By default quota caching is disabled.
1252    pub quota_cache_ratio: Option<f32>,
1253    /// Relative amount of the total quota limit to which quota caching is applied.
1254    ///
1255    /// If exceeded, the rate limiter will no longer cache the quota and sync with Redis on every call instead.
1256    /// Lowering this value reduces the probability of incorrectly over-accepting.
1257    ///
1258    /// Must be between `0.0` and `1.0`, by default there is no limit configured.
1259    pub quota_cache_max: Option<f32>,
1260    /// Configuration for the objectstore service.
1261    #[serde(default, alias = "upload")]
1262    pub objectstore: ObjectstoreServiceConfig,
1263}
1264
1265impl Default for Processing {
1266    /// Constructs a disabled processing configuration.
1267    fn default() -> Self {
1268        Self {
1269            enabled: false,
1270            geoip_path: None,
1271            max_secs_in_future: default_max_secs_in_future(),
1272            max_session_secs_in_past: default_max_session_secs_in_past(),
1273            kafka_config: Vec::new(),
1274            secondary_kafka_configs: BTreeMap::new(),
1275            topics: TopicAssignments::default(),
1276            kafka_validate_topics: false,
1277            redis: None,
1278            attachment_chunk_size: default_chunk_size(),
1279            projectconfig_cache_prefix: default_projectconfig_cache_prefix(),
1280            max_rate_limit: default_max_rate_limit(),
1281            quota_cache_ratio: None,
1282            quota_cache_max: None,
1283            objectstore: ObjectstoreServiceConfig::default(),
1284        }
1285    }
1286}
1287
1288/// Configuration for normalization in this Relay.
1289#[derive(Debug, Default, Serialize, Deserialize)]
1290#[serde(default)]
1291pub struct Normalization {
1292    /// Level of normalization for Relay to apply to incoming data.
1293    #[serde(default)]
1294    pub level: NormalizationLevel,
1295}
1296
1297/// Configuration for the level of normalization this Relay should do.
1298#[derive(Copy, Clone, Debug, Default, Serialize, Deserialize, Eq, PartialEq)]
1299#[serde(rename_all = "lowercase")]
1300pub enum NormalizationLevel {
1301    /// Runs normalization, excluding steps that break future compatibility.
1302    ///
1303    /// Processing Relays run [`NormalizationLevel::Full`] if this option is set.
1304    #[default]
1305    Default,
1306    /// Run full normalization.
1307    ///
1308    /// It includes steps that break future compatibility and should only run in
1309    /// the last layer of relays.
1310    Full,
1311}
1312
1313/// Configuration values for the outcome aggregator
1314#[derive(Serialize, Deserialize, Debug)]
1315#[serde(default)]
1316pub struct OutcomeAggregatorConfig {
1317    /// Defines the width of the buckets into which outcomes are aggregated, in seconds.
1318    pub bucket_interval: u64,
1319    /// Defines how often all buckets are flushed, in seconds.
1320    pub flush_interval: u64,
1321}
1322
1323impl Default for OutcomeAggregatorConfig {
1324    fn default() -> Self {
1325        Self {
1326            bucket_interval: 60,
1327            flush_interval: 120,
1328        }
1329    }
1330}
1331
1332/// Configuration options for objectstore's auth scheme.
1333#[derive(Serialize, Deserialize)]
1334pub struct ObjectstoreAuthConfig {
1335    /// Identifier for the private key used to sign objectstore's tokens. Must correspond to a
1336    /// public key configured in objectstore.
1337    pub key_id: String,
1338
1339    /// EdDSA private key used to sign Objectstore's tokens, in PEM format.
1340    pub signing_key: String,
1341}
1342
1343impl fmt::Debug for ObjectstoreAuthConfig {
1344    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1345        f.debug_struct("ObjectstoreAuthConfig")
1346            .field("key_id", &self.key_id)
1347            .field("signing_key", &"[redacted]")
1348            .finish()
1349    }
1350}
1351
1352/// Configuration values for the objectstore service.
1353#[derive(Serialize, Deserialize, Debug)]
1354#[serde(default)]
1355pub struct ObjectstoreServiceConfig {
1356    /// The base URL for the objectstore service.
1357    ///
1358    /// This defaults to [`None`], which means that the service will be disabled,
1359    /// unless a proper configuration is provided.
1360    pub objectstore_url: Option<String>,
1361
1362    /// Maximum concurrency of uploads.
1363    pub max_concurrent_requests: usize,
1364
1365    /// Maximum size of the service input queue when `max_concurrent_requests` is saturated.
1366    ///
1367    /// The service will loadshed if this threshold is reached.
1368    pub max_backlog: usize,
1369
1370    /// Maximum duration of an attachment upload in seconds. Uploads that take longer are discarded.
1371    ///
1372    /// NOTE: This timeout applies to attachments that are already in-memory. Streaming uploads
1373    /// might take longer and are restricted independently by [`Self::stream_timeout`].
1374    pub timeout: u64,
1375
1376    /// Maximum duration of an upload stream.
1377    ///
1378    /// Streams get a larger default timeout because their duration depends on the client
1379    /// as well as the server.
1380    pub stream_timeout: u64,
1381
1382    /// Time between upload attempts.
1383    pub retry_delay: f64,
1384
1385    /// Maximum number of attempts made to upload.
1386    pub max_attempts: NonZeroU16,
1387
1388    /// Configuration values for objectstore's auth scheme.
1389    pub auth: Option<ObjectstoreAuthConfig>,
1390}
1391
1392impl Default for ObjectstoreServiceConfig {
1393    fn default() -> Self {
1394        Self {
1395            objectstore_url: None,
1396            max_concurrent_requests: 10,
1397            max_backlog: 20,
1398            timeout: 60,
1399            stream_timeout: 5 * 60, // synced with `Upload::timeout`
1400            retry_delay: 1.0,
1401            max_attempts: NonZeroU16::new(5).unwrap(),
1402            auth: None,
1403        }
1404    }
1405}
1406
1407/// Determines how to emit outcomes.
1408/// For compatibility reasons, this can either be true, false or AsClientReports
1409#[derive(Copy, Clone, Debug, PartialEq, Eq)]
1410
1411pub enum EmitOutcomes {
1412    /// Do not emit any outcomes
1413    None,
1414    /// Emit outcomes as client reports
1415    AsClientReports,
1416    /// Emit outcomes as outcomes
1417    AsOutcomes,
1418}
1419
1420impl EmitOutcomes {
1421    /// Returns true of outcomes are emitted via http, kafka, or client reports.
1422    pub fn any(&self) -> bool {
1423        !matches!(self, EmitOutcomes::None)
1424    }
1425}
1426
1427impl Serialize for EmitOutcomes {
1428    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1429    where
1430        S: Serializer,
1431    {
1432        // For compatibility, serialize None and AsOutcomes as booleans.
1433        match self {
1434            Self::None => serializer.serialize_bool(false),
1435            Self::AsClientReports => serializer.serialize_str("as_client_reports"),
1436            Self::AsOutcomes => serializer.serialize_bool(true),
1437        }
1438    }
1439}
1440
1441struct EmitOutcomesVisitor;
1442
1443impl Visitor<'_> for EmitOutcomesVisitor {
1444    type Value = EmitOutcomes;
1445
1446    fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
1447        formatter.write_str("true, false, or 'as_client_reports'")
1448    }
1449
1450    fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E>
1451    where
1452        E: serde::de::Error,
1453    {
1454        Ok(if v {
1455            EmitOutcomes::AsOutcomes
1456        } else {
1457            EmitOutcomes::None
1458        })
1459    }
1460
1461    fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
1462    where
1463        E: serde::de::Error,
1464    {
1465        if v == "as_client_reports" {
1466            Ok(EmitOutcomes::AsClientReports)
1467        } else {
1468            Err(E::invalid_value(Unexpected::Str(v), &"as_client_reports"))
1469        }
1470    }
1471}
1472
1473impl<'de> Deserialize<'de> for EmitOutcomes {
1474    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1475    where
1476        D: Deserializer<'de>,
1477    {
1478        deserializer.deserialize_any(EmitOutcomesVisitor)
1479    }
1480}
1481
1482/// Outcome generation specific configuration values.
1483#[derive(Serialize, Deserialize, Debug)]
1484#[serde(default)]
1485pub struct Outcomes {
1486    /// Controls whether outcomes will be emitted when processing is disabled.
1487    /// Processing relays always emit outcomes (for backwards compatibility).
1488    /// Can take the following values: false, "as_client_reports", true
1489    pub emit_outcomes: EmitOutcomes,
1490    /// The maximum number of outcomes that are batched before being sent
1491    /// via http to the upstream (only applies to non processing relays).
1492    pub batch_size: usize,
1493    /// The maximum time interval (in milliseconds) that an outcome may be batched
1494    /// via http to the upstream (only applies to non processing relays).
1495    pub batch_interval: u64,
1496    /// Defines the source string registered in the outcomes originating from
1497    /// this Relay (typically something like the region or the layer).
1498    pub source: Option<String>,
1499    /// Configures the outcome aggregator.
1500    pub aggregator: OutcomeAggregatorConfig,
1501}
1502
1503impl Default for Outcomes {
1504    fn default() -> Self {
1505        Outcomes {
1506            emit_outcomes: EmitOutcomes::AsClientReports,
1507            batch_size: 1000,
1508            batch_interval: 500,
1509            source: None,
1510            aggregator: OutcomeAggregatorConfig::default(),
1511        }
1512    }
1513}
1514
1515/// Minimal version of a config for dumping out.
1516#[derive(Serialize, Deserialize, Debug, Default)]
1517pub struct MinimalConfig {
1518    /// The relay part of the config.
1519    pub relay: Relay,
1520}
1521
1522impl MinimalConfig {
1523    /// Saves the config in the given config folder as config.yml
1524    pub fn save_in_folder<P: AsRef<Path>>(&self, p: P) -> anyhow::Result<()> {
1525        let path = p.as_ref();
1526        if fs::metadata(path).is_err() {
1527            fs::create_dir_all(path)
1528                .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotOpenFile, path))?;
1529        }
1530        self.save(path)
1531    }
1532}
1533
1534impl ConfigObject for MinimalConfig {
1535    fn format() -> ConfigFormat {
1536        ConfigFormat::Yaml
1537    }
1538
1539    fn name() -> &'static str {
1540        "config"
1541    }
1542}
1543
1544/// Alternative serialization of RelayInfo for config file using snake case.
1545mod config_relay_info {
1546    use serde::ser::SerializeMap;
1547
1548    use super::*;
1549
1550    // Uses snake_case as opposed to camelCase.
1551    #[derive(Debug, Serialize, Deserialize, Clone)]
1552    struct RelayInfoConfig {
1553        public_key: PublicKey,
1554        #[serde(default)]
1555        internal: bool,
1556    }
1557
1558    impl From<RelayInfoConfig> for RelayInfo {
1559        fn from(v: RelayInfoConfig) -> Self {
1560            RelayInfo {
1561                public_key: v.public_key,
1562                internal: v.internal,
1563            }
1564        }
1565    }
1566
1567    impl From<RelayInfo> for RelayInfoConfig {
1568        fn from(v: RelayInfo) -> Self {
1569            RelayInfoConfig {
1570                public_key: v.public_key,
1571                internal: v.internal,
1572            }
1573        }
1574    }
1575
1576    pub(super) fn deserialize<'de, D>(des: D) -> Result<HashMap<RelayId, RelayInfo>, D::Error>
1577    where
1578        D: Deserializer<'de>,
1579    {
1580        let map = HashMap::<RelayId, RelayInfoConfig>::deserialize(des)?;
1581        Ok(map.into_iter().map(|(k, v)| (k, v.into())).collect())
1582    }
1583
1584    pub(super) fn serialize<S>(elm: &HashMap<RelayId, RelayInfo>, ser: S) -> Result<S::Ok, S::Error>
1585    where
1586        S: Serializer,
1587    {
1588        let mut map = ser.serialize_map(Some(elm.len()))?;
1589
1590        for (k, v) in elm {
1591            map.serialize_entry(k, &RelayInfoConfig::from(v.clone()))?;
1592        }
1593
1594        map.end()
1595    }
1596}
1597
1598/// Authentication options.
1599#[derive(Serialize, Deserialize, Debug, Default)]
1600pub struct AuthConfig {
1601    /// Controls responses from the readiness health check endpoint based on authentication.
1602    #[serde(default, skip_serializing_if = "is_default")]
1603    pub ready: ReadinessCondition,
1604
1605    /// Statically authenticated downstream relays.
1606    #[serde(default, with = "config_relay_info")]
1607    pub static_relays: HashMap<RelayId, RelayInfo>,
1608
1609    /// How old a signature can be before it is considered invalid, in seconds.
1610    ///
1611    /// Defaults to 5 minutes.
1612    #[serde(default = "default_max_age")]
1613    pub signature_max_age: u64,
1614}
1615
1616fn default_max_age() -> u64 {
1617    300
1618}
1619
1620/// GeoIp database configuration options.
1621#[derive(Serialize, Deserialize, Debug, Default)]
1622pub struct GeoIpConfig {
1623    /// The path to GeoIP database.
1624    pub path: Option<PathBuf>,
1625}
1626
1627/// Cardinality Limiter configuration options.
1628#[derive(Serialize, Deserialize, Debug)]
1629#[serde(default)]
1630pub struct CardinalityLimiter {
1631    /// Cache vacuum interval in seconds for the in memory cache.
1632    ///
1633    /// The cache will scan for expired values based on this interval.
1634    ///
1635    /// Defaults to 180 seconds, 3 minutes.
1636    pub cache_vacuum_interval: u64,
1637}
1638
1639impl Default for CardinalityLimiter {
1640    fn default() -> Self {
1641        Self {
1642            cache_vacuum_interval: 180,
1643        }
1644    }
1645}
1646
1647/// Settings to control Relay's health checks.
1648///
1649/// After breaching one of the configured thresholds, Relay will
1650/// return an `unhealthy` status from its health endpoint.
1651#[derive(Serialize, Deserialize, Debug)]
1652#[serde(default)]
1653pub struct Health {
1654    /// Interval to refresh internal health checks.
1655    ///
1656    /// Shorter intervals will decrease the time it takes the health check endpoint to report
1657    /// issues, but can also increase sporadic unhealthy responses.
1658    ///
1659    /// Defaults to `3000`` (3 seconds).
1660    pub refresh_interval_ms: u64,
1661    /// Maximum memory watermark in bytes.
1662    ///
1663    /// By default, there is no absolute limit set and the watermark
1664    /// is only controlled by setting [`Self::max_memory_percent`].
1665    pub max_memory_bytes: Option<ByteSize>,
1666    /// Maximum memory watermark as a percentage of maximum system memory.
1667    ///
1668    /// Defaults to `0.95` (95%).
1669    pub max_memory_percent: f32,
1670    /// Health check probe timeout in milliseconds.
1671    ///
1672    /// Any probe exceeding the timeout will be considered failed.
1673    /// This limits the max execution time of Relay health checks.
1674    ///
1675    /// Defaults to 900 milliseconds.
1676    pub probe_timeout_ms: u64,
1677    /// The refresh frequency of memory stats which are used to poll memory
1678    /// usage of Relay.
1679    ///
1680    /// The implementation of memory stats guarantees that the refresh will happen at
1681    /// least every `x` ms since memory readings are lazy and are updated only if needed.
1682    pub memory_stat_refresh_frequency_ms: u64,
1683}
1684
1685impl Default for Health {
1686    fn default() -> Self {
1687        Self {
1688            refresh_interval_ms: 3000,
1689            max_memory_bytes: None,
1690            max_memory_percent: 0.95,
1691            probe_timeout_ms: 900,
1692            memory_stat_refresh_frequency_ms: 100,
1693        }
1694    }
1695}
1696
1697/// COGS configuration.
1698#[derive(Serialize, Deserialize, Debug)]
1699#[serde(default)]
1700pub struct Cogs {
1701    /// Maximium amount of COGS measurements allowed to backlog.
1702    ///
1703    /// Any additional COGS measurements recorded will be dropped.
1704    ///
1705    /// Defaults to `10_000`.
1706    pub max_queue_size: u64,
1707    /// Relay COGS resource id.
1708    ///
1709    /// All Relay related COGS measurements are emitted with this resource id.
1710    ///
1711    /// Defaults to `relay_service`.
1712    pub relay_resource_id: String,
1713}
1714
1715impl Default for Cogs {
1716    fn default() -> Self {
1717        Self {
1718            max_queue_size: 10_000,
1719            relay_resource_id: "relay_service".to_owned(),
1720        }
1721    }
1722}
1723
1724/// Configuration for the upload service.
1725#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
1726#[serde(default)]
1727pub struct Upload {
1728    /// Maximum number of uploads that the service accepts.
1729    ///
1730    /// Additional uploads will be rejected.
1731    pub max_concurrent_requests: usize,
1732    /// Maximum time spent trying to upload, in seconds.
1733    pub timeout: u64,
1734    /// The maximum time between creating the upload and uploading the data / the attachment placeholder.
1735    ///
1736    /// In seconds.
1737    pub max_age: i64,
1738}
1739
1740impl Default for Upload {
1741    fn default() -> Self {
1742        Self {
1743            max_concurrent_requests: 100,
1744            timeout: 5 * 60,  // five minutes
1745            max_age: 60 * 60, // 1h
1746        }
1747    }
1748}
1749
1750/// All configuration values that can be deserialized from `config.yml`.
1751#[derive(Serialize, Deserialize, Debug, Default)]
1752#[allow(missing_docs)]
1753pub struct ConfigValues {
1754    #[serde(default)]
1755    pub relay: Relay,
1756    #[serde(default)]
1757    pub http: Http,
1758    #[serde(default)]
1759    pub cache: Cache,
1760    #[serde(default)]
1761    pub spool: Spool,
1762    #[serde(default)]
1763    pub limits: Limits,
1764    #[serde(default)]
1765    pub logging: relay_log::LogConfig,
1766    #[serde(default)]
1767    pub routing: Routing,
1768    #[serde(default)]
1769    pub metrics: Metrics,
1770    #[serde(default)]
1771    pub sentry: relay_log::SentryConfig,
1772    #[serde(default)]
1773    pub processing: Processing,
1774    #[serde(default)]
1775    pub outcomes: Outcomes,
1776    #[serde(default)]
1777    pub aggregator: AggregatorServiceConfig,
1778    #[serde(default)]
1779    pub secondary_aggregators: Vec<ScopedAggregatorConfig>,
1780    #[serde(default)]
1781    pub auth: AuthConfig,
1782    #[serde(default)]
1783    pub geoip: GeoIpConfig,
1784    #[serde(default)]
1785    pub normalization: Normalization,
1786    #[serde(default)]
1787    pub cardinality_limiter: CardinalityLimiter,
1788    #[serde(default)]
1789    pub health: Health,
1790    #[serde(default)]
1791    pub cogs: Cogs,
1792    #[serde(default)]
1793    pub upload: Upload,
1794}
1795
1796impl ConfigObject for ConfigValues {
1797    fn format() -> ConfigFormat {
1798        ConfigFormat::Yaml
1799    }
1800
1801    fn name() -> &'static str {
1802        "config"
1803    }
1804}
1805
1806/// Config struct.
1807pub struct Config {
1808    values: ConfigValues,
1809    credentials: Option<Credentials>,
1810    path: PathBuf,
1811}
1812
1813impl fmt::Debug for Config {
1814    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1815        f.debug_struct("Config")
1816            .field("path", &self.path)
1817            .field("values", &self.values)
1818            .finish()
1819    }
1820}
1821
1822impl Config {
1823    /// Loads a config from a given config folder.
1824    pub fn from_path<P: AsRef<Path>>(path: P) -> anyhow::Result<Config> {
1825        let path = env::current_dir()
1826            .map(|x| x.join(path.as_ref()))
1827            .unwrap_or_else(|_| path.as_ref().to_path_buf());
1828
1829        let config = Config {
1830            values: ConfigValues::load(&path)?,
1831            credentials: if Credentials::path(&path).exists() {
1832                Some(Credentials::load(&path)?)
1833            } else {
1834                None
1835            },
1836            path: path.clone(),
1837        };
1838
1839        if cfg!(not(feature = "processing")) && config.processing_enabled() {
1840            return Err(ConfigError::file(ConfigErrorKind::ProcessingNotAvailable, &path).into());
1841        }
1842
1843        Ok(config)
1844    }
1845
1846    /// Creates a config from a JSON value.
1847    ///
1848    /// This is mostly useful for tests.
1849    pub fn from_json_value(value: serde_json::Value) -> anyhow::Result<Config> {
1850        Ok(Config {
1851            values: serde_json::from_value(value)
1852                .with_context(|| ConfigError::new(ConfigErrorKind::BadJson))?,
1853            credentials: None,
1854            path: PathBuf::new(),
1855        })
1856    }
1857
1858    /// Override configuration with values coming from other sources (e.g. env variables or
1859    /// command line parameters)
1860    pub fn apply_override(
1861        &mut self,
1862        mut overrides: OverridableConfig,
1863    ) -> anyhow::Result<&mut Self> {
1864        let relay = &mut self.values.relay;
1865
1866        if let Some(mode) = overrides.mode {
1867            relay.mode = mode
1868                .parse::<RelayMode>()
1869                .with_context(|| ConfigError::field("mode"))?;
1870        }
1871
1872        if let Some(deployment) = overrides.instance {
1873            relay.instance = deployment
1874                .parse::<RelayInstance>()
1875                .with_context(|| ConfigError::field("deployment"))?;
1876        }
1877
1878        if let Some(log_level) = overrides.log_level {
1879            self.values.logging.level = log_level.parse()?;
1880        }
1881
1882        if let Some(log_format) = overrides.log_format {
1883            self.values.logging.format = log_format.parse()?;
1884        }
1885
1886        if let Some(upstream) = overrides.upstream {
1887            relay.upstream = upstream
1888                .parse::<UpstreamDescriptor>()
1889                .with_context(|| ConfigError::field("upstream"))?;
1890        } else if let Some(upstream_dsn) = overrides.upstream_dsn {
1891            relay.upstream = upstream_dsn
1892                .parse::<Dsn>()
1893                .map(|dsn| UpstreamDescriptor::from_dsn(&dsn))
1894                .with_context(|| ConfigError::field("upstream_dsn"))?;
1895        }
1896
1897        if let Some(host) = overrides.host {
1898            relay.host = host
1899                .parse::<IpAddr>()
1900                .with_context(|| ConfigError::field("host"))?;
1901        }
1902
1903        if let Some(port) = overrides.port {
1904            relay.port = port
1905                .as_str()
1906                .parse()
1907                .with_context(|| ConfigError::field("port"))?;
1908        }
1909
1910        let processing = &mut self.values.processing;
1911        if let Some(enabled) = overrides.processing {
1912            match enabled.to_lowercase().as_str() {
1913                "true" | "1" => processing.enabled = true,
1914                "false" | "0" | "" => processing.enabled = false,
1915                _ => return Err(ConfigError::field("processing").into()),
1916            }
1917        }
1918
1919        if let Some(redis) = overrides.redis_url {
1920            processing.redis = Some(RedisConfigs::Unified(RedisConfig::single(redis)))
1921        }
1922
1923        if let Some(kafka_url) = overrides.kafka_url {
1924            let existing = processing
1925                .kafka_config
1926                .iter_mut()
1927                .find(|e| e.name == "bootstrap.servers");
1928
1929            if let Some(config_param) = existing {
1930                config_param.value = kafka_url;
1931            } else {
1932                processing.kafka_config.push(KafkaConfigParam {
1933                    name: "bootstrap.servers".to_owned(),
1934                    value: kafka_url,
1935                })
1936            }
1937        }
1938        // credentials overrides
1939        let id = if let Some(id) = overrides.id {
1940            let id = Uuid::parse_str(&id).with_context(|| ConfigError::field("id"))?;
1941            Some(id)
1942        } else {
1943            None
1944        };
1945        let public_key = if let Some(public_key) = overrides.public_key {
1946            let public_key = public_key
1947                .parse::<PublicKey>()
1948                .with_context(|| ConfigError::field("public_key"))?;
1949            Some(public_key)
1950        } else {
1951            None
1952        };
1953
1954        let secret_key = if let Some(secret_key) = overrides.secret_key {
1955            let secret_key = secret_key
1956                .parse::<SecretKey>()
1957                .with_context(|| ConfigError::field("secret_key"))?;
1958            Some(secret_key)
1959        } else {
1960            None
1961        };
1962        let outcomes = &mut self.values.outcomes;
1963        if overrides.outcome_source.is_some() {
1964            outcomes.source = overrides.outcome_source.take();
1965        }
1966
1967        if let Some(credentials) = &mut self.credentials {
1968            //we have existing credentials we may override some entries
1969            if let Some(id) = id {
1970                credentials.id = id;
1971            }
1972            if let Some(public_key) = public_key {
1973                credentials.public_key = public_key;
1974            }
1975            if let Some(secret_key) = secret_key {
1976                credentials.secret_key = secret_key
1977            }
1978        } else {
1979            //no existing credentials we may only create the full credentials
1980            match (id, public_key, secret_key) {
1981                (Some(id), Some(public_key), Some(secret_key)) => {
1982                    self.credentials = Some(Credentials {
1983                        secret_key,
1984                        public_key,
1985                        id,
1986                    })
1987                }
1988                (None, None, None) => {
1989                    // nothing provided, we'll just leave the credentials None, maybe we
1990                    // don't need them in the current command or we'll override them later
1991                }
1992                _ => {
1993                    return Err(ConfigError::field("incomplete credentials").into());
1994                }
1995            }
1996        }
1997
1998        let limits = &mut self.values.limits;
1999        if let Some(shutdown_timeout) = overrides.shutdown_timeout
2000            && let Ok(shutdown_timeout) = shutdown_timeout.parse::<u64>()
2001        {
2002            limits.shutdown_timeout = shutdown_timeout;
2003        }
2004
2005        if let Some(server_name) = overrides.server_name {
2006            self.values.sentry.server_name = Some(server_name.into());
2007        }
2008
2009        Ok(self)
2010    }
2011
2012    /// Checks if the config is already initialized.
2013    pub fn config_exists<P: AsRef<Path>>(path: P) -> bool {
2014        fs::metadata(ConfigValues::path(path.as_ref())).is_ok()
2015    }
2016
2017    /// Returns the filename of the config file.
2018    pub fn path(&self) -> &Path {
2019        &self.path
2020    }
2021
2022    /// Dumps out a YAML string of the values.
2023    pub fn to_yaml_string(&self) -> anyhow::Result<String> {
2024        serde_yaml::to_string(&self.values)
2025            .with_context(|| ConfigError::new(ConfigErrorKind::CouldNotWriteFile))
2026    }
2027
2028    /// Regenerates the relay credentials.
2029    ///
2030    /// This also writes the credentials back to the file.
2031    pub fn regenerate_credentials(&mut self, save: bool) -> anyhow::Result<()> {
2032        let creds = Credentials::generate();
2033        if save {
2034            creds.save(&self.path)?;
2035        }
2036        self.credentials = Some(creds);
2037        Ok(())
2038    }
2039
2040    /// Return the current credentials
2041    pub fn credentials(&self) -> Option<&Credentials> {
2042        self.credentials.as_ref()
2043    }
2044
2045    /// Set new credentials.
2046    ///
2047    /// This also writes the credentials back to the file.
2048    pub fn replace_credentials(
2049        &mut self,
2050        credentials: Option<Credentials>,
2051    ) -> anyhow::Result<bool> {
2052        if self.credentials == credentials {
2053            return Ok(false);
2054        }
2055
2056        match credentials {
2057            Some(ref creds) => {
2058                creds.save(&self.path)?;
2059            }
2060            None => {
2061                let path = Credentials::path(&self.path);
2062                if fs::metadata(&path).is_ok() {
2063                    fs::remove_file(&path).with_context(|| {
2064                        ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path)
2065                    })?;
2066                }
2067            }
2068        }
2069
2070        self.credentials = credentials;
2071        Ok(true)
2072    }
2073
2074    /// Returns `true` if the config is ready to use.
2075    pub fn has_credentials(&self) -> bool {
2076        self.credentials.is_some()
2077    }
2078
2079    /// Returns the secret key if set.
2080    pub fn secret_key(&self) -> Option<&SecretKey> {
2081        self.credentials.as_ref().map(|x| &x.secret_key)
2082    }
2083
2084    /// Returns the public key if set.
2085    pub fn public_key(&self) -> Option<&PublicKey> {
2086        self.credentials.as_ref().map(|x| &x.public_key)
2087    }
2088
2089    /// Returns the relay ID.
2090    pub fn relay_id(&self) -> Option<&RelayId> {
2091        self.credentials.as_ref().map(|x| &x.id)
2092    }
2093
2094    /// Returns the relay mode.
2095    pub fn relay_mode(&self) -> RelayMode {
2096        self.values.relay.mode
2097    }
2098
2099    /// Returns the instance type of relay.
2100    pub fn relay_instance(&self) -> RelayInstance {
2101        self.values.relay.instance
2102    }
2103
2104    /// Returns the upstream target as descriptor.
2105    pub fn upstream(&self) -> &UpstreamDescriptor {
2106        &self.values.relay.upstream
2107    }
2108
2109    /// Returns the advertised upstream for downstream instances as descriptor.
2110    pub fn advertised_upstream(&self) -> Option<&UpstreamDescriptor> {
2111        self.values.relay.advertised_upstream.as_ref()
2112    }
2113
2114    /// Returns the custom HTTP "Host" header.
2115    pub fn http_host_header(&self) -> Option<&str> {
2116        self.values.http.host_header.as_deref()
2117    }
2118
2119    /// Returns the listen address.
2120    pub fn listen_addr(&self) -> SocketAddr {
2121        (self.values.relay.host, self.values.relay.port).into()
2122    }
2123
2124    /// Returns the listen address for internal APIs.
2125    ///
2126    /// Internal APIs are APIs which do not need to be publicly exposed,
2127    /// like health checks.
2128    ///
2129    /// Returns `None` when there is no explicit address configured for internal APIs,
2130    /// and they should instead be exposed on the main [`Self::listen_addr`].
2131    pub fn listen_addr_internal(&self) -> Option<SocketAddr> {
2132        match (
2133            self.values.relay.internal_host,
2134            self.values.relay.internal_port,
2135        ) {
2136            (Some(host), None) => Some((host, self.values.relay.port).into()),
2137            (None, Some(port)) => Some((self.values.relay.host, port).into()),
2138            (Some(host), Some(port)) => Some((host, port).into()),
2139            (None, None) => None,
2140        }
2141    }
2142
2143    /// Returns the TLS listen address.
2144    pub fn tls_listen_addr(&self) -> Option<SocketAddr> {
2145        if self.values.relay.tls_identity_path.is_some() {
2146            let port = self.values.relay.tls_port.unwrap_or(3443);
2147            Some((self.values.relay.host, port).into())
2148        } else {
2149            None
2150        }
2151    }
2152
2153    /// Returns the path to the identity bundle
2154    pub fn tls_identity_path(&self) -> Option<&Path> {
2155        self.values.relay.tls_identity_path.as_deref()
2156    }
2157
2158    /// Returns the password for the identity bundle
2159    pub fn tls_identity_password(&self) -> Option<&str> {
2160        self.values.relay.tls_identity_password.as_deref()
2161    }
2162
2163    /// Returns `true` when project IDs should be overriden rather than validated.
2164    ///
2165    /// Defaults to `false`, which requires project ID validation.
2166    pub fn override_project_ids(&self) -> bool {
2167        self.values.relay.override_project_ids
2168    }
2169
2170    /// Returns `true` if Relay requires authentication for readiness.
2171    ///
2172    /// See [`ReadinessCondition`] for more information.
2173    pub fn requires_auth(&self) -> bool {
2174        match self.values.auth.ready {
2175            ReadinessCondition::Authenticated => self.relay_mode() == RelayMode::Managed,
2176            ReadinessCondition::Always => false,
2177        }
2178    }
2179
2180    /// Returns the interval at which Realy should try to re-authenticate with the upstream.
2181    ///
2182    /// Always disabled in processing mode.
2183    pub fn http_auth_interval(&self) -> Option<Duration> {
2184        if self.processing_enabled() {
2185            return None;
2186        }
2187
2188        match self.values.http.auth_interval {
2189            None | Some(0) => None,
2190            Some(secs) => Some(Duration::from_secs(secs)),
2191        }
2192    }
2193
2194    /// The maximum time of experiencing uninterrupted network failures until Relay considers that
2195    /// it has encountered a network outage.
2196    pub fn http_outage_grace_period(&self) -> Duration {
2197        Duration::from_secs(self.values.http.outage_grace_period)
2198    }
2199
2200    /// Time Relay waits before retrying an upstream request.
2201    ///
2202    /// Before going into a network outage, Relay may fail to make upstream
2203    /// requests. This is the time Relay waits before retrying the same request.
2204    pub fn http_retry_delay(&self) -> Duration {
2205        Duration::from_secs(self.values.http.retry_delay)
2206    }
2207
2208    /// Time of continued project request failures before Relay emits an error.
2209    pub fn http_project_failure_interval(&self) -> Duration {
2210        Duration::from_secs(self.values.http.project_failure_interval)
2211    }
2212
2213    /// Content encoding of upstream requests.
2214    pub fn http_encoding(&self) -> HttpEncoding {
2215        self.values.http.encoding
2216    }
2217
2218    /// Returns whether metrics should be sent globally through a shared endpoint.
2219    pub fn http_global_metrics(&self) -> bool {
2220        self.values.http.global_metrics
2221    }
2222
2223    /// Returns `true` if Relay supports forwarding unknown API requests.
2224    ///
2225    /// Relay instances with processing enabled are expected to support the latest API and do never
2226    /// support forwarding requests to Sentry.
2227    pub fn http_forward(&self) -> bool {
2228        self.values.http.forward && !self.processing_enabled()
2229    }
2230
2231    /// Returns whether this Relay should emit outcomes.
2232    ///
2233    /// This is `true` either if `outcomes.emit_outcomes` is explicitly enabled, or if this Relay is
2234    /// in processing mode.
2235    pub fn emit_outcomes(&self) -> EmitOutcomes {
2236        if self.processing_enabled() {
2237            return EmitOutcomes::AsOutcomes;
2238        }
2239        self.values.outcomes.emit_outcomes
2240    }
2241
2242    /// Returns the maximum number of outcomes that are batched before being sent
2243    pub fn outcome_batch_size(&self) -> usize {
2244        self.values.outcomes.batch_size
2245    }
2246
2247    /// Returns the maximum interval that an outcome may be batched
2248    pub fn outcome_batch_interval(&self) -> Duration {
2249        Duration::from_millis(self.values.outcomes.batch_interval)
2250    }
2251
2252    /// The originating source of the outcome
2253    pub fn outcome_source(&self) -> Option<&str> {
2254        self.values.outcomes.source.as_deref()
2255    }
2256
2257    /// Returns the width of the buckets into which outcomes are aggregated, in seconds.
2258    pub fn outcome_aggregator(&self) -> &OutcomeAggregatorConfig {
2259        &self.values.outcomes.aggregator
2260    }
2261
2262    /// Returns logging configuration.
2263    pub fn logging(&self) -> &relay_log::LogConfig {
2264        &self.values.logging
2265    }
2266
2267    /// Returns logging configuration.
2268    pub fn sentry(&self) -> &relay_log::SentryConfig {
2269        &self.values.sentry
2270    }
2271
2272    /// Returns the addresses for statsd metrics.
2273    pub fn statsd_addr(&self) -> Option<&str> {
2274        self.values.metrics.statsd.as_deref()
2275    }
2276
2277    /// Returns the addresses for statsd metrics.
2278    pub fn statsd_buffer_size(&self) -> Option<usize> {
2279        self.values.metrics.statsd_buffer_size
2280    }
2281
2282    /// Return the prefix for statsd metrics.
2283    pub fn metrics_prefix(&self) -> &str {
2284        &self.values.metrics.prefix
2285    }
2286
2287    /// Returns the default tags for statsd metrics.
2288    pub fn metrics_default_tags(&self) -> &BTreeMap<String, String> {
2289        &self.values.metrics.default_tags
2290    }
2291
2292    /// Returns the name of the hostname tag that should be attached to each outgoing metric.
2293    pub fn metrics_hostname_tag(&self) -> Option<&str> {
2294        self.values.metrics.hostname_tag.as_deref()
2295    }
2296
2297    /// Returns the interval for periodic metrics emitted from Relay.
2298    ///
2299    /// `None` if periodic metrics are disabled.
2300    pub fn metrics_periodic_interval(&self) -> Option<Duration> {
2301        match self.values.metrics.periodic_secs {
2302            0 => None,
2303            secs => Some(Duration::from_secs(secs)),
2304        }
2305    }
2306
2307    /// Returns the default timeout for all upstream HTTP requests.
2308    pub fn http_timeout(&self) -> Duration {
2309        Duration::from_secs(self.values.http.timeout.into())
2310    }
2311
2312    /// Returns the connection timeout for all upstream HTTP requests.
2313    pub fn http_connection_timeout(&self) -> Duration {
2314        Duration::from_secs(self.values.http.connection_timeout.into())
2315    }
2316
2317    /// Returns the failed upstream request retry interval.
2318    pub fn http_max_retry_interval(&self) -> Duration {
2319        Duration::from_secs(self.values.http.max_retry_interval.into())
2320    }
2321
2322    /// Returns `true` if relay should use an in-process cache for DNS lookups.
2323    pub fn http_dns_cache(&self) -> bool {
2324        self.values.http.dns_cache
2325    }
2326
2327    /// Returns the expiry timeout for cached projects.
2328    pub fn project_cache_expiry(&self) -> Duration {
2329        Duration::from_secs(self.values.cache.project_expiry.into())
2330    }
2331
2332    /// Returns `true` if the full project state should be requested from upstream.
2333    pub fn request_full_project_config(&self) -> bool {
2334        self.values.cache.project_request_full_config
2335    }
2336
2337    /// Returns the expiry timeout for cached relay infos (public keys).
2338    pub fn relay_cache_expiry(&self) -> Duration {
2339        Duration::from_secs(self.values.cache.relay_expiry.into())
2340    }
2341
2342    /// Returns the maximum number of buffered envelopes
2343    pub fn envelope_buffer_size(&self) -> usize {
2344        self.values
2345            .cache
2346            .envelope_buffer_size
2347            .try_into()
2348            .unwrap_or(usize::MAX)
2349    }
2350
2351    /// Returns the expiry timeout for cached misses before trying to refetch.
2352    pub fn cache_miss_expiry(&self) -> Duration {
2353        Duration::from_secs(self.values.cache.miss_expiry.into())
2354    }
2355
2356    /// Returns the grace period for project caches.
2357    pub fn project_grace_period(&self) -> Duration {
2358        Duration::from_secs(self.values.cache.project_grace_period.into())
2359    }
2360
2361    /// Returns the refresh interval for a project.
2362    ///
2363    /// Validates the refresh time to be between the grace period and expiry.
2364    pub fn project_refresh_interval(&self) -> Option<Duration> {
2365        self.values
2366            .cache
2367            .project_refresh_interval
2368            .map(Into::into)
2369            .map(Duration::from_secs)
2370    }
2371
2372    /// Returns the duration in which batchable project config queries are
2373    /// collected before sending them in a single request.
2374    pub fn query_batch_interval(&self) -> Duration {
2375        Duration::from_millis(self.values.cache.batch_interval.into())
2376    }
2377
2378    /// Returns the duration in which downstream relays are requested from upstream.
2379    pub fn downstream_relays_batch_interval(&self) -> Duration {
2380        Duration::from_millis(self.values.cache.downstream_relays_batch_interval.into())
2381    }
2382
2383    /// Returns the interval in seconds in which local project configurations should be reloaded.
2384    pub fn local_cache_interval(&self) -> Duration {
2385        Duration::from_secs(self.values.cache.file_interval.into())
2386    }
2387
2388    /// Returns the interval in seconds in which fresh global configs should be
2389    /// fetched from  upstream.
2390    pub fn global_config_fetch_interval(&self) -> Duration {
2391        Duration::from_secs(self.values.cache.global_config_fetch_interval.into())
2392    }
2393
2394    /// Returns the path of the buffer file if the `cache.persistent_envelope_buffer.path` is configured.
2395    ///
2396    /// In case a partition with id > 0 is supplied, the filename of the envelopes path will be
2397    /// suffixed with `.{partition_id}`.
2398    pub fn spool_envelopes_path(&self, partition_id: u8) -> Option<PathBuf> {
2399        let mut path = self
2400            .values
2401            .spool
2402            .envelopes
2403            .path
2404            .as_ref()
2405            .map(|path| path.to_owned())?;
2406
2407        if partition_id == 0 {
2408            return Some(path);
2409        }
2410
2411        let file_name = path.file_name().and_then(|f| f.to_str())?;
2412        let new_file_name = format!("{file_name}.{partition_id}");
2413        path.set_file_name(new_file_name);
2414
2415        Some(path)
2416    }
2417
2418    /// The maximum size of the buffer, in bytes.
2419    pub fn spool_envelopes_max_disk_size(&self) -> usize {
2420        self.values.spool.envelopes.max_disk_size.as_bytes()
2421    }
2422
2423    /// Number of encoded envelope bytes that need to be accumulated before
2424    /// flushing one batch to disk.
2425    pub fn spool_envelopes_batch_size_bytes(&self) -> usize {
2426        self.values.spool.envelopes.batch_size_bytes.as_bytes()
2427    }
2428
2429    /// Returns the time after which we drop envelopes as a [`Duration`] object.
2430    pub fn spool_envelopes_max_age(&self) -> Duration {
2431        Duration::from_secs(self.values.spool.envelopes.max_envelope_delay_secs)
2432    }
2433
2434    /// Returns the refresh frequency for disk usage monitoring as a [`Duration`] object.
2435    pub fn spool_disk_usage_refresh_frequency_ms(&self) -> Duration {
2436        Duration::from_millis(self.values.spool.envelopes.disk_usage_refresh_frequency_ms)
2437    }
2438
2439    /// Returns the relative memory usage up to which the disk buffer will unspool envelopes.
2440    pub fn spool_max_backpressure_memory_percent(&self) -> f32 {
2441        self.values.spool.envelopes.max_backpressure_memory_percent
2442    }
2443
2444    /// Returns the number of partitions for the buffer.
2445    pub fn spool_partitions(&self) -> NonZeroU8 {
2446        self.values.spool.envelopes.partitions
2447    }
2448
2449    /// Returns the strategy used to assign envelopes to buffer partitions.
2450    pub fn spool_partitioning(&self) -> EnvelopeSpoolPartitioning {
2451        self.values.spool.envelopes.partitioning
2452    }
2453
2454    /// Returns `true` if the data is stored on ephemeral disks.
2455    pub fn spool_ephemeral(&self) -> bool {
2456        self.values.spool.envelopes.ephemeral
2457    }
2458
2459    /// Returns the maximum size of an event payload in bytes.
2460    pub fn max_event_size(&self) -> usize {
2461        self.values.limits.max_event_size.as_bytes()
2462    }
2463
2464    /// Returns the maximum size of each attachment.
2465    pub fn max_attachment_size(&self) -> usize {
2466        self.values.limits.max_attachment_size.as_bytes()
2467    }
2468
2469    /// Returns the maximum size of a TUS upload request body.
2470    pub fn max_upload_size(&self) -> usize {
2471        self.values.limits.max_upload_size.as_bytes()
2472    }
2473
2474    /// Returns the maximum combined size of attachments or payloads containing attachments
2475    /// (minidump, unreal, standalone attachments) in bytes.
2476    pub fn max_attachments_size(&self) -> usize {
2477        self.values.limits.max_attachments_size.as_bytes()
2478    }
2479
2480    /// Returns the maximum combined size of client reports in bytes.
2481    pub fn max_client_reports_size(&self) -> usize {
2482        self.values.limits.max_client_reports_size.as_bytes()
2483    }
2484
2485    /// Returns the maximum payload size of a monitor check-in in bytes.
2486    pub fn max_check_in_size(&self) -> usize {
2487        self.values.limits.max_check_in_size.as_bytes()
2488    }
2489
2490    /// Returns the maximum payload size of a log in bytes.
2491    pub fn max_log_size(&self) -> usize {
2492        self.values.limits.max_log_size.as_bytes()
2493    }
2494
2495    /// Returns the maximum payload size of a span in bytes.
2496    pub fn max_span_size(&self) -> usize {
2497        self.values.limits.max_span_size.as_bytes()
2498    }
2499
2500    /// Returns the maximum payload size of an item container in bytes.
2501    pub fn max_container_size(&self) -> usize {
2502        self.values.limits.max_container_size.as_bytes()
2503    }
2504
2505    /// Returns the maximum payload size for logs integration items in bytes.
2506    pub fn max_logs_integration_size(&self) -> usize {
2507        // Not explicitly configured, inherited from the maximum size of a log container.
2508        self.max_container_size()
2509    }
2510
2511    /// Returns the maximum payload size for spans integration items in bytes.
2512    pub fn max_spans_integration_size(&self) -> usize {
2513        // Not explicitly configured, inherited from the maximum size of a span container.
2514        self.max_container_size()
2515    }
2516
2517    /// Returns the maximum size of an envelope payload in bytes.
2518    ///
2519    /// Individual item size limits still apply.
2520    pub fn max_envelope_size(&self) -> usize {
2521        self.values.limits.max_envelope_size.as_bytes()
2522    }
2523
2524    /// Returns the maximum number of sessions per envelope.
2525    pub fn max_session_count(&self) -> usize {
2526        self.values.limits.max_session_count
2527    }
2528
2529    /// Returns the maximum payload size of a statsd metric in bytes.
2530    pub fn max_statsd_size(&self) -> usize {
2531        self.values.limits.max_statsd_size.as_bytes()
2532    }
2533
2534    /// Returns the maximum payload size of metric buckets in bytes.
2535    pub fn max_metric_buckets_size(&self) -> usize {
2536        self.values.limits.max_metric_buckets_size.as_bytes()
2537    }
2538
2539    /// Returns the maximum payload size for general API requests.
2540    pub fn max_api_payload_size(&self) -> usize {
2541        self.values.limits.max_api_payload_size.as_bytes()
2542    }
2543
2544    /// Returns the maximum payload size for file uploads and chunks.
2545    pub fn max_api_file_upload_size(&self) -> usize {
2546        self.values.limits.max_api_file_upload_size.as_bytes()
2547    }
2548
2549    /// Returns the maximum payload size for chunks
2550    pub fn max_api_chunk_upload_size(&self) -> usize {
2551        self.values.limits.max_api_chunk_upload_size.as_bytes()
2552    }
2553
2554    /// Returns the maximum payload size for a profile
2555    pub fn max_profile_size(&self) -> usize {
2556        self.values.limits.max_profile_size.as_bytes()
2557    }
2558
2559    /// Returns the maximum payload size for a trace metric.
2560    pub fn max_trace_metric_size(&self) -> usize {
2561        self.values.limits.max_trace_metric_size.as_bytes()
2562    }
2563
2564    /// Returns the maximum payload size for a compressed replay.
2565    pub fn max_replay_compressed_size(&self) -> usize {
2566        self.values.limits.max_replay_compressed_size.as_bytes()
2567    }
2568
2569    /// Returns the maximum payload size for an uncompressed replay.
2570    pub fn max_replay_uncompressed_size(&self) -> usize {
2571        self.values.limits.max_replay_uncompressed_size.as_bytes()
2572    }
2573
2574    /// Returns the maximum message size for an uncompressed replay.
2575    ///
2576    /// This is greater than max_replay_compressed_size because
2577    /// it can include additional metadata about the replay in
2578    /// addition to the recording.
2579    pub fn max_replay_message_size(&self) -> usize {
2580        self.values.limits.max_replay_message_size.as_bytes()
2581    }
2582
2583    /// Returns the maximum number of active requests
2584    pub fn max_concurrent_requests(&self) -> usize {
2585        self.values.limits.max_concurrent_requests
2586    }
2587
2588    /// Returns the maximum number of active queries
2589    pub fn max_concurrent_queries(&self) -> usize {
2590        self.values.limits.max_concurrent_queries
2591    }
2592
2593    /// Returns the maximum combined size of keys of invalid attributes.
2594    pub fn max_removed_attribute_key_size(&self) -> usize {
2595        self.values.limits.max_removed_attribute_key_size.as_bytes()
2596    }
2597
2598    /// The maximum number of seconds a query is allowed to take across retries.
2599    pub fn query_timeout(&self) -> Duration {
2600        Duration::from_secs(self.values.limits.query_timeout)
2601    }
2602
2603    /// The maximum number of seconds to wait for pending envelopes after receiving a shutdown
2604    /// signal.
2605    pub fn shutdown_timeout(&self) -> Duration {
2606        Duration::from_secs(self.values.limits.shutdown_timeout)
2607    }
2608
2609    /// Returns the server keep-alive timeout in seconds.
2610    ///
2611    /// By default keep alive is set to a 5 seconds.
2612    pub fn keepalive_timeout(&self) -> Duration {
2613        Duration::from_secs(self.values.limits.keepalive_timeout)
2614    }
2615
2616    /// Returns the server idle timeout in seconds.
2617    pub fn idle_timeout(&self) -> Option<Duration> {
2618        self.values.limits.idle_timeout.map(Duration::from_secs)
2619    }
2620
2621    /// Returns the maximum connections.
2622    pub fn max_connections(&self) -> Option<usize> {
2623        self.values.limits.max_connections
2624    }
2625
2626    /// TCP listen backlog to configure on Relay's listening socket.
2627    pub fn tcp_listen_backlog(&self) -> u32 {
2628        self.values.limits.tcp_listen_backlog
2629    }
2630
2631    /// Returns the number of cores to use for thread pools.
2632    pub fn cpu_concurrency(&self) -> usize {
2633        self.values.limits.max_thread_count
2634    }
2635
2636    /// Returns the number of tasks that can run concurrently in the worker pool.
2637    pub fn pool_concurrency(&self) -> usize {
2638        self.values.limits.max_pool_concurrency
2639    }
2640
2641    /// Returns the maximum size of a project config query.
2642    pub fn query_batch_size(&self) -> usize {
2643        self.values.cache.batch_size
2644    }
2645
2646    /// Get filename for static project config.
2647    pub fn project_configs_path(&self) -> PathBuf {
2648        self.path.join("projects")
2649    }
2650
2651    /// True if the Relay should do processing.
2652    pub fn processing_enabled(&self) -> bool {
2653        self.values.processing.enabled
2654    }
2655
2656    /// Level of normalization for Relay to apply to incoming data.
2657    pub fn normalization_level(&self) -> NormalizationLevel {
2658        self.values.normalization.level
2659    }
2660
2661    /// The path to the GeoIp database required for event processing.
2662    pub fn geoip_path(&self) -> Option<&Path> {
2663        self.values
2664            .geoip
2665            .path
2666            .as_deref()
2667            .or(self.values.processing.geoip_path.as_deref())
2668    }
2669
2670    /// Maximum future timestamp of ingested data.
2671    ///
2672    /// Events past this timestamp will be adjusted to `now()`. Sessions will be dropped.
2673    pub fn max_secs_in_future(&self) -> i64 {
2674        self.values.processing.max_secs_in_future.into()
2675    }
2676
2677    /// Maximum age of ingested sessions. Older sessions will be dropped.
2678    pub fn max_session_secs_in_past(&self) -> i64 {
2679        self.values.processing.max_session_secs_in_past.into()
2680    }
2681
2682    /// Configuration name and list of Kafka configuration parameters for a given topic.
2683    pub fn kafka_configs(
2684        &self,
2685        topic: KafkaTopic,
2686    ) -> Result<KafkaTopicConfig<'_>, KafkaConfigError> {
2687        self.values.processing.topics.get(topic).kafka_configs(
2688            &self.values.processing.kafka_config,
2689            &self.values.processing.secondary_kafka_configs,
2690        )
2691    }
2692
2693    /// Whether to validate the topics against Kafka.
2694    pub fn kafka_validate_topics(&self) -> bool {
2695        self.values.processing.kafka_validate_topics
2696    }
2697
2698    /// All unused but configured topic assignments.
2699    pub fn unused_topic_assignments(&self) -> &relay_kafka::Unused {
2700        &self.values.processing.topics.unused
2701    }
2702
2703    /// Configuration of the objectstore service.
2704    pub fn objectstore(&self) -> &ObjectstoreServiceConfig {
2705        &self.values.processing.objectstore
2706    }
2707
2708    /// Configuration of the upload service.
2709    pub fn upload(&self) -> &Upload {
2710        &self.values.upload
2711    }
2712
2713    /// Redis servers to connect to for project configs, cardinality limits,
2714    /// rate limiting, and metrics metadata.
2715    pub fn redis(&self) -> Option<RedisConfigsRef<'_>> {
2716        let redis_configs = self.values.processing.redis.as_ref()?;
2717
2718        Some(build_redis_configs(
2719            redis_configs,
2720            self.cpu_concurrency() as u32,
2721            self.pool_concurrency() as u32,
2722        ))
2723    }
2724
2725    /// Chunk size of attachments in bytes.
2726    pub fn attachment_chunk_size(&self) -> usize {
2727        self.values.processing.attachment_chunk_size.as_bytes()
2728    }
2729
2730    /// Maximum metrics batch size in bytes.
2731    pub fn metrics_max_batch_size_bytes(&self) -> usize {
2732        self.values.aggregator.max_flush_bytes
2733    }
2734
2735    /// Default prefix to use when looking up project configs in Redis. This is only done when
2736    /// Relay is in processing mode.
2737    pub fn projectconfig_cache_prefix(&self) -> &str {
2738        &self.values.processing.projectconfig_cache_prefix
2739    }
2740
2741    /// Maximum rate limit to report to clients in seconds.
2742    pub fn max_rate_limit(&self) -> Option<u64> {
2743        self.values.processing.max_rate_limit.map(u32::into)
2744    }
2745
2746    /// Amount of remaining quota which is cached in memory.
2747    pub fn quota_cache_ratio(&self) -> Option<f32> {
2748        self.values.processing.quota_cache_ratio
2749    }
2750
2751    /// Maximum limit (ratio) for the in memory quota cache.
2752    pub fn quota_cache_max(&self) -> Option<f32> {
2753        self.values.processing.quota_cache_max
2754    }
2755
2756    /// Cache vacuum interval for the cardinality limiter in memory cache.
2757    ///
2758    /// The cache will scan for expired values based on this interval.
2759    pub fn cardinality_limiter_cache_vacuum_interval(&self) -> Duration {
2760        Duration::from_secs(self.values.cardinality_limiter.cache_vacuum_interval)
2761    }
2762
2763    /// Interval to refresh internal health checks.
2764    pub fn health_refresh_interval(&self) -> Duration {
2765        Duration::from_millis(self.values.health.refresh_interval_ms)
2766    }
2767
2768    /// Maximum memory watermark in bytes.
2769    pub fn health_max_memory_watermark_bytes(&self) -> u64 {
2770        self.values
2771            .health
2772            .max_memory_bytes
2773            .as_ref()
2774            .map_or(u64::MAX, |b| b.as_bytes() as u64)
2775    }
2776
2777    /// Maximum memory watermark as a percentage of maximum system memory.
2778    pub fn health_max_memory_watermark_percent(&self) -> f32 {
2779        self.values.health.max_memory_percent
2780    }
2781
2782    /// Health check probe timeout.
2783    pub fn health_probe_timeout(&self) -> Duration {
2784        Duration::from_millis(self.values.health.probe_timeout_ms)
2785    }
2786
2787    /// Refresh frequency for polling new memory stats.
2788    pub fn memory_stat_refresh_frequency_ms(&self) -> u64 {
2789        self.values.health.memory_stat_refresh_frequency_ms
2790    }
2791
2792    /// Maximum amount of COGS measurements buffered in memory.
2793    pub fn cogs_max_queue_size(&self) -> u64 {
2794        self.values.cogs.max_queue_size
2795    }
2796
2797    /// Resource ID to use for Relay COGS measurements.
2798    pub fn cogs_relay_resource_id(&self) -> &str {
2799        &self.values.cogs.relay_resource_id
2800    }
2801
2802    /// Returns configuration for the default metrics aggregator.
2803    pub fn default_aggregator_config(&self) -> &AggregatorServiceConfig {
2804        &self.values.aggregator
2805    }
2806
2807    /// Returns configuration for non-default metrics aggregator.
2808    pub fn secondary_aggregator_configs(&self) -> &Vec<ScopedAggregatorConfig> {
2809        &self.values.secondary_aggregators
2810    }
2811
2812    /// Returns aggregator config for a given metrics namespace.
2813    pub fn aggregator_config_for(&self, namespace: MetricNamespace) -> &AggregatorServiceConfig {
2814        for entry in &self.values.secondary_aggregators {
2815            if entry.condition.matches(Some(namespace)) {
2816                return &entry.config;
2817            }
2818        }
2819        &self.values.aggregator
2820    }
2821
2822    /// Return the statically configured Relays.
2823    pub fn static_relays(&self) -> &HashMap<RelayId, RelayInfo> {
2824        &self.values.auth.static_relays
2825    }
2826
2827    /// Returns the max age a signature is considered valid, in seconds.
2828    pub fn signature_max_age(&self) -> Duration {
2829        Duration::from_secs(self.values.auth.signature_max_age)
2830    }
2831
2832    /// Returns `true` if unknown items should be accepted and forwarded.
2833    pub fn accept_unknown_items(&self) -> bool {
2834        let forward = self.values.routing.accept_unknown_items;
2835        forward.unwrap_or_else(|| !self.processing_enabled())
2836    }
2837}
2838
2839impl Default for Config {
2840    fn default() -> Self {
2841        Self {
2842            values: ConfigValues::default(),
2843            credentials: None,
2844            path: PathBuf::new(),
2845        }
2846    }
2847}
2848
2849#[cfg(test)]
2850mod tests {
2851
2852    use super::*;
2853
2854    /// Regression test for renaming the envelope buffer flags.
2855    #[test]
2856    fn test_event_buffer_size() {
2857        let yaml = r###"
2858cache:
2859    event_buffer_size: 1000000
2860    event_expiry: 1800
2861"###;
2862
2863        let values: ConfigValues = serde_yaml::from_str(yaml).unwrap();
2864        assert_eq!(values.cache.envelope_buffer_size, 1_000_000);
2865        assert_eq!(values.cache.envelope_expiry, 1800);
2866    }
2867
2868    #[test]
2869    fn test_emit_outcomes() {
2870        for (serialized, deserialized) in &[
2871            ("true", EmitOutcomes::AsOutcomes),
2872            ("false", EmitOutcomes::None),
2873            ("\"as_client_reports\"", EmitOutcomes::AsClientReports),
2874        ] {
2875            let value: EmitOutcomes = serde_json::from_str(serialized).unwrap();
2876            assert_eq!(value, *deserialized);
2877            assert_eq!(serde_json::to_string(&value).unwrap(), *serialized);
2878        }
2879    }
2880
2881    #[test]
2882    fn test_emit_outcomes_invalid() {
2883        assert!(serde_json::from_str::<EmitOutcomes>("asdf").is_err());
2884    }
2885}