relay_config/
config.rs

1use std::collections::{BTreeMap, HashMap};
2use std::error::Error;
3use std::io::Write;
4use std::net::{IpAddr, SocketAddr, ToSocketAddrs};
5use std::num::NonZeroU8;
6use std::path::{Path, PathBuf};
7use std::str::FromStr;
8use std::time::Duration;
9use std::{env, fmt, fs, io};
10
11use anyhow::Context;
12use relay_auth::{generate_key_pair, generate_relay_id, PublicKey, RelayId, SecretKey};
13use relay_common::Dsn;
14use relay_kafka::{
15    ConfigError as KafkaConfigError, KafkaConfigParam, KafkaParams, KafkaTopic, TopicAssignment,
16    TopicAssignments,
17};
18use relay_metrics::MetricNamespace;
19use serde::de::{DeserializeOwned, Unexpected, Visitor};
20use serde::{Deserialize, Deserializer, Serialize, Serializer};
21use uuid::Uuid;
22
23use crate::aggregator::{AggregatorServiceConfig, ScopedAggregatorConfig};
24use crate::byte_size::ByteSize;
25use crate::upstream::UpstreamDescriptor;
26use crate::{build_redis_configs, RedisConfig, RedisConfigs, RedisConfigsRef};
27
28const DEFAULT_NETWORK_OUTAGE_GRACE_PERIOD: u64 = 10;
29
30static CONFIG_YAML_HEADER: &str = r###"# Please see the relevant documentation.
31# Performance tuning: https://docs.sentry.io/product/relay/operating-guidelines/
32# All config options: https://docs.sentry.io/product/relay/options/
33"###;
34
35/// Indicates config related errors.
36#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
37#[non_exhaustive]
38pub enum ConfigErrorKind {
39    /// Failed to open the file.
40    CouldNotOpenFile,
41    /// Failed to save a file.
42    CouldNotWriteFile,
43    /// Parsing YAML failed.
44    BadYaml,
45    /// Parsing JSON failed.
46    BadJson,
47    /// Invalid config value
48    InvalidValue,
49    /// The user attempted to run Relay with processing enabled, but uses a binary that was
50    /// compiled without the processing feature.
51    ProcessingNotAvailable,
52}
53
54impl fmt::Display for ConfigErrorKind {
55    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
56        match self {
57            Self::CouldNotOpenFile => write!(f, "could not open config file"),
58            Self::CouldNotWriteFile => write!(f, "could not write config file"),
59            Self::BadYaml => write!(f, "could not parse yaml config file"),
60            Self::BadJson => write!(f, "could not parse json config file"),
61            Self::InvalidValue => write!(f, "invalid config value"),
62            Self::ProcessingNotAvailable => write!(
63                f,
64                "was not compiled with processing, cannot enable processing"
65            ),
66        }
67    }
68}
69
70/// Defines the source of a config error
71#[derive(Debug)]
72enum ConfigErrorSource {
73    /// An error occurring independently.
74    None,
75    /// An error originating from a configuration file.
76    File(PathBuf),
77    /// An error originating in a field override (an env var, or a CLI parameter).
78    FieldOverride(String),
79}
80
81impl Default for ConfigErrorSource {
82    fn default() -> Self {
83        Self::None
84    }
85}
86
87impl fmt::Display for ConfigErrorSource {
88    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
89        match self {
90            ConfigErrorSource::None => Ok(()),
91            ConfigErrorSource::File(file_name) => {
92                write!(f, " (file {})", file_name.display())
93            }
94            ConfigErrorSource::FieldOverride(name) => write!(f, " (field {name})"),
95        }
96    }
97}
98
99/// Indicates config related errors.
100#[derive(Debug)]
101pub struct ConfigError {
102    source: ConfigErrorSource,
103    kind: ConfigErrorKind,
104}
105
106impl ConfigError {
107    #[inline]
108    fn new(kind: ConfigErrorKind) -> Self {
109        Self {
110            source: ConfigErrorSource::None,
111            kind,
112        }
113    }
114
115    #[inline]
116    fn field(field: &'static str) -> Self {
117        Self {
118            source: ConfigErrorSource::FieldOverride(field.to_owned()),
119            kind: ConfigErrorKind::InvalidValue,
120        }
121    }
122
123    #[inline]
124    fn file(kind: ConfigErrorKind, p: impl AsRef<Path>) -> Self {
125        Self {
126            source: ConfigErrorSource::File(p.as_ref().to_path_buf()),
127            kind,
128        }
129    }
130
131    /// Returns the error kind of the error.
132    pub fn kind(&self) -> ConfigErrorKind {
133        self.kind
134    }
135}
136
137impl fmt::Display for ConfigError {
138    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
139        write!(f, "{}{}", self.kind(), self.source)
140    }
141}
142
143impl Error for ConfigError {}
144
145enum ConfigFormat {
146    Yaml,
147    Json,
148}
149
150impl ConfigFormat {
151    pub fn extension(&self) -> &'static str {
152        match self {
153            ConfigFormat::Yaml => "yml",
154            ConfigFormat::Json => "json",
155        }
156    }
157}
158
159trait ConfigObject: DeserializeOwned + Serialize {
160    /// The format in which to serialize this configuration.
161    fn format() -> ConfigFormat;
162
163    /// The basename of the config file.
164    fn name() -> &'static str;
165
166    /// The full filename of the config file, including the file extension.
167    fn path(base: &Path) -> PathBuf {
168        base.join(format!("{}.{}", Self::name(), Self::format().extension()))
169    }
170
171    /// Loads the config file from a file within the given directory location.
172    fn load(base: &Path) -> anyhow::Result<Self> {
173        let path = Self::path(base);
174
175        let f = fs::File::open(&path)
176            .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotOpenFile, &path))?;
177
178        match Self::format() {
179            ConfigFormat::Yaml => serde_yaml::from_reader(io::BufReader::new(f))
180                .with_context(|| ConfigError::file(ConfigErrorKind::BadYaml, &path)),
181            ConfigFormat::Json => serde_json::from_reader(io::BufReader::new(f))
182                .with_context(|| ConfigError::file(ConfigErrorKind::BadJson, &path)),
183        }
184    }
185
186    /// Writes the configuration to a file within the given directory location.
187    fn save(&self, base: &Path) -> anyhow::Result<()> {
188        let path = Self::path(base);
189        let mut options = fs::OpenOptions::new();
190        options.write(true).truncate(true).create(true);
191
192        // Remove all non-user permissions for the newly created file
193        #[cfg(unix)]
194        {
195            use std::os::unix::fs::OpenOptionsExt;
196            options.mode(0o600);
197        }
198
199        let mut f = options
200            .open(&path)
201            .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path))?;
202
203        match Self::format() {
204            ConfigFormat::Yaml => {
205                f.write_all(CONFIG_YAML_HEADER.as_bytes())?;
206                serde_yaml::to_writer(&mut f, self)
207                    .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path))?
208            }
209            ConfigFormat::Json => serde_json::to_writer_pretty(&mut f, self)
210                .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path))?,
211        }
212
213        f.write_all(b"\n").ok();
214
215        Ok(())
216    }
217}
218
219/// Structure used to hold information about configuration overrides via
220/// CLI parameters or environment variables
221#[derive(Debug, Default)]
222pub struct OverridableConfig {
223    /// The operation mode of this relay.
224    pub mode: Option<String>,
225    /// The instance type of this relay.
226    pub instance: Option<String>,
227    /// The log level of this relay.
228    pub log_level: Option<String>,
229    /// The log format of this relay.
230    pub log_format: Option<String>,
231    /// The upstream relay or sentry instance.
232    pub upstream: Option<String>,
233    /// Alternate upstream provided through a Sentry DSN. Key and project will be ignored.
234    pub upstream_dsn: Option<String>,
235    /// The host the relay should bind to (network interface).
236    pub host: Option<String>,
237    /// The port to bind for the unencrypted relay HTTP server.
238    pub port: Option<String>,
239    /// "true" if processing is enabled "false" otherwise
240    pub processing: Option<String>,
241    /// the kafka bootstrap.servers configuration string
242    pub kafka_url: Option<String>,
243    /// the redis server url
244    pub redis_url: Option<String>,
245    /// The globally unique ID of the relay.
246    pub id: Option<String>,
247    /// The secret key of the relay
248    pub secret_key: Option<String>,
249    /// The public key of the relay
250    pub public_key: Option<String>,
251    /// Outcome source
252    pub outcome_source: Option<String>,
253    /// shutdown timeout
254    pub shutdown_timeout: Option<String>,
255    /// Server name reported in the Sentry SDK.
256    pub server_name: Option<String>,
257}
258
259/// The relay credentials
260#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
261pub struct Credentials {
262    /// The secret key of the relay
263    pub secret_key: SecretKey,
264    /// The public key of the relay
265    pub public_key: PublicKey,
266    /// The globally unique ID of the relay.
267    pub id: RelayId,
268}
269
270impl Credentials {
271    /// Generates new random credentials.
272    pub fn generate() -> Self {
273        relay_log::info!("generating new relay credentials");
274        let (sk, pk) = generate_key_pair();
275        Self {
276            secret_key: sk,
277            public_key: pk,
278            id: generate_relay_id(),
279        }
280    }
281
282    /// Serializes this configuration to JSON.
283    pub fn to_json_string(&self) -> anyhow::Result<String> {
284        serde_json::to_string(self)
285            .with_context(|| ConfigError::new(ConfigErrorKind::CouldNotWriteFile))
286    }
287}
288
289impl ConfigObject for Credentials {
290    fn format() -> ConfigFormat {
291        ConfigFormat::Json
292    }
293    fn name() -> &'static str {
294        "credentials"
295    }
296}
297
298/// Information on a downstream Relay.
299#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
300#[serde(rename_all = "camelCase")]
301pub struct RelayInfo {
302    /// The public key that this Relay uses to authenticate and sign requests.
303    pub public_key: PublicKey,
304
305    /// Marks an internal relay that has privileged access to more project configuration.
306    #[serde(default)]
307    pub internal: bool,
308}
309
310impl RelayInfo {
311    /// Creates a new RelayInfo
312    pub fn new(public_key: PublicKey) -> Self {
313        Self {
314            public_key,
315            internal: false,
316        }
317    }
318}
319
320/// The operation mode of a relay.
321#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize)]
322#[serde(rename_all = "camelCase")]
323pub enum RelayMode {
324    /// This relay acts as a proxy for all requests and events.
325    ///
326    /// Events are normalized and rate limits from the upstream are enforced, but the relay will not
327    /// fetch project configurations from the upstream or perform PII stripping. All events are
328    /// accepted unless overridden on the file system.
329    Proxy,
330
331    /// This relay is configured statically in the file system.
332    ///
333    /// Events are only accepted for projects configured statically in the file system. All other
334    /// events are rejected. If configured, PII stripping is also performed on those events.
335    Static,
336
337    /// Project configurations are managed by the upstream.
338    ///
339    /// Project configurations are always fetched from the upstream, unless they are statically
340    /// overridden in the file system. This relay must be allowed in the upstream Sentry. This is
341    /// only possible, if the upstream is Sentry directly, or another managed Relay.
342    Managed,
343
344    /// Events are held in memory for inspection only.
345    ///
346    /// This mode is used for testing sentry SDKs.
347    Capture,
348}
349
350impl fmt::Display for RelayMode {
351    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
352        match self {
353            RelayMode::Proxy => write!(f, "proxy"),
354            RelayMode::Static => write!(f, "static"),
355            RelayMode::Managed => write!(f, "managed"),
356            RelayMode::Capture => write!(f, "capture"),
357        }
358    }
359}
360
361/// The instance type of Relay.
362#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize)]
363#[serde(rename_all = "camelCase")]
364pub enum RelayInstance {
365    /// This Relay is run as a default instance.
366    Default,
367
368    /// This Relay is run as a canary instance where experiments can be run.
369    Canary,
370}
371
372impl RelayInstance {
373    /// Returns `true` if the [`RelayInstance`] is of type [`RelayInstance::Canary`].
374    pub fn is_canary(&self) -> bool {
375        matches!(self, RelayInstance::Canary)
376    }
377}
378
379impl fmt::Display for RelayInstance {
380    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
381        match self {
382            RelayInstance::Default => write!(f, "default"),
383            RelayInstance::Canary => write!(f, "canary"),
384        }
385    }
386}
387
388impl FromStr for RelayInstance {
389    type Err = fmt::Error;
390
391    fn from_str(s: &str) -> Result<Self, Self::Err> {
392        match s {
393            "canary" => Ok(RelayInstance::Canary),
394            _ => Ok(RelayInstance::Default),
395        }
396    }
397}
398
399/// Error returned when parsing an invalid [`RelayMode`].
400#[derive(Clone, Copy, Debug, Eq, PartialEq)]
401pub struct ParseRelayModeError;
402
403impl fmt::Display for ParseRelayModeError {
404    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
405        write!(
406            f,
407            "Relay mode must be one of: managed, static, proxy, capture"
408        )
409    }
410}
411
412impl Error for ParseRelayModeError {}
413
414impl FromStr for RelayMode {
415    type Err = ParseRelayModeError;
416
417    fn from_str(s: &str) -> Result<Self, Self::Err> {
418        match s {
419            "proxy" => Ok(RelayMode::Proxy),
420            "static" => Ok(RelayMode::Static),
421            "managed" => Ok(RelayMode::Managed),
422            "capture" => Ok(RelayMode::Capture),
423            _ => Err(ParseRelayModeError),
424        }
425    }
426}
427
428/// Returns `true` if this value is equal to `Default::default()`.
429fn is_default<T: Default + PartialEq>(t: &T) -> bool {
430    *t == T::default()
431}
432
433/// Checks if we are running in docker.
434fn is_docker() -> bool {
435    if fs::metadata("/.dockerenv").is_ok() {
436        return true;
437    }
438
439    fs::read_to_string("/proc/self/cgroup").is_ok_and(|s| s.contains("/docker"))
440}
441
442/// Default value for the "bind" configuration.
443fn default_host() -> IpAddr {
444    if is_docker() {
445        // Docker images rely on this service being exposed
446        "0.0.0.0".parse().unwrap()
447    } else {
448        "127.0.0.1".parse().unwrap()
449    }
450}
451
452/// Controls responses from the readiness health check endpoint based on authentication.
453///
454/// Independent of the the readiness condition, shutdown always switches Relay into unready state.
455#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize)]
456#[serde(rename_all = "lowercase")]
457pub enum ReadinessCondition {
458    /// (default) Relay is ready when authenticated and connected to the upstream.
459    ///
460    /// Before authentication has succeeded and during network outages, Relay responds as not ready.
461    /// Relay reauthenticates based on the `http.auth_interval` parameter. During reauthentication,
462    /// Relay remains ready until authentication fails.
463    ///
464    /// Authentication is only required for Relays in managed mode. Other Relays will only check for
465    /// network outages.
466    Authenticated,
467    /// Relay reports readiness regardless of the authentication and networking state.
468    Always,
469}
470
471impl Default for ReadinessCondition {
472    fn default() -> Self {
473        Self::Authenticated
474    }
475}
476
477/// Relay specific configuration values.
478#[derive(Serialize, Deserialize, Debug)]
479#[serde(default)]
480pub struct Relay {
481    /// The operation mode of this relay.
482    pub mode: RelayMode,
483    /// The instance type of this relay.
484    pub instance: RelayInstance,
485    /// The upstream relay or sentry instance.
486    pub upstream: UpstreamDescriptor<'static>,
487    /// The host the relay should bind to (network interface).
488    pub host: IpAddr,
489    /// The port to bind for the unencrypted relay HTTP server.
490    pub port: u16,
491    /// Optional port to bind for the encrypted relay HTTPS server.
492    #[serde(skip_serializing)]
493    pub tls_port: Option<u16>,
494    /// The path to the identity (DER-encoded PKCS12) to use for TLS.
495    #[serde(skip_serializing)]
496    pub tls_identity_path: Option<PathBuf>,
497    /// Password for the PKCS12 archive.
498    #[serde(skip_serializing)]
499    pub tls_identity_password: Option<String>,
500    /// Always override project IDs from the URL and DSN with the identifier used at the upstream.
501    ///
502    /// Enable this setting for Relays used to redirect traffic to a migrated Sentry instance.
503    /// Validation of project identifiers can be safely skipped in these cases.
504    #[serde(skip_serializing_if = "is_default")]
505    pub override_project_ids: bool,
506}
507
508impl Default for Relay {
509    fn default() -> Self {
510        Relay {
511            mode: RelayMode::Managed,
512            instance: RelayInstance::Default,
513            upstream: "https://sentry.io/".parse().unwrap(),
514            host: default_host(),
515            port: 3000,
516            tls_port: None,
517            tls_identity_path: None,
518            tls_identity_password: None,
519            override_project_ids: false,
520        }
521    }
522}
523
524/// Control the metrics.
525#[derive(Serialize, Deserialize, Debug)]
526#[serde(default)]
527pub struct Metrics {
528    /// Hostname and port of the statsd server.
529    ///
530    /// Defaults to `None`.
531    pub statsd: Option<String>,
532    /// Common prefix that should be added to all metrics.
533    ///
534    /// Defaults to `"sentry.relay"`.
535    pub prefix: String,
536    /// Default tags to apply to all metrics.
537    pub default_tags: BTreeMap<String, String>,
538    /// Tag name to report the hostname to for each metric. Defaults to not sending such a tag.
539    pub hostname_tag: Option<String>,
540    /// Global sample rate for all emitted metrics between `0.0` and `1.0`.
541    ///
542    /// For example, a value of `0.3` means that only 30% of the emitted metrics will be sent.
543    /// Defaults to `1.0` (100%).
544    pub sample_rate: f32,
545    /// Interval for periodic metrics emitted from Relay.
546    ///
547    /// Setting it to `0` seconds disables the periodic metrics.
548    /// Defaults to 5 seconds.
549    pub periodic_secs: u64,
550    /// Whether local metric aggregation using statdsproxy should be enabled.
551    ///
552    /// Defaults to `true`.
553    pub aggregate: bool,
554}
555
556impl Default for Metrics {
557    fn default() -> Self {
558        Metrics {
559            statsd: None,
560            prefix: "sentry.relay".into(),
561            default_tags: BTreeMap::new(),
562            hostname_tag: None,
563            sample_rate: 1.0,
564            periodic_secs: 5,
565            aggregate: true,
566        }
567    }
568}
569
570/// Controls processing of Sentry metrics and metric metadata.
571#[derive(Serialize, Deserialize, Debug, Default)]
572#[serde(default)]
573pub struct SentryMetrics {
574    /// Whether metric stats are collected and emitted.
575    ///
576    /// Metric stats are always collected and emitted when processing
577    /// is enabled.
578    ///
579    /// This option is required for running multiple trusted Relays in a chain
580    /// and you want the metric stats to be collected and forwarded from
581    /// the first Relay in the chain.
582    ///
583    /// Defaults to `false`.
584    pub metric_stats_enabled: bool,
585}
586
587/// Controls various limits
588#[derive(Serialize, Deserialize, Debug)]
589#[serde(default)]
590pub struct Limits {
591    /// How many requests can be sent concurrently from Relay to the upstream before Relay starts
592    /// buffering.
593    pub max_concurrent_requests: usize,
594    /// How many queries can be sent concurrently from Relay to the upstream before Relay starts
595    /// buffering.
596    ///
597    /// The concurrency of queries is additionally constrained by `max_concurrent_requests`.
598    pub max_concurrent_queries: usize,
599    /// The maximum payload size for events.
600    pub max_event_size: ByteSize,
601    /// The maximum size for each attachment.
602    pub max_attachment_size: ByteSize,
603    /// The maximum combined size for all attachments in an envelope or request.
604    pub max_attachments_size: ByteSize,
605    /// The maximum combined size for all client reports in an envelope or request.
606    pub max_client_reports_size: ByteSize,
607    /// The maximum payload size for a monitor check-in.
608    pub max_check_in_size: ByteSize,
609    /// The maximum payload size for an entire envelopes. Individual limits still apply.
610    pub max_envelope_size: ByteSize,
611    /// The maximum number of session items per envelope.
612    pub max_session_count: usize,
613    /// The maximum payload size for general API requests.
614    pub max_api_payload_size: ByteSize,
615    /// The maximum payload size for file uploads and chunks.
616    pub max_api_file_upload_size: ByteSize,
617    /// The maximum payload size for chunks
618    pub max_api_chunk_upload_size: ByteSize,
619    /// The maximum payload size for a profile
620    pub max_profile_size: ByteSize,
621    /// The maximum payload size for a span.
622    pub max_log_size: ByteSize,
623    /// The maximum payload size for a span.
624    pub max_span_size: ByteSize,
625    /// The maximum payload size for a statsd metric.
626    pub max_statsd_size: ByteSize,
627    /// The maximum payload size for metric buckets.
628    pub max_metric_buckets_size: ByteSize,
629    /// The maximum payload size for a compressed replay.
630    pub max_replay_compressed_size: ByteSize,
631    /// The maximum payload size for an uncompressed replay.
632    #[serde(alias = "max_replay_size")]
633    max_replay_uncompressed_size: ByteSize,
634    /// The maximum size for a replay recording Kafka message.
635    pub max_replay_message_size: ByteSize,
636    /// The maximum number of threads to spawn for CPU and web work, each.
637    ///
638    /// The total number of threads spawned will roughly be `2 * max_thread_count`. Defaults to
639    /// the number of logical CPU cores on the host.
640    pub max_thread_count: usize,
641    /// Controls the maximum concurrency of each worker thread.
642    ///
643    /// Increasing the concurrency, can lead to a better utilization of worker threads by
644    /// increasing the amount of I/O done concurrently.
645    //
646    /// Currently has no effect on defaults to `1`.
647    pub max_pool_concurrency: usize,
648    /// The maximum number of seconds a query is allowed to take across retries. Individual requests
649    /// have lower timeouts. Defaults to 30 seconds.
650    pub query_timeout: u64,
651    /// The maximum number of seconds to wait for pending envelopes after receiving a shutdown
652    /// signal.
653    pub shutdown_timeout: u64,
654    /// Server keep-alive timeout in seconds.
655    ///
656    /// By default, keep-alive is set to 5 seconds.
657    pub keepalive_timeout: u64,
658    /// Server idle timeout in seconds.
659    ///
660    /// The idle timeout limits the amount of time a connection is kept open without activity.
661    /// Setting this too short may abort connections before Relay is able to send a response.
662    ///
663    /// By default there is no idle timeout.
664    pub idle_timeout: Option<u64>,
665    /// Sets the maximum number of concurrent connections.
666    ///
667    /// Upon reaching the limit, the server will stop accepting connections.
668    ///
669    /// By default there is no limit.
670    pub max_connections: Option<usize>,
671    /// The TCP listen backlog.
672    ///
673    /// Configures the TCP listen backlog for the listening socket of Relay.
674    /// See [`man listen(2)`](https://man7.org/linux/man-pages/man2/listen.2.html)
675    /// for a more detailed description of the listen backlog.
676    ///
677    /// Defaults to `1024`, a value [google has been using for a long time](https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/commit/?id=19f92a030ca6d772ab44b22ee6a01378a8cb32d4).
678    pub tcp_listen_backlog: u32,
679}
680
681impl Default for Limits {
682    fn default() -> Self {
683        Limits {
684            max_concurrent_requests: 100,
685            max_concurrent_queries: 5,
686            max_event_size: ByteSize::mebibytes(1),
687            max_attachment_size: ByteSize::mebibytes(100),
688            max_attachments_size: ByteSize::mebibytes(100),
689            max_client_reports_size: ByteSize::kibibytes(4),
690            max_check_in_size: ByteSize::kibibytes(100),
691            max_envelope_size: ByteSize::mebibytes(100),
692            max_session_count: 100,
693            max_api_payload_size: ByteSize::mebibytes(20),
694            max_api_file_upload_size: ByteSize::mebibytes(40),
695            max_api_chunk_upload_size: ByteSize::mebibytes(100),
696            max_profile_size: ByteSize::mebibytes(50),
697            max_log_size: ByteSize::mebibytes(1),
698            max_span_size: ByteSize::mebibytes(1),
699            max_statsd_size: ByteSize::mebibytes(1),
700            max_metric_buckets_size: ByteSize::mebibytes(1),
701            max_replay_compressed_size: ByteSize::mebibytes(10),
702            max_replay_uncompressed_size: ByteSize::mebibytes(100),
703            max_replay_message_size: ByteSize::mebibytes(15),
704            max_thread_count: num_cpus::get(),
705            max_pool_concurrency: 1,
706            query_timeout: 30,
707            shutdown_timeout: 10,
708            keepalive_timeout: 5,
709            idle_timeout: None,
710            max_connections: None,
711            tcp_listen_backlog: 1024,
712        }
713    }
714}
715
716/// Controls traffic steering.
717#[derive(Debug, Default, Deserialize, Serialize)]
718#[serde(default)]
719pub struct Routing {
720    /// Accept and forward unknown Envelope items to the upstream.
721    ///
722    /// Forwarding unknown items should be enabled in most cases to allow proxying traffic for newer
723    /// SDK versions. The upstream in Sentry makes the final decision on which items are valid. If
724    /// this is disabled, just the unknown items are removed from Envelopes, and the rest is
725    /// processed as usual.
726    ///
727    /// Defaults to `true` for all Relay modes other than processing mode. In processing mode, this
728    /// is disabled by default since the item cannot be handled.
729    pub accept_unknown_items: Option<bool>,
730}
731
732/// Http content encoding for both incoming and outgoing web requests.
733#[derive(Clone, Copy, Debug, Default, Deserialize, Serialize)]
734#[serde(rename_all = "lowercase")]
735pub enum HttpEncoding {
736    /// Identity function without no compression.
737    ///
738    /// This is the default encoding and does not require the presence of the `content-encoding`
739    /// HTTP header.
740    #[default]
741    Identity,
742    /// Compression using a [zlib](https://en.wikipedia.org/wiki/Zlib) structure with
743    /// [deflate](https://en.wikipedia.org/wiki/DEFLATE) encoding.
744    ///
745    /// These structures are defined in [RFC 1950](https://datatracker.ietf.org/doc/html/rfc1950)
746    /// and [RFC 1951](https://datatracker.ietf.org/doc/html/rfc1951).
747    Deflate,
748    /// A format using the [Lempel-Ziv coding](https://en.wikipedia.org/wiki/LZ77_and_LZ78#LZ77)
749    /// (LZ77), with a 32-bit CRC.
750    ///
751    /// This is the original format of the UNIX gzip program. The HTTP/1.1 standard also recommends
752    /// that the servers supporting this content-encoding should recognize `x-gzip` as an alias, for
753    /// compatibility purposes.
754    Gzip,
755    /// A format using the [Brotli](https://en.wikipedia.org/wiki/Brotli) algorithm.
756    Br,
757    /// A format using the [Zstd](https://en.wikipedia.org/wiki/Zstd) compression algorithm.
758    Zstd,
759}
760
761impl HttpEncoding {
762    /// Parses a [`HttpEncoding`] from its `content-encoding` header value.
763    pub fn parse(str: &str) -> Self {
764        let str = str.trim();
765        if str.eq_ignore_ascii_case("zstd") {
766            Self::Zstd
767        } else if str.eq_ignore_ascii_case("br") {
768            Self::Br
769        } else if str.eq_ignore_ascii_case("gzip") || str.eq_ignore_ascii_case("x-gzip") {
770            Self::Gzip
771        } else if str.eq_ignore_ascii_case("deflate") {
772            Self::Deflate
773        } else {
774            Self::Identity
775        }
776    }
777
778    /// Returns the value for the `content-encoding` HTTP header.
779    ///
780    /// Returns `None` for [`Identity`](Self::Identity), and `Some` for other encodings.
781    pub fn name(&self) -> Option<&'static str> {
782        match self {
783            Self::Identity => None,
784            Self::Deflate => Some("deflate"),
785            Self::Gzip => Some("gzip"),
786            Self::Br => Some("br"),
787            Self::Zstd => Some("zstd"),
788        }
789    }
790}
791
792/// Controls authentication with upstream.
793#[derive(Serialize, Deserialize, Debug)]
794#[serde(default)]
795pub struct Http {
796    /// Timeout for upstream requests in seconds.
797    ///
798    /// This timeout covers the time from sending the request until receiving response headers.
799    /// Neither the connection process and handshakes, nor reading the response body is covered in
800    /// this timeout.
801    pub timeout: u32,
802    /// Timeout for establishing connections with the upstream in seconds.
803    ///
804    /// This includes SSL handshakes. Relay reuses connections when the upstream supports connection
805    /// keep-alive. Connections are retained for a maximum 75 seconds, or 15 seconds of inactivity.
806    pub connection_timeout: u32,
807    /// Maximum interval between failed request retries in seconds.
808    pub max_retry_interval: u32,
809    /// The custom HTTP Host header to send to the upstream.
810    pub host_header: Option<String>,
811    /// The interval in seconds at which Relay attempts to reauthenticate with the upstream server.
812    ///
813    /// Re-authentication happens even when Relay is idle. If authentication fails, Relay reverts
814    /// back into startup mode and tries to establish a connection. During this time, incoming
815    /// envelopes will be buffered.
816    ///
817    /// Defaults to `600` (10 minutes).
818    pub auth_interval: Option<u64>,
819    /// The maximum time of experiencing uninterrupted network failures until Relay considers that
820    /// it has encountered a network outage in seconds.
821    ///
822    /// During a network outage relay will try to reconnect and will buffer all upstream messages
823    /// until it manages to reconnect.
824    pub outage_grace_period: u64,
825    /// The time Relay waits before retrying an upstream request, in seconds.
826    ///
827    /// This time is only used before going into a network outage mode.
828    pub retry_delay: u64,
829    /// The interval in seconds for continued failed project fetches at which Relay will error.
830    ///
831    /// A successful fetch resets this interval. Relay does nothing during long
832    /// times without emitting requests.
833    pub project_failure_interval: u64,
834    /// Content encoding to apply to upstream store requests.
835    ///
836    /// By default, Relay applies `zstd` content encoding to compress upstream requests. Compression
837    /// can be disabled to reduce CPU consumption, but at the expense of increased network traffic.
838    ///
839    /// This setting applies to all store requests of SDK data, including events, transactions,
840    /// envelopes and sessions. At the moment, this does not apply to Relay's internal queries.
841    ///
842    /// Available options are:
843    ///
844    ///  - `identity`: Disables compression.
845    ///  - `deflate`: Compression using a zlib header with deflate encoding.
846    ///  - `gzip` (default): Compression using gzip.
847    ///  - `br`: Compression using the brotli algorithm.
848    ///  - `zstd`: Compression using the zstd algorithm.
849    pub encoding: HttpEncoding,
850    /// Submit metrics globally through a shared endpoint.
851    ///
852    /// As opposed to regular envelopes which are sent to an endpoint inferred from the project's
853    /// DSN, this submits metrics to the global endpoint with Relay authentication.
854    ///
855    /// This option does not have any effect on processing mode.
856    pub global_metrics: bool,
857}
858
859impl Default for Http {
860    fn default() -> Self {
861        Http {
862            timeout: 5,
863            connection_timeout: 3,
864            max_retry_interval: 60, // 1 minute
865            host_header: None,
866            auth_interval: Some(600), // 10 minutes
867            outage_grace_period: DEFAULT_NETWORK_OUTAGE_GRACE_PERIOD,
868            retry_delay: default_retry_delay(),
869            project_failure_interval: default_project_failure_interval(),
870            encoding: HttpEncoding::Zstd,
871            global_metrics: false,
872        }
873    }
874}
875
876/// Default for unavailable upstream retry period, 1s.
877fn default_retry_delay() -> u64 {
878    1
879}
880
881/// Default for project failure interval, 90s.
882fn default_project_failure_interval() -> u64 {
883    90
884}
885
886/// Default for max disk size, 500 MB.
887fn spool_envelopes_max_disk_size() -> ByteSize {
888    ByteSize::mebibytes(500)
889}
890
891/// Default number of encoded envelope bytes to cache before writing to disk.
892fn spool_envelopes_batch_size_bytes() -> ByteSize {
893    ByteSize::kibibytes(10)
894}
895
896fn spool_envelopes_max_envelope_delay_secs() -> u64 {
897    24 * 60 * 60
898}
899
900/// Default refresh frequency in ms for the disk usage monitoring.
901fn spool_disk_usage_refresh_frequency_ms() -> u64 {
902    100
903}
904
905/// Default bounded buffer size for handling backpressure.
906fn spool_max_backpressure_envelopes() -> usize {
907    500
908}
909
910/// Default max memory usage for unspooling.
911fn spool_max_backpressure_memory_percent() -> f32 {
912    0.9
913}
914
915/// Default number of partitions for the buffer.
916fn spool_envelopes_partitions() -> NonZeroU8 {
917    NonZeroU8::new(1).unwrap()
918}
919
920/// Persistent buffering configuration for incoming envelopes.
921#[derive(Debug, Serialize, Deserialize)]
922pub struct EnvelopeSpool {
923    /// The path of the SQLite database file(s) which persist the data.
924    ///
925    /// Based on the number of partitions, more database files will be created within the same path.
926    ///
927    /// If not set, the envelopes will be buffered in memory.
928    pub path: Option<PathBuf>,
929    /// The maximum size of the buffer to keep, in bytes.
930    ///
931    /// When the on-disk buffer reaches this size, new envelopes will be dropped.
932    ///
933    /// Defaults to 500MB.
934    #[serde(default = "spool_envelopes_max_disk_size")]
935    pub max_disk_size: ByteSize,
936    /// Size of the batch of compressed envelopes that are spooled to disk at once.
937    ///
938    /// Note that this is the size after which spooling will be triggered but it does not guarantee
939    /// that exactly this size will be spooled, it can be greater or equal.
940    ///
941    /// Defaults to 10 KiB.
942    #[serde(default = "spool_envelopes_batch_size_bytes")]
943    pub batch_size_bytes: ByteSize,
944    /// Maximum time between receiving the envelope and processing it.
945    ///
946    /// When envelopes spend too much time in the buffer (e.g. because their project cannot be loaded),
947    /// they are dropped.
948    ///
949    /// Defaults to 24h.
950    #[serde(default = "spool_envelopes_max_envelope_delay_secs")]
951    pub max_envelope_delay_secs: u64,
952    /// The refresh frequency in ms of how frequently disk usage is updated by querying SQLite
953    /// internal page stats.
954    ///
955    /// Defaults to 100ms.
956    #[serde(default = "spool_disk_usage_refresh_frequency_ms")]
957    pub disk_usage_refresh_frequency_ms: u64,
958    /// The amount of envelopes that the envelope buffer can push to its output queue.
959    ///
960    /// Defaults to 500.
961    #[serde(default = "spool_max_backpressure_envelopes")]
962    pub max_backpressure_envelopes: usize,
963    /// The relative memory usage above which the buffer service will stop dequeueing envelopes.
964    ///
965    /// Only applies when [`Self::path`] is set.
966    ///
967    /// This value should be lower than [`Health::max_memory_percent`] to prevent flip-flopping.
968    ///
969    /// Warning: This threshold can cause the buffer service to deadlock when the buffer consumes
970    /// excessive memory (as influenced by [`Self::batch_size_bytes`]).
971    ///
972    /// This scenario arises when the buffer stops spooling due to reaching the
973    /// [`Self::max_backpressure_memory_percent`] limit, but the batch threshold for spooling
974    /// ([`Self::batch_size_bytes`]) is never reached. As a result, no data is spooled, memory usage
975    /// continues to grow, and the system becomes deadlocked.
976    ///
977    /// ### Example
978    /// Suppose the system has 1GB of available memory and is configured to spool only after
979    /// accumulating 10GB worth of envelopes. If Relay consumes 900MB of memory, it will stop
980    /// unspooling due to reaching the [`Self::max_backpressure_memory_percent`] threshold.
981    ///
982    /// However, because the buffer hasn't accumulated the 10GB needed to trigger spooling,
983    /// no data will be offloaded. Memory usage keeps increasing until it hits the
984    /// [`Health::max_memory_percent`] threshold, e.g., at 950MB. At this point:
985    ///
986    /// - No more envelopes are accepted.
987    /// - The buffer remains stuck, as unspooling won’t resume until memory drops below 900MB which
988    ///   will not happen.
989    /// - A deadlock occurs, with the system unable to recover without manual intervention.
990    ///
991    /// Defaults to 90% (5% less than max memory).
992    #[serde(default = "spool_max_backpressure_memory_percent")]
993    pub max_backpressure_memory_percent: f32,
994    /// Number of partitions of the buffer.
995    ///
996    /// A partition is a separate instance of the buffer which has its own isolated queue, stacks
997    /// and other resources.
998    ///
999    /// Defaults to 1.
1000    #[serde(default = "spool_envelopes_partitions")]
1001    pub partitions: NonZeroU8,
1002}
1003
1004impl Default for EnvelopeSpool {
1005    fn default() -> Self {
1006        Self {
1007            path: None,
1008            max_disk_size: spool_envelopes_max_disk_size(),
1009            batch_size_bytes: spool_envelopes_batch_size_bytes(),
1010            max_envelope_delay_secs: spool_envelopes_max_envelope_delay_secs(),
1011            disk_usage_refresh_frequency_ms: spool_disk_usage_refresh_frequency_ms(),
1012            max_backpressure_envelopes: spool_max_backpressure_envelopes(),
1013            max_backpressure_memory_percent: spool_max_backpressure_memory_percent(),
1014            partitions: spool_envelopes_partitions(),
1015        }
1016    }
1017}
1018
1019/// Persistent buffering configuration.
1020#[derive(Debug, Serialize, Deserialize, Default)]
1021pub struct Spool {
1022    /// Configuration for envelope spooling.
1023    #[serde(default)]
1024    pub envelopes: EnvelopeSpool,
1025}
1026
1027/// Controls internal caching behavior.
1028#[derive(Serialize, Deserialize, Debug)]
1029#[serde(default)]
1030pub struct Cache {
1031    /// The full project state will be requested by this Relay if set to `true`.
1032    pub project_request_full_config: bool,
1033    /// The cache timeout for project configurations in seconds.
1034    pub project_expiry: u32,
1035    /// Continue using project state this many seconds after cache expiry while a new state is
1036    /// being fetched. This is added on top of `project_expiry`.
1037    ///
1038    /// Default is 2 minutes.
1039    pub project_grace_period: u32,
1040    /// The cache timeout for downstream relay info (public keys) in seconds.
1041    pub relay_expiry: u32,
1042    /// Unused cache timeout for envelopes.
1043    ///
1044    /// The envelope buffer is instead controlled by `envelope_buffer_size`, which controls the
1045    /// maximum number of envelopes in the buffer. A time based configuration may be re-introduced
1046    /// at a later point.
1047    #[serde(alias = "event_expiry")]
1048    envelope_expiry: u32,
1049    /// The maximum amount of envelopes to queue before dropping them.
1050    #[serde(alias = "event_buffer_size")]
1051    envelope_buffer_size: u32,
1052    /// The cache timeout for non-existing entries.
1053    pub miss_expiry: u32,
1054    /// The buffer timeout for batched project config queries before sending them upstream in ms.
1055    pub batch_interval: u32,
1056    /// The buffer timeout for batched queries of downstream relays in ms. Defaults to 100ms.
1057    pub downstream_relays_batch_interval: u32,
1058    /// The maximum number of project configs to fetch from Sentry at once. Defaults to 500.
1059    ///
1060    /// `cache.batch_interval` controls how quickly batches are sent, this controls the batch size.
1061    pub batch_size: usize,
1062    /// Interval for watching local cache override files in seconds.
1063    pub file_interval: u32,
1064    /// Interval for fetching new global configs from the upstream, in seconds.
1065    pub global_config_fetch_interval: u32,
1066}
1067
1068impl Default for Cache {
1069    fn default() -> Self {
1070        Cache {
1071            project_request_full_config: false,
1072            project_expiry: 300,       // 5 minutes
1073            project_grace_period: 120, // 2 minutes
1074            relay_expiry: 3600,        // 1 hour
1075            envelope_expiry: 600,      // 10 minutes
1076            envelope_buffer_size: 1000,
1077            miss_expiry: 60,                       // 1 minute
1078            batch_interval: 100,                   // 100ms
1079            downstream_relays_batch_interval: 100, // 100ms
1080            batch_size: 500,
1081            file_interval: 10,                // 10 seconds
1082            global_config_fetch_interval: 10, // 10 seconds
1083        }
1084    }
1085}
1086
1087fn default_max_secs_in_future() -> u32 {
1088    60 // 1 minute
1089}
1090
1091fn default_max_session_secs_in_past() -> u32 {
1092    5 * 24 * 3600 // 5 days
1093}
1094
1095fn default_chunk_size() -> ByteSize {
1096    ByteSize::mebibytes(1)
1097}
1098
1099fn default_projectconfig_cache_prefix() -> String {
1100    "relayconfig".to_owned()
1101}
1102
1103#[allow(clippy::unnecessary_wraps)]
1104fn default_max_rate_limit() -> Option<u32> {
1105    Some(300) // 5 minutes
1106}
1107
1108/// Controls Sentry-internal event processing.
1109#[derive(Serialize, Deserialize, Debug)]
1110pub struct Processing {
1111    /// True if the Relay should do processing. Defaults to `false`.
1112    pub enabled: bool,
1113    /// GeoIp DB file source.
1114    #[serde(default)]
1115    pub geoip_path: Option<PathBuf>,
1116    /// Maximum future timestamp of ingested events.
1117    #[serde(default = "default_max_secs_in_future")]
1118    pub max_secs_in_future: u32,
1119    /// Maximum age of ingested sessions. Older sessions will be dropped.
1120    #[serde(default = "default_max_session_secs_in_past")]
1121    pub max_session_secs_in_past: u32,
1122    /// Kafka producer configurations.
1123    pub kafka_config: Vec<KafkaConfigParam>,
1124    /// Additional kafka producer configurations.
1125    ///
1126    /// The `kafka_config` is the default producer configuration used for all topics. A secondary
1127    /// kafka config can be referenced in `topics:` like this:
1128    ///
1129    /// ```yaml
1130    /// secondary_kafka_configs:
1131    ///   mycustomcluster:
1132    ///     - name: 'bootstrap.servers'
1133    ///       value: 'sentry_kafka_metrics:9093'
1134    ///
1135    /// topics:
1136    ///   transactions: ingest-transactions
1137    ///   metrics:
1138    ///     name: ingest-metrics
1139    ///     config: mycustomcluster
1140    /// ```
1141    ///
1142    /// Then metrics will be produced to an entirely different Kafka cluster.
1143    #[serde(default)]
1144    pub secondary_kafka_configs: BTreeMap<String, Vec<KafkaConfigParam>>,
1145    /// Kafka topic names.
1146    #[serde(default)]
1147    pub topics: TopicAssignments,
1148    /// Whether to validate the supplied topics by calling Kafka's metadata endpoints.
1149    #[serde(default)]
1150    pub kafka_validate_topics: bool,
1151    /// Redis hosts to connect to for storing state for rate limits.
1152    #[serde(default)]
1153    pub redis: Option<RedisConfigs>,
1154    /// Maximum chunk size of attachments for Kafka.
1155    #[serde(default = "default_chunk_size")]
1156    pub attachment_chunk_size: ByteSize,
1157    /// Prefix to use when looking up project configs in Redis. Defaults to "relayconfig".
1158    #[serde(default = "default_projectconfig_cache_prefix")]
1159    pub projectconfig_cache_prefix: String,
1160    /// Maximum rate limit to report to clients.
1161    #[serde(default = "default_max_rate_limit")]
1162    pub max_rate_limit: Option<u32>,
1163}
1164
1165impl Default for Processing {
1166    /// Constructs a disabled processing configuration.
1167    fn default() -> Self {
1168        Self {
1169            enabled: false,
1170            geoip_path: None,
1171            max_secs_in_future: default_max_secs_in_future(),
1172            max_session_secs_in_past: default_max_session_secs_in_past(),
1173            kafka_config: Vec::new(),
1174            secondary_kafka_configs: BTreeMap::new(),
1175            topics: TopicAssignments::default(),
1176            kafka_validate_topics: false,
1177            redis: None,
1178            attachment_chunk_size: default_chunk_size(),
1179            projectconfig_cache_prefix: default_projectconfig_cache_prefix(),
1180            max_rate_limit: default_max_rate_limit(),
1181        }
1182    }
1183}
1184
1185/// Configuration for normalization in this Relay.
1186#[derive(Debug, Default, Serialize, Deserialize)]
1187#[serde(default)]
1188pub struct Normalization {
1189    /// Level of normalization for Relay to apply to incoming data.
1190    #[serde(default)]
1191    pub level: NormalizationLevel,
1192}
1193
1194/// Configuration for the level of normalization this Relay should do.
1195#[derive(Copy, Clone, Debug, Default, Serialize, Deserialize, Eq, PartialEq)]
1196#[serde(rename_all = "lowercase")]
1197pub enum NormalizationLevel {
1198    /// Runs normalization, excluding steps that break future compatibility.
1199    ///
1200    /// Processing Relays run [`NormalizationLevel::Full`] if this option is set.
1201    #[default]
1202    Default,
1203    /// Run full normalization.
1204    ///
1205    /// It includes steps that break future compatibility and should only run in
1206    /// the last layer of relays.
1207    Full,
1208}
1209
1210/// Configuration values for the outcome aggregator
1211#[derive(Serialize, Deserialize, Debug)]
1212#[serde(default)]
1213pub struct OutcomeAggregatorConfig {
1214    /// Defines the width of the buckets into which outcomes are aggregated, in seconds.
1215    pub bucket_interval: u64,
1216    /// Defines how often all buckets are flushed, in seconds.
1217    pub flush_interval: u64,
1218}
1219
1220impl Default for OutcomeAggregatorConfig {
1221    fn default() -> Self {
1222        Self {
1223            bucket_interval: 60,
1224            flush_interval: 120,
1225        }
1226    }
1227}
1228
1229/// Determines how to emit outcomes.
1230/// For compatibility reasons, this can either be true, false or AsClientReports
1231#[derive(Copy, Clone, Debug, PartialEq, Eq)]
1232
1233pub enum EmitOutcomes {
1234    /// Do not emit any outcomes
1235    None,
1236    /// Emit outcomes as client reports
1237    AsClientReports,
1238    /// Emit outcomes as outcomes
1239    AsOutcomes,
1240}
1241
1242impl EmitOutcomes {
1243    /// Returns true of outcomes are emitted via http, kafka, or client reports.
1244    pub fn any(&self) -> bool {
1245        !matches!(self, EmitOutcomes::None)
1246    }
1247}
1248
1249impl Serialize for EmitOutcomes {
1250    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1251    where
1252        S: Serializer,
1253    {
1254        // For compatibility, serialize None and AsOutcomes as booleans.
1255        match self {
1256            Self::None => serializer.serialize_bool(false),
1257            Self::AsClientReports => serializer.serialize_str("as_client_reports"),
1258            Self::AsOutcomes => serializer.serialize_bool(true),
1259        }
1260    }
1261}
1262
1263struct EmitOutcomesVisitor;
1264
1265impl Visitor<'_> for EmitOutcomesVisitor {
1266    type Value = EmitOutcomes;
1267
1268    fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
1269        formatter.write_str("true, false, or 'as_client_reports'")
1270    }
1271
1272    fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E>
1273    where
1274        E: serde::de::Error,
1275    {
1276        Ok(if v {
1277            EmitOutcomes::AsOutcomes
1278        } else {
1279            EmitOutcomes::None
1280        })
1281    }
1282
1283    fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
1284    where
1285        E: serde::de::Error,
1286    {
1287        if v == "as_client_reports" {
1288            Ok(EmitOutcomes::AsClientReports)
1289        } else {
1290            Err(E::invalid_value(Unexpected::Str(v), &"as_client_reports"))
1291        }
1292    }
1293}
1294
1295impl<'de> Deserialize<'de> for EmitOutcomes {
1296    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1297    where
1298        D: Deserializer<'de>,
1299    {
1300        deserializer.deserialize_any(EmitOutcomesVisitor)
1301    }
1302}
1303
1304/// Outcome generation specific configuration values.
1305#[derive(Serialize, Deserialize, Debug)]
1306#[serde(default)]
1307pub struct Outcomes {
1308    /// Controls whether outcomes will be emitted when processing is disabled.
1309    /// Processing relays always emit outcomes (for backwards compatibility).
1310    /// Can take the following values: false, "as_client_reports", true
1311    pub emit_outcomes: EmitOutcomes,
1312    /// Controls wheather client reported outcomes should be emitted.
1313    pub emit_client_outcomes: bool,
1314    /// The maximum number of outcomes that are batched before being sent
1315    /// via http to the upstream (only applies to non processing relays).
1316    pub batch_size: usize,
1317    /// The maximum time interval (in milliseconds) that an outcome may be batched
1318    /// via http to the upstream (only applies to non processing relays).
1319    pub batch_interval: u64,
1320    /// Defines the source string registered in the outcomes originating from
1321    /// this Relay (typically something like the region or the layer).
1322    pub source: Option<String>,
1323    /// Configures the outcome aggregator.
1324    pub aggregator: OutcomeAggregatorConfig,
1325}
1326
1327impl Default for Outcomes {
1328    fn default() -> Self {
1329        Outcomes {
1330            emit_outcomes: EmitOutcomes::AsClientReports,
1331            emit_client_outcomes: true,
1332            batch_size: 1000,
1333            batch_interval: 500,
1334            source: None,
1335            aggregator: OutcomeAggregatorConfig::default(),
1336        }
1337    }
1338}
1339
1340/// Minimal version of a config for dumping out.
1341#[derive(Serialize, Deserialize, Debug, Default)]
1342pub struct MinimalConfig {
1343    /// The relay part of the config.
1344    pub relay: Relay,
1345}
1346
1347impl MinimalConfig {
1348    /// Saves the config in the given config folder as config.yml
1349    pub fn save_in_folder<P: AsRef<Path>>(&self, p: P) -> anyhow::Result<()> {
1350        let path = p.as_ref();
1351        if fs::metadata(path).is_err() {
1352            fs::create_dir_all(path)
1353                .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotOpenFile, path))?;
1354        }
1355        self.save(path)
1356    }
1357}
1358
1359impl ConfigObject for MinimalConfig {
1360    fn format() -> ConfigFormat {
1361        ConfigFormat::Yaml
1362    }
1363
1364    fn name() -> &'static str {
1365        "config"
1366    }
1367}
1368
1369/// Alternative serialization of RelayInfo for config file using snake case.
1370mod config_relay_info {
1371    use serde::ser::SerializeMap;
1372
1373    use super::*;
1374
1375    // Uses snake_case as opposed to camelCase.
1376    #[derive(Debug, Serialize, Deserialize, Clone)]
1377    struct RelayInfoConfig {
1378        public_key: PublicKey,
1379        #[serde(default)]
1380        internal: bool,
1381    }
1382
1383    impl From<RelayInfoConfig> for RelayInfo {
1384        fn from(v: RelayInfoConfig) -> Self {
1385            RelayInfo {
1386                public_key: v.public_key,
1387                internal: v.internal,
1388            }
1389        }
1390    }
1391
1392    impl From<RelayInfo> for RelayInfoConfig {
1393        fn from(v: RelayInfo) -> Self {
1394            RelayInfoConfig {
1395                public_key: v.public_key,
1396                internal: v.internal,
1397            }
1398        }
1399    }
1400
1401    pub(super) fn deserialize<'de, D>(des: D) -> Result<HashMap<RelayId, RelayInfo>, D::Error>
1402    where
1403        D: Deserializer<'de>,
1404    {
1405        let map = HashMap::<RelayId, RelayInfoConfig>::deserialize(des)?;
1406        Ok(map.into_iter().map(|(k, v)| (k, v.into())).collect())
1407    }
1408
1409    pub(super) fn serialize<S>(elm: &HashMap<RelayId, RelayInfo>, ser: S) -> Result<S::Ok, S::Error>
1410    where
1411        S: Serializer,
1412    {
1413        let mut map = ser.serialize_map(Some(elm.len()))?;
1414
1415        for (k, v) in elm {
1416            map.serialize_entry(k, &RelayInfoConfig::from(v.clone()))?;
1417        }
1418
1419        map.end()
1420    }
1421}
1422
1423/// Authentication options.
1424#[derive(Serialize, Deserialize, Debug, Default)]
1425pub struct AuthConfig {
1426    /// Controls responses from the readiness health check endpoint based on authentication.
1427    #[serde(default, skip_serializing_if = "is_default")]
1428    pub ready: ReadinessCondition,
1429
1430    /// Statically authenticated downstream relays.
1431    #[serde(default, with = "config_relay_info")]
1432    pub static_relays: HashMap<RelayId, RelayInfo>,
1433}
1434
1435/// GeoIp database configuration options.
1436#[derive(Serialize, Deserialize, Debug, Default)]
1437pub struct GeoIpConfig {
1438    /// The path to GeoIP database.
1439    pub path: Option<PathBuf>,
1440}
1441
1442/// Cardinality Limiter configuration options.
1443#[derive(Serialize, Deserialize, Debug)]
1444#[serde(default)]
1445pub struct CardinalityLimiter {
1446    /// Cache vacuum interval in seconds for the in memory cache.
1447    ///
1448    /// The cache will scan for expired values based on this interval.
1449    ///
1450    /// Defaults to 180 seconds, 3 minutes.
1451    pub cache_vacuum_interval: u64,
1452}
1453
1454impl Default for CardinalityLimiter {
1455    fn default() -> Self {
1456        Self {
1457            cache_vacuum_interval: 180,
1458        }
1459    }
1460}
1461
1462/// Settings to control Relay's health checks.
1463///
1464/// After breaching one of the configured thresholds, Relay will
1465/// return an `unhealthy` status from its health endpoint.
1466#[derive(Serialize, Deserialize, Debug)]
1467#[serde(default)]
1468pub struct Health {
1469    /// Interval to refresh internal health checks.
1470    ///
1471    /// Shorter intervals will decrease the time it takes the health check endpoint to report
1472    /// issues, but can also increase sporadic unhealthy responses.
1473    ///
1474    /// Defaults to `3000`` (3 seconds).
1475    pub refresh_interval_ms: u64,
1476    /// Maximum memory watermark in bytes.
1477    ///
1478    /// By default, there is no absolute limit set and the watermark
1479    /// is only controlled by setting [`Self::max_memory_percent`].
1480    pub max_memory_bytes: Option<ByteSize>,
1481    /// Maximum memory watermark as a percentage of maximum system memory.
1482    ///
1483    /// Defaults to `0.95` (95%).
1484    pub max_memory_percent: f32,
1485    /// Health check probe timeout in milliseconds.
1486    ///
1487    /// Any probe exceeding the timeout will be considered failed.
1488    /// This limits the max execution time of Relay health checks.
1489    ///
1490    /// Defaults to 900 milliseconds.
1491    pub probe_timeout_ms: u64,
1492    /// The refresh frequency of memory stats which are used to poll memory
1493    /// usage of Relay.
1494    ///
1495    /// The implementation of memory stats guarantees that the refresh will happen at
1496    /// least every `x` ms since memory readings are lazy and are updated only if needed.
1497    pub memory_stat_refresh_frequency_ms: u64,
1498}
1499
1500impl Default for Health {
1501    fn default() -> Self {
1502        Self {
1503            refresh_interval_ms: 3000,
1504            max_memory_bytes: None,
1505            max_memory_percent: 0.95,
1506            probe_timeout_ms: 900,
1507            memory_stat_refresh_frequency_ms: 100,
1508        }
1509    }
1510}
1511
1512/// COGS configuration.
1513#[derive(Serialize, Deserialize, Debug)]
1514#[serde(default)]
1515pub struct Cogs {
1516    /// Maximium amount of COGS measurements allowed to backlog.
1517    ///
1518    /// Any additional COGS measurements recorded will be dropped.
1519    ///
1520    /// Defaults to `10_000`.
1521    pub max_queue_size: u64,
1522    /// Relay COGS resource id.
1523    ///
1524    /// All Relay related COGS measurements are emitted with this resource id.
1525    ///
1526    /// Defaults to `relay_service`.
1527    pub relay_resource_id: String,
1528}
1529
1530impl Default for Cogs {
1531    fn default() -> Self {
1532        Self {
1533            max_queue_size: 10_000,
1534            relay_resource_id: "relay_service".to_owned(),
1535        }
1536    }
1537}
1538
1539#[derive(Serialize, Deserialize, Debug, Default)]
1540struct ConfigValues {
1541    #[serde(default)]
1542    relay: Relay,
1543    #[serde(default)]
1544    http: Http,
1545    #[serde(default)]
1546    cache: Cache,
1547    #[serde(default)]
1548    spool: Spool,
1549    #[serde(default)]
1550    limits: Limits,
1551    #[serde(default)]
1552    logging: relay_log::LogConfig,
1553    #[serde(default)]
1554    routing: Routing,
1555    #[serde(default)]
1556    metrics: Metrics,
1557    #[serde(default)]
1558    sentry_metrics: SentryMetrics,
1559    #[serde(default)]
1560    sentry: relay_log::SentryConfig,
1561    #[serde(default)]
1562    processing: Processing,
1563    #[serde(default)]
1564    outcomes: Outcomes,
1565    #[serde(default)]
1566    aggregator: AggregatorServiceConfig,
1567    #[serde(default)]
1568    secondary_aggregators: Vec<ScopedAggregatorConfig>,
1569    #[serde(default)]
1570    auth: AuthConfig,
1571    #[serde(default)]
1572    geoip: GeoIpConfig,
1573    #[serde(default)]
1574    normalization: Normalization,
1575    #[serde(default)]
1576    cardinality_limiter: CardinalityLimiter,
1577    #[serde(default)]
1578    health: Health,
1579    #[serde(default)]
1580    cogs: Cogs,
1581}
1582
1583impl ConfigObject for ConfigValues {
1584    fn format() -> ConfigFormat {
1585        ConfigFormat::Yaml
1586    }
1587
1588    fn name() -> &'static str {
1589        "config"
1590    }
1591}
1592
1593/// Config struct.
1594pub struct Config {
1595    values: ConfigValues,
1596    credentials: Option<Credentials>,
1597    path: PathBuf,
1598}
1599
1600impl fmt::Debug for Config {
1601    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1602        f.debug_struct("Config")
1603            .field("path", &self.path)
1604            .field("values", &self.values)
1605            .finish()
1606    }
1607}
1608
1609impl Config {
1610    /// Loads a config from a given config folder.
1611    pub fn from_path<P: AsRef<Path>>(path: P) -> anyhow::Result<Config> {
1612        let path = env::current_dir()
1613            .map(|x| x.join(path.as_ref()))
1614            .unwrap_or_else(|_| path.as_ref().to_path_buf());
1615
1616        let config = Config {
1617            values: ConfigValues::load(&path)?,
1618            credentials: if Credentials::path(&path).exists() {
1619                Some(Credentials::load(&path)?)
1620            } else {
1621                None
1622            },
1623            path: path.clone(),
1624        };
1625
1626        if cfg!(not(feature = "processing")) && config.processing_enabled() {
1627            return Err(ConfigError::file(ConfigErrorKind::ProcessingNotAvailable, &path).into());
1628        }
1629
1630        Ok(config)
1631    }
1632
1633    /// Creates a config from a JSON value.
1634    ///
1635    /// This is mostly useful for tests.
1636    pub fn from_json_value(value: serde_json::Value) -> anyhow::Result<Config> {
1637        Ok(Config {
1638            values: serde_json::from_value(value)
1639                .with_context(|| ConfigError::new(ConfigErrorKind::BadJson))?,
1640            credentials: None,
1641            path: PathBuf::new(),
1642        })
1643    }
1644
1645    /// Override configuration with values coming from other sources (e.g. env variables or
1646    /// command line parameters)
1647    pub fn apply_override(
1648        &mut self,
1649        mut overrides: OverridableConfig,
1650    ) -> anyhow::Result<&mut Self> {
1651        let relay = &mut self.values.relay;
1652
1653        if let Some(mode) = overrides.mode {
1654            relay.mode = mode
1655                .parse::<RelayMode>()
1656                .with_context(|| ConfigError::field("mode"))?;
1657        }
1658
1659        if let Some(deployment) = overrides.instance {
1660            relay.instance = deployment
1661                .parse::<RelayInstance>()
1662                .with_context(|| ConfigError::field("deployment"))?;
1663        }
1664
1665        if let Some(log_level) = overrides.log_level {
1666            self.values.logging.level = log_level.parse()?;
1667        }
1668
1669        if let Some(log_format) = overrides.log_format {
1670            self.values.logging.format = log_format.parse()?;
1671        }
1672
1673        if let Some(upstream) = overrides.upstream {
1674            relay.upstream = upstream
1675                .parse::<UpstreamDescriptor>()
1676                .with_context(|| ConfigError::field("upstream"))?;
1677        } else if let Some(upstream_dsn) = overrides.upstream_dsn {
1678            relay.upstream = upstream_dsn
1679                .parse::<Dsn>()
1680                .map(|dsn| UpstreamDescriptor::from_dsn(&dsn).into_owned())
1681                .with_context(|| ConfigError::field("upstream_dsn"))?;
1682        }
1683
1684        if let Some(host) = overrides.host {
1685            relay.host = host
1686                .parse::<IpAddr>()
1687                .with_context(|| ConfigError::field("host"))?;
1688        }
1689
1690        if let Some(port) = overrides.port {
1691            relay.port = port
1692                .as_str()
1693                .parse()
1694                .with_context(|| ConfigError::field("port"))?;
1695        }
1696
1697        let processing = &mut self.values.processing;
1698        if let Some(enabled) = overrides.processing {
1699            match enabled.to_lowercase().as_str() {
1700                "true" | "1" => processing.enabled = true,
1701                "false" | "0" | "" => processing.enabled = false,
1702                _ => return Err(ConfigError::field("processing").into()),
1703            }
1704        }
1705
1706        if let Some(redis) = overrides.redis_url {
1707            processing.redis = Some(RedisConfigs::Unified(RedisConfig::single(redis)))
1708        }
1709
1710        if let Some(kafka_url) = overrides.kafka_url {
1711            let existing = processing
1712                .kafka_config
1713                .iter_mut()
1714                .find(|e| e.name == "bootstrap.servers");
1715
1716            if let Some(config_param) = existing {
1717                config_param.value = kafka_url;
1718            } else {
1719                processing.kafka_config.push(KafkaConfigParam {
1720                    name: "bootstrap.servers".to_owned(),
1721                    value: kafka_url,
1722                })
1723            }
1724        }
1725        // credentials overrides
1726        let id = if let Some(id) = overrides.id {
1727            let id = Uuid::parse_str(&id).with_context(|| ConfigError::field("id"))?;
1728            Some(id)
1729        } else {
1730            None
1731        };
1732        let public_key = if let Some(public_key) = overrides.public_key {
1733            let public_key = public_key
1734                .parse::<PublicKey>()
1735                .with_context(|| ConfigError::field("public_key"))?;
1736            Some(public_key)
1737        } else {
1738            None
1739        };
1740
1741        let secret_key = if let Some(secret_key) = overrides.secret_key {
1742            let secret_key = secret_key
1743                .parse::<SecretKey>()
1744                .with_context(|| ConfigError::field("secret_key"))?;
1745            Some(secret_key)
1746        } else {
1747            None
1748        };
1749        let outcomes = &mut self.values.outcomes;
1750        if overrides.outcome_source.is_some() {
1751            outcomes.source = overrides.outcome_source.take();
1752        }
1753
1754        if let Some(credentials) = &mut self.credentials {
1755            //we have existing credentials we may override some entries
1756            if let Some(id) = id {
1757                credentials.id = id;
1758            }
1759            if let Some(public_key) = public_key {
1760                credentials.public_key = public_key;
1761            }
1762            if let Some(secret_key) = secret_key {
1763                credentials.secret_key = secret_key
1764            }
1765        } else {
1766            //no existing credentials we may only create the full credentials
1767            match (id, public_key, secret_key) {
1768                (Some(id), Some(public_key), Some(secret_key)) => {
1769                    self.credentials = Some(Credentials {
1770                        secret_key,
1771                        public_key,
1772                        id,
1773                    })
1774                }
1775                (None, None, None) => {
1776                    // nothing provided, we'll just leave the credentials None, maybe we
1777                    // don't need them in the current command or we'll override them later
1778                }
1779                _ => {
1780                    return Err(ConfigError::field("incomplete credentials").into());
1781                }
1782            }
1783        }
1784
1785        let limits = &mut self.values.limits;
1786        if let Some(shutdown_timeout) = overrides.shutdown_timeout {
1787            if let Ok(shutdown_timeout) = shutdown_timeout.parse::<u64>() {
1788                limits.shutdown_timeout = shutdown_timeout;
1789            }
1790        }
1791
1792        if let Some(server_name) = overrides.server_name {
1793            self.values.sentry.server_name = Some(server_name.into());
1794        }
1795
1796        Ok(self)
1797    }
1798
1799    /// Checks if the config is already initialized.
1800    pub fn config_exists<P: AsRef<Path>>(path: P) -> bool {
1801        fs::metadata(ConfigValues::path(path.as_ref())).is_ok()
1802    }
1803
1804    /// Returns the filename of the config file.
1805    pub fn path(&self) -> &Path {
1806        &self.path
1807    }
1808
1809    /// Dumps out a YAML string of the values.
1810    pub fn to_yaml_string(&self) -> anyhow::Result<String> {
1811        serde_yaml::to_string(&self.values)
1812            .with_context(|| ConfigError::new(ConfigErrorKind::CouldNotWriteFile))
1813    }
1814
1815    /// Regenerates the relay credentials.
1816    ///
1817    /// This also writes the credentials back to the file.
1818    pub fn regenerate_credentials(&mut self, save: bool) -> anyhow::Result<()> {
1819        let creds = Credentials::generate();
1820        if save {
1821            creds.save(&self.path)?;
1822        }
1823        self.credentials = Some(creds);
1824        Ok(())
1825    }
1826
1827    /// Return the current credentials
1828    pub fn credentials(&self) -> Option<&Credentials> {
1829        self.credentials.as_ref()
1830    }
1831
1832    /// Set new credentials.
1833    ///
1834    /// This also writes the credentials back to the file.
1835    pub fn replace_credentials(
1836        &mut self,
1837        credentials: Option<Credentials>,
1838    ) -> anyhow::Result<bool> {
1839        if self.credentials == credentials {
1840            return Ok(false);
1841        }
1842
1843        match credentials {
1844            Some(ref creds) => {
1845                creds.save(&self.path)?;
1846            }
1847            None => {
1848                let path = Credentials::path(&self.path);
1849                if fs::metadata(&path).is_ok() {
1850                    fs::remove_file(&path).with_context(|| {
1851                        ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path)
1852                    })?;
1853                }
1854            }
1855        }
1856
1857        self.credentials = credentials;
1858        Ok(true)
1859    }
1860
1861    /// Returns `true` if the config is ready to use.
1862    pub fn has_credentials(&self) -> bool {
1863        self.credentials.is_some()
1864    }
1865
1866    /// Returns the secret key if set.
1867    pub fn secret_key(&self) -> Option<&SecretKey> {
1868        self.credentials.as_ref().map(|x| &x.secret_key)
1869    }
1870
1871    /// Returns the public key if set.
1872    pub fn public_key(&self) -> Option<&PublicKey> {
1873        self.credentials.as_ref().map(|x| &x.public_key)
1874    }
1875
1876    /// Returns the relay ID.
1877    pub fn relay_id(&self) -> Option<&RelayId> {
1878        self.credentials.as_ref().map(|x| &x.id)
1879    }
1880
1881    /// Returns the relay mode.
1882    pub fn relay_mode(&self) -> RelayMode {
1883        self.values.relay.mode
1884    }
1885
1886    /// Returns the instance type of relay.
1887    pub fn relay_instance(&self) -> RelayInstance {
1888        self.values.relay.instance
1889    }
1890
1891    /// Returns the upstream target as descriptor.
1892    pub fn upstream_descriptor(&self) -> &UpstreamDescriptor<'_> {
1893        &self.values.relay.upstream
1894    }
1895
1896    /// Returns the custom HTTP "Host" header.
1897    pub fn http_host_header(&self) -> Option<&str> {
1898        self.values.http.host_header.as_deref()
1899    }
1900
1901    /// Returns the listen address.
1902    pub fn listen_addr(&self) -> SocketAddr {
1903        (self.values.relay.host, self.values.relay.port).into()
1904    }
1905
1906    /// Returns the TLS listen address.
1907    pub fn tls_listen_addr(&self) -> Option<SocketAddr> {
1908        if self.values.relay.tls_identity_path.is_some() {
1909            let port = self.values.relay.tls_port.unwrap_or(3443);
1910            Some((self.values.relay.host, port).into())
1911        } else {
1912            None
1913        }
1914    }
1915
1916    /// Returns the path to the identity bundle
1917    pub fn tls_identity_path(&self) -> Option<&Path> {
1918        self.values.relay.tls_identity_path.as_deref()
1919    }
1920
1921    /// Returns the password for the identity bundle
1922    pub fn tls_identity_password(&self) -> Option<&str> {
1923        self.values.relay.tls_identity_password.as_deref()
1924    }
1925
1926    /// Returns `true` when project IDs should be overriden rather than validated.
1927    ///
1928    /// Defaults to `false`, which requires project ID validation.
1929    pub fn override_project_ids(&self) -> bool {
1930        self.values.relay.override_project_ids
1931    }
1932
1933    /// Returns `true` if Relay requires authentication for readiness.
1934    ///
1935    /// See [`ReadinessCondition`] for more information.
1936    pub fn requires_auth(&self) -> bool {
1937        match self.values.auth.ready {
1938            ReadinessCondition::Authenticated => self.relay_mode() == RelayMode::Managed,
1939            ReadinessCondition::Always => false,
1940        }
1941    }
1942
1943    /// Returns the interval at which Realy should try to re-authenticate with the upstream.
1944    ///
1945    /// Always disabled in processing mode.
1946    pub fn http_auth_interval(&self) -> Option<Duration> {
1947        if self.processing_enabled() {
1948            return None;
1949        }
1950
1951        match self.values.http.auth_interval {
1952            None | Some(0) => None,
1953            Some(secs) => Some(Duration::from_secs(secs)),
1954        }
1955    }
1956
1957    /// The maximum time of experiencing uninterrupted network failures until Relay considers that
1958    /// it has encountered a network outage.
1959    pub fn http_outage_grace_period(&self) -> Duration {
1960        Duration::from_secs(self.values.http.outage_grace_period)
1961    }
1962
1963    /// Time Relay waits before retrying an upstream request.
1964    ///
1965    /// Before going into a network outage, Relay may fail to make upstream
1966    /// requests. This is the time Relay waits before retrying the same request.
1967    pub fn http_retry_delay(&self) -> Duration {
1968        Duration::from_secs(self.values.http.retry_delay)
1969    }
1970
1971    /// Time of continued project request failures before Relay emits an error.
1972    pub fn http_project_failure_interval(&self) -> Duration {
1973        Duration::from_secs(self.values.http.project_failure_interval)
1974    }
1975
1976    /// Content encoding of upstream requests.
1977    pub fn http_encoding(&self) -> HttpEncoding {
1978        self.values.http.encoding
1979    }
1980
1981    /// Returns whether metrics should be sent globally through a shared endpoint.
1982    pub fn http_global_metrics(&self) -> bool {
1983        self.values.http.global_metrics
1984    }
1985
1986    /// Returns whether this Relay should emit outcomes.
1987    ///
1988    /// This is `true` either if `outcomes.emit_outcomes` is explicitly enabled, or if this Relay is
1989    /// in processing mode.
1990    pub fn emit_outcomes(&self) -> EmitOutcomes {
1991        if self.processing_enabled() {
1992            return EmitOutcomes::AsOutcomes;
1993        }
1994        self.values.outcomes.emit_outcomes
1995    }
1996
1997    /// Returns whether this Relay should emit client outcomes
1998    ///
1999    /// Relays that do not emit client outcomes will forward client recieved outcomes
2000    /// directly to the next relay in the chain as client report envelope.  This is only done
2001    /// if this relay emits outcomes at all. A relay that will not emit outcomes
2002    /// will forward the envelope unchanged.
2003    ///
2004    /// This flag can be explicitly disabled on processing relays as well to prevent the
2005    /// emitting of client outcomes to the kafka topic.
2006    pub fn emit_client_outcomes(&self) -> bool {
2007        self.values.outcomes.emit_client_outcomes
2008    }
2009
2010    /// Returns the maximum number of outcomes that are batched before being sent
2011    pub fn outcome_batch_size(&self) -> usize {
2012        self.values.outcomes.batch_size
2013    }
2014
2015    /// Returns the maximum interval that an outcome may be batched
2016    pub fn outcome_batch_interval(&self) -> Duration {
2017        Duration::from_millis(self.values.outcomes.batch_interval)
2018    }
2019
2020    /// The originating source of the outcome
2021    pub fn outcome_source(&self) -> Option<&str> {
2022        self.values.outcomes.source.as_deref()
2023    }
2024
2025    /// Returns the width of the buckets into which outcomes are aggregated, in seconds.
2026    pub fn outcome_aggregator(&self) -> &OutcomeAggregatorConfig {
2027        &self.values.outcomes.aggregator
2028    }
2029
2030    /// Returns logging configuration.
2031    pub fn logging(&self) -> &relay_log::LogConfig {
2032        &self.values.logging
2033    }
2034
2035    /// Returns logging configuration.
2036    pub fn sentry(&self) -> &relay_log::SentryConfig {
2037        &self.values.sentry
2038    }
2039
2040    /// Returns the socket addresses for statsd.
2041    ///
2042    /// If stats is disabled an empty vector is returned.
2043    pub fn statsd_addrs(&self) -> anyhow::Result<Vec<SocketAddr>> {
2044        if let Some(ref addr) = self.values.metrics.statsd {
2045            let addrs = addr
2046                .as_str()
2047                .to_socket_addrs()
2048                .with_context(|| ConfigError::file(ConfigErrorKind::InvalidValue, &self.path))?
2049                .collect();
2050            Ok(addrs)
2051        } else {
2052            Ok(vec![])
2053        }
2054    }
2055
2056    /// Return the prefix for statsd metrics.
2057    pub fn metrics_prefix(&self) -> &str {
2058        &self.values.metrics.prefix
2059    }
2060
2061    /// Returns the default tags for statsd metrics.
2062    pub fn metrics_default_tags(&self) -> &BTreeMap<String, String> {
2063        &self.values.metrics.default_tags
2064    }
2065
2066    /// Returns the name of the hostname tag that should be attached to each outgoing metric.
2067    pub fn metrics_hostname_tag(&self) -> Option<&str> {
2068        self.values.metrics.hostname_tag.as_deref()
2069    }
2070
2071    /// Returns the global sample rate for all metrics.
2072    pub fn metrics_sample_rate(&self) -> f32 {
2073        self.values.metrics.sample_rate
2074    }
2075
2076    /// Returns whether local metric aggregation should be enabled.
2077    pub fn metrics_aggregate(&self) -> bool {
2078        self.values.metrics.aggregate
2079    }
2080
2081    /// Returns the interval for periodic metrics emitted from Relay.
2082    ///
2083    /// `None` if periodic metrics are disabled.
2084    pub fn metrics_periodic_interval(&self) -> Option<Duration> {
2085        match self.values.metrics.periodic_secs {
2086            0 => None,
2087            secs => Some(Duration::from_secs(secs)),
2088        }
2089    }
2090
2091    /// Returns the default timeout for all upstream HTTP requests.
2092    pub fn http_timeout(&self) -> Duration {
2093        Duration::from_secs(self.values.http.timeout.into())
2094    }
2095
2096    /// Returns the connection timeout for all upstream HTTP requests.
2097    pub fn http_connection_timeout(&self) -> Duration {
2098        Duration::from_secs(self.values.http.connection_timeout.into())
2099    }
2100
2101    /// Returns the failed upstream request retry interval.
2102    pub fn http_max_retry_interval(&self) -> Duration {
2103        Duration::from_secs(self.values.http.max_retry_interval.into())
2104    }
2105
2106    /// Returns the expiry timeout for cached projects.
2107    pub fn project_cache_expiry(&self) -> Duration {
2108        Duration::from_secs(self.values.cache.project_expiry.into())
2109    }
2110
2111    /// Returns `true` if the full project state should be requested from upstream.
2112    pub fn request_full_project_config(&self) -> bool {
2113        self.values.cache.project_request_full_config
2114    }
2115
2116    /// Returns the expiry timeout for cached relay infos (public keys).
2117    pub fn relay_cache_expiry(&self) -> Duration {
2118        Duration::from_secs(self.values.cache.relay_expiry.into())
2119    }
2120
2121    /// Returns the maximum number of buffered envelopes
2122    pub fn envelope_buffer_size(&self) -> usize {
2123        self.values
2124            .cache
2125            .envelope_buffer_size
2126            .try_into()
2127            .unwrap_or(usize::MAX)
2128    }
2129
2130    /// Returns the expiry timeout for cached misses before trying to refetch.
2131    pub fn cache_miss_expiry(&self) -> Duration {
2132        Duration::from_secs(self.values.cache.miss_expiry.into())
2133    }
2134
2135    /// Returns the grace period for project caches.
2136    pub fn project_grace_period(&self) -> Duration {
2137        Duration::from_secs(self.values.cache.project_grace_period.into())
2138    }
2139
2140    /// Returns the duration in which batchable project config queries are
2141    /// collected before sending them in a single request.
2142    pub fn query_batch_interval(&self) -> Duration {
2143        Duration::from_millis(self.values.cache.batch_interval.into())
2144    }
2145
2146    /// Returns the duration in which downstream relays are requested from upstream.
2147    pub fn downstream_relays_batch_interval(&self) -> Duration {
2148        Duration::from_millis(self.values.cache.downstream_relays_batch_interval.into())
2149    }
2150
2151    /// Returns the interval in seconds in which local project configurations should be reloaded.
2152    pub fn local_cache_interval(&self) -> Duration {
2153        Duration::from_secs(self.values.cache.file_interval.into())
2154    }
2155
2156    /// Returns the interval in seconds in which fresh global configs should be
2157    /// fetched from  upstream.
2158    pub fn global_config_fetch_interval(&self) -> Duration {
2159        Duration::from_secs(self.values.cache.global_config_fetch_interval.into())
2160    }
2161
2162    /// Returns the path of the buffer file if the `cache.persistent_envelope_buffer.path` is configured.
2163    ///
2164    /// In case a partition with id > 0 is supplied, the filename of the envelopes path will be
2165    /// suffixed with `.{partition_id}`.
2166    pub fn spool_envelopes_path(&self, partition_id: u8) -> Option<PathBuf> {
2167        let mut path = self
2168            .values
2169            .spool
2170            .envelopes
2171            .path
2172            .as_ref()
2173            .map(|path| path.to_owned())?;
2174
2175        if partition_id == 0 {
2176            return Some(path);
2177        }
2178
2179        let file_name = path.file_name().and_then(|f| f.to_str())?;
2180        let new_file_name = format!("{}.{}", file_name, partition_id);
2181        path.set_file_name(new_file_name);
2182
2183        Some(path)
2184    }
2185
2186    /// The maximum size of the buffer, in bytes.
2187    pub fn spool_envelopes_max_disk_size(&self) -> usize {
2188        self.values.spool.envelopes.max_disk_size.as_bytes()
2189    }
2190
2191    /// Number of encoded envelope bytes that need to be accumulated before
2192    /// flushing one batch to disk.
2193    pub fn spool_envelopes_batch_size_bytes(&self) -> usize {
2194        self.values.spool.envelopes.batch_size_bytes.as_bytes()
2195    }
2196
2197    /// Returns the time after which we drop envelopes as a [`Duration`] object.
2198    pub fn spool_envelopes_max_age(&self) -> Duration {
2199        Duration::from_secs(self.values.spool.envelopes.max_envelope_delay_secs)
2200    }
2201
2202    /// Returns the refresh frequency for disk usage monitoring as a [`Duration`] object.
2203    pub fn spool_disk_usage_refresh_frequency_ms(&self) -> Duration {
2204        Duration::from_millis(self.values.spool.envelopes.disk_usage_refresh_frequency_ms)
2205    }
2206
2207    /// Returns the maximum number of envelopes that can be put in the bounded buffer.
2208    pub fn spool_max_backpressure_envelopes(&self) -> usize {
2209        self.values.spool.envelopes.max_backpressure_envelopes
2210    }
2211
2212    /// Returns the relative memory usage up to which the disk buffer will unspool envelopes.
2213    pub fn spool_max_backpressure_memory_percent(&self) -> f32 {
2214        self.values.spool.envelopes.max_backpressure_memory_percent
2215    }
2216
2217    /// Returns the number of partitions for the buffer.
2218    pub fn spool_partitions(&self) -> NonZeroU8 {
2219        self.values.spool.envelopes.partitions
2220    }
2221
2222    /// Returns the maximum size of an event payload in bytes.
2223    pub fn max_event_size(&self) -> usize {
2224        self.values.limits.max_event_size.as_bytes()
2225    }
2226
2227    /// Returns the maximum size of each attachment.
2228    pub fn max_attachment_size(&self) -> usize {
2229        self.values.limits.max_attachment_size.as_bytes()
2230    }
2231
2232    /// Returns the maximum combined size of attachments or payloads containing attachments
2233    /// (minidump, unreal, standalone attachments) in bytes.
2234    pub fn max_attachments_size(&self) -> usize {
2235        self.values.limits.max_attachments_size.as_bytes()
2236    }
2237
2238    /// Returns the maximum combined size of client reports in bytes.
2239    pub fn max_client_reports_size(&self) -> usize {
2240        self.values.limits.max_client_reports_size.as_bytes()
2241    }
2242
2243    /// Returns the maximum payload size of a monitor check-in in bytes.
2244    pub fn max_check_in_size(&self) -> usize {
2245        self.values.limits.max_check_in_size.as_bytes()
2246    }
2247
2248    /// Returns the maximum payload size of a log in bytes.
2249    pub fn max_log_size(&self) -> usize {
2250        self.values.limits.max_log_size.as_bytes()
2251    }
2252
2253    /// Returns the maximum payload size of a span in bytes.
2254    pub fn max_span_size(&self) -> usize {
2255        self.values.limits.max_span_size.as_bytes()
2256    }
2257
2258    /// Returns the maximum size of an envelope payload in bytes.
2259    ///
2260    /// Individual item size limits still apply.
2261    pub fn max_envelope_size(&self) -> usize {
2262        self.values.limits.max_envelope_size.as_bytes()
2263    }
2264
2265    /// Returns the maximum number of sessions per envelope.
2266    pub fn max_session_count(&self) -> usize {
2267        self.values.limits.max_session_count
2268    }
2269
2270    /// Returns the maximum payload size of a statsd metric in bytes.
2271    pub fn max_statsd_size(&self) -> usize {
2272        self.values.limits.max_statsd_size.as_bytes()
2273    }
2274
2275    /// Returns the maximum payload size of metric buckets in bytes.
2276    pub fn max_metric_buckets_size(&self) -> usize {
2277        self.values.limits.max_metric_buckets_size.as_bytes()
2278    }
2279
2280    /// Whether metric stats are collected and emitted.
2281    ///
2282    /// Metric stats are always collected and emitted when processing
2283    /// is enabled.
2284    pub fn metric_stats_enabled(&self) -> bool {
2285        self.values.sentry_metrics.metric_stats_enabled || self.values.processing.enabled
2286    }
2287
2288    /// Returns the maximum payload size for general API requests.
2289    pub fn max_api_payload_size(&self) -> usize {
2290        self.values.limits.max_api_payload_size.as_bytes()
2291    }
2292
2293    /// Returns the maximum payload size for file uploads and chunks.
2294    pub fn max_api_file_upload_size(&self) -> usize {
2295        self.values.limits.max_api_file_upload_size.as_bytes()
2296    }
2297
2298    /// Returns the maximum payload size for chunks
2299    pub fn max_api_chunk_upload_size(&self) -> usize {
2300        self.values.limits.max_api_chunk_upload_size.as_bytes()
2301    }
2302
2303    /// Returns the maximum payload size for a profile
2304    pub fn max_profile_size(&self) -> usize {
2305        self.values.limits.max_profile_size.as_bytes()
2306    }
2307
2308    /// Returns the maximum payload size for a compressed replay.
2309    pub fn max_replay_compressed_size(&self) -> usize {
2310        self.values.limits.max_replay_compressed_size.as_bytes()
2311    }
2312
2313    /// Returns the maximum payload size for an uncompressed replay.
2314    pub fn max_replay_uncompressed_size(&self) -> usize {
2315        self.values.limits.max_replay_uncompressed_size.as_bytes()
2316    }
2317
2318    /// Returns the maximum message size for an uncompressed replay.
2319    ///
2320    /// This is greater than max_replay_compressed_size because
2321    /// it can include additional metadata about the replay in
2322    /// addition to the recording.
2323    pub fn max_replay_message_size(&self) -> usize {
2324        self.values.limits.max_replay_message_size.as_bytes()
2325    }
2326
2327    /// Returns the maximum number of active requests
2328    pub fn max_concurrent_requests(&self) -> usize {
2329        self.values.limits.max_concurrent_requests
2330    }
2331
2332    /// Returns the maximum number of active queries
2333    pub fn max_concurrent_queries(&self) -> usize {
2334        self.values.limits.max_concurrent_queries
2335    }
2336
2337    /// The maximum number of seconds a query is allowed to take across retries.
2338    pub fn query_timeout(&self) -> Duration {
2339        Duration::from_secs(self.values.limits.query_timeout)
2340    }
2341
2342    /// The maximum number of seconds to wait for pending envelopes after receiving a shutdown
2343    /// signal.
2344    pub fn shutdown_timeout(&self) -> Duration {
2345        Duration::from_secs(self.values.limits.shutdown_timeout)
2346    }
2347
2348    /// Returns the server keep-alive timeout in seconds.
2349    ///
2350    /// By default keep alive is set to a 5 seconds.
2351    pub fn keepalive_timeout(&self) -> Duration {
2352        Duration::from_secs(self.values.limits.keepalive_timeout)
2353    }
2354
2355    /// Returns the server idle timeout in seconds.
2356    pub fn idle_timeout(&self) -> Option<Duration> {
2357        self.values.limits.idle_timeout.map(Duration::from_secs)
2358    }
2359
2360    /// Returns the maximum connections.
2361    pub fn max_connections(&self) -> Option<usize> {
2362        self.values.limits.max_connections
2363    }
2364
2365    /// TCP listen backlog to configure on Relay's listening socket.
2366    pub fn tcp_listen_backlog(&self) -> u32 {
2367        self.values.limits.tcp_listen_backlog
2368    }
2369
2370    /// Returns the number of cores to use for thread pools.
2371    pub fn cpu_concurrency(&self) -> usize {
2372        self.values.limits.max_thread_count
2373    }
2374
2375    /// Returns the number of tasks that can run concurrently in the worker pool.
2376    pub fn pool_concurrency(&self) -> usize {
2377        self.values.limits.max_pool_concurrency
2378    }
2379
2380    /// Returns the maximum size of a project config query.
2381    pub fn query_batch_size(&self) -> usize {
2382        self.values.cache.batch_size
2383    }
2384
2385    /// Get filename for static project config.
2386    pub fn project_configs_path(&self) -> PathBuf {
2387        self.path.join("projects")
2388    }
2389
2390    /// True if the Relay should do processing.
2391    pub fn processing_enabled(&self) -> bool {
2392        self.values.processing.enabled
2393    }
2394
2395    /// Level of normalization for Relay to apply to incoming data.
2396    pub fn normalization_level(&self) -> NormalizationLevel {
2397        self.values.normalization.level
2398    }
2399
2400    /// The path to the GeoIp database required for event processing.
2401    pub fn geoip_path(&self) -> Option<&Path> {
2402        self.values
2403            .geoip
2404            .path
2405            .as_deref()
2406            .or(self.values.processing.geoip_path.as_deref())
2407    }
2408
2409    /// Maximum future timestamp of ingested data.
2410    ///
2411    /// Events past this timestamp will be adjusted to `now()`. Sessions will be dropped.
2412    pub fn max_secs_in_future(&self) -> i64 {
2413        self.values.processing.max_secs_in_future.into()
2414    }
2415
2416    /// Maximum age of ingested sessions. Older sessions will be dropped.
2417    pub fn max_session_secs_in_past(&self) -> i64 {
2418        self.values.processing.max_session_secs_in_past.into()
2419    }
2420
2421    /// Configuration name and list of Kafka configuration parameters for a given topic.
2422    pub fn kafka_config(&self, topic: KafkaTopic) -> Result<KafkaParams, KafkaConfigError> {
2423        self.values.processing.topics.get(topic).kafka_config(
2424            &self.values.processing.kafka_config,
2425            &self.values.processing.secondary_kafka_configs,
2426        )
2427    }
2428
2429    /// Whether to validate the topics against Kafka.
2430    pub fn kafka_validate_topics(&self) -> bool {
2431        self.values.processing.kafka_validate_topics
2432    }
2433
2434    /// All unused but configured topic assignments.
2435    pub fn unused_topic_assignments(&self) -> &BTreeMap<String, TopicAssignment> {
2436        &self.values.processing.topics.unused
2437    }
2438
2439    /// Redis servers to connect to for project configs, cardinality limits,
2440    /// rate limiting, and metrics metadata.
2441    pub fn redis(&self) -> Option<RedisConfigsRef> {
2442        let redis_configs = self.values.processing.redis.as_ref()?;
2443
2444        Some(build_redis_configs(
2445            redis_configs,
2446            self.cpu_concurrency() as u32,
2447        ))
2448    }
2449
2450    /// Chunk size of attachments in bytes.
2451    pub fn attachment_chunk_size(&self) -> usize {
2452        self.values.processing.attachment_chunk_size.as_bytes()
2453    }
2454
2455    /// Maximum metrics batch size in bytes.
2456    pub fn metrics_max_batch_size_bytes(&self) -> usize {
2457        self.values.aggregator.max_flush_bytes
2458    }
2459
2460    /// Default prefix to use when looking up project configs in Redis. This is only done when
2461    /// Relay is in processing mode.
2462    pub fn projectconfig_cache_prefix(&self) -> &str {
2463        &self.values.processing.projectconfig_cache_prefix
2464    }
2465
2466    /// Maximum rate limit to report to clients in seconds.
2467    pub fn max_rate_limit(&self) -> Option<u64> {
2468        self.values.processing.max_rate_limit.map(u32::into)
2469    }
2470
2471    /// Cache vacuum interval for the cardinality limiter in memory cache.
2472    ///
2473    /// The cache will scan for expired values based on this interval.
2474    pub fn cardinality_limiter_cache_vacuum_interval(&self) -> Duration {
2475        Duration::from_secs(self.values.cardinality_limiter.cache_vacuum_interval)
2476    }
2477
2478    /// Interval to refresh internal health checks.
2479    pub fn health_refresh_interval(&self) -> Duration {
2480        Duration::from_millis(self.values.health.refresh_interval_ms)
2481    }
2482
2483    /// Maximum memory watermark in bytes.
2484    pub fn health_max_memory_watermark_bytes(&self) -> u64 {
2485        self.values
2486            .health
2487            .max_memory_bytes
2488            .as_ref()
2489            .map_or(u64::MAX, |b| b.as_bytes() as u64)
2490    }
2491
2492    /// Maximum memory watermark as a percentage of maximum system memory.
2493    pub fn health_max_memory_watermark_percent(&self) -> f32 {
2494        self.values.health.max_memory_percent
2495    }
2496
2497    /// Health check probe timeout.
2498    pub fn health_probe_timeout(&self) -> Duration {
2499        Duration::from_millis(self.values.health.probe_timeout_ms)
2500    }
2501
2502    /// Refresh frequency for polling new memory stats.
2503    pub fn memory_stat_refresh_frequency_ms(&self) -> u64 {
2504        self.values.health.memory_stat_refresh_frequency_ms
2505    }
2506
2507    /// Maximum amount of COGS measurements buffered in memory.
2508    pub fn cogs_max_queue_size(&self) -> u64 {
2509        self.values.cogs.max_queue_size
2510    }
2511
2512    /// Resource ID to use for Relay COGS measurements.
2513    pub fn cogs_relay_resource_id(&self) -> &str {
2514        &self.values.cogs.relay_resource_id
2515    }
2516
2517    /// Returns configuration for the default metrics aggregator.
2518    pub fn default_aggregator_config(&self) -> &AggregatorServiceConfig {
2519        &self.values.aggregator
2520    }
2521
2522    /// Returns configuration for non-default metrics aggregator.
2523    pub fn secondary_aggregator_configs(&self) -> &Vec<ScopedAggregatorConfig> {
2524        &self.values.secondary_aggregators
2525    }
2526
2527    /// Returns aggregator config for a given metrics namespace.
2528    pub fn aggregator_config_for(&self, namespace: MetricNamespace) -> &AggregatorServiceConfig {
2529        for entry in &self.values.secondary_aggregators {
2530            if entry.condition.matches(Some(namespace)) {
2531                return &entry.config;
2532            }
2533        }
2534        &self.values.aggregator
2535    }
2536
2537    /// Return the statically configured Relays.
2538    pub fn static_relays(&self) -> &HashMap<RelayId, RelayInfo> {
2539        &self.values.auth.static_relays
2540    }
2541
2542    /// Returns `true` if unknown items should be accepted and forwarded.
2543    pub fn accept_unknown_items(&self) -> bool {
2544        let forward = self.values.routing.accept_unknown_items;
2545        forward.unwrap_or_else(|| !self.processing_enabled())
2546    }
2547}
2548
2549impl Default for Config {
2550    fn default() -> Self {
2551        Self {
2552            values: ConfigValues::default(),
2553            credentials: None,
2554            path: PathBuf::new(),
2555        }
2556    }
2557}
2558
2559#[cfg(test)]
2560mod tests {
2561
2562    use super::*;
2563
2564    /// Regression test for renaming the envelope buffer flags.
2565    #[test]
2566    fn test_event_buffer_size() {
2567        let yaml = r###"
2568cache:
2569    event_buffer_size: 1000000
2570    event_expiry: 1800
2571"###;
2572
2573        let values: ConfigValues = serde_yaml::from_str(yaml).unwrap();
2574        assert_eq!(values.cache.envelope_buffer_size, 1_000_000);
2575        assert_eq!(values.cache.envelope_expiry, 1800);
2576    }
2577
2578    #[test]
2579    fn test_emit_outcomes() {
2580        for (serialized, deserialized) in &[
2581            ("true", EmitOutcomes::AsOutcomes),
2582            ("false", EmitOutcomes::None),
2583            ("\"as_client_reports\"", EmitOutcomes::AsClientReports),
2584        ] {
2585            let value: EmitOutcomes = serde_json::from_str(serialized).unwrap();
2586            assert_eq!(value, *deserialized);
2587            assert_eq!(serde_json::to_string(&value).unwrap(), *serialized);
2588        }
2589    }
2590
2591    #[test]
2592    fn test_emit_outcomes_invalid() {
2593        assert!(serde_json::from_str::<EmitOutcomes>("asdf").is_err());
2594    }
2595}