relay_config/
config.rs

1use std::collections::{BTreeMap, HashMap};
2use std::error::Error;
3use std::io::Write;
4use std::net::{IpAddr, SocketAddr, ToSocketAddrs};
5use std::num::NonZeroU8;
6use std::path::{Path, PathBuf};
7use std::str::FromStr;
8use std::time::Duration;
9use std::{env, fmt, fs, io};
10
11use anyhow::Context;
12use relay_auth::{PublicKey, RelayId, SecretKey, generate_key_pair, generate_relay_id};
13use relay_common::Dsn;
14use relay_kafka::{
15    ConfigError as KafkaConfigError, KafkaConfigParam, KafkaTopic, KafkaTopicConfig,
16    TopicAssignments,
17};
18use relay_metrics::MetricNamespace;
19use serde::de::{DeserializeOwned, Unexpected, Visitor};
20use serde::{Deserialize, Deserializer, Serialize, Serializer};
21use uuid::Uuid;
22
23use crate::aggregator::{AggregatorServiceConfig, ScopedAggregatorConfig};
24use crate::byte_size::ByteSize;
25use crate::upstream::UpstreamDescriptor;
26use crate::{RedisConfig, RedisConfigs, RedisConfigsRef, build_redis_configs};
27
28const DEFAULT_NETWORK_OUTAGE_GRACE_PERIOD: u64 = 10;
29
30static CONFIG_YAML_HEADER: &str = r###"# Please see the relevant documentation.
31# Performance tuning: https://docs.sentry.io/product/relay/operating-guidelines/
32# All config options: https://docs.sentry.io/product/relay/options/
33"###;
34
35/// Indicates config related errors.
36#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
37#[non_exhaustive]
38pub enum ConfigErrorKind {
39    /// Failed to open the file.
40    CouldNotOpenFile,
41    /// Failed to save a file.
42    CouldNotWriteFile,
43    /// Parsing YAML failed.
44    BadYaml,
45    /// Parsing JSON failed.
46    BadJson,
47    /// Invalid config value
48    InvalidValue,
49    /// The user attempted to run Relay with processing enabled, but uses a binary that was
50    /// compiled without the processing feature.
51    ProcessingNotAvailable,
52}
53
54impl fmt::Display for ConfigErrorKind {
55    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
56        match self {
57            Self::CouldNotOpenFile => write!(f, "could not open config file"),
58            Self::CouldNotWriteFile => write!(f, "could not write config file"),
59            Self::BadYaml => write!(f, "could not parse yaml config file"),
60            Self::BadJson => write!(f, "could not parse json config file"),
61            Self::InvalidValue => write!(f, "invalid config value"),
62            Self::ProcessingNotAvailable => write!(
63                f,
64                "was not compiled with processing, cannot enable processing"
65            ),
66        }
67    }
68}
69
70/// Defines the source of a config error
71#[derive(Debug, Default)]
72enum ConfigErrorSource {
73    /// An error occurring independently.
74    #[default]
75    None,
76    /// An error originating from a configuration file.
77    File(PathBuf),
78    /// An error originating in a field override (an env var, or a CLI parameter).
79    FieldOverride(String),
80}
81
82impl fmt::Display for ConfigErrorSource {
83    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84        match self {
85            ConfigErrorSource::None => Ok(()),
86            ConfigErrorSource::File(file_name) => {
87                write!(f, " (file {})", file_name.display())
88            }
89            ConfigErrorSource::FieldOverride(name) => write!(f, " (field {name})"),
90        }
91    }
92}
93
94/// Indicates config related errors.
95#[derive(Debug)]
96pub struct ConfigError {
97    source: ConfigErrorSource,
98    kind: ConfigErrorKind,
99}
100
101impl ConfigError {
102    #[inline]
103    fn new(kind: ConfigErrorKind) -> Self {
104        Self {
105            source: ConfigErrorSource::None,
106            kind,
107        }
108    }
109
110    #[inline]
111    fn field(field: &'static str) -> Self {
112        Self {
113            source: ConfigErrorSource::FieldOverride(field.to_owned()),
114            kind: ConfigErrorKind::InvalidValue,
115        }
116    }
117
118    #[inline]
119    fn file(kind: ConfigErrorKind, p: impl AsRef<Path>) -> Self {
120        Self {
121            source: ConfigErrorSource::File(p.as_ref().to_path_buf()),
122            kind,
123        }
124    }
125
126    /// Returns the error kind of the error.
127    pub fn kind(&self) -> ConfigErrorKind {
128        self.kind
129    }
130}
131
132impl fmt::Display for ConfigError {
133    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
134        write!(f, "{}{}", self.kind(), self.source)
135    }
136}
137
138impl Error for ConfigError {}
139
140enum ConfigFormat {
141    Yaml,
142    Json,
143}
144
145impl ConfigFormat {
146    pub fn extension(&self) -> &'static str {
147        match self {
148            ConfigFormat::Yaml => "yml",
149            ConfigFormat::Json => "json",
150        }
151    }
152}
153
154trait ConfigObject: DeserializeOwned + Serialize {
155    /// The format in which to serialize this configuration.
156    fn format() -> ConfigFormat;
157
158    /// The basename of the config file.
159    fn name() -> &'static str;
160
161    /// The full filename of the config file, including the file extension.
162    fn path(base: &Path) -> PathBuf {
163        base.join(format!("{}.{}", Self::name(), Self::format().extension()))
164    }
165
166    /// Loads the config file from a file within the given directory location.
167    fn load(base: &Path) -> anyhow::Result<Self> {
168        let path = Self::path(base);
169
170        let f = fs::File::open(&path)
171            .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotOpenFile, &path))?;
172        let f = io::BufReader::new(f);
173
174        let mut source = {
175            let file = serde_vars::FileSource::default()
176                .with_variable_prefix("${file:")
177                .with_variable_suffix("}")
178                .with_base_path(base);
179            let env = serde_vars::EnvSource::default()
180                .with_variable_prefix("${")
181                .with_variable_suffix("}");
182            (file, env)
183        };
184        match Self::format() {
185            ConfigFormat::Yaml => {
186                serde_vars::deserialize(serde_yaml::Deserializer::from_reader(f), &mut source)
187                    .with_context(|| ConfigError::file(ConfigErrorKind::BadYaml, &path))
188            }
189            ConfigFormat::Json => {
190                serde_vars::deserialize(&mut serde_json::Deserializer::from_reader(f), &mut source)
191                    .with_context(|| ConfigError::file(ConfigErrorKind::BadJson, &path))
192            }
193        }
194    }
195
196    /// Writes the configuration to a file within the given directory location.
197    fn save(&self, base: &Path) -> anyhow::Result<()> {
198        let path = Self::path(base);
199        let mut options = fs::OpenOptions::new();
200        options.write(true).truncate(true).create(true);
201
202        // Remove all non-user permissions for the newly created file
203        #[cfg(unix)]
204        {
205            use std::os::unix::fs::OpenOptionsExt;
206            options.mode(0o600);
207        }
208
209        let mut f = options
210            .open(&path)
211            .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path))?;
212
213        match Self::format() {
214            ConfigFormat::Yaml => {
215                f.write_all(CONFIG_YAML_HEADER.as_bytes())?;
216                serde_yaml::to_writer(&mut f, self)
217                    .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path))?
218            }
219            ConfigFormat::Json => serde_json::to_writer_pretty(&mut f, self)
220                .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path))?,
221        }
222
223        f.write_all(b"\n").ok();
224
225        Ok(())
226    }
227}
228
229/// Structure used to hold information about configuration overrides via
230/// CLI parameters or environment variables
231#[derive(Debug, Default)]
232pub struct OverridableConfig {
233    /// The operation mode of this relay.
234    pub mode: Option<String>,
235    /// The instance type of this relay.
236    pub instance: Option<String>,
237    /// The log level of this relay.
238    pub log_level: Option<String>,
239    /// The log format of this relay.
240    pub log_format: Option<String>,
241    /// The upstream relay or sentry instance.
242    pub upstream: Option<String>,
243    /// Alternate upstream provided through a Sentry DSN. Key and project will be ignored.
244    pub upstream_dsn: Option<String>,
245    /// The host the relay should bind to (network interface).
246    pub host: Option<String>,
247    /// The port to bind for the unencrypted relay HTTP server.
248    pub port: Option<String>,
249    /// "true" if processing is enabled "false" otherwise
250    pub processing: Option<String>,
251    /// the kafka bootstrap.servers configuration string
252    pub kafka_url: Option<String>,
253    /// the redis server url
254    pub redis_url: Option<String>,
255    /// The globally unique ID of the relay.
256    pub id: Option<String>,
257    /// The secret key of the relay
258    pub secret_key: Option<String>,
259    /// The public key of the relay
260    pub public_key: Option<String>,
261    /// Outcome source
262    pub outcome_source: Option<String>,
263    /// shutdown timeout
264    pub shutdown_timeout: Option<String>,
265    /// Server name reported in the Sentry SDK.
266    pub server_name: Option<String>,
267}
268
269/// The relay credentials
270#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
271pub struct Credentials {
272    /// The secret key of the relay
273    pub secret_key: SecretKey,
274    /// The public key of the relay
275    pub public_key: PublicKey,
276    /// The globally unique ID of the relay.
277    pub id: RelayId,
278}
279
280impl Credentials {
281    /// Generates new random credentials.
282    pub fn generate() -> Self {
283        relay_log::info!("generating new relay credentials");
284        let (sk, pk) = generate_key_pair();
285        Self {
286            secret_key: sk,
287            public_key: pk,
288            id: generate_relay_id(),
289        }
290    }
291
292    /// Serializes this configuration to JSON.
293    pub fn to_json_string(&self) -> anyhow::Result<String> {
294        serde_json::to_string(self)
295            .with_context(|| ConfigError::new(ConfigErrorKind::CouldNotWriteFile))
296    }
297}
298
299impl ConfigObject for Credentials {
300    fn format() -> ConfigFormat {
301        ConfigFormat::Json
302    }
303    fn name() -> &'static str {
304        "credentials"
305    }
306}
307
308/// Information on a downstream Relay.
309#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
310#[serde(rename_all = "camelCase")]
311pub struct RelayInfo {
312    /// The public key that this Relay uses to authenticate and sign requests.
313    pub public_key: PublicKey,
314
315    /// Marks an internal relay that has privileged access to more project configuration.
316    #[serde(default)]
317    pub internal: bool,
318}
319
320impl RelayInfo {
321    /// Creates a new RelayInfo
322    pub fn new(public_key: PublicKey) -> Self {
323        Self {
324            public_key,
325            internal: false,
326        }
327    }
328}
329
330/// The operation mode of a relay.
331#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
332#[serde(rename_all = "camelCase")]
333pub enum RelayMode {
334    /// This relay acts as a proxy for all requests and events.
335    ///
336    /// Events are normalized and rate limits from the upstream are enforced, but the relay will not
337    /// fetch project configurations from the upstream or perform PII stripping. All events are
338    /// accepted unless overridden on the file system.
339    Proxy,
340
341    /// Project configurations are managed by the upstream.
342    ///
343    /// Project configurations are always fetched from the upstream, unless they are statically
344    /// overridden in the file system. This relay must be allowed in the upstream Sentry. This is
345    /// only possible, if the upstream is Sentry directly, or another managed Relay.
346    Managed,
347}
348
349impl<'de> Deserialize<'de> for RelayMode {
350    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
351    where
352        D: Deserializer<'de>,
353    {
354        let s = String::deserialize(deserializer)?;
355        match s.as_str() {
356            "proxy" => Ok(RelayMode::Proxy),
357            "managed" => Ok(RelayMode::Managed),
358            "static" => Err(serde::de::Error::custom(
359                "Relay mode 'static' has been removed. Please use 'managed' or 'proxy' instead.",
360            )),
361            other => Err(serde::de::Error::unknown_variant(
362                other,
363                &["proxy", "managed"],
364            )),
365        }
366    }
367}
368
369impl fmt::Display for RelayMode {
370    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
371        match self {
372            RelayMode::Proxy => write!(f, "proxy"),
373            RelayMode::Managed => write!(f, "managed"),
374        }
375    }
376}
377
378/// The instance type of Relay.
379#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize)]
380#[serde(rename_all = "camelCase")]
381pub enum RelayInstance {
382    /// This Relay is run as a default instance.
383    Default,
384
385    /// This Relay is run as a canary instance where experiments can be run.
386    Canary,
387}
388
389impl RelayInstance {
390    /// Returns `true` if the [`RelayInstance`] is of type [`RelayInstance::Canary`].
391    pub fn is_canary(&self) -> bool {
392        matches!(self, RelayInstance::Canary)
393    }
394}
395
396impl fmt::Display for RelayInstance {
397    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
398        match self {
399            RelayInstance::Default => write!(f, "default"),
400            RelayInstance::Canary => write!(f, "canary"),
401        }
402    }
403}
404
405impl FromStr for RelayInstance {
406    type Err = fmt::Error;
407
408    fn from_str(s: &str) -> Result<Self, Self::Err> {
409        match s {
410            "canary" => Ok(RelayInstance::Canary),
411            _ => Ok(RelayInstance::Default),
412        }
413    }
414}
415
416/// Error returned when parsing an invalid [`RelayMode`].
417#[derive(Clone, Copy, Debug, Eq, PartialEq)]
418pub struct ParseRelayModeError;
419
420impl fmt::Display for ParseRelayModeError {
421    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
422        write!(f, "Relay mode must be one of: managed or proxy")
423    }
424}
425
426impl Error for ParseRelayModeError {}
427
428impl FromStr for RelayMode {
429    type Err = ParseRelayModeError;
430
431    fn from_str(s: &str) -> Result<Self, Self::Err> {
432        match s {
433            "proxy" => Ok(RelayMode::Proxy),
434            "managed" => Ok(RelayMode::Managed),
435            _ => Err(ParseRelayModeError),
436        }
437    }
438}
439
440/// Returns `true` if this value is equal to `Default::default()`.
441fn is_default<T: Default + PartialEq>(t: &T) -> bool {
442    *t == T::default()
443}
444
445/// Checks if we are running in docker.
446fn is_docker() -> bool {
447    if fs::metadata("/.dockerenv").is_ok() {
448        return true;
449    }
450
451    fs::read_to_string("/proc/self/cgroup").is_ok_and(|s| s.contains("/docker"))
452}
453
454/// Default value for the "bind" configuration.
455fn default_host() -> IpAddr {
456    if is_docker() {
457        // Docker images rely on this service being exposed
458        "0.0.0.0".parse().unwrap()
459    } else {
460        "127.0.0.1".parse().unwrap()
461    }
462}
463
464/// Controls responses from the readiness health check endpoint based on authentication.
465///
466/// Independent of the the readiness condition, shutdown always switches Relay into unready state.
467#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize)]
468#[serde(rename_all = "lowercase")]
469#[derive(Default)]
470pub enum ReadinessCondition {
471    /// (default) Relay is ready when authenticated and connected to the upstream.
472    ///
473    /// Before authentication has succeeded and during network outages, Relay responds as not ready.
474    /// Relay reauthenticates based on the `http.auth_interval` parameter. During reauthentication,
475    /// Relay remains ready until authentication fails.
476    ///
477    /// Authentication is only required for Relays in managed mode. Other Relays will only check for
478    /// network outages.
479    #[default]
480    Authenticated,
481    /// Relay reports readiness regardless of the authentication and networking state.
482    Always,
483}
484
485/// Relay specific configuration values.
486#[derive(Serialize, Deserialize, Debug)]
487#[serde(default)]
488pub struct Relay {
489    /// The operation mode of this relay.
490    pub mode: RelayMode,
491    /// The instance type of this relay.
492    pub instance: RelayInstance,
493    /// The upstream relay or sentry instance.
494    pub upstream: UpstreamDescriptor<'static>,
495    /// The host the relay should bind to (network interface).
496    pub host: IpAddr,
497    /// The port to bind for the unencrypted relay HTTP server.
498    pub port: u16,
499    /// The host the relay should bind to (network interface) for internally exposed APIs, like
500    /// health checks.
501    ///
502    /// If not configured, internal routes are exposed on the main HTTP server.
503    ///
504    /// Note: configuring the internal http server on an address which overlaps with the main
505    /// server (e.g. main on `0.0.0.0:3000` and internal on `127.0.0.1:3000`) is a misconfiguration
506    /// resulting in approximately half of the requests sent to `127.0.0.1:3000` to fail, as the handling
507    /// http server is chosen by the operating system 'at random'.
508    ///
509    /// As a best practice you should always choose different ports to avoid this issue.
510    ///
511    /// Defaults to [`Self::host`].
512    pub internal_host: Option<IpAddr>,
513    /// The port to bind for internally exposed APIs.
514    ///
515    /// Defaults to [`Self::port`].
516    pub internal_port: Option<u16>,
517    /// Optional port to bind for the encrypted relay HTTPS server.
518    #[serde(skip_serializing)]
519    pub tls_port: Option<u16>,
520    /// The path to the identity (DER-encoded PKCS12) to use for TLS.
521    #[serde(skip_serializing)]
522    pub tls_identity_path: Option<PathBuf>,
523    /// Password for the PKCS12 archive.
524    #[serde(skip_serializing)]
525    pub tls_identity_password: Option<String>,
526    /// Always override project IDs from the URL and DSN with the identifier used at the upstream.
527    ///
528    /// Enable this setting for Relays used to redirect traffic to a migrated Sentry instance.
529    /// Validation of project identifiers can be safely skipped in these cases.
530    #[serde(skip_serializing_if = "is_default")]
531    pub override_project_ids: bool,
532}
533
534impl Default for Relay {
535    fn default() -> Self {
536        Relay {
537            mode: RelayMode::Managed,
538            instance: RelayInstance::Default,
539            upstream: "https://sentry.io/".parse().unwrap(),
540            host: default_host(),
541            port: 3000,
542            internal_host: None,
543            internal_port: None,
544            tls_port: None,
545            tls_identity_path: None,
546            tls_identity_password: None,
547            override_project_ids: false,
548        }
549    }
550}
551
552/// Control the metrics.
553#[derive(Serialize, Deserialize, Debug)]
554#[serde(default)]
555pub struct Metrics {
556    /// Hostname and port of the statsd server.
557    ///
558    /// Defaults to `None`.
559    pub statsd: Option<String>,
560    /// Common prefix that should be added to all metrics.
561    ///
562    /// Defaults to `"sentry.relay"`.
563    pub prefix: String,
564    /// Default tags to apply to all metrics.
565    pub default_tags: BTreeMap<String, String>,
566    /// Tag name to report the hostname to for each metric. Defaults to not sending such a tag.
567    pub hostname_tag: Option<String>,
568    /// Global sample rate for all emitted metrics between `0.0` and `1.0`.
569    ///
570    /// For example, a value of `0.3` means that only 30% of the emitted metrics will be sent.
571    /// Defaults to `1.0` (100%).
572    pub sample_rate: f64,
573    /// Interval for periodic metrics emitted from Relay.
574    ///
575    /// Setting it to `0` seconds disables the periodic metrics.
576    /// Defaults to 5 seconds.
577    pub periodic_secs: u64,
578    /// Whether local metric aggregation using statdsproxy should be enabled.
579    ///
580    /// Defaults to `true`.
581    pub aggregate: bool,
582    /// Allows emission of metrics with high cardinality tags.
583    ///
584    /// High cardinality tags are dynamic values attached to metrics,
585    /// such as project IDs. When enabled, these tags will be included
586    /// in the emitted metrics. When disabled, the tags will be omitted.
587    ///
588    /// Defaults to `false`.
589    pub allow_high_cardinality_tags: bool,
590}
591
592impl Default for Metrics {
593    fn default() -> Self {
594        Metrics {
595            statsd: None,
596            prefix: "sentry.relay".into(),
597            default_tags: BTreeMap::new(),
598            hostname_tag: None,
599            sample_rate: 1.0,
600            periodic_secs: 5,
601            aggregate: true,
602            allow_high_cardinality_tags: false,
603        }
604    }
605}
606
607/// Controls various limits
608#[derive(Serialize, Deserialize, Debug)]
609#[serde(default)]
610pub struct Limits {
611    /// How many requests can be sent concurrently from Relay to the upstream before Relay starts
612    /// buffering.
613    pub max_concurrent_requests: usize,
614    /// How many queries can be sent concurrently from Relay to the upstream before Relay starts
615    /// buffering.
616    ///
617    /// The concurrency of queries is additionally constrained by `max_concurrent_requests`.
618    pub max_concurrent_queries: usize,
619    /// The maximum payload size for events.
620    pub max_event_size: ByteSize,
621    /// The maximum size for each attachment.
622    pub max_attachment_size: ByteSize,
623    /// The maximum size for a TUS upload request body.
624    pub max_upload_size: ByteSize,
625    /// The maximum combined size for all attachments in an envelope or request.
626    pub max_attachments_size: ByteSize,
627    /// The maximum combined size for all client reports in an envelope or request.
628    pub max_client_reports_size: ByteSize,
629    /// The maximum payload size for a monitor check-in.
630    pub max_check_in_size: ByteSize,
631    /// The maximum payload size for an entire envelopes. Individual limits still apply.
632    pub max_envelope_size: ByteSize,
633    /// The maximum number of session items per envelope.
634    pub max_session_count: usize,
635    /// The maximum payload size for general API requests.
636    pub max_api_payload_size: ByteSize,
637    /// The maximum payload size for file uploads and chunks.
638    pub max_api_file_upload_size: ByteSize,
639    /// The maximum payload size for chunks
640    pub max_api_chunk_upload_size: ByteSize,
641    /// The maximum payload size for a profile
642    pub max_profile_size: ByteSize,
643    /// The maximum payload size for a trace metric.
644    pub max_trace_metric_size: ByteSize,
645    /// The maximum payload size for a log.
646    pub max_log_size: ByteSize,
647    /// The maximum payload size for a span.
648    pub max_span_size: ByteSize,
649    /// The maximum payload size for an item container.
650    pub max_container_size: ByteSize,
651    /// The maximum payload size for a statsd metric.
652    pub max_statsd_size: ByteSize,
653    /// The maximum payload size for metric buckets.
654    pub max_metric_buckets_size: ByteSize,
655    /// The maximum payload size for a compressed replay.
656    pub max_replay_compressed_size: ByteSize,
657    /// The maximum payload size for an uncompressed replay.
658    #[serde(alias = "max_replay_size")]
659    max_replay_uncompressed_size: ByteSize,
660    /// The maximum size for a replay recording Kafka message.
661    pub max_replay_message_size: ByteSize,
662    /// The byte size limit up to which Relay will retain
663    /// keys of invalid/removed attributes.
664    ///
665    /// This is only relevant for EAP items (spans, logs, …).
666    /// In principle, we want to record all deletions of attributes,
667    /// but we have to institute some limit to protect our infrastructure
668    /// against excessive metadata sizes.
669    ///
670    /// Defaults to 10KiB.
671    pub max_removed_attribute_key_size: ByteSize,
672    /// The maximum number of threads to spawn for CPU and web work, each.
673    ///
674    /// The total number of threads spawned will roughly be `2 * max_thread_count`. Defaults to
675    /// the number of logical CPU cores on the host.
676    pub max_thread_count: usize,
677    /// Controls the maximum concurrency of each worker thread.
678    ///
679    /// Increasing the concurrency, can lead to a better utilization of worker threads by
680    /// increasing the amount of I/O done concurrently.
681    //
682    /// Currently has no effect on defaults to `1`.
683    pub max_pool_concurrency: usize,
684    /// The maximum number of seconds a query is allowed to take across retries. Individual requests
685    /// have lower timeouts. Defaults to 30 seconds.
686    pub query_timeout: u64,
687    /// The maximum number of seconds to wait for pending envelopes after receiving a shutdown
688    /// signal.
689    pub shutdown_timeout: u64,
690    /// Server keep-alive timeout in seconds.
691    ///
692    /// By default, keep-alive is set to 5 seconds.
693    pub keepalive_timeout: u64,
694    /// Server idle timeout in seconds.
695    ///
696    /// The idle timeout limits the amount of time a connection is kept open without activity.
697    /// Setting this too short may abort connections before Relay is able to send a response.
698    ///
699    /// By default there is no idle timeout.
700    pub idle_timeout: Option<u64>,
701    /// Sets the maximum number of concurrent connections.
702    ///
703    /// Upon reaching the limit, the server will stop accepting connections.
704    ///
705    /// By default there is no limit.
706    pub max_connections: Option<usize>,
707    /// The TCP listen backlog.
708    ///
709    /// Configures the TCP listen backlog for the listening socket of Relay.
710    /// See [`man listen(2)`](https://man7.org/linux/man-pages/man2/listen.2.html)
711    /// for a more detailed description of the listen backlog.
712    ///
713    /// Defaults to `1024`, a value [google has been using for a long time](https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/commit/?id=19f92a030ca6d772ab44b22ee6a01378a8cb32d4).
714    pub tcp_listen_backlog: u32,
715}
716
717impl Default for Limits {
718    fn default() -> Self {
719        Limits {
720            max_concurrent_requests: 100,
721            max_concurrent_queries: 5,
722            max_event_size: ByteSize::mebibytes(1),
723            max_attachment_size: ByteSize::mebibytes(200),
724            max_upload_size: ByteSize::mebibytes(1024),
725            max_attachments_size: ByteSize::mebibytes(200),
726            max_client_reports_size: ByteSize::kibibytes(4),
727            max_check_in_size: ByteSize::kibibytes(100),
728            max_envelope_size: ByteSize::mebibytes(200),
729            max_session_count: 100,
730            max_api_payload_size: ByteSize::mebibytes(20),
731            max_api_file_upload_size: ByteSize::mebibytes(40),
732            max_api_chunk_upload_size: ByteSize::mebibytes(100),
733            max_profile_size: ByteSize::mebibytes(50),
734            max_trace_metric_size: ByteSize::kibibytes(2),
735            max_log_size: ByteSize::mebibytes(1),
736            max_span_size: ByteSize::mebibytes(1),
737            max_container_size: ByteSize::mebibytes(12),
738            max_statsd_size: ByteSize::mebibytes(1),
739            max_metric_buckets_size: ByteSize::mebibytes(1),
740            max_replay_compressed_size: ByteSize::mebibytes(10),
741            max_replay_uncompressed_size: ByteSize::mebibytes(100),
742            max_replay_message_size: ByteSize::mebibytes(15),
743            max_thread_count: num_cpus::get(),
744            max_pool_concurrency: 1,
745            query_timeout: 30,
746            shutdown_timeout: 10,
747            keepalive_timeout: 5,
748            idle_timeout: None,
749            max_connections: None,
750            tcp_listen_backlog: 1024,
751            max_removed_attribute_key_size: ByteSize::kibibytes(10),
752        }
753    }
754}
755
756/// Controls traffic steering.
757#[derive(Debug, Default, Deserialize, Serialize)]
758#[serde(default)]
759pub struct Routing {
760    /// Accept and forward unknown Envelope items to the upstream.
761    ///
762    /// Forwarding unknown items should be enabled in most cases to allow proxying traffic for newer
763    /// SDK versions. The upstream in Sentry makes the final decision on which items are valid. If
764    /// this is disabled, just the unknown items are removed from Envelopes, and the rest is
765    /// processed as usual.
766    ///
767    /// Defaults to `true` for all Relay modes other than processing mode. In processing mode, this
768    /// is disabled by default since the item cannot be handled.
769    pub accept_unknown_items: Option<bool>,
770}
771
772/// Http content encoding for both incoming and outgoing web requests.
773#[derive(Clone, Copy, Debug, Default, Deserialize, Serialize)]
774#[serde(rename_all = "lowercase")]
775pub enum HttpEncoding {
776    /// Identity function without no compression.
777    ///
778    /// This is the default encoding and does not require the presence of the `content-encoding`
779    /// HTTP header.
780    #[default]
781    Identity,
782    /// Compression using a [zlib](https://en.wikipedia.org/wiki/Zlib) structure with
783    /// [deflate](https://en.wikipedia.org/wiki/DEFLATE) encoding.
784    ///
785    /// These structures are defined in [RFC 1950](https://datatracker.ietf.org/doc/html/rfc1950)
786    /// and [RFC 1951](https://datatracker.ietf.org/doc/html/rfc1951).
787    Deflate,
788    /// A format using the [Lempel-Ziv coding](https://en.wikipedia.org/wiki/LZ77_and_LZ78#LZ77)
789    /// (LZ77), with a 32-bit CRC.
790    ///
791    /// This is the original format of the UNIX gzip program. The HTTP/1.1 standard also recommends
792    /// that the servers supporting this content-encoding should recognize `x-gzip` as an alias, for
793    /// compatibility purposes.
794    Gzip,
795    /// A format using the [Brotli](https://en.wikipedia.org/wiki/Brotli) algorithm.
796    Br,
797    /// A format using the [Zstd](https://en.wikipedia.org/wiki/Zstd) compression algorithm.
798    Zstd,
799}
800
801impl HttpEncoding {
802    /// Parses a [`HttpEncoding`] from its `content-encoding` header value.
803    pub fn parse(str: &str) -> Self {
804        let str = str.trim();
805        if str.eq_ignore_ascii_case("zstd") {
806            Self::Zstd
807        } else if str.eq_ignore_ascii_case("br") {
808            Self::Br
809        } else if str.eq_ignore_ascii_case("gzip") || str.eq_ignore_ascii_case("x-gzip") {
810            Self::Gzip
811        } else if str.eq_ignore_ascii_case("deflate") {
812            Self::Deflate
813        } else {
814            Self::Identity
815        }
816    }
817
818    /// Returns the value for the `content-encoding` HTTP header.
819    ///
820    /// Returns `None` for [`Identity`](Self::Identity), and `Some` for other encodings.
821    pub fn name(&self) -> Option<&'static str> {
822        match self {
823            Self::Identity => None,
824            Self::Deflate => Some("deflate"),
825            Self::Gzip => Some("gzip"),
826            Self::Br => Some("br"),
827            Self::Zstd => Some("zstd"),
828        }
829    }
830}
831
832/// Controls authentication with upstream.
833#[derive(Serialize, Deserialize, Debug)]
834#[serde(default)]
835pub struct Http {
836    /// Timeout for upstream requests in seconds.
837    ///
838    /// This timeout covers the time from sending the request until receiving response headers.
839    /// Neither the connection process and handshakes, nor reading the response body is covered in
840    /// this timeout.
841    pub timeout: u32,
842    /// Timeout for establishing connections with the upstream in seconds.
843    ///
844    /// This includes SSL handshakes. Relay reuses connections when the upstream supports connection
845    /// keep-alive. Connections are retained for a maximum 75 seconds, or 15 seconds of inactivity.
846    pub connection_timeout: u32,
847    /// Maximum interval between failed request retries in seconds.
848    pub max_retry_interval: u32,
849    /// The custom HTTP Host header to send to the upstream.
850    pub host_header: Option<String>,
851    /// The interval in seconds at which Relay attempts to reauthenticate with the upstream server.
852    ///
853    /// Re-authentication happens even when Relay is idle. If authentication fails, Relay reverts
854    /// back into startup mode and tries to establish a connection. During this time, incoming
855    /// envelopes will be buffered.
856    ///
857    /// Defaults to `600` (10 minutes).
858    pub auth_interval: Option<u64>,
859    /// The maximum time of experiencing uninterrupted network failures until Relay considers that
860    /// it has encountered a network outage in seconds.
861    ///
862    /// During a network outage relay will try to reconnect and will buffer all upstream messages
863    /// until it manages to reconnect.
864    pub outage_grace_period: u64,
865    /// The time Relay waits before retrying an upstream request, in seconds.
866    ///
867    /// This time is only used before going into a network outage mode.
868    pub retry_delay: u64,
869    /// The interval in seconds for continued failed project fetches at which Relay will error.
870    ///
871    /// A successful fetch resets this interval. Relay does nothing during long
872    /// times without emitting requests.
873    pub project_failure_interval: u64,
874    /// Content encoding to apply to upstream store requests.
875    ///
876    /// By default, Relay applies `zstd` content encoding to compress upstream requests. Compression
877    /// can be disabled to reduce CPU consumption, but at the expense of increased network traffic.
878    ///
879    /// This setting applies to all store requests of SDK data, including events, transactions,
880    /// envelopes and sessions. At the moment, this does not apply to Relay's internal queries.
881    ///
882    /// Available options are:
883    ///
884    ///  - `identity`: Disables compression.
885    ///  - `deflate`: Compression using a zlib header with deflate encoding.
886    ///  - `gzip` (default): Compression using gzip.
887    ///  - `br`: Compression using the brotli algorithm.
888    ///  - `zstd`: Compression using the zstd algorithm.
889    pub encoding: HttpEncoding,
890    /// Submit metrics globally through a shared endpoint.
891    ///
892    /// As opposed to regular envelopes which are sent to an endpoint inferred from the project's
893    /// DSN, this submits metrics to the global endpoint with Relay authentication.
894    ///
895    /// This option does not have any effect on processing mode.
896    pub global_metrics: bool,
897    /// Controls whether the forward endpoint is enabled.
898    ///
899    /// The forward endpoint forwards unknown API requests to the upstream.
900    pub forward: bool,
901}
902
903impl Default for Http {
904    fn default() -> Self {
905        Http {
906            timeout: 5,
907            connection_timeout: 3,
908            max_retry_interval: 60, // 1 minute
909            host_header: None,
910            auth_interval: Some(600), // 10 minutes
911            outage_grace_period: DEFAULT_NETWORK_OUTAGE_GRACE_PERIOD,
912            retry_delay: default_retry_delay(),
913            project_failure_interval: default_project_failure_interval(),
914            encoding: HttpEncoding::Zstd,
915            global_metrics: false,
916            forward: true,
917        }
918    }
919}
920
921/// Default for unavailable upstream retry period, 1s.
922fn default_retry_delay() -> u64 {
923    1
924}
925
926/// Default for project failure interval, 90s.
927fn default_project_failure_interval() -> u64 {
928    90
929}
930
931/// Default for max disk size, 500 MB.
932fn spool_envelopes_max_disk_size() -> ByteSize {
933    ByteSize::mebibytes(500)
934}
935
936/// Default number of encoded envelope bytes to cache before writing to disk.
937fn spool_envelopes_batch_size_bytes() -> ByteSize {
938    ByteSize::kibibytes(10)
939}
940
941fn spool_envelopes_max_envelope_delay_secs() -> u64 {
942    24 * 60 * 60
943}
944
945/// Default refresh frequency in ms for the disk usage monitoring.
946fn spool_disk_usage_refresh_frequency_ms() -> u64 {
947    100
948}
949
950/// Default max memory usage for unspooling.
951fn spool_max_backpressure_memory_percent() -> f32 {
952    0.8
953}
954
955/// Default number of partitions for the buffer.
956fn spool_envelopes_partitions() -> NonZeroU8 {
957    NonZeroU8::new(1).unwrap()
958}
959
960/// Persistent buffering configuration for incoming envelopes.
961#[derive(Debug, Serialize, Deserialize)]
962pub struct EnvelopeSpool {
963    /// The path of the SQLite database file(s) which persist the data.
964    ///
965    /// Based on the number of partitions, more database files will be created within the same path.
966    ///
967    /// If not set, the envelopes will be buffered in memory.
968    pub path: Option<PathBuf>,
969    /// The maximum size of the buffer to keep, in bytes.
970    ///
971    /// When the on-disk buffer reaches this size, new envelopes will be dropped.
972    ///
973    /// Defaults to 500MB.
974    #[serde(default = "spool_envelopes_max_disk_size")]
975    pub max_disk_size: ByteSize,
976    /// Size of the batch of compressed envelopes that are spooled to disk at once.
977    ///
978    /// Note that this is the size after which spooling will be triggered but it does not guarantee
979    /// that exactly this size will be spooled, it can be greater or equal.
980    ///
981    /// Defaults to 10 KiB.
982    #[serde(default = "spool_envelopes_batch_size_bytes")]
983    pub batch_size_bytes: ByteSize,
984    /// Maximum time between receiving the envelope and processing it.
985    ///
986    /// When envelopes spend too much time in the buffer (e.g. because their project cannot be loaded),
987    /// they are dropped.
988    ///
989    /// Defaults to 24h.
990    #[serde(default = "spool_envelopes_max_envelope_delay_secs")]
991    pub max_envelope_delay_secs: u64,
992    /// The refresh frequency in ms of how frequently disk usage is updated by querying SQLite
993    /// internal page stats.
994    ///
995    /// Defaults to 100ms.
996    #[serde(default = "spool_disk_usage_refresh_frequency_ms")]
997    pub disk_usage_refresh_frequency_ms: u64,
998    /// The relative memory usage above which the buffer service will stop dequeueing envelopes.
999    ///
1000    /// Only applies when [`Self::path`] is set.
1001    ///
1002    /// This value should be lower than [`Health::max_memory_percent`] to prevent flip-flopping.
1003    ///
1004    /// Warning: This threshold can cause the buffer service to deadlock when the buffer consumes
1005    /// excessive memory (as influenced by [`Self::batch_size_bytes`]).
1006    ///
1007    /// This scenario arises when the buffer stops spooling due to reaching the
1008    /// [`Self::max_backpressure_memory_percent`] limit, but the batch threshold for spooling
1009    /// ([`Self::batch_size_bytes`]) is never reached. As a result, no data is spooled, memory usage
1010    /// continues to grow, and the system becomes deadlocked.
1011    ///
1012    /// ### Example
1013    /// Suppose the system has 1GB of available memory and is configured to spool only after
1014    /// accumulating 10GB worth of envelopes. If Relay consumes 900MB of memory, it will stop
1015    /// unspooling due to reaching the [`Self::max_backpressure_memory_percent`] threshold.
1016    ///
1017    /// However, because the buffer hasn't accumulated the 10GB needed to trigger spooling,
1018    /// no data will be offloaded. Memory usage keeps increasing until it hits the
1019    /// [`Health::max_memory_percent`] threshold, e.g., at 950MB. At this point:
1020    ///
1021    /// - No more envelopes are accepted.
1022    /// - The buffer remains stuck, as unspooling won’t resume until memory drops below 900MB which
1023    ///   will not happen.
1024    /// - A deadlock occurs, with the system unable to recover without manual intervention.
1025    ///
1026    /// Defaults to 90% (5% less than max memory).
1027    #[serde(default = "spool_max_backpressure_memory_percent")]
1028    pub max_backpressure_memory_percent: f32,
1029    /// Number of partitions of the buffer.
1030    ///
1031    /// A partition is a separate instance of the buffer which has its own isolated queue, stacks
1032    /// and other resources.
1033    ///
1034    /// Defaults to 1.
1035    #[serde(default = "spool_envelopes_partitions")]
1036    pub partitions: NonZeroU8,
1037}
1038
1039impl Default for EnvelopeSpool {
1040    fn default() -> Self {
1041        Self {
1042            path: None,
1043            max_disk_size: spool_envelopes_max_disk_size(),
1044            batch_size_bytes: spool_envelopes_batch_size_bytes(),
1045            max_envelope_delay_secs: spool_envelopes_max_envelope_delay_secs(),
1046            disk_usage_refresh_frequency_ms: spool_disk_usage_refresh_frequency_ms(),
1047            max_backpressure_memory_percent: spool_max_backpressure_memory_percent(),
1048            partitions: spool_envelopes_partitions(),
1049        }
1050    }
1051}
1052
1053/// Persistent buffering configuration.
1054#[derive(Debug, Serialize, Deserialize, Default)]
1055pub struct Spool {
1056    /// Configuration for envelope spooling.
1057    #[serde(default)]
1058    pub envelopes: EnvelopeSpool,
1059}
1060
1061/// Controls internal caching behavior.
1062#[derive(Serialize, Deserialize, Debug)]
1063#[serde(default)]
1064pub struct Cache {
1065    /// The full project state will be requested by this Relay if set to `true`.
1066    pub project_request_full_config: bool,
1067    /// The cache timeout for project configurations in seconds.
1068    pub project_expiry: u32,
1069    /// Continue using project state this many seconds after cache expiry while a new state is
1070    /// being fetched. This is added on top of `project_expiry`.
1071    ///
1072    /// Default is 2 minutes.
1073    pub project_grace_period: u32,
1074    /// Refresh a project after the specified seconds.
1075    ///
1076    /// The time must be between expiry time and the grace period.
1077    ///
1078    /// By default there are no refreshes enabled.
1079    pub project_refresh_interval: Option<u32>,
1080    /// The cache timeout for downstream relay info (public keys) in seconds.
1081    pub relay_expiry: u32,
1082    /// Unused cache timeout for envelopes.
1083    ///
1084    /// The envelope buffer is instead controlled by `envelope_buffer_size`, which controls the
1085    /// maximum number of envelopes in the buffer. A time based configuration may be re-introduced
1086    /// at a later point.
1087    #[serde(alias = "event_expiry")]
1088    envelope_expiry: u32,
1089    /// The maximum amount of envelopes to queue before dropping them.
1090    #[serde(alias = "event_buffer_size")]
1091    envelope_buffer_size: u32,
1092    /// The cache timeout for non-existing entries.
1093    pub miss_expiry: u32,
1094    /// The buffer timeout for batched project config queries before sending them upstream in ms.
1095    pub batch_interval: u32,
1096    /// The buffer timeout for batched queries of downstream relays in ms. Defaults to 100ms.
1097    pub downstream_relays_batch_interval: u32,
1098    /// The maximum number of project configs to fetch from Sentry at once. Defaults to 500.
1099    ///
1100    /// `cache.batch_interval` controls how quickly batches are sent, this controls the batch size.
1101    pub batch_size: usize,
1102    /// Interval for watching local cache override files in seconds.
1103    pub file_interval: u32,
1104    /// Interval for fetching new global configs from the upstream, in seconds.
1105    pub global_config_fetch_interval: u32,
1106}
1107
1108impl Default for Cache {
1109    fn default() -> Self {
1110        Cache {
1111            project_request_full_config: false,
1112            project_expiry: 300,       // 5 minutes
1113            project_grace_period: 120, // 2 minutes
1114            project_refresh_interval: None,
1115            relay_expiry: 3600,   // 1 hour
1116            envelope_expiry: 600, // 10 minutes
1117            envelope_buffer_size: 1000,
1118            miss_expiry: 60,                       // 1 minute
1119            batch_interval: 100,                   // 100ms
1120            downstream_relays_batch_interval: 100, // 100ms
1121            batch_size: 500,
1122            file_interval: 10,                // 10 seconds
1123            global_config_fetch_interval: 10, // 10 seconds
1124        }
1125    }
1126}
1127
1128fn default_max_secs_in_future() -> u32 {
1129    60 // 1 minute
1130}
1131
1132fn default_max_session_secs_in_past() -> u32 {
1133    5 * 24 * 3600 // 5 days
1134}
1135
1136fn default_chunk_size() -> ByteSize {
1137    ByteSize::mebibytes(1)
1138}
1139
1140fn default_projectconfig_cache_prefix() -> String {
1141    "relayconfig".to_owned()
1142}
1143
1144#[allow(clippy::unnecessary_wraps)]
1145fn default_max_rate_limit() -> Option<u32> {
1146    Some(300) // 5 minutes
1147}
1148
1149/// Controls Sentry-internal event processing.
1150#[derive(Serialize, Deserialize, Debug)]
1151pub struct Processing {
1152    /// True if the Relay should do processing. Defaults to `false`.
1153    pub enabled: bool,
1154    /// GeoIp DB file source.
1155    #[serde(default)]
1156    pub geoip_path: Option<PathBuf>,
1157    /// Maximum future timestamp of ingested events.
1158    #[serde(default = "default_max_secs_in_future")]
1159    pub max_secs_in_future: u32,
1160    /// Maximum age of ingested sessions. Older sessions will be dropped.
1161    #[serde(default = "default_max_session_secs_in_past")]
1162    pub max_session_secs_in_past: u32,
1163    /// Kafka producer configurations.
1164    pub kafka_config: Vec<KafkaConfigParam>,
1165    /// Additional kafka producer configurations.
1166    ///
1167    /// The `kafka_config` is the default producer configuration used for all topics. A secondary
1168    /// kafka config can be referenced in `topics:` like this:
1169    ///
1170    /// ```yaml
1171    /// secondary_kafka_configs:
1172    ///   mycustomcluster:
1173    ///     - name: 'bootstrap.servers'
1174    ///       value: 'sentry_kafka_metrics:9093'
1175    ///
1176    /// topics:
1177    ///   transactions: ingest-transactions
1178    ///   metrics:
1179    ///     name: ingest-metrics
1180    ///     config: mycustomcluster
1181    /// ```
1182    ///
1183    /// Then metrics will be produced to an entirely different Kafka cluster.
1184    #[serde(default)]
1185    pub secondary_kafka_configs: BTreeMap<String, Vec<KafkaConfigParam>>,
1186    /// Kafka topic names.
1187    #[serde(default)]
1188    pub topics: TopicAssignments,
1189    /// Whether to validate the supplied topics by calling Kafka's metadata endpoints.
1190    #[serde(default)]
1191    pub kafka_validate_topics: bool,
1192    /// Redis hosts to connect to for storing state for rate limits.
1193    #[serde(default)]
1194    pub redis: Option<RedisConfigs>,
1195    /// Maximum chunk size of attachments for Kafka.
1196    #[serde(default = "default_chunk_size")]
1197    pub attachment_chunk_size: ByteSize,
1198    /// Prefix to use when looking up project configs in Redis. Defaults to "relayconfig".
1199    #[serde(default = "default_projectconfig_cache_prefix")]
1200    pub projectconfig_cache_prefix: String,
1201    /// Maximum rate limit to report to clients.
1202    #[serde(default = "default_max_rate_limit")]
1203    pub max_rate_limit: Option<u32>,
1204    /// Configures the quota cache ratio between `0.0` and `1.0`.
1205    ///
1206    /// The quota cache, caches the specified ratio of remaining quota in memory to reduce the
1207    /// amount of synchronizations required with Redis.
1208    ///
1209    /// The ratio is applied to the (per second) rate of the quota, not the total limit.
1210    /// For example a quota with limit 100 with a 10 second window is treated equally to a quota of
1211    /// 10 with a 1 second window.
1212    ///
1213    /// By default quota caching is disabled.
1214    pub quota_cache_ratio: Option<f32>,
1215    /// Relative amount of the total quota limit to which quota caching is applied.
1216    ///
1217    /// If exceeded, the rate limiter will no longer cache the quota and sync with Redis on every call instead.
1218    /// Lowering this value reduces the probability of incorrectly over-accepting.
1219    ///
1220    /// Must be between `0.0` and `1.0`, by default there is no limit configured.
1221    pub quota_cache_max: Option<f32>,
1222    /// Configuration for attachment uploads.
1223    #[serde(default)]
1224    pub upload: UploadServiceConfig,
1225}
1226
1227impl Default for Processing {
1228    /// Constructs a disabled processing configuration.
1229    fn default() -> Self {
1230        Self {
1231            enabled: false,
1232            geoip_path: None,
1233            max_secs_in_future: default_max_secs_in_future(),
1234            max_session_secs_in_past: default_max_session_secs_in_past(),
1235            kafka_config: Vec::new(),
1236            secondary_kafka_configs: BTreeMap::new(),
1237            topics: TopicAssignments::default(),
1238            kafka_validate_topics: false,
1239            redis: None,
1240            attachment_chunk_size: default_chunk_size(),
1241            projectconfig_cache_prefix: default_projectconfig_cache_prefix(),
1242            max_rate_limit: default_max_rate_limit(),
1243            quota_cache_ratio: None,
1244            quota_cache_max: None,
1245            upload: UploadServiceConfig::default(),
1246        }
1247    }
1248}
1249
1250/// Configuration for normalization in this Relay.
1251#[derive(Debug, Default, Serialize, Deserialize)]
1252#[serde(default)]
1253pub struct Normalization {
1254    /// Level of normalization for Relay to apply to incoming data.
1255    #[serde(default)]
1256    pub level: NormalizationLevel,
1257}
1258
1259/// Configuration for the level of normalization this Relay should do.
1260#[derive(Copy, Clone, Debug, Default, Serialize, Deserialize, Eq, PartialEq)]
1261#[serde(rename_all = "lowercase")]
1262pub enum NormalizationLevel {
1263    /// Runs normalization, excluding steps that break future compatibility.
1264    ///
1265    /// Processing Relays run [`NormalizationLevel::Full`] if this option is set.
1266    #[default]
1267    Default,
1268    /// Run full normalization.
1269    ///
1270    /// It includes steps that break future compatibility and should only run in
1271    /// the last layer of relays.
1272    Full,
1273}
1274
1275/// Configuration values for the outcome aggregator
1276#[derive(Serialize, Deserialize, Debug)]
1277#[serde(default)]
1278pub struct OutcomeAggregatorConfig {
1279    /// Defines the width of the buckets into which outcomes are aggregated, in seconds.
1280    pub bucket_interval: u64,
1281    /// Defines how often all buckets are flushed, in seconds.
1282    pub flush_interval: u64,
1283}
1284
1285impl Default for OutcomeAggregatorConfig {
1286    fn default() -> Self {
1287        Self {
1288            bucket_interval: 60,
1289            flush_interval: 120,
1290        }
1291    }
1292}
1293
1294/// Configuration values for attachment uploads.
1295#[derive(Serialize, Deserialize, Debug)]
1296#[serde(default)]
1297pub struct UploadServiceConfig {
1298    /// The base URL for the objectstore service.
1299    ///
1300    /// This defaults to [`None`], which means that the service will be disabled,
1301    /// unless a proper configuration is provided.
1302    pub objectstore_url: Option<String>,
1303
1304    /// Maximum concurrency of uploads.
1305    pub max_concurrent_requests: usize,
1306
1307    /// Maximum duration of an attachment upload in seconds. Uploads that take longer are discarded.
1308    pub timeout: u64,
1309}
1310
1311impl Default for UploadServiceConfig {
1312    fn default() -> Self {
1313        Self {
1314            objectstore_url: None,
1315            max_concurrent_requests: 10,
1316            timeout: 60,
1317        }
1318    }
1319}
1320
1321/// Determines how to emit outcomes.
1322/// For compatibility reasons, this can either be true, false or AsClientReports
1323#[derive(Copy, Clone, Debug, PartialEq, Eq)]
1324
1325pub enum EmitOutcomes {
1326    /// Do not emit any outcomes
1327    None,
1328    /// Emit outcomes as client reports
1329    AsClientReports,
1330    /// Emit outcomes as outcomes
1331    AsOutcomes,
1332}
1333
1334impl EmitOutcomes {
1335    /// Returns true of outcomes are emitted via http, kafka, or client reports.
1336    pub fn any(&self) -> bool {
1337        !matches!(self, EmitOutcomes::None)
1338    }
1339}
1340
1341impl Serialize for EmitOutcomes {
1342    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1343    where
1344        S: Serializer,
1345    {
1346        // For compatibility, serialize None and AsOutcomes as booleans.
1347        match self {
1348            Self::None => serializer.serialize_bool(false),
1349            Self::AsClientReports => serializer.serialize_str("as_client_reports"),
1350            Self::AsOutcomes => serializer.serialize_bool(true),
1351        }
1352    }
1353}
1354
1355struct EmitOutcomesVisitor;
1356
1357impl Visitor<'_> for EmitOutcomesVisitor {
1358    type Value = EmitOutcomes;
1359
1360    fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
1361        formatter.write_str("true, false, or 'as_client_reports'")
1362    }
1363
1364    fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E>
1365    where
1366        E: serde::de::Error,
1367    {
1368        Ok(if v {
1369            EmitOutcomes::AsOutcomes
1370        } else {
1371            EmitOutcomes::None
1372        })
1373    }
1374
1375    fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
1376    where
1377        E: serde::de::Error,
1378    {
1379        if v == "as_client_reports" {
1380            Ok(EmitOutcomes::AsClientReports)
1381        } else {
1382            Err(E::invalid_value(Unexpected::Str(v), &"as_client_reports"))
1383        }
1384    }
1385}
1386
1387impl<'de> Deserialize<'de> for EmitOutcomes {
1388    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1389    where
1390        D: Deserializer<'de>,
1391    {
1392        deserializer.deserialize_any(EmitOutcomesVisitor)
1393    }
1394}
1395
1396/// Outcome generation specific configuration values.
1397#[derive(Serialize, Deserialize, Debug)]
1398#[serde(default)]
1399pub struct Outcomes {
1400    /// Controls whether outcomes will be emitted when processing is disabled.
1401    /// Processing relays always emit outcomes (for backwards compatibility).
1402    /// Can take the following values: false, "as_client_reports", true
1403    pub emit_outcomes: EmitOutcomes,
1404    /// Controls wheather client reported outcomes should be emitted.
1405    pub emit_client_outcomes: bool,
1406    /// The maximum number of outcomes that are batched before being sent
1407    /// via http to the upstream (only applies to non processing relays).
1408    pub batch_size: usize,
1409    /// The maximum time interval (in milliseconds) that an outcome may be batched
1410    /// via http to the upstream (only applies to non processing relays).
1411    pub batch_interval: u64,
1412    /// Defines the source string registered in the outcomes originating from
1413    /// this Relay (typically something like the region or the layer).
1414    pub source: Option<String>,
1415    /// Configures the outcome aggregator.
1416    pub aggregator: OutcomeAggregatorConfig,
1417}
1418
1419impl Default for Outcomes {
1420    fn default() -> Self {
1421        Outcomes {
1422            emit_outcomes: EmitOutcomes::AsClientReports,
1423            emit_client_outcomes: true,
1424            batch_size: 1000,
1425            batch_interval: 500,
1426            source: None,
1427            aggregator: OutcomeAggregatorConfig::default(),
1428        }
1429    }
1430}
1431
1432/// Minimal version of a config for dumping out.
1433#[derive(Serialize, Deserialize, Debug, Default)]
1434pub struct MinimalConfig {
1435    /// The relay part of the config.
1436    pub relay: Relay,
1437}
1438
1439impl MinimalConfig {
1440    /// Saves the config in the given config folder as config.yml
1441    pub fn save_in_folder<P: AsRef<Path>>(&self, p: P) -> anyhow::Result<()> {
1442        let path = p.as_ref();
1443        if fs::metadata(path).is_err() {
1444            fs::create_dir_all(path)
1445                .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotOpenFile, path))?;
1446        }
1447        self.save(path)
1448    }
1449}
1450
1451impl ConfigObject for MinimalConfig {
1452    fn format() -> ConfigFormat {
1453        ConfigFormat::Yaml
1454    }
1455
1456    fn name() -> &'static str {
1457        "config"
1458    }
1459}
1460
1461/// Alternative serialization of RelayInfo for config file using snake case.
1462mod config_relay_info {
1463    use serde::ser::SerializeMap;
1464
1465    use super::*;
1466
1467    // Uses snake_case as opposed to camelCase.
1468    #[derive(Debug, Serialize, Deserialize, Clone)]
1469    struct RelayInfoConfig {
1470        public_key: PublicKey,
1471        #[serde(default)]
1472        internal: bool,
1473    }
1474
1475    impl From<RelayInfoConfig> for RelayInfo {
1476        fn from(v: RelayInfoConfig) -> Self {
1477            RelayInfo {
1478                public_key: v.public_key,
1479                internal: v.internal,
1480            }
1481        }
1482    }
1483
1484    impl From<RelayInfo> for RelayInfoConfig {
1485        fn from(v: RelayInfo) -> Self {
1486            RelayInfoConfig {
1487                public_key: v.public_key,
1488                internal: v.internal,
1489            }
1490        }
1491    }
1492
1493    pub(super) fn deserialize<'de, D>(des: D) -> Result<HashMap<RelayId, RelayInfo>, D::Error>
1494    where
1495        D: Deserializer<'de>,
1496    {
1497        let map = HashMap::<RelayId, RelayInfoConfig>::deserialize(des)?;
1498        Ok(map.into_iter().map(|(k, v)| (k, v.into())).collect())
1499    }
1500
1501    pub(super) fn serialize<S>(elm: &HashMap<RelayId, RelayInfo>, ser: S) -> Result<S::Ok, S::Error>
1502    where
1503        S: Serializer,
1504    {
1505        let mut map = ser.serialize_map(Some(elm.len()))?;
1506
1507        for (k, v) in elm {
1508            map.serialize_entry(k, &RelayInfoConfig::from(v.clone()))?;
1509        }
1510
1511        map.end()
1512    }
1513}
1514
1515/// Authentication options.
1516#[derive(Serialize, Deserialize, Debug, Default)]
1517pub struct AuthConfig {
1518    /// Controls responses from the readiness health check endpoint based on authentication.
1519    #[serde(default, skip_serializing_if = "is_default")]
1520    pub ready: ReadinessCondition,
1521
1522    /// Statically authenticated downstream relays.
1523    #[serde(default, with = "config_relay_info")]
1524    pub static_relays: HashMap<RelayId, RelayInfo>,
1525
1526    /// How old a signature can be before it is considered invalid, in seconds.
1527    ///
1528    /// Defaults to 5 minutes.
1529    #[serde(default = "default_max_age")]
1530    pub signature_max_age: u64,
1531}
1532
1533fn default_max_age() -> u64 {
1534    300
1535}
1536
1537/// GeoIp database configuration options.
1538#[derive(Serialize, Deserialize, Debug, Default)]
1539pub struct GeoIpConfig {
1540    /// The path to GeoIP database.
1541    pub path: Option<PathBuf>,
1542}
1543
1544/// Cardinality Limiter configuration options.
1545#[derive(Serialize, Deserialize, Debug)]
1546#[serde(default)]
1547pub struct CardinalityLimiter {
1548    /// Cache vacuum interval in seconds for the in memory cache.
1549    ///
1550    /// The cache will scan for expired values based on this interval.
1551    ///
1552    /// Defaults to 180 seconds, 3 minutes.
1553    pub cache_vacuum_interval: u64,
1554}
1555
1556impl Default for CardinalityLimiter {
1557    fn default() -> Self {
1558        Self {
1559            cache_vacuum_interval: 180,
1560        }
1561    }
1562}
1563
1564/// Settings to control Relay's health checks.
1565///
1566/// After breaching one of the configured thresholds, Relay will
1567/// return an `unhealthy` status from its health endpoint.
1568#[derive(Serialize, Deserialize, Debug)]
1569#[serde(default)]
1570pub struct Health {
1571    /// Interval to refresh internal health checks.
1572    ///
1573    /// Shorter intervals will decrease the time it takes the health check endpoint to report
1574    /// issues, but can also increase sporadic unhealthy responses.
1575    ///
1576    /// Defaults to `3000`` (3 seconds).
1577    pub refresh_interval_ms: u64,
1578    /// Maximum memory watermark in bytes.
1579    ///
1580    /// By default, there is no absolute limit set and the watermark
1581    /// is only controlled by setting [`Self::max_memory_percent`].
1582    pub max_memory_bytes: Option<ByteSize>,
1583    /// Maximum memory watermark as a percentage of maximum system memory.
1584    ///
1585    /// Defaults to `0.95` (95%).
1586    pub max_memory_percent: f32,
1587    /// Health check probe timeout in milliseconds.
1588    ///
1589    /// Any probe exceeding the timeout will be considered failed.
1590    /// This limits the max execution time of Relay health checks.
1591    ///
1592    /// Defaults to 900 milliseconds.
1593    pub probe_timeout_ms: u64,
1594    /// The refresh frequency of memory stats which are used to poll memory
1595    /// usage of Relay.
1596    ///
1597    /// The implementation of memory stats guarantees that the refresh will happen at
1598    /// least every `x` ms since memory readings are lazy and are updated only if needed.
1599    pub memory_stat_refresh_frequency_ms: u64,
1600}
1601
1602impl Default for Health {
1603    fn default() -> Self {
1604        Self {
1605            refresh_interval_ms: 3000,
1606            max_memory_bytes: None,
1607            max_memory_percent: 0.95,
1608            probe_timeout_ms: 900,
1609            memory_stat_refresh_frequency_ms: 100,
1610        }
1611    }
1612}
1613
1614/// COGS configuration.
1615#[derive(Serialize, Deserialize, Debug)]
1616#[serde(default)]
1617pub struct Cogs {
1618    /// Maximium amount of COGS measurements allowed to backlog.
1619    ///
1620    /// Any additional COGS measurements recorded will be dropped.
1621    ///
1622    /// Defaults to `10_000`.
1623    pub max_queue_size: u64,
1624    /// Relay COGS resource id.
1625    ///
1626    /// All Relay related COGS measurements are emitted with this resource id.
1627    ///
1628    /// Defaults to `relay_service`.
1629    pub relay_resource_id: String,
1630}
1631
1632impl Default for Cogs {
1633    fn default() -> Self {
1634        Self {
1635            max_queue_size: 10_000,
1636            relay_resource_id: "relay_service".to_owned(),
1637        }
1638    }
1639}
1640
1641#[derive(Serialize, Deserialize, Debug, Default)]
1642struct ConfigValues {
1643    #[serde(default)]
1644    relay: Relay,
1645    #[serde(default)]
1646    http: Http,
1647    #[serde(default)]
1648    cache: Cache,
1649    #[serde(default)]
1650    spool: Spool,
1651    #[serde(default)]
1652    limits: Limits,
1653    #[serde(default)]
1654    logging: relay_log::LogConfig,
1655    #[serde(default)]
1656    routing: Routing,
1657    #[serde(default)]
1658    metrics: Metrics,
1659    #[serde(default)]
1660    sentry: relay_log::SentryConfig,
1661    #[serde(default)]
1662    processing: Processing,
1663    #[serde(default)]
1664    outcomes: Outcomes,
1665    #[serde(default)]
1666    aggregator: AggregatorServiceConfig,
1667    #[serde(default)]
1668    secondary_aggregators: Vec<ScopedAggregatorConfig>,
1669    #[serde(default)]
1670    auth: AuthConfig,
1671    #[serde(default)]
1672    geoip: GeoIpConfig,
1673    #[serde(default)]
1674    normalization: Normalization,
1675    #[serde(default)]
1676    cardinality_limiter: CardinalityLimiter,
1677    #[serde(default)]
1678    health: Health,
1679    #[serde(default)]
1680    cogs: Cogs,
1681}
1682
1683impl ConfigObject for ConfigValues {
1684    fn format() -> ConfigFormat {
1685        ConfigFormat::Yaml
1686    }
1687
1688    fn name() -> &'static str {
1689        "config"
1690    }
1691}
1692
1693/// Config struct.
1694pub struct Config {
1695    values: ConfigValues,
1696    credentials: Option<Credentials>,
1697    path: PathBuf,
1698}
1699
1700impl fmt::Debug for Config {
1701    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1702        f.debug_struct("Config")
1703            .field("path", &self.path)
1704            .field("values", &self.values)
1705            .finish()
1706    }
1707}
1708
1709impl Config {
1710    /// Loads a config from a given config folder.
1711    pub fn from_path<P: AsRef<Path>>(path: P) -> anyhow::Result<Config> {
1712        let path = env::current_dir()
1713            .map(|x| x.join(path.as_ref()))
1714            .unwrap_or_else(|_| path.as_ref().to_path_buf());
1715
1716        let config = Config {
1717            values: ConfigValues::load(&path)?,
1718            credentials: if Credentials::path(&path).exists() {
1719                Some(Credentials::load(&path)?)
1720            } else {
1721                None
1722            },
1723            path: path.clone(),
1724        };
1725
1726        if cfg!(not(feature = "processing")) && config.processing_enabled() {
1727            return Err(ConfigError::file(ConfigErrorKind::ProcessingNotAvailable, &path).into());
1728        }
1729
1730        Ok(config)
1731    }
1732
1733    /// Creates a config from a JSON value.
1734    ///
1735    /// This is mostly useful for tests.
1736    pub fn from_json_value(value: serde_json::Value) -> anyhow::Result<Config> {
1737        Ok(Config {
1738            values: serde_json::from_value(value)
1739                .with_context(|| ConfigError::new(ConfigErrorKind::BadJson))?,
1740            credentials: None,
1741            path: PathBuf::new(),
1742        })
1743    }
1744
1745    /// Override configuration with values coming from other sources (e.g. env variables or
1746    /// command line parameters)
1747    pub fn apply_override(
1748        &mut self,
1749        mut overrides: OverridableConfig,
1750    ) -> anyhow::Result<&mut Self> {
1751        let relay = &mut self.values.relay;
1752
1753        if let Some(mode) = overrides.mode {
1754            relay.mode = mode
1755                .parse::<RelayMode>()
1756                .with_context(|| ConfigError::field("mode"))?;
1757        }
1758
1759        if let Some(deployment) = overrides.instance {
1760            relay.instance = deployment
1761                .parse::<RelayInstance>()
1762                .with_context(|| ConfigError::field("deployment"))?;
1763        }
1764
1765        if let Some(log_level) = overrides.log_level {
1766            self.values.logging.level = log_level.parse()?;
1767        }
1768
1769        if let Some(log_format) = overrides.log_format {
1770            self.values.logging.format = log_format.parse()?;
1771        }
1772
1773        if let Some(upstream) = overrides.upstream {
1774            relay.upstream = upstream
1775                .parse::<UpstreamDescriptor>()
1776                .with_context(|| ConfigError::field("upstream"))?;
1777        } else if let Some(upstream_dsn) = overrides.upstream_dsn {
1778            relay.upstream = upstream_dsn
1779                .parse::<Dsn>()
1780                .map(|dsn| UpstreamDescriptor::from_dsn(&dsn).into_owned())
1781                .with_context(|| ConfigError::field("upstream_dsn"))?;
1782        }
1783
1784        if let Some(host) = overrides.host {
1785            relay.host = host
1786                .parse::<IpAddr>()
1787                .with_context(|| ConfigError::field("host"))?;
1788        }
1789
1790        if let Some(port) = overrides.port {
1791            relay.port = port
1792                .as_str()
1793                .parse()
1794                .with_context(|| ConfigError::field("port"))?;
1795        }
1796
1797        let processing = &mut self.values.processing;
1798        if let Some(enabled) = overrides.processing {
1799            match enabled.to_lowercase().as_str() {
1800                "true" | "1" => processing.enabled = true,
1801                "false" | "0" | "" => processing.enabled = false,
1802                _ => return Err(ConfigError::field("processing").into()),
1803            }
1804        }
1805
1806        if let Some(redis) = overrides.redis_url {
1807            processing.redis = Some(RedisConfigs::Unified(RedisConfig::single(redis)))
1808        }
1809
1810        if let Some(kafka_url) = overrides.kafka_url {
1811            let existing = processing
1812                .kafka_config
1813                .iter_mut()
1814                .find(|e| e.name == "bootstrap.servers");
1815
1816            if let Some(config_param) = existing {
1817                config_param.value = kafka_url;
1818            } else {
1819                processing.kafka_config.push(KafkaConfigParam {
1820                    name: "bootstrap.servers".to_owned(),
1821                    value: kafka_url,
1822                })
1823            }
1824        }
1825        // credentials overrides
1826        let id = if let Some(id) = overrides.id {
1827            let id = Uuid::parse_str(&id).with_context(|| ConfigError::field("id"))?;
1828            Some(id)
1829        } else {
1830            None
1831        };
1832        let public_key = if let Some(public_key) = overrides.public_key {
1833            let public_key = public_key
1834                .parse::<PublicKey>()
1835                .with_context(|| ConfigError::field("public_key"))?;
1836            Some(public_key)
1837        } else {
1838            None
1839        };
1840
1841        let secret_key = if let Some(secret_key) = overrides.secret_key {
1842            let secret_key = secret_key
1843                .parse::<SecretKey>()
1844                .with_context(|| ConfigError::field("secret_key"))?;
1845            Some(secret_key)
1846        } else {
1847            None
1848        };
1849        let outcomes = &mut self.values.outcomes;
1850        if overrides.outcome_source.is_some() {
1851            outcomes.source = overrides.outcome_source.take();
1852        }
1853
1854        if let Some(credentials) = &mut self.credentials {
1855            //we have existing credentials we may override some entries
1856            if let Some(id) = id {
1857                credentials.id = id;
1858            }
1859            if let Some(public_key) = public_key {
1860                credentials.public_key = public_key;
1861            }
1862            if let Some(secret_key) = secret_key {
1863                credentials.secret_key = secret_key
1864            }
1865        } else {
1866            //no existing credentials we may only create the full credentials
1867            match (id, public_key, secret_key) {
1868                (Some(id), Some(public_key), Some(secret_key)) => {
1869                    self.credentials = Some(Credentials {
1870                        secret_key,
1871                        public_key,
1872                        id,
1873                    })
1874                }
1875                (None, None, None) => {
1876                    // nothing provided, we'll just leave the credentials None, maybe we
1877                    // don't need them in the current command or we'll override them later
1878                }
1879                _ => {
1880                    return Err(ConfigError::field("incomplete credentials").into());
1881                }
1882            }
1883        }
1884
1885        let limits = &mut self.values.limits;
1886        if let Some(shutdown_timeout) = overrides.shutdown_timeout
1887            && let Ok(shutdown_timeout) = shutdown_timeout.parse::<u64>()
1888        {
1889            limits.shutdown_timeout = shutdown_timeout;
1890        }
1891
1892        if let Some(server_name) = overrides.server_name {
1893            self.values.sentry.server_name = Some(server_name.into());
1894        }
1895
1896        Ok(self)
1897    }
1898
1899    /// Checks if the config is already initialized.
1900    pub fn config_exists<P: AsRef<Path>>(path: P) -> bool {
1901        fs::metadata(ConfigValues::path(path.as_ref())).is_ok()
1902    }
1903
1904    /// Returns the filename of the config file.
1905    pub fn path(&self) -> &Path {
1906        &self.path
1907    }
1908
1909    /// Dumps out a YAML string of the values.
1910    pub fn to_yaml_string(&self) -> anyhow::Result<String> {
1911        serde_yaml::to_string(&self.values)
1912            .with_context(|| ConfigError::new(ConfigErrorKind::CouldNotWriteFile))
1913    }
1914
1915    /// Regenerates the relay credentials.
1916    ///
1917    /// This also writes the credentials back to the file.
1918    pub fn regenerate_credentials(&mut self, save: bool) -> anyhow::Result<()> {
1919        let creds = Credentials::generate();
1920        if save {
1921            creds.save(&self.path)?;
1922        }
1923        self.credentials = Some(creds);
1924        Ok(())
1925    }
1926
1927    /// Return the current credentials
1928    pub fn credentials(&self) -> Option<&Credentials> {
1929        self.credentials.as_ref()
1930    }
1931
1932    /// Set new credentials.
1933    ///
1934    /// This also writes the credentials back to the file.
1935    pub fn replace_credentials(
1936        &mut self,
1937        credentials: Option<Credentials>,
1938    ) -> anyhow::Result<bool> {
1939        if self.credentials == credentials {
1940            return Ok(false);
1941        }
1942
1943        match credentials {
1944            Some(ref creds) => {
1945                creds.save(&self.path)?;
1946            }
1947            None => {
1948                let path = Credentials::path(&self.path);
1949                if fs::metadata(&path).is_ok() {
1950                    fs::remove_file(&path).with_context(|| {
1951                        ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path)
1952                    })?;
1953                }
1954            }
1955        }
1956
1957        self.credentials = credentials;
1958        Ok(true)
1959    }
1960
1961    /// Returns `true` if the config is ready to use.
1962    pub fn has_credentials(&self) -> bool {
1963        self.credentials.is_some()
1964    }
1965
1966    /// Returns the secret key if set.
1967    pub fn secret_key(&self) -> Option<&SecretKey> {
1968        self.credentials.as_ref().map(|x| &x.secret_key)
1969    }
1970
1971    /// Returns the public key if set.
1972    pub fn public_key(&self) -> Option<&PublicKey> {
1973        self.credentials.as_ref().map(|x| &x.public_key)
1974    }
1975
1976    /// Returns the relay ID.
1977    pub fn relay_id(&self) -> Option<&RelayId> {
1978        self.credentials.as_ref().map(|x| &x.id)
1979    }
1980
1981    /// Returns the relay mode.
1982    pub fn relay_mode(&self) -> RelayMode {
1983        self.values.relay.mode
1984    }
1985
1986    /// Returns the instance type of relay.
1987    pub fn relay_instance(&self) -> RelayInstance {
1988        self.values.relay.instance
1989    }
1990
1991    /// Returns the upstream target as descriptor.
1992    pub fn upstream_descriptor(&self) -> &UpstreamDescriptor<'_> {
1993        &self.values.relay.upstream
1994    }
1995
1996    /// Returns the custom HTTP "Host" header.
1997    pub fn http_host_header(&self) -> Option<&str> {
1998        self.values.http.host_header.as_deref()
1999    }
2000
2001    /// Returns the listen address.
2002    pub fn listen_addr(&self) -> SocketAddr {
2003        (self.values.relay.host, self.values.relay.port).into()
2004    }
2005
2006    /// Returns the listen address for internal APIs.
2007    ///
2008    /// Internal APIs are APIs which do not need to be publicly exposed,
2009    /// like health checks.
2010    ///
2011    /// Returns `None` when there is no explicit address configured for internal APIs,
2012    /// and they should instead be exposed on the main [`Self::listen_addr`].
2013    pub fn listen_addr_internal(&self) -> Option<SocketAddr> {
2014        match (
2015            self.values.relay.internal_host,
2016            self.values.relay.internal_port,
2017        ) {
2018            (Some(host), None) => Some((host, self.values.relay.port).into()),
2019            (None, Some(port)) => Some((self.values.relay.host, port).into()),
2020            (Some(host), Some(port)) => Some((host, port).into()),
2021            (None, None) => None,
2022        }
2023    }
2024
2025    /// Returns the TLS listen address.
2026    pub fn tls_listen_addr(&self) -> Option<SocketAddr> {
2027        if self.values.relay.tls_identity_path.is_some() {
2028            let port = self.values.relay.tls_port.unwrap_or(3443);
2029            Some((self.values.relay.host, port).into())
2030        } else {
2031            None
2032        }
2033    }
2034
2035    /// Returns the path to the identity bundle
2036    pub fn tls_identity_path(&self) -> Option<&Path> {
2037        self.values.relay.tls_identity_path.as_deref()
2038    }
2039
2040    /// Returns the password for the identity bundle
2041    pub fn tls_identity_password(&self) -> Option<&str> {
2042        self.values.relay.tls_identity_password.as_deref()
2043    }
2044
2045    /// Returns `true` when project IDs should be overriden rather than validated.
2046    ///
2047    /// Defaults to `false`, which requires project ID validation.
2048    pub fn override_project_ids(&self) -> bool {
2049        self.values.relay.override_project_ids
2050    }
2051
2052    /// Returns `true` if Relay requires authentication for readiness.
2053    ///
2054    /// See [`ReadinessCondition`] for more information.
2055    pub fn requires_auth(&self) -> bool {
2056        match self.values.auth.ready {
2057            ReadinessCondition::Authenticated => self.relay_mode() == RelayMode::Managed,
2058            ReadinessCondition::Always => false,
2059        }
2060    }
2061
2062    /// Returns the interval at which Realy should try to re-authenticate with the upstream.
2063    ///
2064    /// Always disabled in processing mode.
2065    pub fn http_auth_interval(&self) -> Option<Duration> {
2066        if self.processing_enabled() {
2067            return None;
2068        }
2069
2070        match self.values.http.auth_interval {
2071            None | Some(0) => None,
2072            Some(secs) => Some(Duration::from_secs(secs)),
2073        }
2074    }
2075
2076    /// The maximum time of experiencing uninterrupted network failures until Relay considers that
2077    /// it has encountered a network outage.
2078    pub fn http_outage_grace_period(&self) -> Duration {
2079        Duration::from_secs(self.values.http.outage_grace_period)
2080    }
2081
2082    /// Time Relay waits before retrying an upstream request.
2083    ///
2084    /// Before going into a network outage, Relay may fail to make upstream
2085    /// requests. This is the time Relay waits before retrying the same request.
2086    pub fn http_retry_delay(&self) -> Duration {
2087        Duration::from_secs(self.values.http.retry_delay)
2088    }
2089
2090    /// Time of continued project request failures before Relay emits an error.
2091    pub fn http_project_failure_interval(&self) -> Duration {
2092        Duration::from_secs(self.values.http.project_failure_interval)
2093    }
2094
2095    /// Content encoding of upstream requests.
2096    pub fn http_encoding(&self) -> HttpEncoding {
2097        self.values.http.encoding
2098    }
2099
2100    /// Returns whether metrics should be sent globally through a shared endpoint.
2101    pub fn http_global_metrics(&self) -> bool {
2102        self.values.http.global_metrics
2103    }
2104
2105    /// Returns `true` if Relay supports forwarding unknown API requests.
2106    pub fn http_forward(&self) -> bool {
2107        self.values.http.forward
2108    }
2109
2110    /// Returns whether this Relay should emit outcomes.
2111    ///
2112    /// This is `true` either if `outcomes.emit_outcomes` is explicitly enabled, or if this Relay is
2113    /// in processing mode.
2114    pub fn emit_outcomes(&self) -> EmitOutcomes {
2115        if self.processing_enabled() {
2116            return EmitOutcomes::AsOutcomes;
2117        }
2118        self.values.outcomes.emit_outcomes
2119    }
2120
2121    /// Returns whether this Relay should emit client outcomes
2122    ///
2123    /// Relays that do not emit client outcomes will forward client recieved outcomes
2124    /// directly to the next relay in the chain as client report envelope.  This is only done
2125    /// if this relay emits outcomes at all. A relay that will not emit outcomes
2126    /// will forward the envelope unchanged.
2127    ///
2128    /// This flag can be explicitly disabled on processing relays as well to prevent the
2129    /// emitting of client outcomes to the kafka topic.
2130    pub fn emit_client_outcomes(&self) -> bool {
2131        self.values.outcomes.emit_client_outcomes
2132    }
2133
2134    /// Returns the maximum number of outcomes that are batched before being sent
2135    pub fn outcome_batch_size(&self) -> usize {
2136        self.values.outcomes.batch_size
2137    }
2138
2139    /// Returns the maximum interval that an outcome may be batched
2140    pub fn outcome_batch_interval(&self) -> Duration {
2141        Duration::from_millis(self.values.outcomes.batch_interval)
2142    }
2143
2144    /// The originating source of the outcome
2145    pub fn outcome_source(&self) -> Option<&str> {
2146        self.values.outcomes.source.as_deref()
2147    }
2148
2149    /// Returns the width of the buckets into which outcomes are aggregated, in seconds.
2150    pub fn outcome_aggregator(&self) -> &OutcomeAggregatorConfig {
2151        &self.values.outcomes.aggregator
2152    }
2153
2154    /// Returns logging configuration.
2155    pub fn logging(&self) -> &relay_log::LogConfig {
2156        &self.values.logging
2157    }
2158
2159    /// Returns logging configuration.
2160    pub fn sentry(&self) -> &relay_log::SentryConfig {
2161        &self.values.sentry
2162    }
2163
2164    /// Returns the socket addresses for statsd.
2165    ///
2166    /// If stats is disabled an empty vector is returned.
2167    pub fn statsd_addrs(&self) -> anyhow::Result<Vec<SocketAddr>> {
2168        if let Some(ref addr) = self.values.metrics.statsd {
2169            let addrs = addr
2170                .as_str()
2171                .to_socket_addrs()
2172                .with_context(|| ConfigError::file(ConfigErrorKind::InvalidValue, &self.path))?
2173                .collect();
2174            Ok(addrs)
2175        } else {
2176            Ok(vec![])
2177        }
2178    }
2179
2180    /// Return the prefix for statsd metrics.
2181    pub fn metrics_prefix(&self) -> &str {
2182        &self.values.metrics.prefix
2183    }
2184
2185    /// Returns the default tags for statsd metrics.
2186    pub fn metrics_default_tags(&self) -> &BTreeMap<String, String> {
2187        &self.values.metrics.default_tags
2188    }
2189
2190    /// Returns the name of the hostname tag that should be attached to each outgoing metric.
2191    pub fn metrics_hostname_tag(&self) -> Option<&str> {
2192        self.values.metrics.hostname_tag.as_deref()
2193    }
2194
2195    /// Returns the global sample rate for all metrics.
2196    pub fn metrics_sample_rate(&self) -> f64 {
2197        self.values.metrics.sample_rate
2198    }
2199
2200    /// Returns whether local metric aggregation should be enabled.
2201    pub fn metrics_aggregate(&self) -> bool {
2202        self.values.metrics.aggregate
2203    }
2204
2205    /// Returns whether high cardinality tags should be removed before sending metrics.
2206    pub fn metrics_allow_high_cardinality_tags(&self) -> bool {
2207        self.values.metrics.allow_high_cardinality_tags
2208    }
2209
2210    /// Returns the interval for periodic metrics emitted from Relay.
2211    ///
2212    /// `None` if periodic metrics are disabled.
2213    pub fn metrics_periodic_interval(&self) -> Option<Duration> {
2214        match self.values.metrics.periodic_secs {
2215            0 => None,
2216            secs => Some(Duration::from_secs(secs)),
2217        }
2218    }
2219
2220    /// Returns the default timeout for all upstream HTTP requests.
2221    pub fn http_timeout(&self) -> Duration {
2222        Duration::from_secs(self.values.http.timeout.into())
2223    }
2224
2225    /// Returns the connection timeout for all upstream HTTP requests.
2226    pub fn http_connection_timeout(&self) -> Duration {
2227        Duration::from_secs(self.values.http.connection_timeout.into())
2228    }
2229
2230    /// Returns the failed upstream request retry interval.
2231    pub fn http_max_retry_interval(&self) -> Duration {
2232        Duration::from_secs(self.values.http.max_retry_interval.into())
2233    }
2234
2235    /// Returns the expiry timeout for cached projects.
2236    pub fn project_cache_expiry(&self) -> Duration {
2237        Duration::from_secs(self.values.cache.project_expiry.into())
2238    }
2239
2240    /// Returns `true` if the full project state should be requested from upstream.
2241    pub fn request_full_project_config(&self) -> bool {
2242        self.values.cache.project_request_full_config
2243    }
2244
2245    /// Returns the expiry timeout for cached relay infos (public keys).
2246    pub fn relay_cache_expiry(&self) -> Duration {
2247        Duration::from_secs(self.values.cache.relay_expiry.into())
2248    }
2249
2250    /// Returns the maximum number of buffered envelopes
2251    pub fn envelope_buffer_size(&self) -> usize {
2252        self.values
2253            .cache
2254            .envelope_buffer_size
2255            .try_into()
2256            .unwrap_or(usize::MAX)
2257    }
2258
2259    /// Returns the expiry timeout for cached misses before trying to refetch.
2260    pub fn cache_miss_expiry(&self) -> Duration {
2261        Duration::from_secs(self.values.cache.miss_expiry.into())
2262    }
2263
2264    /// Returns the grace period for project caches.
2265    pub fn project_grace_period(&self) -> Duration {
2266        Duration::from_secs(self.values.cache.project_grace_period.into())
2267    }
2268
2269    /// Returns the refresh interval for a project.
2270    ///
2271    /// Validates the refresh time to be between the grace period and expiry.
2272    pub fn project_refresh_interval(&self) -> Option<Duration> {
2273        self.values
2274            .cache
2275            .project_refresh_interval
2276            .map(Into::into)
2277            .map(Duration::from_secs)
2278    }
2279
2280    /// Returns the duration in which batchable project config queries are
2281    /// collected before sending them in a single request.
2282    pub fn query_batch_interval(&self) -> Duration {
2283        Duration::from_millis(self.values.cache.batch_interval.into())
2284    }
2285
2286    /// Returns the duration in which downstream relays are requested from upstream.
2287    pub fn downstream_relays_batch_interval(&self) -> Duration {
2288        Duration::from_millis(self.values.cache.downstream_relays_batch_interval.into())
2289    }
2290
2291    /// Returns the interval in seconds in which local project configurations should be reloaded.
2292    pub fn local_cache_interval(&self) -> Duration {
2293        Duration::from_secs(self.values.cache.file_interval.into())
2294    }
2295
2296    /// Returns the interval in seconds in which fresh global configs should be
2297    /// fetched from  upstream.
2298    pub fn global_config_fetch_interval(&self) -> Duration {
2299        Duration::from_secs(self.values.cache.global_config_fetch_interval.into())
2300    }
2301
2302    /// Returns the path of the buffer file if the `cache.persistent_envelope_buffer.path` is configured.
2303    ///
2304    /// In case a partition with id > 0 is supplied, the filename of the envelopes path will be
2305    /// suffixed with `.{partition_id}`.
2306    pub fn spool_envelopes_path(&self, partition_id: u8) -> Option<PathBuf> {
2307        let mut path = self
2308            .values
2309            .spool
2310            .envelopes
2311            .path
2312            .as_ref()
2313            .map(|path| path.to_owned())?;
2314
2315        if partition_id == 0 {
2316            return Some(path);
2317        }
2318
2319        let file_name = path.file_name().and_then(|f| f.to_str())?;
2320        let new_file_name = format!("{file_name}.{partition_id}");
2321        path.set_file_name(new_file_name);
2322
2323        Some(path)
2324    }
2325
2326    /// The maximum size of the buffer, in bytes.
2327    pub fn spool_envelopes_max_disk_size(&self) -> usize {
2328        self.values.spool.envelopes.max_disk_size.as_bytes()
2329    }
2330
2331    /// Number of encoded envelope bytes that need to be accumulated before
2332    /// flushing one batch to disk.
2333    pub fn spool_envelopes_batch_size_bytes(&self) -> usize {
2334        self.values.spool.envelopes.batch_size_bytes.as_bytes()
2335    }
2336
2337    /// Returns the time after which we drop envelopes as a [`Duration`] object.
2338    pub fn spool_envelopes_max_age(&self) -> Duration {
2339        Duration::from_secs(self.values.spool.envelopes.max_envelope_delay_secs)
2340    }
2341
2342    /// Returns the refresh frequency for disk usage monitoring as a [`Duration`] object.
2343    pub fn spool_disk_usage_refresh_frequency_ms(&self) -> Duration {
2344        Duration::from_millis(self.values.spool.envelopes.disk_usage_refresh_frequency_ms)
2345    }
2346
2347    /// Returns the relative memory usage up to which the disk buffer will unspool envelopes.
2348    pub fn spool_max_backpressure_memory_percent(&self) -> f32 {
2349        self.values.spool.envelopes.max_backpressure_memory_percent
2350    }
2351
2352    /// Returns the number of partitions for the buffer.
2353    pub fn spool_partitions(&self) -> NonZeroU8 {
2354        self.values.spool.envelopes.partitions
2355    }
2356
2357    /// Returns the maximum size of an event payload in bytes.
2358    pub fn max_event_size(&self) -> usize {
2359        self.values.limits.max_event_size.as_bytes()
2360    }
2361
2362    /// Returns the maximum size of each attachment.
2363    pub fn max_attachment_size(&self) -> usize {
2364        self.values.limits.max_attachment_size.as_bytes()
2365    }
2366
2367    /// Returns the maximum size of a TUS upload request body.
2368    pub fn max_upload_size(&self) -> usize {
2369        self.values.limits.max_upload_size.as_bytes()
2370    }
2371
2372    /// Returns the maximum combined size of attachments or payloads containing attachments
2373    /// (minidump, unreal, standalone attachments) in bytes.
2374    pub fn max_attachments_size(&self) -> usize {
2375        self.values.limits.max_attachments_size.as_bytes()
2376    }
2377
2378    /// Returns the maximum combined size of client reports in bytes.
2379    pub fn max_client_reports_size(&self) -> usize {
2380        self.values.limits.max_client_reports_size.as_bytes()
2381    }
2382
2383    /// Returns the maximum payload size of a monitor check-in in bytes.
2384    pub fn max_check_in_size(&self) -> usize {
2385        self.values.limits.max_check_in_size.as_bytes()
2386    }
2387
2388    /// Returns the maximum payload size of a log in bytes.
2389    pub fn max_log_size(&self) -> usize {
2390        self.values.limits.max_log_size.as_bytes()
2391    }
2392
2393    /// Returns the maximum payload size of a span in bytes.
2394    pub fn max_span_size(&self) -> usize {
2395        self.values.limits.max_span_size.as_bytes()
2396    }
2397
2398    /// Returns the maximum payload size of an item container in bytes.
2399    pub fn max_container_size(&self) -> usize {
2400        self.values.limits.max_container_size.as_bytes()
2401    }
2402
2403    /// Returns the maximum payload size for logs integration items in bytes.
2404    pub fn max_logs_integration_size(&self) -> usize {
2405        // Not explicitly configured, inherited from the maximum size of a log container.
2406        self.max_container_size()
2407    }
2408
2409    /// Returns the maximum payload size for spans integration items in bytes.
2410    pub fn max_spans_integration_size(&self) -> usize {
2411        // Not explicitly configured, inherited from the maximum size of a span container.
2412        self.max_container_size()
2413    }
2414
2415    /// Returns the maximum size of an envelope payload in bytes.
2416    ///
2417    /// Individual item size limits still apply.
2418    pub fn max_envelope_size(&self) -> usize {
2419        self.values.limits.max_envelope_size.as_bytes()
2420    }
2421
2422    /// Returns the maximum number of sessions per envelope.
2423    pub fn max_session_count(&self) -> usize {
2424        self.values.limits.max_session_count
2425    }
2426
2427    /// Returns the maximum payload size of a statsd metric in bytes.
2428    pub fn max_statsd_size(&self) -> usize {
2429        self.values.limits.max_statsd_size.as_bytes()
2430    }
2431
2432    /// Returns the maximum payload size of metric buckets in bytes.
2433    pub fn max_metric_buckets_size(&self) -> usize {
2434        self.values.limits.max_metric_buckets_size.as_bytes()
2435    }
2436
2437    /// Returns the maximum payload size for general API requests.
2438    pub fn max_api_payload_size(&self) -> usize {
2439        self.values.limits.max_api_payload_size.as_bytes()
2440    }
2441
2442    /// Returns the maximum payload size for file uploads and chunks.
2443    pub fn max_api_file_upload_size(&self) -> usize {
2444        self.values.limits.max_api_file_upload_size.as_bytes()
2445    }
2446
2447    /// Returns the maximum payload size for chunks
2448    pub fn max_api_chunk_upload_size(&self) -> usize {
2449        self.values.limits.max_api_chunk_upload_size.as_bytes()
2450    }
2451
2452    /// Returns the maximum payload size for a profile
2453    pub fn max_profile_size(&self) -> usize {
2454        self.values.limits.max_profile_size.as_bytes()
2455    }
2456
2457    /// Returns the maximum payload size for a trace metric.
2458    pub fn max_trace_metric_size(&self) -> usize {
2459        self.values.limits.max_trace_metric_size.as_bytes()
2460    }
2461
2462    /// Returns the maximum payload size for a compressed replay.
2463    pub fn max_replay_compressed_size(&self) -> usize {
2464        self.values.limits.max_replay_compressed_size.as_bytes()
2465    }
2466
2467    /// Returns the maximum payload size for an uncompressed replay.
2468    pub fn max_replay_uncompressed_size(&self) -> usize {
2469        self.values.limits.max_replay_uncompressed_size.as_bytes()
2470    }
2471
2472    /// Returns the maximum message size for an uncompressed replay.
2473    ///
2474    /// This is greater than max_replay_compressed_size because
2475    /// it can include additional metadata about the replay in
2476    /// addition to the recording.
2477    pub fn max_replay_message_size(&self) -> usize {
2478        self.values.limits.max_replay_message_size.as_bytes()
2479    }
2480
2481    /// Returns the maximum number of active requests
2482    pub fn max_concurrent_requests(&self) -> usize {
2483        self.values.limits.max_concurrent_requests
2484    }
2485
2486    /// Returns the maximum number of active queries
2487    pub fn max_concurrent_queries(&self) -> usize {
2488        self.values.limits.max_concurrent_queries
2489    }
2490
2491    /// Returns the maximum combined size of keys of invalid attributes.
2492    pub fn max_removed_attribute_key_size(&self) -> usize {
2493        self.values.limits.max_removed_attribute_key_size.as_bytes()
2494    }
2495
2496    /// The maximum number of seconds a query is allowed to take across retries.
2497    pub fn query_timeout(&self) -> Duration {
2498        Duration::from_secs(self.values.limits.query_timeout)
2499    }
2500
2501    /// The maximum number of seconds to wait for pending envelopes after receiving a shutdown
2502    /// signal.
2503    pub fn shutdown_timeout(&self) -> Duration {
2504        Duration::from_secs(self.values.limits.shutdown_timeout)
2505    }
2506
2507    /// Returns the server keep-alive timeout in seconds.
2508    ///
2509    /// By default keep alive is set to a 5 seconds.
2510    pub fn keepalive_timeout(&self) -> Duration {
2511        Duration::from_secs(self.values.limits.keepalive_timeout)
2512    }
2513
2514    /// Returns the server idle timeout in seconds.
2515    pub fn idle_timeout(&self) -> Option<Duration> {
2516        self.values.limits.idle_timeout.map(Duration::from_secs)
2517    }
2518
2519    /// Returns the maximum connections.
2520    pub fn max_connections(&self) -> Option<usize> {
2521        self.values.limits.max_connections
2522    }
2523
2524    /// TCP listen backlog to configure on Relay's listening socket.
2525    pub fn tcp_listen_backlog(&self) -> u32 {
2526        self.values.limits.tcp_listen_backlog
2527    }
2528
2529    /// Returns the number of cores to use for thread pools.
2530    pub fn cpu_concurrency(&self) -> usize {
2531        self.values.limits.max_thread_count
2532    }
2533
2534    /// Returns the number of tasks that can run concurrently in the worker pool.
2535    pub fn pool_concurrency(&self) -> usize {
2536        self.values.limits.max_pool_concurrency
2537    }
2538
2539    /// Returns the maximum size of a project config query.
2540    pub fn query_batch_size(&self) -> usize {
2541        self.values.cache.batch_size
2542    }
2543
2544    /// Get filename for static project config.
2545    pub fn project_configs_path(&self) -> PathBuf {
2546        self.path.join("projects")
2547    }
2548
2549    /// True if the Relay should do processing.
2550    pub fn processing_enabled(&self) -> bool {
2551        self.values.processing.enabled
2552    }
2553
2554    /// Level of normalization for Relay to apply to incoming data.
2555    pub fn normalization_level(&self) -> NormalizationLevel {
2556        self.values.normalization.level
2557    }
2558
2559    /// The path to the GeoIp database required for event processing.
2560    pub fn geoip_path(&self) -> Option<&Path> {
2561        self.values
2562            .geoip
2563            .path
2564            .as_deref()
2565            .or(self.values.processing.geoip_path.as_deref())
2566    }
2567
2568    /// Maximum future timestamp of ingested data.
2569    ///
2570    /// Events past this timestamp will be adjusted to `now()`. Sessions will be dropped.
2571    pub fn max_secs_in_future(&self) -> i64 {
2572        self.values.processing.max_secs_in_future.into()
2573    }
2574
2575    /// Maximum age of ingested sessions. Older sessions will be dropped.
2576    pub fn max_session_secs_in_past(&self) -> i64 {
2577        self.values.processing.max_session_secs_in_past.into()
2578    }
2579
2580    /// Configuration name and list of Kafka configuration parameters for a given topic.
2581    pub fn kafka_configs(
2582        &self,
2583        topic: KafkaTopic,
2584    ) -> Result<KafkaTopicConfig<'_>, KafkaConfigError> {
2585        self.values.processing.topics.get(topic).kafka_configs(
2586            &self.values.processing.kafka_config,
2587            &self.values.processing.secondary_kafka_configs,
2588        )
2589    }
2590
2591    /// Whether to validate the topics against Kafka.
2592    pub fn kafka_validate_topics(&self) -> bool {
2593        self.values.processing.kafka_validate_topics
2594    }
2595
2596    /// All unused but configured topic assignments.
2597    pub fn unused_topic_assignments(&self) -> &relay_kafka::Unused {
2598        &self.values.processing.topics.unused
2599    }
2600
2601    /// Configuration of the attachment upload service.
2602    pub fn upload(&self) -> &UploadServiceConfig {
2603        &self.values.processing.upload
2604    }
2605
2606    /// Redis servers to connect to for project configs, cardinality limits,
2607    /// rate limiting, and metrics metadata.
2608    pub fn redis(&self) -> Option<RedisConfigsRef<'_>> {
2609        let redis_configs = self.values.processing.redis.as_ref()?;
2610
2611        Some(build_redis_configs(
2612            redis_configs,
2613            self.cpu_concurrency() as u32,
2614            self.pool_concurrency() as u32,
2615        ))
2616    }
2617
2618    /// Chunk size of attachments in bytes.
2619    pub fn attachment_chunk_size(&self) -> usize {
2620        self.values.processing.attachment_chunk_size.as_bytes()
2621    }
2622
2623    /// Maximum metrics batch size in bytes.
2624    pub fn metrics_max_batch_size_bytes(&self) -> usize {
2625        self.values.aggregator.max_flush_bytes
2626    }
2627
2628    /// Default prefix to use when looking up project configs in Redis. This is only done when
2629    /// Relay is in processing mode.
2630    pub fn projectconfig_cache_prefix(&self) -> &str {
2631        &self.values.processing.projectconfig_cache_prefix
2632    }
2633
2634    /// Maximum rate limit to report to clients in seconds.
2635    pub fn max_rate_limit(&self) -> Option<u64> {
2636        self.values.processing.max_rate_limit.map(u32::into)
2637    }
2638
2639    /// Amount of remaining quota which is cached in memory.
2640    pub fn quota_cache_ratio(&self) -> Option<f32> {
2641        self.values.processing.quota_cache_ratio
2642    }
2643
2644    /// Maximum limit (ratio) for the in memory quota cache.
2645    pub fn quota_cache_max(&self) -> Option<f32> {
2646        self.values.processing.quota_cache_max
2647    }
2648
2649    /// Cache vacuum interval for the cardinality limiter in memory cache.
2650    ///
2651    /// The cache will scan for expired values based on this interval.
2652    pub fn cardinality_limiter_cache_vacuum_interval(&self) -> Duration {
2653        Duration::from_secs(self.values.cardinality_limiter.cache_vacuum_interval)
2654    }
2655
2656    /// Interval to refresh internal health checks.
2657    pub fn health_refresh_interval(&self) -> Duration {
2658        Duration::from_millis(self.values.health.refresh_interval_ms)
2659    }
2660
2661    /// Maximum memory watermark in bytes.
2662    pub fn health_max_memory_watermark_bytes(&self) -> u64 {
2663        self.values
2664            .health
2665            .max_memory_bytes
2666            .as_ref()
2667            .map_or(u64::MAX, |b| b.as_bytes() as u64)
2668    }
2669
2670    /// Maximum memory watermark as a percentage of maximum system memory.
2671    pub fn health_max_memory_watermark_percent(&self) -> f32 {
2672        self.values.health.max_memory_percent
2673    }
2674
2675    /// Health check probe timeout.
2676    pub fn health_probe_timeout(&self) -> Duration {
2677        Duration::from_millis(self.values.health.probe_timeout_ms)
2678    }
2679
2680    /// Refresh frequency for polling new memory stats.
2681    pub fn memory_stat_refresh_frequency_ms(&self) -> u64 {
2682        self.values.health.memory_stat_refresh_frequency_ms
2683    }
2684
2685    /// Maximum amount of COGS measurements buffered in memory.
2686    pub fn cogs_max_queue_size(&self) -> u64 {
2687        self.values.cogs.max_queue_size
2688    }
2689
2690    /// Resource ID to use for Relay COGS measurements.
2691    pub fn cogs_relay_resource_id(&self) -> &str {
2692        &self.values.cogs.relay_resource_id
2693    }
2694
2695    /// Returns configuration for the default metrics aggregator.
2696    pub fn default_aggregator_config(&self) -> &AggregatorServiceConfig {
2697        &self.values.aggregator
2698    }
2699
2700    /// Returns configuration for non-default metrics aggregator.
2701    pub fn secondary_aggregator_configs(&self) -> &Vec<ScopedAggregatorConfig> {
2702        &self.values.secondary_aggregators
2703    }
2704
2705    /// Returns aggregator config for a given metrics namespace.
2706    pub fn aggregator_config_for(&self, namespace: MetricNamespace) -> &AggregatorServiceConfig {
2707        for entry in &self.values.secondary_aggregators {
2708            if entry.condition.matches(Some(namespace)) {
2709                return &entry.config;
2710            }
2711        }
2712        &self.values.aggregator
2713    }
2714
2715    /// Return the statically configured Relays.
2716    pub fn static_relays(&self) -> &HashMap<RelayId, RelayInfo> {
2717        &self.values.auth.static_relays
2718    }
2719
2720    /// Returns the max age a signature is considered valid, in seconds.
2721    pub fn signature_max_age(&self) -> Duration {
2722        Duration::from_secs(self.values.auth.signature_max_age)
2723    }
2724
2725    /// Returns `true` if unknown items should be accepted and forwarded.
2726    pub fn accept_unknown_items(&self) -> bool {
2727        let forward = self.values.routing.accept_unknown_items;
2728        forward.unwrap_or_else(|| !self.processing_enabled())
2729    }
2730}
2731
2732impl Default for Config {
2733    fn default() -> Self {
2734        Self {
2735            values: ConfigValues::default(),
2736            credentials: None,
2737            path: PathBuf::new(),
2738        }
2739    }
2740}
2741
2742#[cfg(test)]
2743mod tests {
2744
2745    use super::*;
2746
2747    /// Regression test for renaming the envelope buffer flags.
2748    #[test]
2749    fn test_event_buffer_size() {
2750        let yaml = r###"
2751cache:
2752    event_buffer_size: 1000000
2753    event_expiry: 1800
2754"###;
2755
2756        let values: ConfigValues = serde_yaml::from_str(yaml).unwrap();
2757        assert_eq!(values.cache.envelope_buffer_size, 1_000_000);
2758        assert_eq!(values.cache.envelope_expiry, 1800);
2759    }
2760
2761    #[test]
2762    fn test_emit_outcomes() {
2763        for (serialized, deserialized) in &[
2764            ("true", EmitOutcomes::AsOutcomes),
2765            ("false", EmitOutcomes::None),
2766            ("\"as_client_reports\"", EmitOutcomes::AsClientReports),
2767        ] {
2768            let value: EmitOutcomes = serde_json::from_str(serialized).unwrap();
2769            assert_eq!(value, *deserialized);
2770            assert_eq!(serde_json::to_string(&value).unwrap(), *serialized);
2771        }
2772    }
2773
2774    #[test]
2775    fn test_emit_outcomes_invalid() {
2776        assert!(serde_json::from_str::<EmitOutcomes>("asdf").is_err());
2777    }
2778}