relay_config/
config.rs

1use std::collections::{BTreeMap, HashMap};
2use std::error::Error;
3use std::io::Write;
4use std::net::{IpAddr, SocketAddr, ToSocketAddrs};
5use std::num::NonZeroU8;
6use std::path::{Path, PathBuf};
7use std::str::FromStr;
8use std::time::Duration;
9use std::{env, fmt, fs, io};
10
11use anyhow::Context;
12use relay_auth::{PublicKey, RelayId, SecretKey, generate_key_pair, generate_relay_id};
13use relay_common::Dsn;
14use relay_kafka::{
15    ConfigError as KafkaConfigError, KafkaConfigParam, KafkaTopic, KafkaTopicConfig,
16    TopicAssignments,
17};
18use relay_metrics::MetricNamespace;
19use serde::de::{DeserializeOwned, Unexpected, Visitor};
20use serde::{Deserialize, Deserializer, Serialize, Serializer};
21use uuid::Uuid;
22
23use crate::aggregator::{AggregatorServiceConfig, ScopedAggregatorConfig};
24use crate::byte_size::ByteSize;
25use crate::upstream::UpstreamDescriptor;
26use crate::{RedisConfig, RedisConfigs, RedisConfigsRef, build_redis_configs};
27
28const DEFAULT_NETWORK_OUTAGE_GRACE_PERIOD: u64 = 10;
29
30static CONFIG_YAML_HEADER: &str = r###"# Please see the relevant documentation.
31# Performance tuning: https://docs.sentry.io/product/relay/operating-guidelines/
32# All config options: https://docs.sentry.io/product/relay/options/
33"###;
34
35/// Indicates config related errors.
36#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
37#[non_exhaustive]
38pub enum ConfigErrorKind {
39    /// Failed to open the file.
40    CouldNotOpenFile,
41    /// Failed to save a file.
42    CouldNotWriteFile,
43    /// Parsing YAML failed.
44    BadYaml,
45    /// Parsing JSON failed.
46    BadJson,
47    /// Invalid config value
48    InvalidValue,
49    /// The user attempted to run Relay with processing enabled, but uses a binary that was
50    /// compiled without the processing feature.
51    ProcessingNotAvailable,
52}
53
54impl fmt::Display for ConfigErrorKind {
55    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
56        match self {
57            Self::CouldNotOpenFile => write!(f, "could not open config file"),
58            Self::CouldNotWriteFile => write!(f, "could not write config file"),
59            Self::BadYaml => write!(f, "could not parse yaml config file"),
60            Self::BadJson => write!(f, "could not parse json config file"),
61            Self::InvalidValue => write!(f, "invalid config value"),
62            Self::ProcessingNotAvailable => write!(
63                f,
64                "was not compiled with processing, cannot enable processing"
65            ),
66        }
67    }
68}
69
70/// Defines the source of a config error
71#[derive(Debug)]
72enum ConfigErrorSource {
73    /// An error occurring independently.
74    None,
75    /// An error originating from a configuration file.
76    File(PathBuf),
77    /// An error originating in a field override (an env var, or a CLI parameter).
78    FieldOverride(String),
79}
80
81impl Default for ConfigErrorSource {
82    fn default() -> Self {
83        Self::None
84    }
85}
86
87impl fmt::Display for ConfigErrorSource {
88    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
89        match self {
90            ConfigErrorSource::None => Ok(()),
91            ConfigErrorSource::File(file_name) => {
92                write!(f, " (file {})", file_name.display())
93            }
94            ConfigErrorSource::FieldOverride(name) => write!(f, " (field {name})"),
95        }
96    }
97}
98
99/// Indicates config related errors.
100#[derive(Debug)]
101pub struct ConfigError {
102    source: ConfigErrorSource,
103    kind: ConfigErrorKind,
104}
105
106impl ConfigError {
107    #[inline]
108    fn new(kind: ConfigErrorKind) -> Self {
109        Self {
110            source: ConfigErrorSource::None,
111            kind,
112        }
113    }
114
115    #[inline]
116    fn field(field: &'static str) -> Self {
117        Self {
118            source: ConfigErrorSource::FieldOverride(field.to_owned()),
119            kind: ConfigErrorKind::InvalidValue,
120        }
121    }
122
123    #[inline]
124    fn file(kind: ConfigErrorKind, p: impl AsRef<Path>) -> Self {
125        Self {
126            source: ConfigErrorSource::File(p.as_ref().to_path_buf()),
127            kind,
128        }
129    }
130
131    /// Returns the error kind of the error.
132    pub fn kind(&self) -> ConfigErrorKind {
133        self.kind
134    }
135}
136
137impl fmt::Display for ConfigError {
138    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
139        write!(f, "{}{}", self.kind(), self.source)
140    }
141}
142
143impl Error for ConfigError {}
144
145enum ConfigFormat {
146    Yaml,
147    Json,
148}
149
150impl ConfigFormat {
151    pub fn extension(&self) -> &'static str {
152        match self {
153            ConfigFormat::Yaml => "yml",
154            ConfigFormat::Json => "json",
155        }
156    }
157}
158
159trait ConfigObject: DeserializeOwned + Serialize {
160    /// The format in which to serialize this configuration.
161    fn format() -> ConfigFormat;
162
163    /// The basename of the config file.
164    fn name() -> &'static str;
165
166    /// The full filename of the config file, including the file extension.
167    fn path(base: &Path) -> PathBuf {
168        base.join(format!("{}.{}", Self::name(), Self::format().extension()))
169    }
170
171    /// Loads the config file from a file within the given directory location.
172    fn load(base: &Path) -> anyhow::Result<Self> {
173        let path = Self::path(base);
174
175        let f = fs::File::open(&path)
176            .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotOpenFile, &path))?;
177        let f = io::BufReader::new(f);
178
179        let mut source = serde_vars::EnvSource::default();
180        match Self::format() {
181            ConfigFormat::Yaml => {
182                serde_vars::deserialize(serde_yaml::Deserializer::from_reader(f), &mut source)
183                    .with_context(|| ConfigError::file(ConfigErrorKind::BadYaml, &path))
184            }
185            ConfigFormat::Json => {
186                serde_vars::deserialize(&mut serde_json::Deserializer::from_reader(f), &mut source)
187                    .with_context(|| ConfigError::file(ConfigErrorKind::BadJson, &path))
188            }
189        }
190    }
191
192    /// Writes the configuration to a file within the given directory location.
193    fn save(&self, base: &Path) -> anyhow::Result<()> {
194        let path = Self::path(base);
195        let mut options = fs::OpenOptions::new();
196        options.write(true).truncate(true).create(true);
197
198        // Remove all non-user permissions for the newly created file
199        #[cfg(unix)]
200        {
201            use std::os::unix::fs::OpenOptionsExt;
202            options.mode(0o600);
203        }
204
205        let mut f = options
206            .open(&path)
207            .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path))?;
208
209        match Self::format() {
210            ConfigFormat::Yaml => {
211                f.write_all(CONFIG_YAML_HEADER.as_bytes())?;
212                serde_yaml::to_writer(&mut f, self)
213                    .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path))?
214            }
215            ConfigFormat::Json => serde_json::to_writer_pretty(&mut f, self)
216                .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path))?,
217        }
218
219        f.write_all(b"\n").ok();
220
221        Ok(())
222    }
223}
224
225/// Structure used to hold information about configuration overrides via
226/// CLI parameters or environment variables
227#[derive(Debug, Default)]
228pub struct OverridableConfig {
229    /// The operation mode of this relay.
230    pub mode: Option<String>,
231    /// The instance type of this relay.
232    pub instance: Option<String>,
233    /// The log level of this relay.
234    pub log_level: Option<String>,
235    /// The log format of this relay.
236    pub log_format: Option<String>,
237    /// The upstream relay or sentry instance.
238    pub upstream: Option<String>,
239    /// Alternate upstream provided through a Sentry DSN. Key and project will be ignored.
240    pub upstream_dsn: Option<String>,
241    /// The host the relay should bind to (network interface).
242    pub host: Option<String>,
243    /// The port to bind for the unencrypted relay HTTP server.
244    pub port: Option<String>,
245    /// "true" if processing is enabled "false" otherwise
246    pub processing: Option<String>,
247    /// the kafka bootstrap.servers configuration string
248    pub kafka_url: Option<String>,
249    /// the redis server url
250    pub redis_url: Option<String>,
251    /// The globally unique ID of the relay.
252    pub id: Option<String>,
253    /// The secret key of the relay
254    pub secret_key: Option<String>,
255    /// The public key of the relay
256    pub public_key: Option<String>,
257    /// Outcome source
258    pub outcome_source: Option<String>,
259    /// shutdown timeout
260    pub shutdown_timeout: Option<String>,
261    /// Server name reported in the Sentry SDK.
262    pub server_name: Option<String>,
263}
264
265/// The relay credentials
266#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
267pub struct Credentials {
268    /// The secret key of the relay
269    pub secret_key: SecretKey,
270    /// The public key of the relay
271    pub public_key: PublicKey,
272    /// The globally unique ID of the relay.
273    pub id: RelayId,
274}
275
276impl Credentials {
277    /// Generates new random credentials.
278    pub fn generate() -> Self {
279        relay_log::info!("generating new relay credentials");
280        let (sk, pk) = generate_key_pair();
281        Self {
282            secret_key: sk,
283            public_key: pk,
284            id: generate_relay_id(),
285        }
286    }
287
288    /// Serializes this configuration to JSON.
289    pub fn to_json_string(&self) -> anyhow::Result<String> {
290        serde_json::to_string(self)
291            .with_context(|| ConfigError::new(ConfigErrorKind::CouldNotWriteFile))
292    }
293}
294
295impl ConfigObject for Credentials {
296    fn format() -> ConfigFormat {
297        ConfigFormat::Json
298    }
299    fn name() -> &'static str {
300        "credentials"
301    }
302}
303
304/// Information on a downstream Relay.
305#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
306#[serde(rename_all = "camelCase")]
307pub struct RelayInfo {
308    /// The public key that this Relay uses to authenticate and sign requests.
309    pub public_key: PublicKey,
310
311    /// Marks an internal relay that has privileged access to more project configuration.
312    #[serde(default)]
313    pub internal: bool,
314}
315
316impl RelayInfo {
317    /// Creates a new RelayInfo
318    pub fn new(public_key: PublicKey) -> Self {
319        Self {
320            public_key,
321            internal: false,
322        }
323    }
324}
325
326/// The operation mode of a relay.
327#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
328#[serde(rename_all = "camelCase")]
329pub enum RelayMode {
330    /// This relay acts as a proxy for all requests and events.
331    ///
332    /// Events are normalized and rate limits from the upstream are enforced, but the relay will not
333    /// fetch project configurations from the upstream or perform PII stripping. All events are
334    /// accepted unless overridden on the file system.
335    Proxy,
336
337    /// Project configurations are managed by the upstream.
338    ///
339    /// Project configurations are always fetched from the upstream, unless they are statically
340    /// overridden in the file system. This relay must be allowed in the upstream Sentry. This is
341    /// only possible, if the upstream is Sentry directly, or another managed Relay.
342    Managed,
343}
344
345impl<'de> Deserialize<'de> for RelayMode {
346    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
347    where
348        D: Deserializer<'de>,
349    {
350        let s = String::deserialize(deserializer)?;
351        match s.as_str() {
352            "proxy" => Ok(RelayMode::Proxy),
353            "managed" => Ok(RelayMode::Managed),
354            "static" => Err(serde::de::Error::custom(
355                "Relay mode 'static' has been removed. Please use 'managed' or 'proxy' instead.",
356            )),
357            other => Err(serde::de::Error::unknown_variant(
358                other,
359                &["proxy", "managed"],
360            )),
361        }
362    }
363}
364
365impl fmt::Display for RelayMode {
366    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
367        match self {
368            RelayMode::Proxy => write!(f, "proxy"),
369            RelayMode::Managed => write!(f, "managed"),
370        }
371    }
372}
373
374/// The instance type of Relay.
375#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize)]
376#[serde(rename_all = "camelCase")]
377pub enum RelayInstance {
378    /// This Relay is run as a default instance.
379    Default,
380
381    /// This Relay is run as a canary instance where experiments can be run.
382    Canary,
383}
384
385impl RelayInstance {
386    /// Returns `true` if the [`RelayInstance`] is of type [`RelayInstance::Canary`].
387    pub fn is_canary(&self) -> bool {
388        matches!(self, RelayInstance::Canary)
389    }
390}
391
392impl fmt::Display for RelayInstance {
393    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
394        match self {
395            RelayInstance::Default => write!(f, "default"),
396            RelayInstance::Canary => write!(f, "canary"),
397        }
398    }
399}
400
401impl FromStr for RelayInstance {
402    type Err = fmt::Error;
403
404    fn from_str(s: &str) -> Result<Self, Self::Err> {
405        match s {
406            "canary" => Ok(RelayInstance::Canary),
407            _ => Ok(RelayInstance::Default),
408        }
409    }
410}
411
412/// Error returned when parsing an invalid [`RelayMode`].
413#[derive(Clone, Copy, Debug, Eq, PartialEq)]
414pub struct ParseRelayModeError;
415
416impl fmt::Display for ParseRelayModeError {
417    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
418        write!(f, "Relay mode must be one of: managed or proxy")
419    }
420}
421
422impl Error for ParseRelayModeError {}
423
424impl FromStr for RelayMode {
425    type Err = ParseRelayModeError;
426
427    fn from_str(s: &str) -> Result<Self, Self::Err> {
428        match s {
429            "proxy" => Ok(RelayMode::Proxy),
430            "managed" => Ok(RelayMode::Managed),
431            _ => Err(ParseRelayModeError),
432        }
433    }
434}
435
436/// Returns `true` if this value is equal to `Default::default()`.
437fn is_default<T: Default + PartialEq>(t: &T) -> bool {
438    *t == T::default()
439}
440
441/// Checks if we are running in docker.
442fn is_docker() -> bool {
443    if fs::metadata("/.dockerenv").is_ok() {
444        return true;
445    }
446
447    fs::read_to_string("/proc/self/cgroup").is_ok_and(|s| s.contains("/docker"))
448}
449
450/// Default value for the "bind" configuration.
451fn default_host() -> IpAddr {
452    if is_docker() {
453        // Docker images rely on this service being exposed
454        "0.0.0.0".parse().unwrap()
455    } else {
456        "127.0.0.1".parse().unwrap()
457    }
458}
459
460/// Controls responses from the readiness health check endpoint based on authentication.
461///
462/// Independent of the the readiness condition, shutdown always switches Relay into unready state.
463#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize)]
464#[serde(rename_all = "lowercase")]
465pub enum ReadinessCondition {
466    /// (default) Relay is ready when authenticated and connected to the upstream.
467    ///
468    /// Before authentication has succeeded and during network outages, Relay responds as not ready.
469    /// Relay reauthenticates based on the `http.auth_interval` parameter. During reauthentication,
470    /// Relay remains ready until authentication fails.
471    ///
472    /// Authentication is only required for Relays in managed mode. Other Relays will only check for
473    /// network outages.
474    Authenticated,
475    /// Relay reports readiness regardless of the authentication and networking state.
476    Always,
477}
478
479impl Default for ReadinessCondition {
480    fn default() -> Self {
481        Self::Authenticated
482    }
483}
484
485/// Relay specific configuration values.
486#[derive(Serialize, Deserialize, Debug)]
487#[serde(default)]
488pub struct Relay {
489    /// The operation mode of this relay.
490    pub mode: RelayMode,
491    /// The instance type of this relay.
492    pub instance: RelayInstance,
493    /// The upstream relay or sentry instance.
494    pub upstream: UpstreamDescriptor<'static>,
495    /// The host the relay should bind to (network interface).
496    pub host: IpAddr,
497    /// The port to bind for the unencrypted relay HTTP server.
498    pub port: u16,
499    /// Optional port to bind for the encrypted relay HTTPS server.
500    #[serde(skip_serializing)]
501    pub tls_port: Option<u16>,
502    /// The path to the identity (DER-encoded PKCS12) to use for TLS.
503    #[serde(skip_serializing)]
504    pub tls_identity_path: Option<PathBuf>,
505    /// Password for the PKCS12 archive.
506    #[serde(skip_serializing)]
507    pub tls_identity_password: Option<String>,
508    /// Always override project IDs from the URL and DSN with the identifier used at the upstream.
509    ///
510    /// Enable this setting for Relays used to redirect traffic to a migrated Sentry instance.
511    /// Validation of project identifiers can be safely skipped in these cases.
512    #[serde(skip_serializing_if = "is_default")]
513    pub override_project_ids: bool,
514}
515
516impl Default for Relay {
517    fn default() -> Self {
518        Relay {
519            mode: RelayMode::Managed,
520            instance: RelayInstance::Default,
521            upstream: "https://sentry.io/".parse().unwrap(),
522            host: default_host(),
523            port: 3000,
524            tls_port: None,
525            tls_identity_path: None,
526            tls_identity_password: None,
527            override_project_ids: false,
528        }
529    }
530}
531
532/// Control the metrics.
533#[derive(Serialize, Deserialize, Debug)]
534#[serde(default)]
535pub struct Metrics {
536    /// Hostname and port of the statsd server.
537    ///
538    /// Defaults to `None`.
539    pub statsd: Option<String>,
540    /// Common prefix that should be added to all metrics.
541    ///
542    /// Defaults to `"sentry.relay"`.
543    pub prefix: String,
544    /// Default tags to apply to all metrics.
545    pub default_tags: BTreeMap<String, String>,
546    /// Tag name to report the hostname to for each metric. Defaults to not sending such a tag.
547    pub hostname_tag: Option<String>,
548    /// Global sample rate for all emitted metrics between `0.0` and `1.0`.
549    ///
550    /// For example, a value of `0.3` means that only 30% of the emitted metrics will be sent.
551    /// Defaults to `1.0` (100%).
552    pub sample_rate: f32,
553    /// Interval for periodic metrics emitted from Relay.
554    ///
555    /// Setting it to `0` seconds disables the periodic metrics.
556    /// Defaults to 5 seconds.
557    pub periodic_secs: u64,
558    /// Whether local metric aggregation using statdsproxy should be enabled.
559    ///
560    /// Defaults to `true`.
561    pub aggregate: bool,
562    /// Allows emission of metrics with high cardinality tags.
563    ///
564    /// High cardinality tags are dynamic values attached to metrics,
565    /// such as project IDs. When enabled, these tags will be included
566    /// in the emitted metrics. When disabled, the tags will be omitted.
567    ///
568    /// Defaults to `false`.
569    pub allow_high_cardinality_tags: bool,
570}
571
572impl Default for Metrics {
573    fn default() -> Self {
574        Metrics {
575            statsd: None,
576            prefix: "sentry.relay".into(),
577            default_tags: BTreeMap::new(),
578            hostname_tag: None,
579            sample_rate: 1.0,
580            periodic_secs: 5,
581            aggregate: true,
582            allow_high_cardinality_tags: false,
583        }
584    }
585}
586
587/// Controls various limits
588#[derive(Serialize, Deserialize, Debug)]
589#[serde(default)]
590pub struct Limits {
591    /// How many requests can be sent concurrently from Relay to the upstream before Relay starts
592    /// buffering.
593    pub max_concurrent_requests: usize,
594    /// How many queries can be sent concurrently from Relay to the upstream before Relay starts
595    /// buffering.
596    ///
597    /// The concurrency of queries is additionally constrained by `max_concurrent_requests`.
598    pub max_concurrent_queries: usize,
599    /// The maximum payload size for events.
600    pub max_event_size: ByteSize,
601    /// The maximum size for each attachment.
602    pub max_attachment_size: ByteSize,
603    /// The maximum combined size for all attachments in an envelope or request.
604    pub max_attachments_size: ByteSize,
605    /// The maximum combined size for all client reports in an envelope or request.
606    pub max_client_reports_size: ByteSize,
607    /// The maximum payload size for a monitor check-in.
608    pub max_check_in_size: ByteSize,
609    /// The maximum payload size for an entire envelopes. Individual limits still apply.
610    pub max_envelope_size: ByteSize,
611    /// The maximum number of session items per envelope.
612    pub max_session_count: usize,
613    /// The maximum number of standalone span items per envelope.
614    pub max_span_count: usize,
615    /// The maximum number of log items per envelope.
616    pub max_log_count: usize,
617    /// The maximum payload size for general API requests.
618    pub max_api_payload_size: ByteSize,
619    /// The maximum payload size for file uploads and chunks.
620    pub max_api_file_upload_size: ByteSize,
621    /// The maximum payload size for chunks
622    pub max_api_chunk_upload_size: ByteSize,
623    /// The maximum payload size for a profile
624    pub max_profile_size: ByteSize,
625    /// The maximum payload size for a span.
626    pub max_log_size: ByteSize,
627    /// The maximum payload size for a span.
628    pub max_span_size: ByteSize,
629    /// The maximum payload size for an item container.
630    pub max_container_size: ByteSize,
631    /// The maximum payload size for a statsd metric.
632    pub max_statsd_size: ByteSize,
633    /// The maximum payload size for metric buckets.
634    pub max_metric_buckets_size: ByteSize,
635    /// The maximum payload size for a compressed replay.
636    pub max_replay_compressed_size: ByteSize,
637    /// The maximum payload size for an uncompressed replay.
638    #[serde(alias = "max_replay_size")]
639    max_replay_uncompressed_size: ByteSize,
640    /// The maximum size for a replay recording Kafka message.
641    pub max_replay_message_size: ByteSize,
642    /// The maximum number of threads to spawn for CPU and web work, each.
643    ///
644    /// The total number of threads spawned will roughly be `2 * max_thread_count`. Defaults to
645    /// the number of logical CPU cores on the host.
646    pub max_thread_count: usize,
647    /// Controls the maximum concurrency of each worker thread.
648    ///
649    /// Increasing the concurrency, can lead to a better utilization of worker threads by
650    /// increasing the amount of I/O done concurrently.
651    //
652    /// Currently has no effect on defaults to `1`.
653    pub max_pool_concurrency: usize,
654    /// The maximum number of seconds a query is allowed to take across retries. Individual requests
655    /// have lower timeouts. Defaults to 30 seconds.
656    pub query_timeout: u64,
657    /// The maximum number of seconds to wait for pending envelopes after receiving a shutdown
658    /// signal.
659    pub shutdown_timeout: u64,
660    /// Server keep-alive timeout in seconds.
661    ///
662    /// By default, keep-alive is set to 5 seconds.
663    pub keepalive_timeout: u64,
664    /// Server idle timeout in seconds.
665    ///
666    /// The idle timeout limits the amount of time a connection is kept open without activity.
667    /// Setting this too short may abort connections before Relay is able to send a response.
668    ///
669    /// By default there is no idle timeout.
670    pub idle_timeout: Option<u64>,
671    /// Sets the maximum number of concurrent connections.
672    ///
673    /// Upon reaching the limit, the server will stop accepting connections.
674    ///
675    /// By default there is no limit.
676    pub max_connections: Option<usize>,
677    /// The TCP listen backlog.
678    ///
679    /// Configures the TCP listen backlog for the listening socket of Relay.
680    /// See [`man listen(2)`](https://man7.org/linux/man-pages/man2/listen.2.html)
681    /// for a more detailed description of the listen backlog.
682    ///
683    /// Defaults to `1024`, a value [google has been using for a long time](https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/commit/?id=19f92a030ca6d772ab44b22ee6a01378a8cb32d4).
684    pub tcp_listen_backlog: u32,
685}
686
687impl Default for Limits {
688    fn default() -> Self {
689        Limits {
690            max_concurrent_requests: 100,
691            max_concurrent_queries: 5,
692            max_event_size: ByteSize::mebibytes(1),
693            max_attachment_size: ByteSize::mebibytes(100),
694            max_attachments_size: ByteSize::mebibytes(100),
695            max_client_reports_size: ByteSize::kibibytes(4),
696            max_check_in_size: ByteSize::kibibytes(100),
697            max_envelope_size: ByteSize::mebibytes(100),
698            max_session_count: 100,
699            max_span_count: 1000,
700            max_log_count: 1000,
701            max_api_payload_size: ByteSize::mebibytes(20),
702            max_api_file_upload_size: ByteSize::mebibytes(40),
703            max_api_chunk_upload_size: ByteSize::mebibytes(100),
704            max_profile_size: ByteSize::mebibytes(50),
705            max_log_size: ByteSize::mebibytes(1),
706            max_span_size: ByteSize::mebibytes(1),
707            max_container_size: ByteSize::mebibytes(3),
708            max_statsd_size: ByteSize::mebibytes(1),
709            max_metric_buckets_size: ByteSize::mebibytes(1),
710            max_replay_compressed_size: ByteSize::mebibytes(10),
711            max_replay_uncompressed_size: ByteSize::mebibytes(100),
712            max_replay_message_size: ByteSize::mebibytes(15),
713            max_thread_count: num_cpus::get(),
714            max_pool_concurrency: 1,
715            query_timeout: 30,
716            shutdown_timeout: 10,
717            keepalive_timeout: 5,
718            idle_timeout: None,
719            max_connections: None,
720            tcp_listen_backlog: 1024,
721        }
722    }
723}
724
725/// Controls traffic steering.
726#[derive(Debug, Default, Deserialize, Serialize)]
727#[serde(default)]
728pub struct Routing {
729    /// Accept and forward unknown Envelope items to the upstream.
730    ///
731    /// Forwarding unknown items should be enabled in most cases to allow proxying traffic for newer
732    /// SDK versions. The upstream in Sentry makes the final decision on which items are valid. If
733    /// this is disabled, just the unknown items are removed from Envelopes, and the rest is
734    /// processed as usual.
735    ///
736    /// Defaults to `true` for all Relay modes other than processing mode. In processing mode, this
737    /// is disabled by default since the item cannot be handled.
738    pub accept_unknown_items: Option<bool>,
739}
740
741/// Http content encoding for both incoming and outgoing web requests.
742#[derive(Clone, Copy, Debug, Default, Deserialize, Serialize)]
743#[serde(rename_all = "lowercase")]
744pub enum HttpEncoding {
745    /// Identity function without no compression.
746    ///
747    /// This is the default encoding and does not require the presence of the `content-encoding`
748    /// HTTP header.
749    #[default]
750    Identity,
751    /// Compression using a [zlib](https://en.wikipedia.org/wiki/Zlib) structure with
752    /// [deflate](https://en.wikipedia.org/wiki/DEFLATE) encoding.
753    ///
754    /// These structures are defined in [RFC 1950](https://datatracker.ietf.org/doc/html/rfc1950)
755    /// and [RFC 1951](https://datatracker.ietf.org/doc/html/rfc1951).
756    Deflate,
757    /// A format using the [Lempel-Ziv coding](https://en.wikipedia.org/wiki/LZ77_and_LZ78#LZ77)
758    /// (LZ77), with a 32-bit CRC.
759    ///
760    /// This is the original format of the UNIX gzip program. The HTTP/1.1 standard also recommends
761    /// that the servers supporting this content-encoding should recognize `x-gzip` as an alias, for
762    /// compatibility purposes.
763    Gzip,
764    /// A format using the [Brotli](https://en.wikipedia.org/wiki/Brotli) algorithm.
765    Br,
766    /// A format using the [Zstd](https://en.wikipedia.org/wiki/Zstd) compression algorithm.
767    Zstd,
768}
769
770impl HttpEncoding {
771    /// Parses a [`HttpEncoding`] from its `content-encoding` header value.
772    pub fn parse(str: &str) -> Self {
773        let str = str.trim();
774        if str.eq_ignore_ascii_case("zstd") {
775            Self::Zstd
776        } else if str.eq_ignore_ascii_case("br") {
777            Self::Br
778        } else if str.eq_ignore_ascii_case("gzip") || str.eq_ignore_ascii_case("x-gzip") {
779            Self::Gzip
780        } else if str.eq_ignore_ascii_case("deflate") {
781            Self::Deflate
782        } else {
783            Self::Identity
784        }
785    }
786
787    /// Returns the value for the `content-encoding` HTTP header.
788    ///
789    /// Returns `None` for [`Identity`](Self::Identity), and `Some` for other encodings.
790    pub fn name(&self) -> Option<&'static str> {
791        match self {
792            Self::Identity => None,
793            Self::Deflate => Some("deflate"),
794            Self::Gzip => Some("gzip"),
795            Self::Br => Some("br"),
796            Self::Zstd => Some("zstd"),
797        }
798    }
799}
800
801/// Controls authentication with upstream.
802#[derive(Serialize, Deserialize, Debug)]
803#[serde(default)]
804pub struct Http {
805    /// Timeout for upstream requests in seconds.
806    ///
807    /// This timeout covers the time from sending the request until receiving response headers.
808    /// Neither the connection process and handshakes, nor reading the response body is covered in
809    /// this timeout.
810    pub timeout: u32,
811    /// Timeout for establishing connections with the upstream in seconds.
812    ///
813    /// This includes SSL handshakes. Relay reuses connections when the upstream supports connection
814    /// keep-alive. Connections are retained for a maximum 75 seconds, or 15 seconds of inactivity.
815    pub connection_timeout: u32,
816    /// Maximum interval between failed request retries in seconds.
817    pub max_retry_interval: u32,
818    /// The custom HTTP Host header to send to the upstream.
819    pub host_header: Option<String>,
820    /// The interval in seconds at which Relay attempts to reauthenticate with the upstream server.
821    ///
822    /// Re-authentication happens even when Relay is idle. If authentication fails, Relay reverts
823    /// back into startup mode and tries to establish a connection. During this time, incoming
824    /// envelopes will be buffered.
825    ///
826    /// Defaults to `600` (10 minutes).
827    pub auth_interval: Option<u64>,
828    /// The maximum time of experiencing uninterrupted network failures until Relay considers that
829    /// it has encountered a network outage in seconds.
830    ///
831    /// During a network outage relay will try to reconnect and will buffer all upstream messages
832    /// until it manages to reconnect.
833    pub outage_grace_period: u64,
834    /// The time Relay waits before retrying an upstream request, in seconds.
835    ///
836    /// This time is only used before going into a network outage mode.
837    pub retry_delay: u64,
838    /// The interval in seconds for continued failed project fetches at which Relay will error.
839    ///
840    /// A successful fetch resets this interval. Relay does nothing during long
841    /// times without emitting requests.
842    pub project_failure_interval: u64,
843    /// Content encoding to apply to upstream store requests.
844    ///
845    /// By default, Relay applies `zstd` content encoding to compress upstream requests. Compression
846    /// can be disabled to reduce CPU consumption, but at the expense of increased network traffic.
847    ///
848    /// This setting applies to all store requests of SDK data, including events, transactions,
849    /// envelopes and sessions. At the moment, this does not apply to Relay's internal queries.
850    ///
851    /// Available options are:
852    ///
853    ///  - `identity`: Disables compression.
854    ///  - `deflate`: Compression using a zlib header with deflate encoding.
855    ///  - `gzip` (default): Compression using gzip.
856    ///  - `br`: Compression using the brotli algorithm.
857    ///  - `zstd`: Compression using the zstd algorithm.
858    pub encoding: HttpEncoding,
859    /// Submit metrics globally through a shared endpoint.
860    ///
861    /// As opposed to regular envelopes which are sent to an endpoint inferred from the project's
862    /// DSN, this submits metrics to the global endpoint with Relay authentication.
863    ///
864    /// This option does not have any effect on processing mode.
865    pub global_metrics: bool,
866}
867
868impl Default for Http {
869    fn default() -> Self {
870        Http {
871            timeout: 5,
872            connection_timeout: 3,
873            max_retry_interval: 60, // 1 minute
874            host_header: None,
875            auth_interval: Some(600), // 10 minutes
876            outage_grace_period: DEFAULT_NETWORK_OUTAGE_GRACE_PERIOD,
877            retry_delay: default_retry_delay(),
878            project_failure_interval: default_project_failure_interval(),
879            encoding: HttpEncoding::Zstd,
880            global_metrics: false,
881        }
882    }
883}
884
885/// Default for unavailable upstream retry period, 1s.
886fn default_retry_delay() -> u64 {
887    1
888}
889
890/// Default for project failure interval, 90s.
891fn default_project_failure_interval() -> u64 {
892    90
893}
894
895/// Default for max disk size, 500 MB.
896fn spool_envelopes_max_disk_size() -> ByteSize {
897    ByteSize::mebibytes(500)
898}
899
900/// Default number of encoded envelope bytes to cache before writing to disk.
901fn spool_envelopes_batch_size_bytes() -> ByteSize {
902    ByteSize::kibibytes(10)
903}
904
905fn spool_envelopes_max_envelope_delay_secs() -> u64 {
906    24 * 60 * 60
907}
908
909/// Default refresh frequency in ms for the disk usage monitoring.
910fn spool_disk_usage_refresh_frequency_ms() -> u64 {
911    100
912}
913
914/// Default bounded buffer size for handling backpressure.
915fn spool_max_backpressure_envelopes() -> usize {
916    500
917}
918
919/// Default max memory usage for unspooling.
920fn spool_max_backpressure_memory_percent() -> f32 {
921    0.9
922}
923
924/// Default number of partitions for the buffer.
925fn spool_envelopes_partitions() -> NonZeroU8 {
926    NonZeroU8::new(1).unwrap()
927}
928
929/// Persistent buffering configuration for incoming envelopes.
930#[derive(Debug, Serialize, Deserialize)]
931pub struct EnvelopeSpool {
932    /// The path of the SQLite database file(s) which persist the data.
933    ///
934    /// Based on the number of partitions, more database files will be created within the same path.
935    ///
936    /// If not set, the envelopes will be buffered in memory.
937    pub path: Option<PathBuf>,
938    /// The maximum size of the buffer to keep, in bytes.
939    ///
940    /// When the on-disk buffer reaches this size, new envelopes will be dropped.
941    ///
942    /// Defaults to 500MB.
943    #[serde(default = "spool_envelopes_max_disk_size")]
944    pub max_disk_size: ByteSize,
945    /// Size of the batch of compressed envelopes that are spooled to disk at once.
946    ///
947    /// Note that this is the size after which spooling will be triggered but it does not guarantee
948    /// that exactly this size will be spooled, it can be greater or equal.
949    ///
950    /// Defaults to 10 KiB.
951    #[serde(default = "spool_envelopes_batch_size_bytes")]
952    pub batch_size_bytes: ByteSize,
953    /// Maximum time between receiving the envelope and processing it.
954    ///
955    /// When envelopes spend too much time in the buffer (e.g. because their project cannot be loaded),
956    /// they are dropped.
957    ///
958    /// Defaults to 24h.
959    #[serde(default = "spool_envelopes_max_envelope_delay_secs")]
960    pub max_envelope_delay_secs: u64,
961    /// The refresh frequency in ms of how frequently disk usage is updated by querying SQLite
962    /// internal page stats.
963    ///
964    /// Defaults to 100ms.
965    #[serde(default = "spool_disk_usage_refresh_frequency_ms")]
966    pub disk_usage_refresh_frequency_ms: u64,
967    /// The amount of envelopes that the envelope buffer can push to its output queue.
968    ///
969    /// Defaults to 500.
970    #[serde(default = "spool_max_backpressure_envelopes")]
971    pub max_backpressure_envelopes: usize,
972    /// The relative memory usage above which the buffer service will stop dequeueing envelopes.
973    ///
974    /// Only applies when [`Self::path`] is set.
975    ///
976    /// This value should be lower than [`Health::max_memory_percent`] to prevent flip-flopping.
977    ///
978    /// Warning: This threshold can cause the buffer service to deadlock when the buffer consumes
979    /// excessive memory (as influenced by [`Self::batch_size_bytes`]).
980    ///
981    /// This scenario arises when the buffer stops spooling due to reaching the
982    /// [`Self::max_backpressure_memory_percent`] limit, but the batch threshold for spooling
983    /// ([`Self::batch_size_bytes`]) is never reached. As a result, no data is spooled, memory usage
984    /// continues to grow, and the system becomes deadlocked.
985    ///
986    /// ### Example
987    /// Suppose the system has 1GB of available memory and is configured to spool only after
988    /// accumulating 10GB worth of envelopes. If Relay consumes 900MB of memory, it will stop
989    /// unspooling due to reaching the [`Self::max_backpressure_memory_percent`] threshold.
990    ///
991    /// However, because the buffer hasn't accumulated the 10GB needed to trigger spooling,
992    /// no data will be offloaded. Memory usage keeps increasing until it hits the
993    /// [`Health::max_memory_percent`] threshold, e.g., at 950MB. At this point:
994    ///
995    /// - No more envelopes are accepted.
996    /// - The buffer remains stuck, as unspooling won’t resume until memory drops below 900MB which
997    ///   will not happen.
998    /// - A deadlock occurs, with the system unable to recover without manual intervention.
999    ///
1000    /// Defaults to 90% (5% less than max memory).
1001    #[serde(default = "spool_max_backpressure_memory_percent")]
1002    pub max_backpressure_memory_percent: f32,
1003    /// Number of partitions of the buffer.
1004    ///
1005    /// A partition is a separate instance of the buffer which has its own isolated queue, stacks
1006    /// and other resources.
1007    ///
1008    /// Defaults to 1.
1009    #[serde(default = "spool_envelopes_partitions")]
1010    pub partitions: NonZeroU8,
1011}
1012
1013impl Default for EnvelopeSpool {
1014    fn default() -> Self {
1015        Self {
1016            path: None,
1017            max_disk_size: spool_envelopes_max_disk_size(),
1018            batch_size_bytes: spool_envelopes_batch_size_bytes(),
1019            max_envelope_delay_secs: spool_envelopes_max_envelope_delay_secs(),
1020            disk_usage_refresh_frequency_ms: spool_disk_usage_refresh_frequency_ms(),
1021            max_backpressure_envelopes: spool_max_backpressure_envelopes(),
1022            max_backpressure_memory_percent: spool_max_backpressure_memory_percent(),
1023            partitions: spool_envelopes_partitions(),
1024        }
1025    }
1026}
1027
1028/// Persistent buffering configuration.
1029#[derive(Debug, Serialize, Deserialize, Default)]
1030pub struct Spool {
1031    /// Configuration for envelope spooling.
1032    #[serde(default)]
1033    pub envelopes: EnvelopeSpool,
1034}
1035
1036/// Controls internal caching behavior.
1037#[derive(Serialize, Deserialize, Debug)]
1038#[serde(default)]
1039pub struct Cache {
1040    /// The full project state will be requested by this Relay if set to `true`.
1041    pub project_request_full_config: bool,
1042    /// The cache timeout for project configurations in seconds.
1043    pub project_expiry: u32,
1044    /// Continue using project state this many seconds after cache expiry while a new state is
1045    /// being fetched. This is added on top of `project_expiry`.
1046    ///
1047    /// Default is 2 minutes.
1048    pub project_grace_period: u32,
1049    /// Refresh a project after the specified seconds.
1050    ///
1051    /// The time must be between expiry time and the grace period.
1052    ///
1053    /// By default there are no refreshes enabled.
1054    pub project_refresh_interval: Option<u32>,
1055    /// The cache timeout for downstream relay info (public keys) in seconds.
1056    pub relay_expiry: u32,
1057    /// Unused cache timeout for envelopes.
1058    ///
1059    /// The envelope buffer is instead controlled by `envelope_buffer_size`, which controls the
1060    /// maximum number of envelopes in the buffer. A time based configuration may be re-introduced
1061    /// at a later point.
1062    #[serde(alias = "event_expiry")]
1063    envelope_expiry: u32,
1064    /// The maximum amount of envelopes to queue before dropping them.
1065    #[serde(alias = "event_buffer_size")]
1066    envelope_buffer_size: u32,
1067    /// The cache timeout for non-existing entries.
1068    pub miss_expiry: u32,
1069    /// The buffer timeout for batched project config queries before sending them upstream in ms.
1070    pub batch_interval: u32,
1071    /// The buffer timeout for batched queries of downstream relays in ms. Defaults to 100ms.
1072    pub downstream_relays_batch_interval: u32,
1073    /// The maximum number of project configs to fetch from Sentry at once. Defaults to 500.
1074    ///
1075    /// `cache.batch_interval` controls how quickly batches are sent, this controls the batch size.
1076    pub batch_size: usize,
1077    /// Interval for watching local cache override files in seconds.
1078    pub file_interval: u32,
1079    /// Interval for fetching new global configs from the upstream, in seconds.
1080    pub global_config_fetch_interval: u32,
1081}
1082
1083impl Default for Cache {
1084    fn default() -> Self {
1085        Cache {
1086            project_request_full_config: false,
1087            project_expiry: 300,       // 5 minutes
1088            project_grace_period: 120, // 2 minutes
1089            project_refresh_interval: None,
1090            relay_expiry: 3600,   // 1 hour
1091            envelope_expiry: 600, // 10 minutes
1092            envelope_buffer_size: 1000,
1093            miss_expiry: 60,                       // 1 minute
1094            batch_interval: 100,                   // 100ms
1095            downstream_relays_batch_interval: 100, // 100ms
1096            batch_size: 500,
1097            file_interval: 10,                // 10 seconds
1098            global_config_fetch_interval: 10, // 10 seconds
1099        }
1100    }
1101}
1102
1103fn default_max_secs_in_future() -> u32 {
1104    60 // 1 minute
1105}
1106
1107fn default_max_session_secs_in_past() -> u32 {
1108    5 * 24 * 3600 // 5 days
1109}
1110
1111fn default_chunk_size() -> ByteSize {
1112    ByteSize::mebibytes(1)
1113}
1114
1115fn default_projectconfig_cache_prefix() -> String {
1116    "relayconfig".to_owned()
1117}
1118
1119#[allow(clippy::unnecessary_wraps)]
1120fn default_max_rate_limit() -> Option<u32> {
1121    Some(300) // 5 minutes
1122}
1123
1124/// Controls Sentry-internal event processing.
1125#[derive(Serialize, Deserialize, Debug)]
1126pub struct Processing {
1127    /// True if the Relay should do processing. Defaults to `false`.
1128    pub enabled: bool,
1129    /// GeoIp DB file source.
1130    #[serde(default)]
1131    pub geoip_path: Option<PathBuf>,
1132    /// Maximum future timestamp of ingested events.
1133    #[serde(default = "default_max_secs_in_future")]
1134    pub max_secs_in_future: u32,
1135    /// Maximum age of ingested sessions. Older sessions will be dropped.
1136    #[serde(default = "default_max_session_secs_in_past")]
1137    pub max_session_secs_in_past: u32,
1138    /// Kafka producer configurations.
1139    pub kafka_config: Vec<KafkaConfigParam>,
1140    /// Additional kafka producer configurations.
1141    ///
1142    /// The `kafka_config` is the default producer configuration used for all topics. A secondary
1143    /// kafka config can be referenced in `topics:` like this:
1144    ///
1145    /// ```yaml
1146    /// secondary_kafka_configs:
1147    ///   mycustomcluster:
1148    ///     - name: 'bootstrap.servers'
1149    ///       value: 'sentry_kafka_metrics:9093'
1150    ///
1151    /// topics:
1152    ///   transactions: ingest-transactions
1153    ///   metrics:
1154    ///     name: ingest-metrics
1155    ///     config: mycustomcluster
1156    /// ```
1157    ///
1158    /// Then metrics will be produced to an entirely different Kafka cluster.
1159    #[serde(default)]
1160    pub secondary_kafka_configs: BTreeMap<String, Vec<KafkaConfigParam>>,
1161    /// Kafka topic names.
1162    #[serde(default)]
1163    pub topics: TopicAssignments,
1164    /// Whether to validate the supplied topics by calling Kafka's metadata endpoints.
1165    #[serde(default)]
1166    pub kafka_validate_topics: bool,
1167    /// Redis hosts to connect to for storing state for rate limits.
1168    #[serde(default)]
1169    pub redis: Option<RedisConfigs>,
1170    /// Maximum chunk size of attachments for Kafka.
1171    #[serde(default = "default_chunk_size")]
1172    pub attachment_chunk_size: ByteSize,
1173    /// Prefix to use when looking up project configs in Redis. Defaults to "relayconfig".
1174    #[serde(default = "default_projectconfig_cache_prefix")]
1175    pub projectconfig_cache_prefix: String,
1176    /// Maximum rate limit to report to clients.
1177    #[serde(default = "default_max_rate_limit")]
1178    pub max_rate_limit: Option<u32>,
1179}
1180
1181impl Default for Processing {
1182    /// Constructs a disabled processing configuration.
1183    fn default() -> Self {
1184        Self {
1185            enabled: false,
1186            geoip_path: None,
1187            max_secs_in_future: default_max_secs_in_future(),
1188            max_session_secs_in_past: default_max_session_secs_in_past(),
1189            kafka_config: Vec::new(),
1190            secondary_kafka_configs: BTreeMap::new(),
1191            topics: TopicAssignments::default(),
1192            kafka_validate_topics: false,
1193            redis: None,
1194            attachment_chunk_size: default_chunk_size(),
1195            projectconfig_cache_prefix: default_projectconfig_cache_prefix(),
1196            max_rate_limit: default_max_rate_limit(),
1197        }
1198    }
1199}
1200
1201/// Configuration for normalization in this Relay.
1202#[derive(Debug, Default, Serialize, Deserialize)]
1203#[serde(default)]
1204pub struct Normalization {
1205    /// Level of normalization for Relay to apply to incoming data.
1206    #[serde(default)]
1207    pub level: NormalizationLevel,
1208}
1209
1210/// Configuration for the level of normalization this Relay should do.
1211#[derive(Copy, Clone, Debug, Default, Serialize, Deserialize, Eq, PartialEq)]
1212#[serde(rename_all = "lowercase")]
1213pub enum NormalizationLevel {
1214    /// Runs normalization, excluding steps that break future compatibility.
1215    ///
1216    /// Processing Relays run [`NormalizationLevel::Full`] if this option is set.
1217    #[default]
1218    Default,
1219    /// Run full normalization.
1220    ///
1221    /// It includes steps that break future compatibility and should only run in
1222    /// the last layer of relays.
1223    Full,
1224}
1225
1226/// Configuration values for the outcome aggregator
1227#[derive(Serialize, Deserialize, Debug)]
1228#[serde(default)]
1229pub struct OutcomeAggregatorConfig {
1230    /// Defines the width of the buckets into which outcomes are aggregated, in seconds.
1231    pub bucket_interval: u64,
1232    /// Defines how often all buckets are flushed, in seconds.
1233    pub flush_interval: u64,
1234}
1235
1236impl Default for OutcomeAggregatorConfig {
1237    fn default() -> Self {
1238        Self {
1239            bucket_interval: 60,
1240            flush_interval: 120,
1241        }
1242    }
1243}
1244
1245/// Determines how to emit outcomes.
1246/// For compatibility reasons, this can either be true, false or AsClientReports
1247#[derive(Copy, Clone, Debug, PartialEq, Eq)]
1248
1249pub enum EmitOutcomes {
1250    /// Do not emit any outcomes
1251    None,
1252    /// Emit outcomes as client reports
1253    AsClientReports,
1254    /// Emit outcomes as outcomes
1255    AsOutcomes,
1256}
1257
1258impl EmitOutcomes {
1259    /// Returns true of outcomes are emitted via http, kafka, or client reports.
1260    pub fn any(&self) -> bool {
1261        !matches!(self, EmitOutcomes::None)
1262    }
1263}
1264
1265impl Serialize for EmitOutcomes {
1266    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1267    where
1268        S: Serializer,
1269    {
1270        // For compatibility, serialize None and AsOutcomes as booleans.
1271        match self {
1272            Self::None => serializer.serialize_bool(false),
1273            Self::AsClientReports => serializer.serialize_str("as_client_reports"),
1274            Self::AsOutcomes => serializer.serialize_bool(true),
1275        }
1276    }
1277}
1278
1279struct EmitOutcomesVisitor;
1280
1281impl Visitor<'_> for EmitOutcomesVisitor {
1282    type Value = EmitOutcomes;
1283
1284    fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
1285        formatter.write_str("true, false, or 'as_client_reports'")
1286    }
1287
1288    fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E>
1289    where
1290        E: serde::de::Error,
1291    {
1292        Ok(if v {
1293            EmitOutcomes::AsOutcomes
1294        } else {
1295            EmitOutcomes::None
1296        })
1297    }
1298
1299    fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
1300    where
1301        E: serde::de::Error,
1302    {
1303        if v == "as_client_reports" {
1304            Ok(EmitOutcomes::AsClientReports)
1305        } else {
1306            Err(E::invalid_value(Unexpected::Str(v), &"as_client_reports"))
1307        }
1308    }
1309}
1310
1311impl<'de> Deserialize<'de> for EmitOutcomes {
1312    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1313    where
1314        D: Deserializer<'de>,
1315    {
1316        deserializer.deserialize_any(EmitOutcomesVisitor)
1317    }
1318}
1319
1320/// Outcome generation specific configuration values.
1321#[derive(Serialize, Deserialize, Debug)]
1322#[serde(default)]
1323pub struct Outcomes {
1324    /// Controls whether outcomes will be emitted when processing is disabled.
1325    /// Processing relays always emit outcomes (for backwards compatibility).
1326    /// Can take the following values: false, "as_client_reports", true
1327    pub emit_outcomes: EmitOutcomes,
1328    /// Controls wheather client reported outcomes should be emitted.
1329    pub emit_client_outcomes: bool,
1330    /// The maximum number of outcomes that are batched before being sent
1331    /// via http to the upstream (only applies to non processing relays).
1332    pub batch_size: usize,
1333    /// The maximum time interval (in milliseconds) that an outcome may be batched
1334    /// via http to the upstream (only applies to non processing relays).
1335    pub batch_interval: u64,
1336    /// Defines the source string registered in the outcomes originating from
1337    /// this Relay (typically something like the region or the layer).
1338    pub source: Option<String>,
1339    /// Configures the outcome aggregator.
1340    pub aggregator: OutcomeAggregatorConfig,
1341}
1342
1343impl Default for Outcomes {
1344    fn default() -> Self {
1345        Outcomes {
1346            emit_outcomes: EmitOutcomes::AsClientReports,
1347            emit_client_outcomes: true,
1348            batch_size: 1000,
1349            batch_interval: 500,
1350            source: None,
1351            aggregator: OutcomeAggregatorConfig::default(),
1352        }
1353    }
1354}
1355
1356/// Minimal version of a config for dumping out.
1357#[derive(Serialize, Deserialize, Debug, Default)]
1358pub struct MinimalConfig {
1359    /// The relay part of the config.
1360    pub relay: Relay,
1361}
1362
1363impl MinimalConfig {
1364    /// Saves the config in the given config folder as config.yml
1365    pub fn save_in_folder<P: AsRef<Path>>(&self, p: P) -> anyhow::Result<()> {
1366        let path = p.as_ref();
1367        if fs::metadata(path).is_err() {
1368            fs::create_dir_all(path)
1369                .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotOpenFile, path))?;
1370        }
1371        self.save(path)
1372    }
1373}
1374
1375impl ConfigObject for MinimalConfig {
1376    fn format() -> ConfigFormat {
1377        ConfigFormat::Yaml
1378    }
1379
1380    fn name() -> &'static str {
1381        "config"
1382    }
1383}
1384
1385/// Alternative serialization of RelayInfo for config file using snake case.
1386mod config_relay_info {
1387    use serde::ser::SerializeMap;
1388
1389    use super::*;
1390
1391    // Uses snake_case as opposed to camelCase.
1392    #[derive(Debug, Serialize, Deserialize, Clone)]
1393    struct RelayInfoConfig {
1394        public_key: PublicKey,
1395        #[serde(default)]
1396        internal: bool,
1397    }
1398
1399    impl From<RelayInfoConfig> for RelayInfo {
1400        fn from(v: RelayInfoConfig) -> Self {
1401            RelayInfo {
1402                public_key: v.public_key,
1403                internal: v.internal,
1404            }
1405        }
1406    }
1407
1408    impl From<RelayInfo> for RelayInfoConfig {
1409        fn from(v: RelayInfo) -> Self {
1410            RelayInfoConfig {
1411                public_key: v.public_key,
1412                internal: v.internal,
1413            }
1414        }
1415    }
1416
1417    pub(super) fn deserialize<'de, D>(des: D) -> Result<HashMap<RelayId, RelayInfo>, D::Error>
1418    where
1419        D: Deserializer<'de>,
1420    {
1421        let map = HashMap::<RelayId, RelayInfoConfig>::deserialize(des)?;
1422        Ok(map.into_iter().map(|(k, v)| (k, v.into())).collect())
1423    }
1424
1425    pub(super) fn serialize<S>(elm: &HashMap<RelayId, RelayInfo>, ser: S) -> Result<S::Ok, S::Error>
1426    where
1427        S: Serializer,
1428    {
1429        let mut map = ser.serialize_map(Some(elm.len()))?;
1430
1431        for (k, v) in elm {
1432            map.serialize_entry(k, &RelayInfoConfig::from(v.clone()))?;
1433        }
1434
1435        map.end()
1436    }
1437}
1438
1439/// Authentication options.
1440#[derive(Serialize, Deserialize, Debug, Default)]
1441pub struct AuthConfig {
1442    /// Controls responses from the readiness health check endpoint based on authentication.
1443    #[serde(default, skip_serializing_if = "is_default")]
1444    pub ready: ReadinessCondition,
1445
1446    /// Statically authenticated downstream relays.
1447    #[serde(default, with = "config_relay_info")]
1448    pub static_relays: HashMap<RelayId, RelayInfo>,
1449
1450    /// How old a signature can be before it is considered invalid, in seconds.
1451    ///
1452    /// Defaults to 5 minutes.
1453    #[serde(default = "default_max_age")]
1454    pub signature_max_age: u64,
1455}
1456
1457fn default_max_age() -> u64 {
1458    300
1459}
1460
1461/// GeoIp database configuration options.
1462#[derive(Serialize, Deserialize, Debug, Default)]
1463pub struct GeoIpConfig {
1464    /// The path to GeoIP database.
1465    pub path: Option<PathBuf>,
1466}
1467
1468/// Cardinality Limiter configuration options.
1469#[derive(Serialize, Deserialize, Debug)]
1470#[serde(default)]
1471pub struct CardinalityLimiter {
1472    /// Cache vacuum interval in seconds for the in memory cache.
1473    ///
1474    /// The cache will scan for expired values based on this interval.
1475    ///
1476    /// Defaults to 180 seconds, 3 minutes.
1477    pub cache_vacuum_interval: u64,
1478}
1479
1480impl Default for CardinalityLimiter {
1481    fn default() -> Self {
1482        Self {
1483            cache_vacuum_interval: 180,
1484        }
1485    }
1486}
1487
1488/// Settings to control Relay's health checks.
1489///
1490/// After breaching one of the configured thresholds, Relay will
1491/// return an `unhealthy` status from its health endpoint.
1492#[derive(Serialize, Deserialize, Debug)]
1493#[serde(default)]
1494pub struct Health {
1495    /// Interval to refresh internal health checks.
1496    ///
1497    /// Shorter intervals will decrease the time it takes the health check endpoint to report
1498    /// issues, but can also increase sporadic unhealthy responses.
1499    ///
1500    /// Defaults to `3000`` (3 seconds).
1501    pub refresh_interval_ms: u64,
1502    /// Maximum memory watermark in bytes.
1503    ///
1504    /// By default, there is no absolute limit set and the watermark
1505    /// is only controlled by setting [`Self::max_memory_percent`].
1506    pub max_memory_bytes: Option<ByteSize>,
1507    /// Maximum memory watermark as a percentage of maximum system memory.
1508    ///
1509    /// Defaults to `0.95` (95%).
1510    pub max_memory_percent: f32,
1511    /// Health check probe timeout in milliseconds.
1512    ///
1513    /// Any probe exceeding the timeout will be considered failed.
1514    /// This limits the max execution time of Relay health checks.
1515    ///
1516    /// Defaults to 900 milliseconds.
1517    pub probe_timeout_ms: u64,
1518    /// The refresh frequency of memory stats which are used to poll memory
1519    /// usage of Relay.
1520    ///
1521    /// The implementation of memory stats guarantees that the refresh will happen at
1522    /// least every `x` ms since memory readings are lazy and are updated only if needed.
1523    pub memory_stat_refresh_frequency_ms: u64,
1524}
1525
1526impl Default for Health {
1527    fn default() -> Self {
1528        Self {
1529            refresh_interval_ms: 3000,
1530            max_memory_bytes: None,
1531            max_memory_percent: 0.95,
1532            probe_timeout_ms: 900,
1533            memory_stat_refresh_frequency_ms: 100,
1534        }
1535    }
1536}
1537
1538/// COGS configuration.
1539#[derive(Serialize, Deserialize, Debug)]
1540#[serde(default)]
1541pub struct Cogs {
1542    /// Maximium amount of COGS measurements allowed to backlog.
1543    ///
1544    /// Any additional COGS measurements recorded will be dropped.
1545    ///
1546    /// Defaults to `10_000`.
1547    pub max_queue_size: u64,
1548    /// Relay COGS resource id.
1549    ///
1550    /// All Relay related COGS measurements are emitted with this resource id.
1551    ///
1552    /// Defaults to `relay_service`.
1553    pub relay_resource_id: String,
1554}
1555
1556impl Default for Cogs {
1557    fn default() -> Self {
1558        Self {
1559            max_queue_size: 10_000,
1560            relay_resource_id: "relay_service".to_owned(),
1561        }
1562    }
1563}
1564
1565#[derive(Serialize, Deserialize, Debug, Default)]
1566struct ConfigValues {
1567    #[serde(default)]
1568    relay: Relay,
1569    #[serde(default)]
1570    http: Http,
1571    #[serde(default)]
1572    cache: Cache,
1573    #[serde(default)]
1574    spool: Spool,
1575    #[serde(default)]
1576    limits: Limits,
1577    #[serde(default)]
1578    logging: relay_log::LogConfig,
1579    #[serde(default)]
1580    routing: Routing,
1581    #[serde(default)]
1582    metrics: Metrics,
1583    #[serde(default)]
1584    sentry: relay_log::SentryConfig,
1585    #[serde(default)]
1586    processing: Processing,
1587    #[serde(default)]
1588    outcomes: Outcomes,
1589    #[serde(default)]
1590    aggregator: AggregatorServiceConfig,
1591    #[serde(default)]
1592    secondary_aggregators: Vec<ScopedAggregatorConfig>,
1593    #[serde(default)]
1594    auth: AuthConfig,
1595    #[serde(default)]
1596    geoip: GeoIpConfig,
1597    #[serde(default)]
1598    normalization: Normalization,
1599    #[serde(default)]
1600    cardinality_limiter: CardinalityLimiter,
1601    #[serde(default)]
1602    health: Health,
1603    #[serde(default)]
1604    cogs: Cogs,
1605}
1606
1607impl ConfigObject for ConfigValues {
1608    fn format() -> ConfigFormat {
1609        ConfigFormat::Yaml
1610    }
1611
1612    fn name() -> &'static str {
1613        "config"
1614    }
1615}
1616
1617/// Config struct.
1618pub struct Config {
1619    values: ConfigValues,
1620    credentials: Option<Credentials>,
1621    path: PathBuf,
1622}
1623
1624impl fmt::Debug for Config {
1625    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1626        f.debug_struct("Config")
1627            .field("path", &self.path)
1628            .field("values", &self.values)
1629            .finish()
1630    }
1631}
1632
1633impl Config {
1634    /// Loads a config from a given config folder.
1635    pub fn from_path<P: AsRef<Path>>(path: P) -> anyhow::Result<Config> {
1636        let path = env::current_dir()
1637            .map(|x| x.join(path.as_ref()))
1638            .unwrap_or_else(|_| path.as_ref().to_path_buf());
1639
1640        let config = Config {
1641            values: ConfigValues::load(&path)?,
1642            credentials: if Credentials::path(&path).exists() {
1643                Some(Credentials::load(&path)?)
1644            } else {
1645                None
1646            },
1647            path: path.clone(),
1648        };
1649
1650        if cfg!(not(feature = "processing")) && config.processing_enabled() {
1651            return Err(ConfigError::file(ConfigErrorKind::ProcessingNotAvailable, &path).into());
1652        }
1653
1654        Ok(config)
1655    }
1656
1657    /// Creates a config from a JSON value.
1658    ///
1659    /// This is mostly useful for tests.
1660    pub fn from_json_value(value: serde_json::Value) -> anyhow::Result<Config> {
1661        Ok(Config {
1662            values: serde_json::from_value(value)
1663                .with_context(|| ConfigError::new(ConfigErrorKind::BadJson))?,
1664            credentials: None,
1665            path: PathBuf::new(),
1666        })
1667    }
1668
1669    /// Override configuration with values coming from other sources (e.g. env variables or
1670    /// command line parameters)
1671    pub fn apply_override(
1672        &mut self,
1673        mut overrides: OverridableConfig,
1674    ) -> anyhow::Result<&mut Self> {
1675        let relay = &mut self.values.relay;
1676
1677        if let Some(mode) = overrides.mode {
1678            relay.mode = mode
1679                .parse::<RelayMode>()
1680                .with_context(|| ConfigError::field("mode"))?;
1681        }
1682
1683        if let Some(deployment) = overrides.instance {
1684            relay.instance = deployment
1685                .parse::<RelayInstance>()
1686                .with_context(|| ConfigError::field("deployment"))?;
1687        }
1688
1689        if let Some(log_level) = overrides.log_level {
1690            self.values.logging.level = log_level.parse()?;
1691        }
1692
1693        if let Some(log_format) = overrides.log_format {
1694            self.values.logging.format = log_format.parse()?;
1695        }
1696
1697        if let Some(upstream) = overrides.upstream {
1698            relay.upstream = upstream
1699                .parse::<UpstreamDescriptor>()
1700                .with_context(|| ConfigError::field("upstream"))?;
1701        } else if let Some(upstream_dsn) = overrides.upstream_dsn {
1702            relay.upstream = upstream_dsn
1703                .parse::<Dsn>()
1704                .map(|dsn| UpstreamDescriptor::from_dsn(&dsn).into_owned())
1705                .with_context(|| ConfigError::field("upstream_dsn"))?;
1706        }
1707
1708        if let Some(host) = overrides.host {
1709            relay.host = host
1710                .parse::<IpAddr>()
1711                .with_context(|| ConfigError::field("host"))?;
1712        }
1713
1714        if let Some(port) = overrides.port {
1715            relay.port = port
1716                .as_str()
1717                .parse()
1718                .with_context(|| ConfigError::field("port"))?;
1719        }
1720
1721        let processing = &mut self.values.processing;
1722        if let Some(enabled) = overrides.processing {
1723            match enabled.to_lowercase().as_str() {
1724                "true" | "1" => processing.enabled = true,
1725                "false" | "0" | "" => processing.enabled = false,
1726                _ => return Err(ConfigError::field("processing").into()),
1727            }
1728        }
1729
1730        if let Some(redis) = overrides.redis_url {
1731            processing.redis = Some(RedisConfigs::Unified(RedisConfig::single(redis)))
1732        }
1733
1734        if let Some(kafka_url) = overrides.kafka_url {
1735            let existing = processing
1736                .kafka_config
1737                .iter_mut()
1738                .find(|e| e.name == "bootstrap.servers");
1739
1740            if let Some(config_param) = existing {
1741                config_param.value = kafka_url;
1742            } else {
1743                processing.kafka_config.push(KafkaConfigParam {
1744                    name: "bootstrap.servers".to_owned(),
1745                    value: kafka_url,
1746                })
1747            }
1748        }
1749        // credentials overrides
1750        let id = if let Some(id) = overrides.id {
1751            let id = Uuid::parse_str(&id).with_context(|| ConfigError::field("id"))?;
1752            Some(id)
1753        } else {
1754            None
1755        };
1756        let public_key = if let Some(public_key) = overrides.public_key {
1757            let public_key = public_key
1758                .parse::<PublicKey>()
1759                .with_context(|| ConfigError::field("public_key"))?;
1760            Some(public_key)
1761        } else {
1762            None
1763        };
1764
1765        let secret_key = if let Some(secret_key) = overrides.secret_key {
1766            let secret_key = secret_key
1767                .parse::<SecretKey>()
1768                .with_context(|| ConfigError::field("secret_key"))?;
1769            Some(secret_key)
1770        } else {
1771            None
1772        };
1773        let outcomes = &mut self.values.outcomes;
1774        if overrides.outcome_source.is_some() {
1775            outcomes.source = overrides.outcome_source.take();
1776        }
1777
1778        if let Some(credentials) = &mut self.credentials {
1779            //we have existing credentials we may override some entries
1780            if let Some(id) = id {
1781                credentials.id = id;
1782            }
1783            if let Some(public_key) = public_key {
1784                credentials.public_key = public_key;
1785            }
1786            if let Some(secret_key) = secret_key {
1787                credentials.secret_key = secret_key
1788            }
1789        } else {
1790            //no existing credentials we may only create the full credentials
1791            match (id, public_key, secret_key) {
1792                (Some(id), Some(public_key), Some(secret_key)) => {
1793                    self.credentials = Some(Credentials {
1794                        secret_key,
1795                        public_key,
1796                        id,
1797                    })
1798                }
1799                (None, None, None) => {
1800                    // nothing provided, we'll just leave the credentials None, maybe we
1801                    // don't need them in the current command or we'll override them later
1802                }
1803                _ => {
1804                    return Err(ConfigError::field("incomplete credentials").into());
1805                }
1806            }
1807        }
1808
1809        let limits = &mut self.values.limits;
1810        if let Some(shutdown_timeout) = overrides.shutdown_timeout
1811            && let Ok(shutdown_timeout) = shutdown_timeout.parse::<u64>()
1812        {
1813            limits.shutdown_timeout = shutdown_timeout;
1814        }
1815
1816        if let Some(server_name) = overrides.server_name {
1817            self.values.sentry.server_name = Some(server_name.into());
1818        }
1819
1820        Ok(self)
1821    }
1822
1823    /// Checks if the config is already initialized.
1824    pub fn config_exists<P: AsRef<Path>>(path: P) -> bool {
1825        fs::metadata(ConfigValues::path(path.as_ref())).is_ok()
1826    }
1827
1828    /// Returns the filename of the config file.
1829    pub fn path(&self) -> &Path {
1830        &self.path
1831    }
1832
1833    /// Dumps out a YAML string of the values.
1834    pub fn to_yaml_string(&self) -> anyhow::Result<String> {
1835        serde_yaml::to_string(&self.values)
1836            .with_context(|| ConfigError::new(ConfigErrorKind::CouldNotWriteFile))
1837    }
1838
1839    /// Regenerates the relay credentials.
1840    ///
1841    /// This also writes the credentials back to the file.
1842    pub fn regenerate_credentials(&mut self, save: bool) -> anyhow::Result<()> {
1843        let creds = Credentials::generate();
1844        if save {
1845            creds.save(&self.path)?;
1846        }
1847        self.credentials = Some(creds);
1848        Ok(())
1849    }
1850
1851    /// Return the current credentials
1852    pub fn credentials(&self) -> Option<&Credentials> {
1853        self.credentials.as_ref()
1854    }
1855
1856    /// Set new credentials.
1857    ///
1858    /// This also writes the credentials back to the file.
1859    pub fn replace_credentials(
1860        &mut self,
1861        credentials: Option<Credentials>,
1862    ) -> anyhow::Result<bool> {
1863        if self.credentials == credentials {
1864            return Ok(false);
1865        }
1866
1867        match credentials {
1868            Some(ref creds) => {
1869                creds.save(&self.path)?;
1870            }
1871            None => {
1872                let path = Credentials::path(&self.path);
1873                if fs::metadata(&path).is_ok() {
1874                    fs::remove_file(&path).with_context(|| {
1875                        ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path)
1876                    })?;
1877                }
1878            }
1879        }
1880
1881        self.credentials = credentials;
1882        Ok(true)
1883    }
1884
1885    /// Returns `true` if the config is ready to use.
1886    pub fn has_credentials(&self) -> bool {
1887        self.credentials.is_some()
1888    }
1889
1890    /// Returns the secret key if set.
1891    pub fn secret_key(&self) -> Option<&SecretKey> {
1892        self.credentials.as_ref().map(|x| &x.secret_key)
1893    }
1894
1895    /// Returns the public key if set.
1896    pub fn public_key(&self) -> Option<&PublicKey> {
1897        self.credentials.as_ref().map(|x| &x.public_key)
1898    }
1899
1900    /// Returns the relay ID.
1901    pub fn relay_id(&self) -> Option<&RelayId> {
1902        self.credentials.as_ref().map(|x| &x.id)
1903    }
1904
1905    /// Returns the relay mode.
1906    pub fn relay_mode(&self) -> RelayMode {
1907        self.values.relay.mode
1908    }
1909
1910    /// Returns the instance type of relay.
1911    pub fn relay_instance(&self) -> RelayInstance {
1912        self.values.relay.instance
1913    }
1914
1915    /// Returns the upstream target as descriptor.
1916    pub fn upstream_descriptor(&self) -> &UpstreamDescriptor<'_> {
1917        &self.values.relay.upstream
1918    }
1919
1920    /// Returns the custom HTTP "Host" header.
1921    pub fn http_host_header(&self) -> Option<&str> {
1922        self.values.http.host_header.as_deref()
1923    }
1924
1925    /// Returns the listen address.
1926    pub fn listen_addr(&self) -> SocketAddr {
1927        (self.values.relay.host, self.values.relay.port).into()
1928    }
1929
1930    /// Returns the TLS listen address.
1931    pub fn tls_listen_addr(&self) -> Option<SocketAddr> {
1932        if self.values.relay.tls_identity_path.is_some() {
1933            let port = self.values.relay.tls_port.unwrap_or(3443);
1934            Some((self.values.relay.host, port).into())
1935        } else {
1936            None
1937        }
1938    }
1939
1940    /// Returns the path to the identity bundle
1941    pub fn tls_identity_path(&self) -> Option<&Path> {
1942        self.values.relay.tls_identity_path.as_deref()
1943    }
1944
1945    /// Returns the password for the identity bundle
1946    pub fn tls_identity_password(&self) -> Option<&str> {
1947        self.values.relay.tls_identity_password.as_deref()
1948    }
1949
1950    /// Returns `true` when project IDs should be overriden rather than validated.
1951    ///
1952    /// Defaults to `false`, which requires project ID validation.
1953    pub fn override_project_ids(&self) -> bool {
1954        self.values.relay.override_project_ids
1955    }
1956
1957    /// Returns `true` if Relay requires authentication for readiness.
1958    ///
1959    /// See [`ReadinessCondition`] for more information.
1960    pub fn requires_auth(&self) -> bool {
1961        match self.values.auth.ready {
1962            ReadinessCondition::Authenticated => self.relay_mode() == RelayMode::Managed,
1963            ReadinessCondition::Always => false,
1964        }
1965    }
1966
1967    /// Returns the interval at which Realy should try to re-authenticate with the upstream.
1968    ///
1969    /// Always disabled in processing mode.
1970    pub fn http_auth_interval(&self) -> Option<Duration> {
1971        if self.processing_enabled() {
1972            return None;
1973        }
1974
1975        match self.values.http.auth_interval {
1976            None | Some(0) => None,
1977            Some(secs) => Some(Duration::from_secs(secs)),
1978        }
1979    }
1980
1981    /// The maximum time of experiencing uninterrupted network failures until Relay considers that
1982    /// it has encountered a network outage.
1983    pub fn http_outage_grace_period(&self) -> Duration {
1984        Duration::from_secs(self.values.http.outage_grace_period)
1985    }
1986
1987    /// Time Relay waits before retrying an upstream request.
1988    ///
1989    /// Before going into a network outage, Relay may fail to make upstream
1990    /// requests. This is the time Relay waits before retrying the same request.
1991    pub fn http_retry_delay(&self) -> Duration {
1992        Duration::from_secs(self.values.http.retry_delay)
1993    }
1994
1995    /// Time of continued project request failures before Relay emits an error.
1996    pub fn http_project_failure_interval(&self) -> Duration {
1997        Duration::from_secs(self.values.http.project_failure_interval)
1998    }
1999
2000    /// Content encoding of upstream requests.
2001    pub fn http_encoding(&self) -> HttpEncoding {
2002        self.values.http.encoding
2003    }
2004
2005    /// Returns whether metrics should be sent globally through a shared endpoint.
2006    pub fn http_global_metrics(&self) -> bool {
2007        self.values.http.global_metrics
2008    }
2009
2010    /// Returns whether this Relay should emit outcomes.
2011    ///
2012    /// This is `true` either if `outcomes.emit_outcomes` is explicitly enabled, or if this Relay is
2013    /// in processing mode.
2014    pub fn emit_outcomes(&self) -> EmitOutcomes {
2015        if self.processing_enabled() {
2016            return EmitOutcomes::AsOutcomes;
2017        }
2018        self.values.outcomes.emit_outcomes
2019    }
2020
2021    /// Returns whether this Relay should emit client outcomes
2022    ///
2023    /// Relays that do not emit client outcomes will forward client recieved outcomes
2024    /// directly to the next relay in the chain as client report envelope.  This is only done
2025    /// if this relay emits outcomes at all. A relay that will not emit outcomes
2026    /// will forward the envelope unchanged.
2027    ///
2028    /// This flag can be explicitly disabled on processing relays as well to prevent the
2029    /// emitting of client outcomes to the kafka topic.
2030    pub fn emit_client_outcomes(&self) -> bool {
2031        self.values.outcomes.emit_client_outcomes
2032    }
2033
2034    /// Returns the maximum number of outcomes that are batched before being sent
2035    pub fn outcome_batch_size(&self) -> usize {
2036        self.values.outcomes.batch_size
2037    }
2038
2039    /// Returns the maximum interval that an outcome may be batched
2040    pub fn outcome_batch_interval(&self) -> Duration {
2041        Duration::from_millis(self.values.outcomes.batch_interval)
2042    }
2043
2044    /// The originating source of the outcome
2045    pub fn outcome_source(&self) -> Option<&str> {
2046        self.values.outcomes.source.as_deref()
2047    }
2048
2049    /// Returns the width of the buckets into which outcomes are aggregated, in seconds.
2050    pub fn outcome_aggregator(&self) -> &OutcomeAggregatorConfig {
2051        &self.values.outcomes.aggregator
2052    }
2053
2054    /// Returns logging configuration.
2055    pub fn logging(&self) -> &relay_log::LogConfig {
2056        &self.values.logging
2057    }
2058
2059    /// Returns logging configuration.
2060    pub fn sentry(&self) -> &relay_log::SentryConfig {
2061        &self.values.sentry
2062    }
2063
2064    /// Returns the socket addresses for statsd.
2065    ///
2066    /// If stats is disabled an empty vector is returned.
2067    pub fn statsd_addrs(&self) -> anyhow::Result<Vec<SocketAddr>> {
2068        if let Some(ref addr) = self.values.metrics.statsd {
2069            let addrs = addr
2070                .as_str()
2071                .to_socket_addrs()
2072                .with_context(|| ConfigError::file(ConfigErrorKind::InvalidValue, &self.path))?
2073                .collect();
2074            Ok(addrs)
2075        } else {
2076            Ok(vec![])
2077        }
2078    }
2079
2080    /// Return the prefix for statsd metrics.
2081    pub fn metrics_prefix(&self) -> &str {
2082        &self.values.metrics.prefix
2083    }
2084
2085    /// Returns the default tags for statsd metrics.
2086    pub fn metrics_default_tags(&self) -> &BTreeMap<String, String> {
2087        &self.values.metrics.default_tags
2088    }
2089
2090    /// Returns the name of the hostname tag that should be attached to each outgoing metric.
2091    pub fn metrics_hostname_tag(&self) -> Option<&str> {
2092        self.values.metrics.hostname_tag.as_deref()
2093    }
2094
2095    /// Returns the global sample rate for all metrics.
2096    pub fn metrics_sample_rate(&self) -> f32 {
2097        self.values.metrics.sample_rate
2098    }
2099
2100    /// Returns whether local metric aggregation should be enabled.
2101    pub fn metrics_aggregate(&self) -> bool {
2102        self.values.metrics.aggregate
2103    }
2104
2105    /// Returns whether high cardinality tags should be removed before sending metrics.
2106    pub fn metrics_allow_high_cardinality_tags(&self) -> bool {
2107        self.values.metrics.allow_high_cardinality_tags
2108    }
2109
2110    /// Returns the interval for periodic metrics emitted from Relay.
2111    ///
2112    /// `None` if periodic metrics are disabled.
2113    pub fn metrics_periodic_interval(&self) -> Option<Duration> {
2114        match self.values.metrics.periodic_secs {
2115            0 => None,
2116            secs => Some(Duration::from_secs(secs)),
2117        }
2118    }
2119
2120    /// Returns the default timeout for all upstream HTTP requests.
2121    pub fn http_timeout(&self) -> Duration {
2122        Duration::from_secs(self.values.http.timeout.into())
2123    }
2124
2125    /// Returns the connection timeout for all upstream HTTP requests.
2126    pub fn http_connection_timeout(&self) -> Duration {
2127        Duration::from_secs(self.values.http.connection_timeout.into())
2128    }
2129
2130    /// Returns the failed upstream request retry interval.
2131    pub fn http_max_retry_interval(&self) -> Duration {
2132        Duration::from_secs(self.values.http.max_retry_interval.into())
2133    }
2134
2135    /// Returns the expiry timeout for cached projects.
2136    pub fn project_cache_expiry(&self) -> Duration {
2137        Duration::from_secs(self.values.cache.project_expiry.into())
2138    }
2139
2140    /// Returns `true` if the full project state should be requested from upstream.
2141    pub fn request_full_project_config(&self) -> bool {
2142        self.values.cache.project_request_full_config
2143    }
2144
2145    /// Returns the expiry timeout for cached relay infos (public keys).
2146    pub fn relay_cache_expiry(&self) -> Duration {
2147        Duration::from_secs(self.values.cache.relay_expiry.into())
2148    }
2149
2150    /// Returns the maximum number of buffered envelopes
2151    pub fn envelope_buffer_size(&self) -> usize {
2152        self.values
2153            .cache
2154            .envelope_buffer_size
2155            .try_into()
2156            .unwrap_or(usize::MAX)
2157    }
2158
2159    /// Returns the expiry timeout for cached misses before trying to refetch.
2160    pub fn cache_miss_expiry(&self) -> Duration {
2161        Duration::from_secs(self.values.cache.miss_expiry.into())
2162    }
2163
2164    /// Returns the grace period for project caches.
2165    pub fn project_grace_period(&self) -> Duration {
2166        Duration::from_secs(self.values.cache.project_grace_period.into())
2167    }
2168
2169    /// Returns the refresh interval for a project.
2170    ///
2171    /// Validates the refresh time to be between the grace period and expiry.
2172    pub fn project_refresh_interval(&self) -> Option<Duration> {
2173        self.values
2174            .cache
2175            .project_refresh_interval
2176            .map(Into::into)
2177            .map(Duration::from_secs)
2178    }
2179
2180    /// Returns the duration in which batchable project config queries are
2181    /// collected before sending them in a single request.
2182    pub fn query_batch_interval(&self) -> Duration {
2183        Duration::from_millis(self.values.cache.batch_interval.into())
2184    }
2185
2186    /// Returns the duration in which downstream relays are requested from upstream.
2187    pub fn downstream_relays_batch_interval(&self) -> Duration {
2188        Duration::from_millis(self.values.cache.downstream_relays_batch_interval.into())
2189    }
2190
2191    /// Returns the interval in seconds in which local project configurations should be reloaded.
2192    pub fn local_cache_interval(&self) -> Duration {
2193        Duration::from_secs(self.values.cache.file_interval.into())
2194    }
2195
2196    /// Returns the interval in seconds in which fresh global configs should be
2197    /// fetched from  upstream.
2198    pub fn global_config_fetch_interval(&self) -> Duration {
2199        Duration::from_secs(self.values.cache.global_config_fetch_interval.into())
2200    }
2201
2202    /// Returns the path of the buffer file if the `cache.persistent_envelope_buffer.path` is configured.
2203    ///
2204    /// In case a partition with id > 0 is supplied, the filename of the envelopes path will be
2205    /// suffixed with `.{partition_id}`.
2206    pub fn spool_envelopes_path(&self, partition_id: u8) -> Option<PathBuf> {
2207        let mut path = self
2208            .values
2209            .spool
2210            .envelopes
2211            .path
2212            .as_ref()
2213            .map(|path| path.to_owned())?;
2214
2215        if partition_id == 0 {
2216            return Some(path);
2217        }
2218
2219        let file_name = path.file_name().and_then(|f| f.to_str())?;
2220        let new_file_name = format!("{file_name}.{partition_id}");
2221        path.set_file_name(new_file_name);
2222
2223        Some(path)
2224    }
2225
2226    /// The maximum size of the buffer, in bytes.
2227    pub fn spool_envelopes_max_disk_size(&self) -> usize {
2228        self.values.spool.envelopes.max_disk_size.as_bytes()
2229    }
2230
2231    /// Number of encoded envelope bytes that need to be accumulated before
2232    /// flushing one batch to disk.
2233    pub fn spool_envelopes_batch_size_bytes(&self) -> usize {
2234        self.values.spool.envelopes.batch_size_bytes.as_bytes()
2235    }
2236
2237    /// Returns the time after which we drop envelopes as a [`Duration`] object.
2238    pub fn spool_envelopes_max_age(&self) -> Duration {
2239        Duration::from_secs(self.values.spool.envelopes.max_envelope_delay_secs)
2240    }
2241
2242    /// Returns the refresh frequency for disk usage monitoring as a [`Duration`] object.
2243    pub fn spool_disk_usage_refresh_frequency_ms(&self) -> Duration {
2244        Duration::from_millis(self.values.spool.envelopes.disk_usage_refresh_frequency_ms)
2245    }
2246
2247    /// Returns the maximum number of envelopes that can be put in the bounded buffer.
2248    pub fn spool_max_backpressure_envelopes(&self) -> usize {
2249        self.values.spool.envelopes.max_backpressure_envelopes
2250    }
2251
2252    /// Returns the relative memory usage up to which the disk buffer will unspool envelopes.
2253    pub fn spool_max_backpressure_memory_percent(&self) -> f32 {
2254        self.values.spool.envelopes.max_backpressure_memory_percent
2255    }
2256
2257    /// Returns the number of partitions for the buffer.
2258    pub fn spool_partitions(&self) -> NonZeroU8 {
2259        self.values.spool.envelopes.partitions
2260    }
2261
2262    /// Returns the maximum size of an event payload in bytes.
2263    pub fn max_event_size(&self) -> usize {
2264        self.values.limits.max_event_size.as_bytes()
2265    }
2266
2267    /// Returns the maximum size of each attachment.
2268    pub fn max_attachment_size(&self) -> usize {
2269        self.values.limits.max_attachment_size.as_bytes()
2270    }
2271
2272    /// Returns the maximum combined size of attachments or payloads containing attachments
2273    /// (minidump, unreal, standalone attachments) in bytes.
2274    pub fn max_attachments_size(&self) -> usize {
2275        self.values.limits.max_attachments_size.as_bytes()
2276    }
2277
2278    /// Returns the maximum combined size of client reports in bytes.
2279    pub fn max_client_reports_size(&self) -> usize {
2280        self.values.limits.max_client_reports_size.as_bytes()
2281    }
2282
2283    /// Returns the maximum payload size of a monitor check-in in bytes.
2284    pub fn max_check_in_size(&self) -> usize {
2285        self.values.limits.max_check_in_size.as_bytes()
2286    }
2287
2288    /// Returns the maximum payload size of a log in bytes.
2289    pub fn max_log_size(&self) -> usize {
2290        self.values.limits.max_log_size.as_bytes()
2291    }
2292
2293    /// Returns the maximum payload size of a span in bytes.
2294    pub fn max_span_size(&self) -> usize {
2295        self.values.limits.max_span_size.as_bytes()
2296    }
2297
2298    /// Returns the maximum payload size of an item container in bytes.
2299    pub fn max_container_size(&self) -> usize {
2300        self.values.limits.max_container_size.as_bytes()
2301    }
2302
2303    /// Returns the maximum size of an envelope payload in bytes.
2304    ///
2305    /// Individual item size limits still apply.
2306    pub fn max_envelope_size(&self) -> usize {
2307        self.values.limits.max_envelope_size.as_bytes()
2308    }
2309
2310    /// Returns the maximum number of sessions per envelope.
2311    pub fn max_session_count(&self) -> usize {
2312        self.values.limits.max_session_count
2313    }
2314
2315    /// Returns the maximum number of standalone spans per envelope.
2316    pub fn max_span_count(&self) -> usize {
2317        self.values.limits.max_span_count
2318    }
2319
2320    /// Returns the maximum number of logs per envelope.
2321    pub fn max_log_count(&self) -> usize {
2322        self.values.limits.max_log_count
2323    }
2324
2325    /// Returns the maximum payload size of a statsd metric in bytes.
2326    pub fn max_statsd_size(&self) -> usize {
2327        self.values.limits.max_statsd_size.as_bytes()
2328    }
2329
2330    /// Returns the maximum payload size of metric buckets in bytes.
2331    pub fn max_metric_buckets_size(&self) -> usize {
2332        self.values.limits.max_metric_buckets_size.as_bytes()
2333    }
2334
2335    /// Returns the maximum payload size for general API requests.
2336    pub fn max_api_payload_size(&self) -> usize {
2337        self.values.limits.max_api_payload_size.as_bytes()
2338    }
2339
2340    /// Returns the maximum payload size for file uploads and chunks.
2341    pub fn max_api_file_upload_size(&self) -> usize {
2342        self.values.limits.max_api_file_upload_size.as_bytes()
2343    }
2344
2345    /// Returns the maximum payload size for chunks
2346    pub fn max_api_chunk_upload_size(&self) -> usize {
2347        self.values.limits.max_api_chunk_upload_size.as_bytes()
2348    }
2349
2350    /// Returns the maximum payload size for a profile
2351    pub fn max_profile_size(&self) -> usize {
2352        self.values.limits.max_profile_size.as_bytes()
2353    }
2354
2355    /// Returns the maximum payload size for a compressed replay.
2356    pub fn max_replay_compressed_size(&self) -> usize {
2357        self.values.limits.max_replay_compressed_size.as_bytes()
2358    }
2359
2360    /// Returns the maximum payload size for an uncompressed replay.
2361    pub fn max_replay_uncompressed_size(&self) -> usize {
2362        self.values.limits.max_replay_uncompressed_size.as_bytes()
2363    }
2364
2365    /// Returns the maximum message size for an uncompressed replay.
2366    ///
2367    /// This is greater than max_replay_compressed_size because
2368    /// it can include additional metadata about the replay in
2369    /// addition to the recording.
2370    pub fn max_replay_message_size(&self) -> usize {
2371        self.values.limits.max_replay_message_size.as_bytes()
2372    }
2373
2374    /// Returns the maximum number of active requests
2375    pub fn max_concurrent_requests(&self) -> usize {
2376        self.values.limits.max_concurrent_requests
2377    }
2378
2379    /// Returns the maximum number of active queries
2380    pub fn max_concurrent_queries(&self) -> usize {
2381        self.values.limits.max_concurrent_queries
2382    }
2383
2384    /// The maximum number of seconds a query is allowed to take across retries.
2385    pub fn query_timeout(&self) -> Duration {
2386        Duration::from_secs(self.values.limits.query_timeout)
2387    }
2388
2389    /// The maximum number of seconds to wait for pending envelopes after receiving a shutdown
2390    /// signal.
2391    pub fn shutdown_timeout(&self) -> Duration {
2392        Duration::from_secs(self.values.limits.shutdown_timeout)
2393    }
2394
2395    /// Returns the server keep-alive timeout in seconds.
2396    ///
2397    /// By default keep alive is set to a 5 seconds.
2398    pub fn keepalive_timeout(&self) -> Duration {
2399        Duration::from_secs(self.values.limits.keepalive_timeout)
2400    }
2401
2402    /// Returns the server idle timeout in seconds.
2403    pub fn idle_timeout(&self) -> Option<Duration> {
2404        self.values.limits.idle_timeout.map(Duration::from_secs)
2405    }
2406
2407    /// Returns the maximum connections.
2408    pub fn max_connections(&self) -> Option<usize> {
2409        self.values.limits.max_connections
2410    }
2411
2412    /// TCP listen backlog to configure on Relay's listening socket.
2413    pub fn tcp_listen_backlog(&self) -> u32 {
2414        self.values.limits.tcp_listen_backlog
2415    }
2416
2417    /// Returns the number of cores to use for thread pools.
2418    pub fn cpu_concurrency(&self) -> usize {
2419        self.values.limits.max_thread_count
2420    }
2421
2422    /// Returns the number of tasks that can run concurrently in the worker pool.
2423    pub fn pool_concurrency(&self) -> usize {
2424        self.values.limits.max_pool_concurrency
2425    }
2426
2427    /// Returns the maximum size of a project config query.
2428    pub fn query_batch_size(&self) -> usize {
2429        self.values.cache.batch_size
2430    }
2431
2432    /// Get filename for static project config.
2433    pub fn project_configs_path(&self) -> PathBuf {
2434        self.path.join("projects")
2435    }
2436
2437    /// True if the Relay should do processing.
2438    pub fn processing_enabled(&self) -> bool {
2439        self.values.processing.enabled
2440    }
2441
2442    /// Level of normalization for Relay to apply to incoming data.
2443    pub fn normalization_level(&self) -> NormalizationLevel {
2444        self.values.normalization.level
2445    }
2446
2447    /// The path to the GeoIp database required for event processing.
2448    pub fn geoip_path(&self) -> Option<&Path> {
2449        self.values
2450            .geoip
2451            .path
2452            .as_deref()
2453            .or(self.values.processing.geoip_path.as_deref())
2454    }
2455
2456    /// Maximum future timestamp of ingested data.
2457    ///
2458    /// Events past this timestamp will be adjusted to `now()`. Sessions will be dropped.
2459    pub fn max_secs_in_future(&self) -> i64 {
2460        self.values.processing.max_secs_in_future.into()
2461    }
2462
2463    /// Maximum age of ingested sessions. Older sessions will be dropped.
2464    pub fn max_session_secs_in_past(&self) -> i64 {
2465        self.values.processing.max_session_secs_in_past.into()
2466    }
2467
2468    /// Configuration name and list of Kafka configuration parameters for a given topic.
2469    pub fn kafka_configs(
2470        &self,
2471        topic: KafkaTopic,
2472    ) -> Result<KafkaTopicConfig<'_>, KafkaConfigError> {
2473        self.values.processing.topics.get(topic).kafka_configs(
2474            &self.values.processing.kafka_config,
2475            &self.values.processing.secondary_kafka_configs,
2476        )
2477    }
2478
2479    /// Whether to validate the topics against Kafka.
2480    pub fn kafka_validate_topics(&self) -> bool {
2481        self.values.processing.kafka_validate_topics
2482    }
2483
2484    /// All unused but configured topic assignments.
2485    pub fn unused_topic_assignments(&self) -> &relay_kafka::Unused {
2486        &self.values.processing.topics.unused
2487    }
2488
2489    /// Redis servers to connect to for project configs, cardinality limits,
2490    /// rate limiting, and metrics metadata.
2491    pub fn redis(&self) -> Option<RedisConfigsRef<'_>> {
2492        let redis_configs = self.values.processing.redis.as_ref()?;
2493
2494        Some(build_redis_configs(
2495            redis_configs,
2496            self.cpu_concurrency() as u32,
2497        ))
2498    }
2499
2500    /// Chunk size of attachments in bytes.
2501    pub fn attachment_chunk_size(&self) -> usize {
2502        self.values.processing.attachment_chunk_size.as_bytes()
2503    }
2504
2505    /// Maximum metrics batch size in bytes.
2506    pub fn metrics_max_batch_size_bytes(&self) -> usize {
2507        self.values.aggregator.max_flush_bytes
2508    }
2509
2510    /// Default prefix to use when looking up project configs in Redis. This is only done when
2511    /// Relay is in processing mode.
2512    pub fn projectconfig_cache_prefix(&self) -> &str {
2513        &self.values.processing.projectconfig_cache_prefix
2514    }
2515
2516    /// Maximum rate limit to report to clients in seconds.
2517    pub fn max_rate_limit(&self) -> Option<u64> {
2518        self.values.processing.max_rate_limit.map(u32::into)
2519    }
2520
2521    /// Cache vacuum interval for the cardinality limiter in memory cache.
2522    ///
2523    /// The cache will scan for expired values based on this interval.
2524    pub fn cardinality_limiter_cache_vacuum_interval(&self) -> Duration {
2525        Duration::from_secs(self.values.cardinality_limiter.cache_vacuum_interval)
2526    }
2527
2528    /// Interval to refresh internal health checks.
2529    pub fn health_refresh_interval(&self) -> Duration {
2530        Duration::from_millis(self.values.health.refresh_interval_ms)
2531    }
2532
2533    /// Maximum memory watermark in bytes.
2534    pub fn health_max_memory_watermark_bytes(&self) -> u64 {
2535        self.values
2536            .health
2537            .max_memory_bytes
2538            .as_ref()
2539            .map_or(u64::MAX, |b| b.as_bytes() as u64)
2540    }
2541
2542    /// Maximum memory watermark as a percentage of maximum system memory.
2543    pub fn health_max_memory_watermark_percent(&self) -> f32 {
2544        self.values.health.max_memory_percent
2545    }
2546
2547    /// Health check probe timeout.
2548    pub fn health_probe_timeout(&self) -> Duration {
2549        Duration::from_millis(self.values.health.probe_timeout_ms)
2550    }
2551
2552    /// Refresh frequency for polling new memory stats.
2553    pub fn memory_stat_refresh_frequency_ms(&self) -> u64 {
2554        self.values.health.memory_stat_refresh_frequency_ms
2555    }
2556
2557    /// Maximum amount of COGS measurements buffered in memory.
2558    pub fn cogs_max_queue_size(&self) -> u64 {
2559        self.values.cogs.max_queue_size
2560    }
2561
2562    /// Resource ID to use for Relay COGS measurements.
2563    pub fn cogs_relay_resource_id(&self) -> &str {
2564        &self.values.cogs.relay_resource_id
2565    }
2566
2567    /// Returns configuration for the default metrics aggregator.
2568    pub fn default_aggregator_config(&self) -> &AggregatorServiceConfig {
2569        &self.values.aggregator
2570    }
2571
2572    /// Returns configuration for non-default metrics aggregator.
2573    pub fn secondary_aggregator_configs(&self) -> &Vec<ScopedAggregatorConfig> {
2574        &self.values.secondary_aggregators
2575    }
2576
2577    /// Returns aggregator config for a given metrics namespace.
2578    pub fn aggregator_config_for(&self, namespace: MetricNamespace) -> &AggregatorServiceConfig {
2579        for entry in &self.values.secondary_aggregators {
2580            if entry.condition.matches(Some(namespace)) {
2581                return &entry.config;
2582            }
2583        }
2584        &self.values.aggregator
2585    }
2586
2587    /// Return the statically configured Relays.
2588    pub fn static_relays(&self) -> &HashMap<RelayId, RelayInfo> {
2589        &self.values.auth.static_relays
2590    }
2591
2592    /// Returns the max age a signature is considered valid, in seconds.
2593    pub fn signature_max_age(&self) -> Duration {
2594        Duration::from_secs(self.values.auth.signature_max_age)
2595    }
2596
2597    /// Returns `true` if unknown items should be accepted and forwarded.
2598    pub fn accept_unknown_items(&self) -> bool {
2599        let forward = self.values.routing.accept_unknown_items;
2600        forward.unwrap_or_else(|| !self.processing_enabled())
2601    }
2602}
2603
2604impl Default for Config {
2605    fn default() -> Self {
2606        Self {
2607            values: ConfigValues::default(),
2608            credentials: None,
2609            path: PathBuf::new(),
2610        }
2611    }
2612}
2613
2614#[cfg(test)]
2615mod tests {
2616
2617    use super::*;
2618
2619    /// Regression test for renaming the envelope buffer flags.
2620    #[test]
2621    fn test_event_buffer_size() {
2622        let yaml = r###"
2623cache:
2624    event_buffer_size: 1000000
2625    event_expiry: 1800
2626"###;
2627
2628        let values: ConfigValues = serde_yaml::from_str(yaml).unwrap();
2629        assert_eq!(values.cache.envelope_buffer_size, 1_000_000);
2630        assert_eq!(values.cache.envelope_expiry, 1800);
2631    }
2632
2633    #[test]
2634    fn test_emit_outcomes() {
2635        for (serialized, deserialized) in &[
2636            ("true", EmitOutcomes::AsOutcomes),
2637            ("false", EmitOutcomes::None),
2638            ("\"as_client_reports\"", EmitOutcomes::AsClientReports),
2639        ] {
2640            let value: EmitOutcomes = serde_json::from_str(serialized).unwrap();
2641            assert_eq!(value, *deserialized);
2642            assert_eq!(serde_json::to_string(&value).unwrap(), *serialized);
2643        }
2644    }
2645
2646    #[test]
2647    fn test_emit_outcomes_invalid() {
2648        assert!(serde_json::from_str::<EmitOutcomes>("asdf").is_err());
2649    }
2650}