relay_config/
config.rs

1use std::collections::{BTreeMap, HashMap};
2use std::error::Error;
3use std::io::Write;
4use std::net::{IpAddr, SocketAddr, ToSocketAddrs};
5use std::num::NonZeroU8;
6use std::path::{Path, PathBuf};
7use std::str::FromStr;
8use std::time::Duration;
9use std::{env, fmt, fs, io};
10
11use anyhow::Context;
12use relay_auth::{PublicKey, RelayId, SecretKey, generate_key_pair, generate_relay_id};
13use relay_common::Dsn;
14use relay_kafka::{
15    ConfigError as KafkaConfigError, KafkaConfigParam, KafkaTopic, KafkaTopicConfig,
16    TopicAssignments,
17};
18use relay_metrics::MetricNamespace;
19use serde::de::{DeserializeOwned, Unexpected, Visitor};
20use serde::{Deserialize, Deserializer, Serialize, Serializer};
21use uuid::Uuid;
22
23use crate::aggregator::{AggregatorServiceConfig, ScopedAggregatorConfig};
24use crate::byte_size::ByteSize;
25use crate::upstream::UpstreamDescriptor;
26use crate::{RedisConfig, RedisConfigs, RedisConfigsRef, build_redis_configs};
27
28const DEFAULT_NETWORK_OUTAGE_GRACE_PERIOD: u64 = 10;
29
30static CONFIG_YAML_HEADER: &str = r###"# Please see the relevant documentation.
31# Performance tuning: https://docs.sentry.io/product/relay/operating-guidelines/
32# All config options: https://docs.sentry.io/product/relay/options/
33"###;
34
35/// Indicates config related errors.
36#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
37#[non_exhaustive]
38pub enum ConfigErrorKind {
39    /// Failed to open the file.
40    CouldNotOpenFile,
41    /// Failed to save a file.
42    CouldNotWriteFile,
43    /// Parsing YAML failed.
44    BadYaml,
45    /// Parsing JSON failed.
46    BadJson,
47    /// Invalid config value
48    InvalidValue,
49    /// The user attempted to run Relay with processing enabled, but uses a binary that was
50    /// compiled without the processing feature.
51    ProcessingNotAvailable,
52}
53
54impl fmt::Display for ConfigErrorKind {
55    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
56        match self {
57            Self::CouldNotOpenFile => write!(f, "could not open config file"),
58            Self::CouldNotWriteFile => write!(f, "could not write config file"),
59            Self::BadYaml => write!(f, "could not parse yaml config file"),
60            Self::BadJson => write!(f, "could not parse json config file"),
61            Self::InvalidValue => write!(f, "invalid config value"),
62            Self::ProcessingNotAvailable => write!(
63                f,
64                "was not compiled with processing, cannot enable processing"
65            ),
66        }
67    }
68}
69
70/// Defines the source of a config error
71#[derive(Debug)]
72enum ConfigErrorSource {
73    /// An error occurring independently.
74    None,
75    /// An error originating from a configuration file.
76    File(PathBuf),
77    /// An error originating in a field override (an env var, or a CLI parameter).
78    FieldOverride(String),
79}
80
81impl Default for ConfigErrorSource {
82    fn default() -> Self {
83        Self::None
84    }
85}
86
87impl fmt::Display for ConfigErrorSource {
88    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
89        match self {
90            ConfigErrorSource::None => Ok(()),
91            ConfigErrorSource::File(file_name) => {
92                write!(f, " (file {})", file_name.display())
93            }
94            ConfigErrorSource::FieldOverride(name) => write!(f, " (field {name})"),
95        }
96    }
97}
98
99/// Indicates config related errors.
100#[derive(Debug)]
101pub struct ConfigError {
102    source: ConfigErrorSource,
103    kind: ConfigErrorKind,
104}
105
106impl ConfigError {
107    #[inline]
108    fn new(kind: ConfigErrorKind) -> Self {
109        Self {
110            source: ConfigErrorSource::None,
111            kind,
112        }
113    }
114
115    #[inline]
116    fn field(field: &'static str) -> Self {
117        Self {
118            source: ConfigErrorSource::FieldOverride(field.to_owned()),
119            kind: ConfigErrorKind::InvalidValue,
120        }
121    }
122
123    #[inline]
124    fn file(kind: ConfigErrorKind, p: impl AsRef<Path>) -> Self {
125        Self {
126            source: ConfigErrorSource::File(p.as_ref().to_path_buf()),
127            kind,
128        }
129    }
130
131    /// Returns the error kind of the error.
132    pub fn kind(&self) -> ConfigErrorKind {
133        self.kind
134    }
135}
136
137impl fmt::Display for ConfigError {
138    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
139        write!(f, "{}{}", self.kind(), self.source)
140    }
141}
142
143impl Error for ConfigError {}
144
145enum ConfigFormat {
146    Yaml,
147    Json,
148}
149
150impl ConfigFormat {
151    pub fn extension(&self) -> &'static str {
152        match self {
153            ConfigFormat::Yaml => "yml",
154            ConfigFormat::Json => "json",
155        }
156    }
157}
158
159trait ConfigObject: DeserializeOwned + Serialize {
160    /// The format in which to serialize this configuration.
161    fn format() -> ConfigFormat;
162
163    /// The basename of the config file.
164    fn name() -> &'static str;
165
166    /// The full filename of the config file, including the file extension.
167    fn path(base: &Path) -> PathBuf {
168        base.join(format!("{}.{}", Self::name(), Self::format().extension()))
169    }
170
171    /// Loads the config file from a file within the given directory location.
172    fn load(base: &Path) -> anyhow::Result<Self> {
173        let path = Self::path(base);
174
175        let f = fs::File::open(&path)
176            .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotOpenFile, &path))?;
177        let f = io::BufReader::new(f);
178
179        let mut source = serde_vars::EnvSource::default();
180        match Self::format() {
181            ConfigFormat::Yaml => {
182                serde_vars::deserialize(serde_yaml::Deserializer::from_reader(f), &mut source)
183                    .with_context(|| ConfigError::file(ConfigErrorKind::BadYaml, &path))
184            }
185            ConfigFormat::Json => {
186                serde_vars::deserialize(&mut serde_json::Deserializer::from_reader(f), &mut source)
187                    .with_context(|| ConfigError::file(ConfigErrorKind::BadJson, &path))
188            }
189        }
190    }
191
192    /// Writes the configuration to a file within the given directory location.
193    fn save(&self, base: &Path) -> anyhow::Result<()> {
194        let path = Self::path(base);
195        let mut options = fs::OpenOptions::new();
196        options.write(true).truncate(true).create(true);
197
198        // Remove all non-user permissions for the newly created file
199        #[cfg(unix)]
200        {
201            use std::os::unix::fs::OpenOptionsExt;
202            options.mode(0o600);
203        }
204
205        let mut f = options
206            .open(&path)
207            .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path))?;
208
209        match Self::format() {
210            ConfigFormat::Yaml => {
211                f.write_all(CONFIG_YAML_HEADER.as_bytes())?;
212                serde_yaml::to_writer(&mut f, self)
213                    .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path))?
214            }
215            ConfigFormat::Json => serde_json::to_writer_pretty(&mut f, self)
216                .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path))?,
217        }
218
219        f.write_all(b"\n").ok();
220
221        Ok(())
222    }
223}
224
225/// Structure used to hold information about configuration overrides via
226/// CLI parameters or environment variables
227#[derive(Debug, Default)]
228pub struct OverridableConfig {
229    /// The operation mode of this relay.
230    pub mode: Option<String>,
231    /// The instance type of this relay.
232    pub instance: Option<String>,
233    /// The log level of this relay.
234    pub log_level: Option<String>,
235    /// The log format of this relay.
236    pub log_format: Option<String>,
237    /// The upstream relay or sentry instance.
238    pub upstream: Option<String>,
239    /// Alternate upstream provided through a Sentry DSN. Key and project will be ignored.
240    pub upstream_dsn: Option<String>,
241    /// The host the relay should bind to (network interface).
242    pub host: Option<String>,
243    /// The port to bind for the unencrypted relay HTTP server.
244    pub port: Option<String>,
245    /// "true" if processing is enabled "false" otherwise
246    pub processing: Option<String>,
247    /// the kafka bootstrap.servers configuration string
248    pub kafka_url: Option<String>,
249    /// the redis server url
250    pub redis_url: Option<String>,
251    /// The globally unique ID of the relay.
252    pub id: Option<String>,
253    /// The secret key of the relay
254    pub secret_key: Option<String>,
255    /// The public key of the relay
256    pub public_key: Option<String>,
257    /// Outcome source
258    pub outcome_source: Option<String>,
259    /// shutdown timeout
260    pub shutdown_timeout: Option<String>,
261    /// Server name reported in the Sentry SDK.
262    pub server_name: Option<String>,
263}
264
265/// The relay credentials
266#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
267pub struct Credentials {
268    /// The secret key of the relay
269    pub secret_key: SecretKey,
270    /// The public key of the relay
271    pub public_key: PublicKey,
272    /// The globally unique ID of the relay.
273    pub id: RelayId,
274}
275
276impl Credentials {
277    /// Generates new random credentials.
278    pub fn generate() -> Self {
279        relay_log::info!("generating new relay credentials");
280        let (sk, pk) = generate_key_pair();
281        Self {
282            secret_key: sk,
283            public_key: pk,
284            id: generate_relay_id(),
285        }
286    }
287
288    /// Serializes this configuration to JSON.
289    pub fn to_json_string(&self) -> anyhow::Result<String> {
290        serde_json::to_string(self)
291            .with_context(|| ConfigError::new(ConfigErrorKind::CouldNotWriteFile))
292    }
293}
294
295impl ConfigObject for Credentials {
296    fn format() -> ConfigFormat {
297        ConfigFormat::Json
298    }
299    fn name() -> &'static str {
300        "credentials"
301    }
302}
303
304/// Information on a downstream Relay.
305#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
306#[serde(rename_all = "camelCase")]
307pub struct RelayInfo {
308    /// The public key that this Relay uses to authenticate and sign requests.
309    pub public_key: PublicKey,
310
311    /// Marks an internal relay that has privileged access to more project configuration.
312    #[serde(default)]
313    pub internal: bool,
314}
315
316impl RelayInfo {
317    /// Creates a new RelayInfo
318    pub fn new(public_key: PublicKey) -> Self {
319        Self {
320            public_key,
321            internal: false,
322        }
323    }
324}
325
326/// The operation mode of a relay.
327#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize)]
328#[serde(rename_all = "camelCase")]
329pub enum RelayMode {
330    /// This relay acts as a proxy for all requests and events.
331    ///
332    /// Events are normalized and rate limits from the upstream are enforced, but the relay will not
333    /// fetch project configurations from the upstream or perform PII stripping. All events are
334    /// accepted unless overridden on the file system.
335    Proxy,
336
337    /// This relay is configured statically in the file system.
338    ///
339    /// Events are only accepted for projects configured statically in the file system. All other
340    /// events are rejected. If configured, PII stripping is also performed on those events.
341    Static,
342
343    /// Project configurations are managed by the upstream.
344    ///
345    /// Project configurations are always fetched from the upstream, unless they are statically
346    /// overridden in the file system. This relay must be allowed in the upstream Sentry. This is
347    /// only possible, if the upstream is Sentry directly, or another managed Relay.
348    Managed,
349}
350
351impl fmt::Display for RelayMode {
352    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
353        match self {
354            RelayMode::Proxy => write!(f, "proxy"),
355            RelayMode::Static => write!(f, "static"),
356            RelayMode::Managed => write!(f, "managed"),
357        }
358    }
359}
360
361/// The instance type of Relay.
362#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize)]
363#[serde(rename_all = "camelCase")]
364pub enum RelayInstance {
365    /// This Relay is run as a default instance.
366    Default,
367
368    /// This Relay is run as a canary instance where experiments can be run.
369    Canary,
370}
371
372impl RelayInstance {
373    /// Returns `true` if the [`RelayInstance`] is of type [`RelayInstance::Canary`].
374    pub fn is_canary(&self) -> bool {
375        matches!(self, RelayInstance::Canary)
376    }
377}
378
379impl fmt::Display for RelayInstance {
380    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
381        match self {
382            RelayInstance::Default => write!(f, "default"),
383            RelayInstance::Canary => write!(f, "canary"),
384        }
385    }
386}
387
388impl FromStr for RelayInstance {
389    type Err = fmt::Error;
390
391    fn from_str(s: &str) -> Result<Self, Self::Err> {
392        match s {
393            "canary" => Ok(RelayInstance::Canary),
394            _ => Ok(RelayInstance::Default),
395        }
396    }
397}
398
399/// Error returned when parsing an invalid [`RelayMode`].
400#[derive(Clone, Copy, Debug, Eq, PartialEq)]
401pub struct ParseRelayModeError;
402
403impl fmt::Display for ParseRelayModeError {
404    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
405        write!(
406            f,
407            "Relay mode must be one of: managed, static, proxy, capture"
408        )
409    }
410}
411
412impl Error for ParseRelayModeError {}
413
414impl FromStr for RelayMode {
415    type Err = ParseRelayModeError;
416
417    fn from_str(s: &str) -> Result<Self, Self::Err> {
418        match s {
419            "proxy" => Ok(RelayMode::Proxy),
420            "static" => Ok(RelayMode::Static),
421            "managed" => Ok(RelayMode::Managed),
422            _ => Err(ParseRelayModeError),
423        }
424    }
425}
426
427/// Returns `true` if this value is equal to `Default::default()`.
428fn is_default<T: Default + PartialEq>(t: &T) -> bool {
429    *t == T::default()
430}
431
432/// Checks if we are running in docker.
433fn is_docker() -> bool {
434    if fs::metadata("/.dockerenv").is_ok() {
435        return true;
436    }
437
438    fs::read_to_string("/proc/self/cgroup").is_ok_and(|s| s.contains("/docker"))
439}
440
441/// Default value for the "bind" configuration.
442fn default_host() -> IpAddr {
443    if is_docker() {
444        // Docker images rely on this service being exposed
445        "0.0.0.0".parse().unwrap()
446    } else {
447        "127.0.0.1".parse().unwrap()
448    }
449}
450
451/// Controls responses from the readiness health check endpoint based on authentication.
452///
453/// Independent of the the readiness condition, shutdown always switches Relay into unready state.
454#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize)]
455#[serde(rename_all = "lowercase")]
456pub enum ReadinessCondition {
457    /// (default) Relay is ready when authenticated and connected to the upstream.
458    ///
459    /// Before authentication has succeeded and during network outages, Relay responds as not ready.
460    /// Relay reauthenticates based on the `http.auth_interval` parameter. During reauthentication,
461    /// Relay remains ready until authentication fails.
462    ///
463    /// Authentication is only required for Relays in managed mode. Other Relays will only check for
464    /// network outages.
465    Authenticated,
466    /// Relay reports readiness regardless of the authentication and networking state.
467    Always,
468}
469
470impl Default for ReadinessCondition {
471    fn default() -> Self {
472        Self::Authenticated
473    }
474}
475
476/// Relay specific configuration values.
477#[derive(Serialize, Deserialize, Debug)]
478#[serde(default)]
479pub struct Relay {
480    /// The operation mode of this relay.
481    pub mode: RelayMode,
482    /// The instance type of this relay.
483    pub instance: RelayInstance,
484    /// The upstream relay or sentry instance.
485    pub upstream: UpstreamDescriptor<'static>,
486    /// The host the relay should bind to (network interface).
487    pub host: IpAddr,
488    /// The port to bind for the unencrypted relay HTTP server.
489    pub port: u16,
490    /// Optional port to bind for the encrypted relay HTTPS server.
491    #[serde(skip_serializing)]
492    pub tls_port: Option<u16>,
493    /// The path to the identity (DER-encoded PKCS12) to use for TLS.
494    #[serde(skip_serializing)]
495    pub tls_identity_path: Option<PathBuf>,
496    /// Password for the PKCS12 archive.
497    #[serde(skip_serializing)]
498    pub tls_identity_password: Option<String>,
499    /// Always override project IDs from the URL and DSN with the identifier used at the upstream.
500    ///
501    /// Enable this setting for Relays used to redirect traffic to a migrated Sentry instance.
502    /// Validation of project identifiers can be safely skipped in these cases.
503    #[serde(skip_serializing_if = "is_default")]
504    pub override_project_ids: bool,
505}
506
507impl Default for Relay {
508    fn default() -> Self {
509        Relay {
510            mode: RelayMode::Managed,
511            instance: RelayInstance::Default,
512            upstream: "https://sentry.io/".parse().unwrap(),
513            host: default_host(),
514            port: 3000,
515            tls_port: None,
516            tls_identity_path: None,
517            tls_identity_password: None,
518            override_project_ids: false,
519        }
520    }
521}
522
523/// Control the metrics.
524#[derive(Serialize, Deserialize, Debug)]
525#[serde(default)]
526pub struct Metrics {
527    /// Hostname and port of the statsd server.
528    ///
529    /// Defaults to `None`.
530    pub statsd: Option<String>,
531    /// Common prefix that should be added to all metrics.
532    ///
533    /// Defaults to `"sentry.relay"`.
534    pub prefix: String,
535    /// Default tags to apply to all metrics.
536    pub default_tags: BTreeMap<String, String>,
537    /// Tag name to report the hostname to for each metric. Defaults to not sending such a tag.
538    pub hostname_tag: Option<String>,
539    /// Global sample rate for all emitted metrics between `0.0` and `1.0`.
540    ///
541    /// For example, a value of `0.3` means that only 30% of the emitted metrics will be sent.
542    /// Defaults to `1.0` (100%).
543    pub sample_rate: f32,
544    /// Interval for periodic metrics emitted from Relay.
545    ///
546    /// Setting it to `0` seconds disables the periodic metrics.
547    /// Defaults to 5 seconds.
548    pub periodic_secs: u64,
549    /// Whether local metric aggregation using statdsproxy should be enabled.
550    ///
551    /// Defaults to `true`.
552    pub aggregate: bool,
553    /// Allows emission of metrics with high cardinality tags.
554    ///
555    /// High cardinality tags are dynamic values attached to metrics,
556    /// such as project IDs. When enabled, these tags will be included
557    /// in the emitted metrics. When disabled, the tags will be omitted.
558    ///
559    /// Defaults to `false`.
560    pub allow_high_cardinality_tags: bool,
561}
562
563impl Default for Metrics {
564    fn default() -> Self {
565        Metrics {
566            statsd: None,
567            prefix: "sentry.relay".into(),
568            default_tags: BTreeMap::new(),
569            hostname_tag: None,
570            sample_rate: 1.0,
571            periodic_secs: 5,
572            aggregate: true,
573            allow_high_cardinality_tags: false,
574        }
575    }
576}
577
578/// Controls processing of Sentry metrics and metric metadata.
579#[derive(Serialize, Deserialize, Debug, Default)]
580#[serde(default)]
581pub struct SentryMetrics {
582    /// Whether metric stats are collected and emitted.
583    ///
584    /// Metric stats are always collected and emitted when processing
585    /// is enabled.
586    ///
587    /// This option is required for running multiple trusted Relays in a chain
588    /// and you want the metric stats to be collected and forwarded from
589    /// the first Relay in the chain.
590    ///
591    /// Defaults to `false`.
592    pub metric_stats_enabled: bool,
593}
594
595/// Controls various limits
596#[derive(Serialize, Deserialize, Debug)]
597#[serde(default)]
598pub struct Limits {
599    /// How many requests can be sent concurrently from Relay to the upstream before Relay starts
600    /// buffering.
601    pub max_concurrent_requests: usize,
602    /// How many queries can be sent concurrently from Relay to the upstream before Relay starts
603    /// buffering.
604    ///
605    /// The concurrency of queries is additionally constrained by `max_concurrent_requests`.
606    pub max_concurrent_queries: usize,
607    /// The maximum payload size for events.
608    pub max_event_size: ByteSize,
609    /// The maximum size for each attachment.
610    pub max_attachment_size: ByteSize,
611    /// The maximum combined size for all attachments in an envelope or request.
612    pub max_attachments_size: ByteSize,
613    /// The maximum combined size for all client reports in an envelope or request.
614    pub max_client_reports_size: ByteSize,
615    /// The maximum payload size for a monitor check-in.
616    pub max_check_in_size: ByteSize,
617    /// The maximum payload size for an entire envelopes. Individual limits still apply.
618    pub max_envelope_size: ByteSize,
619    /// The maximum number of session items per envelope.
620    pub max_session_count: usize,
621    /// The maximum number of standalone span items per envelope.
622    pub max_span_count: usize,
623    /// The maximum number of log items per envelope.
624    pub max_log_count: usize,
625    /// The maximum payload size for general API requests.
626    pub max_api_payload_size: ByteSize,
627    /// The maximum payload size for file uploads and chunks.
628    pub max_api_file_upload_size: ByteSize,
629    /// The maximum payload size for chunks
630    pub max_api_chunk_upload_size: ByteSize,
631    /// The maximum payload size for a profile
632    pub max_profile_size: ByteSize,
633    /// The maximum payload size for a span.
634    pub max_log_size: ByteSize,
635    /// The maximum payload size for a span.
636    pub max_span_size: ByteSize,
637    /// The maximum payload size for an item container.
638    pub max_container_size: ByteSize,
639    /// The maximum payload size for a statsd metric.
640    pub max_statsd_size: ByteSize,
641    /// The maximum payload size for metric buckets.
642    pub max_metric_buckets_size: ByteSize,
643    /// The maximum payload size for a compressed replay.
644    pub max_replay_compressed_size: ByteSize,
645    /// The maximum payload size for an uncompressed replay.
646    #[serde(alias = "max_replay_size")]
647    max_replay_uncompressed_size: ByteSize,
648    /// The maximum size for a replay recording Kafka message.
649    pub max_replay_message_size: ByteSize,
650    /// The maximum number of threads to spawn for CPU and web work, each.
651    ///
652    /// The total number of threads spawned will roughly be `2 * max_thread_count`. Defaults to
653    /// the number of logical CPU cores on the host.
654    pub max_thread_count: usize,
655    /// Controls the maximum concurrency of each worker thread.
656    ///
657    /// Increasing the concurrency, can lead to a better utilization of worker threads by
658    /// increasing the amount of I/O done concurrently.
659    //
660    /// Currently has no effect on defaults to `1`.
661    pub max_pool_concurrency: usize,
662    /// The maximum number of seconds a query is allowed to take across retries. Individual requests
663    /// have lower timeouts. Defaults to 30 seconds.
664    pub query_timeout: u64,
665    /// The maximum number of seconds to wait for pending envelopes after receiving a shutdown
666    /// signal.
667    pub shutdown_timeout: u64,
668    /// Server keep-alive timeout in seconds.
669    ///
670    /// By default, keep-alive is set to 5 seconds.
671    pub keepalive_timeout: u64,
672    /// Server idle timeout in seconds.
673    ///
674    /// The idle timeout limits the amount of time a connection is kept open without activity.
675    /// Setting this too short may abort connections before Relay is able to send a response.
676    ///
677    /// By default there is no idle timeout.
678    pub idle_timeout: Option<u64>,
679    /// Sets the maximum number of concurrent connections.
680    ///
681    /// Upon reaching the limit, the server will stop accepting connections.
682    ///
683    /// By default there is no limit.
684    pub max_connections: Option<usize>,
685    /// The TCP listen backlog.
686    ///
687    /// Configures the TCP listen backlog for the listening socket of Relay.
688    /// See [`man listen(2)`](https://man7.org/linux/man-pages/man2/listen.2.html)
689    /// for a more detailed description of the listen backlog.
690    ///
691    /// Defaults to `1024`, a value [google has been using for a long time](https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/commit/?id=19f92a030ca6d772ab44b22ee6a01378a8cb32d4).
692    pub tcp_listen_backlog: u32,
693}
694
695impl Default for Limits {
696    fn default() -> Self {
697        Limits {
698            max_concurrent_requests: 100,
699            max_concurrent_queries: 5,
700            max_event_size: ByteSize::mebibytes(1),
701            max_attachment_size: ByteSize::mebibytes(100),
702            max_attachments_size: ByteSize::mebibytes(100),
703            max_client_reports_size: ByteSize::kibibytes(4),
704            max_check_in_size: ByteSize::kibibytes(100),
705            max_envelope_size: ByteSize::mebibytes(100),
706            max_session_count: 100,
707            max_span_count: 1000,
708            max_log_count: 1000,
709            max_api_payload_size: ByteSize::mebibytes(20),
710            max_api_file_upload_size: ByteSize::mebibytes(40),
711            max_api_chunk_upload_size: ByteSize::mebibytes(100),
712            max_profile_size: ByteSize::mebibytes(50),
713            max_log_size: ByteSize::mebibytes(1),
714            max_span_size: ByteSize::mebibytes(1),
715            max_container_size: ByteSize::mebibytes(3),
716            max_statsd_size: ByteSize::mebibytes(1),
717            max_metric_buckets_size: ByteSize::mebibytes(1),
718            max_replay_compressed_size: ByteSize::mebibytes(10),
719            max_replay_uncompressed_size: ByteSize::mebibytes(100),
720            max_replay_message_size: ByteSize::mebibytes(15),
721            max_thread_count: num_cpus::get(),
722            max_pool_concurrency: 1,
723            query_timeout: 30,
724            shutdown_timeout: 10,
725            keepalive_timeout: 5,
726            idle_timeout: None,
727            max_connections: None,
728            tcp_listen_backlog: 1024,
729        }
730    }
731}
732
733/// Controls traffic steering.
734#[derive(Debug, Default, Deserialize, Serialize)]
735#[serde(default)]
736pub struct Routing {
737    /// Accept and forward unknown Envelope items to the upstream.
738    ///
739    /// Forwarding unknown items should be enabled in most cases to allow proxying traffic for newer
740    /// SDK versions. The upstream in Sentry makes the final decision on which items are valid. If
741    /// this is disabled, just the unknown items are removed from Envelopes, and the rest is
742    /// processed as usual.
743    ///
744    /// Defaults to `true` for all Relay modes other than processing mode. In processing mode, this
745    /// is disabled by default since the item cannot be handled.
746    pub accept_unknown_items: Option<bool>,
747}
748
749/// Http content encoding for both incoming and outgoing web requests.
750#[derive(Clone, Copy, Debug, Default, Deserialize, Serialize)]
751#[serde(rename_all = "lowercase")]
752pub enum HttpEncoding {
753    /// Identity function without no compression.
754    ///
755    /// This is the default encoding and does not require the presence of the `content-encoding`
756    /// HTTP header.
757    #[default]
758    Identity,
759    /// Compression using a [zlib](https://en.wikipedia.org/wiki/Zlib) structure with
760    /// [deflate](https://en.wikipedia.org/wiki/DEFLATE) encoding.
761    ///
762    /// These structures are defined in [RFC 1950](https://datatracker.ietf.org/doc/html/rfc1950)
763    /// and [RFC 1951](https://datatracker.ietf.org/doc/html/rfc1951).
764    Deflate,
765    /// A format using the [Lempel-Ziv coding](https://en.wikipedia.org/wiki/LZ77_and_LZ78#LZ77)
766    /// (LZ77), with a 32-bit CRC.
767    ///
768    /// This is the original format of the UNIX gzip program. The HTTP/1.1 standard also recommends
769    /// that the servers supporting this content-encoding should recognize `x-gzip` as an alias, for
770    /// compatibility purposes.
771    Gzip,
772    /// A format using the [Brotli](https://en.wikipedia.org/wiki/Brotli) algorithm.
773    Br,
774    /// A format using the [Zstd](https://en.wikipedia.org/wiki/Zstd) compression algorithm.
775    Zstd,
776}
777
778impl HttpEncoding {
779    /// Parses a [`HttpEncoding`] from its `content-encoding` header value.
780    pub fn parse(str: &str) -> Self {
781        let str = str.trim();
782        if str.eq_ignore_ascii_case("zstd") {
783            Self::Zstd
784        } else if str.eq_ignore_ascii_case("br") {
785            Self::Br
786        } else if str.eq_ignore_ascii_case("gzip") || str.eq_ignore_ascii_case("x-gzip") {
787            Self::Gzip
788        } else if str.eq_ignore_ascii_case("deflate") {
789            Self::Deflate
790        } else {
791            Self::Identity
792        }
793    }
794
795    /// Returns the value for the `content-encoding` HTTP header.
796    ///
797    /// Returns `None` for [`Identity`](Self::Identity), and `Some` for other encodings.
798    pub fn name(&self) -> Option<&'static str> {
799        match self {
800            Self::Identity => None,
801            Self::Deflate => Some("deflate"),
802            Self::Gzip => Some("gzip"),
803            Self::Br => Some("br"),
804            Self::Zstd => Some("zstd"),
805        }
806    }
807}
808
809/// Controls authentication with upstream.
810#[derive(Serialize, Deserialize, Debug)]
811#[serde(default)]
812pub struct Http {
813    /// Timeout for upstream requests in seconds.
814    ///
815    /// This timeout covers the time from sending the request until receiving response headers.
816    /// Neither the connection process and handshakes, nor reading the response body is covered in
817    /// this timeout.
818    pub timeout: u32,
819    /// Timeout for establishing connections with the upstream in seconds.
820    ///
821    /// This includes SSL handshakes. Relay reuses connections when the upstream supports connection
822    /// keep-alive. Connections are retained for a maximum 75 seconds, or 15 seconds of inactivity.
823    pub connection_timeout: u32,
824    /// Maximum interval between failed request retries in seconds.
825    pub max_retry_interval: u32,
826    /// The custom HTTP Host header to send to the upstream.
827    pub host_header: Option<String>,
828    /// The interval in seconds at which Relay attempts to reauthenticate with the upstream server.
829    ///
830    /// Re-authentication happens even when Relay is idle. If authentication fails, Relay reverts
831    /// back into startup mode and tries to establish a connection. During this time, incoming
832    /// envelopes will be buffered.
833    ///
834    /// Defaults to `600` (10 minutes).
835    pub auth_interval: Option<u64>,
836    /// The maximum time of experiencing uninterrupted network failures until Relay considers that
837    /// it has encountered a network outage in seconds.
838    ///
839    /// During a network outage relay will try to reconnect and will buffer all upstream messages
840    /// until it manages to reconnect.
841    pub outage_grace_period: u64,
842    /// The time Relay waits before retrying an upstream request, in seconds.
843    ///
844    /// This time is only used before going into a network outage mode.
845    pub retry_delay: u64,
846    /// The interval in seconds for continued failed project fetches at which Relay will error.
847    ///
848    /// A successful fetch resets this interval. Relay does nothing during long
849    /// times without emitting requests.
850    pub project_failure_interval: u64,
851    /// Content encoding to apply to upstream store requests.
852    ///
853    /// By default, Relay applies `zstd` content encoding to compress upstream requests. Compression
854    /// can be disabled to reduce CPU consumption, but at the expense of increased network traffic.
855    ///
856    /// This setting applies to all store requests of SDK data, including events, transactions,
857    /// envelopes and sessions. At the moment, this does not apply to Relay's internal queries.
858    ///
859    /// Available options are:
860    ///
861    ///  - `identity`: Disables compression.
862    ///  - `deflate`: Compression using a zlib header with deflate encoding.
863    ///  - `gzip` (default): Compression using gzip.
864    ///  - `br`: Compression using the brotli algorithm.
865    ///  - `zstd`: Compression using the zstd algorithm.
866    pub encoding: HttpEncoding,
867    /// Submit metrics globally through a shared endpoint.
868    ///
869    /// As opposed to regular envelopes which are sent to an endpoint inferred from the project's
870    /// DSN, this submits metrics to the global endpoint with Relay authentication.
871    ///
872    /// This option does not have any effect on processing mode.
873    pub global_metrics: bool,
874}
875
876impl Default for Http {
877    fn default() -> Self {
878        Http {
879            timeout: 5,
880            connection_timeout: 3,
881            max_retry_interval: 60, // 1 minute
882            host_header: None,
883            auth_interval: Some(600), // 10 minutes
884            outage_grace_period: DEFAULT_NETWORK_OUTAGE_GRACE_PERIOD,
885            retry_delay: default_retry_delay(),
886            project_failure_interval: default_project_failure_interval(),
887            encoding: HttpEncoding::Zstd,
888            global_metrics: false,
889        }
890    }
891}
892
893/// Default for unavailable upstream retry period, 1s.
894fn default_retry_delay() -> u64 {
895    1
896}
897
898/// Default for project failure interval, 90s.
899fn default_project_failure_interval() -> u64 {
900    90
901}
902
903/// Default for max disk size, 500 MB.
904fn spool_envelopes_max_disk_size() -> ByteSize {
905    ByteSize::mebibytes(500)
906}
907
908/// Default number of encoded envelope bytes to cache before writing to disk.
909fn spool_envelopes_batch_size_bytes() -> ByteSize {
910    ByteSize::kibibytes(10)
911}
912
913fn spool_envelopes_max_envelope_delay_secs() -> u64 {
914    24 * 60 * 60
915}
916
917/// Default refresh frequency in ms for the disk usage monitoring.
918fn spool_disk_usage_refresh_frequency_ms() -> u64 {
919    100
920}
921
922/// Default bounded buffer size for handling backpressure.
923fn spool_max_backpressure_envelopes() -> usize {
924    500
925}
926
927/// Default max memory usage for unspooling.
928fn spool_max_backpressure_memory_percent() -> f32 {
929    0.9
930}
931
932/// Default number of partitions for the buffer.
933fn spool_envelopes_partitions() -> NonZeroU8 {
934    NonZeroU8::new(1).unwrap()
935}
936
937/// Persistent buffering configuration for incoming envelopes.
938#[derive(Debug, Serialize, Deserialize)]
939pub struct EnvelopeSpool {
940    /// The path of the SQLite database file(s) which persist the data.
941    ///
942    /// Based on the number of partitions, more database files will be created within the same path.
943    ///
944    /// If not set, the envelopes will be buffered in memory.
945    pub path: Option<PathBuf>,
946    /// The maximum size of the buffer to keep, in bytes.
947    ///
948    /// When the on-disk buffer reaches this size, new envelopes will be dropped.
949    ///
950    /// Defaults to 500MB.
951    #[serde(default = "spool_envelopes_max_disk_size")]
952    pub max_disk_size: ByteSize,
953    /// Size of the batch of compressed envelopes that are spooled to disk at once.
954    ///
955    /// Note that this is the size after which spooling will be triggered but it does not guarantee
956    /// that exactly this size will be spooled, it can be greater or equal.
957    ///
958    /// Defaults to 10 KiB.
959    #[serde(default = "spool_envelopes_batch_size_bytes")]
960    pub batch_size_bytes: ByteSize,
961    /// Maximum time between receiving the envelope and processing it.
962    ///
963    /// When envelopes spend too much time in the buffer (e.g. because their project cannot be loaded),
964    /// they are dropped.
965    ///
966    /// Defaults to 24h.
967    #[serde(default = "spool_envelopes_max_envelope_delay_secs")]
968    pub max_envelope_delay_secs: u64,
969    /// The refresh frequency in ms of how frequently disk usage is updated by querying SQLite
970    /// internal page stats.
971    ///
972    /// Defaults to 100ms.
973    #[serde(default = "spool_disk_usage_refresh_frequency_ms")]
974    pub disk_usage_refresh_frequency_ms: u64,
975    /// The amount of envelopes that the envelope buffer can push to its output queue.
976    ///
977    /// Defaults to 500.
978    #[serde(default = "spool_max_backpressure_envelopes")]
979    pub max_backpressure_envelopes: usize,
980    /// The relative memory usage above which the buffer service will stop dequeueing envelopes.
981    ///
982    /// Only applies when [`Self::path`] is set.
983    ///
984    /// This value should be lower than [`Health::max_memory_percent`] to prevent flip-flopping.
985    ///
986    /// Warning: This threshold can cause the buffer service to deadlock when the buffer consumes
987    /// excessive memory (as influenced by [`Self::batch_size_bytes`]).
988    ///
989    /// This scenario arises when the buffer stops spooling due to reaching the
990    /// [`Self::max_backpressure_memory_percent`] limit, but the batch threshold for spooling
991    /// ([`Self::batch_size_bytes`]) is never reached. As a result, no data is spooled, memory usage
992    /// continues to grow, and the system becomes deadlocked.
993    ///
994    /// ### Example
995    /// Suppose the system has 1GB of available memory and is configured to spool only after
996    /// accumulating 10GB worth of envelopes. If Relay consumes 900MB of memory, it will stop
997    /// unspooling due to reaching the [`Self::max_backpressure_memory_percent`] threshold.
998    ///
999    /// However, because the buffer hasn't accumulated the 10GB needed to trigger spooling,
1000    /// no data will be offloaded. Memory usage keeps increasing until it hits the
1001    /// [`Health::max_memory_percent`] threshold, e.g., at 950MB. At this point:
1002    ///
1003    /// - No more envelopes are accepted.
1004    /// - The buffer remains stuck, as unspooling won’t resume until memory drops below 900MB which
1005    ///   will not happen.
1006    /// - A deadlock occurs, with the system unable to recover without manual intervention.
1007    ///
1008    /// Defaults to 90% (5% less than max memory).
1009    #[serde(default = "spool_max_backpressure_memory_percent")]
1010    pub max_backpressure_memory_percent: f32,
1011    /// Number of partitions of the buffer.
1012    ///
1013    /// A partition is a separate instance of the buffer which has its own isolated queue, stacks
1014    /// and other resources.
1015    ///
1016    /// Defaults to 1.
1017    #[serde(default = "spool_envelopes_partitions")]
1018    pub partitions: NonZeroU8,
1019}
1020
1021impl Default for EnvelopeSpool {
1022    fn default() -> Self {
1023        Self {
1024            path: None,
1025            max_disk_size: spool_envelopes_max_disk_size(),
1026            batch_size_bytes: spool_envelopes_batch_size_bytes(),
1027            max_envelope_delay_secs: spool_envelopes_max_envelope_delay_secs(),
1028            disk_usage_refresh_frequency_ms: spool_disk_usage_refresh_frequency_ms(),
1029            max_backpressure_envelopes: spool_max_backpressure_envelopes(),
1030            max_backpressure_memory_percent: spool_max_backpressure_memory_percent(),
1031            partitions: spool_envelopes_partitions(),
1032        }
1033    }
1034}
1035
1036/// Persistent buffering configuration.
1037#[derive(Debug, Serialize, Deserialize, Default)]
1038pub struct Spool {
1039    /// Configuration for envelope spooling.
1040    #[serde(default)]
1041    pub envelopes: EnvelopeSpool,
1042}
1043
1044/// Controls internal caching behavior.
1045#[derive(Serialize, Deserialize, Debug)]
1046#[serde(default)]
1047pub struct Cache {
1048    /// The full project state will be requested by this Relay if set to `true`.
1049    pub project_request_full_config: bool,
1050    /// The cache timeout for project configurations in seconds.
1051    pub project_expiry: u32,
1052    /// Continue using project state this many seconds after cache expiry while a new state is
1053    /// being fetched. This is added on top of `project_expiry`.
1054    ///
1055    /// Default is 2 minutes.
1056    pub project_grace_period: u32,
1057    /// Refresh a project after the specified seconds.
1058    ///
1059    /// The time must be between expiry time and the grace period.
1060    ///
1061    /// By default there are no refreshes enabled.
1062    pub project_refresh_interval: Option<u32>,
1063    /// The cache timeout for downstream relay info (public keys) in seconds.
1064    pub relay_expiry: u32,
1065    /// Unused cache timeout for envelopes.
1066    ///
1067    /// The envelope buffer is instead controlled by `envelope_buffer_size`, which controls the
1068    /// maximum number of envelopes in the buffer. A time based configuration may be re-introduced
1069    /// at a later point.
1070    #[serde(alias = "event_expiry")]
1071    envelope_expiry: u32,
1072    /// The maximum amount of envelopes to queue before dropping them.
1073    #[serde(alias = "event_buffer_size")]
1074    envelope_buffer_size: u32,
1075    /// The cache timeout for non-existing entries.
1076    pub miss_expiry: u32,
1077    /// The buffer timeout for batched project config queries before sending them upstream in ms.
1078    pub batch_interval: u32,
1079    /// The buffer timeout for batched queries of downstream relays in ms. Defaults to 100ms.
1080    pub downstream_relays_batch_interval: u32,
1081    /// The maximum number of project configs to fetch from Sentry at once. Defaults to 500.
1082    ///
1083    /// `cache.batch_interval` controls how quickly batches are sent, this controls the batch size.
1084    pub batch_size: usize,
1085    /// Interval for watching local cache override files in seconds.
1086    pub file_interval: u32,
1087    /// Interval for fetching new global configs from the upstream, in seconds.
1088    pub global_config_fetch_interval: u32,
1089}
1090
1091impl Default for Cache {
1092    fn default() -> Self {
1093        Cache {
1094            project_request_full_config: false,
1095            project_expiry: 300,       // 5 minutes
1096            project_grace_period: 120, // 2 minutes
1097            project_refresh_interval: None,
1098            relay_expiry: 3600,   // 1 hour
1099            envelope_expiry: 600, // 10 minutes
1100            envelope_buffer_size: 1000,
1101            miss_expiry: 60,                       // 1 minute
1102            batch_interval: 100,                   // 100ms
1103            downstream_relays_batch_interval: 100, // 100ms
1104            batch_size: 500,
1105            file_interval: 10,                // 10 seconds
1106            global_config_fetch_interval: 10, // 10 seconds
1107        }
1108    }
1109}
1110
1111fn default_max_secs_in_future() -> u32 {
1112    60 // 1 minute
1113}
1114
1115fn default_max_session_secs_in_past() -> u32 {
1116    5 * 24 * 3600 // 5 days
1117}
1118
1119fn default_chunk_size() -> ByteSize {
1120    ByteSize::mebibytes(1)
1121}
1122
1123fn default_projectconfig_cache_prefix() -> String {
1124    "relayconfig".to_owned()
1125}
1126
1127#[allow(clippy::unnecessary_wraps)]
1128fn default_max_rate_limit() -> Option<u32> {
1129    Some(300) // 5 minutes
1130}
1131
1132/// Controls Sentry-internal event processing.
1133#[derive(Serialize, Deserialize, Debug)]
1134pub struct Processing {
1135    /// True if the Relay should do processing. Defaults to `false`.
1136    pub enabled: bool,
1137    /// GeoIp DB file source.
1138    #[serde(default)]
1139    pub geoip_path: Option<PathBuf>,
1140    /// Maximum future timestamp of ingested events.
1141    #[serde(default = "default_max_secs_in_future")]
1142    pub max_secs_in_future: u32,
1143    /// Maximum age of ingested sessions. Older sessions will be dropped.
1144    #[serde(default = "default_max_session_secs_in_past")]
1145    pub max_session_secs_in_past: u32,
1146    /// Kafka producer configurations.
1147    pub kafka_config: Vec<KafkaConfigParam>,
1148    /// Configure what span format to produce.
1149    #[serde(default)]
1150    pub span_producers: SpanProducers,
1151    /// Additional kafka producer configurations.
1152    ///
1153    /// The `kafka_config` is the default producer configuration used for all topics. A secondary
1154    /// kafka config can be referenced in `topics:` like this:
1155    ///
1156    /// ```yaml
1157    /// secondary_kafka_configs:
1158    ///   mycustomcluster:
1159    ///     - name: 'bootstrap.servers'
1160    ///       value: 'sentry_kafka_metrics:9093'
1161    ///
1162    /// topics:
1163    ///   transactions: ingest-transactions
1164    ///   metrics:
1165    ///     name: ingest-metrics
1166    ///     config: mycustomcluster
1167    /// ```
1168    ///
1169    /// Then metrics will be produced to an entirely different Kafka cluster.
1170    #[serde(default)]
1171    pub secondary_kafka_configs: BTreeMap<String, Vec<KafkaConfigParam>>,
1172    /// Kafka topic names.
1173    #[serde(default)]
1174    pub topics: TopicAssignments,
1175    /// Whether to validate the supplied topics by calling Kafka's metadata endpoints.
1176    #[serde(default)]
1177    pub kafka_validate_topics: bool,
1178    /// Redis hosts to connect to for storing state for rate limits.
1179    #[serde(default)]
1180    pub redis: Option<RedisConfigs>,
1181    /// Maximum chunk size of attachments for Kafka.
1182    #[serde(default = "default_chunk_size")]
1183    pub attachment_chunk_size: ByteSize,
1184    /// Prefix to use when looking up project configs in Redis. Defaults to "relayconfig".
1185    #[serde(default = "default_projectconfig_cache_prefix")]
1186    pub projectconfig_cache_prefix: String,
1187    /// Maximum rate limit to report to clients.
1188    #[serde(default = "default_max_rate_limit")]
1189    pub max_rate_limit: Option<u32>,
1190}
1191
1192impl Default for Processing {
1193    /// Constructs a disabled processing configuration.
1194    fn default() -> Self {
1195        Self {
1196            enabled: false,
1197            geoip_path: None,
1198            max_secs_in_future: default_max_secs_in_future(),
1199            max_session_secs_in_past: default_max_session_secs_in_past(),
1200            kafka_config: Vec::new(),
1201            secondary_kafka_configs: BTreeMap::new(),
1202            topics: TopicAssignments::default(),
1203            kafka_validate_topics: false,
1204            redis: None,
1205            attachment_chunk_size: default_chunk_size(),
1206            projectconfig_cache_prefix: default_projectconfig_cache_prefix(),
1207            max_rate_limit: default_max_rate_limit(),
1208            span_producers: Default::default(),
1209        }
1210    }
1211}
1212
1213/// Configuration for span producers.
1214#[derive(Debug, Serialize, Deserialize)]
1215#[serde(default)]
1216pub struct SpanProducers {
1217    /// Send JSON spans to `ingest-spans`.
1218    pub produce_json: bool,
1219    /// Send Protobuf (TraceItem) to `snuba-items`.
1220    pub produce_protobuf: bool,
1221}
1222
1223impl Default for SpanProducers {
1224    fn default() -> Self {
1225        Self {
1226            produce_json: false,
1227            produce_protobuf: true,
1228        }
1229    }
1230}
1231
1232/// Configuration for normalization in this Relay.
1233#[derive(Debug, Default, Serialize, Deserialize)]
1234#[serde(default)]
1235pub struct Normalization {
1236    /// Level of normalization for Relay to apply to incoming data.
1237    #[serde(default)]
1238    pub level: NormalizationLevel,
1239}
1240
1241/// Configuration for the level of normalization this Relay should do.
1242#[derive(Copy, Clone, Debug, Default, Serialize, Deserialize, Eq, PartialEq)]
1243#[serde(rename_all = "lowercase")]
1244pub enum NormalizationLevel {
1245    /// Runs normalization, excluding steps that break future compatibility.
1246    ///
1247    /// Processing Relays run [`NormalizationLevel::Full`] if this option is set.
1248    #[default]
1249    Default,
1250    /// Run full normalization.
1251    ///
1252    /// It includes steps that break future compatibility and should only run in
1253    /// the last layer of relays.
1254    Full,
1255}
1256
1257/// Configuration values for the outcome aggregator
1258#[derive(Serialize, Deserialize, Debug)]
1259#[serde(default)]
1260pub struct OutcomeAggregatorConfig {
1261    /// Defines the width of the buckets into which outcomes are aggregated, in seconds.
1262    pub bucket_interval: u64,
1263    /// Defines how often all buckets are flushed, in seconds.
1264    pub flush_interval: u64,
1265}
1266
1267impl Default for OutcomeAggregatorConfig {
1268    fn default() -> Self {
1269        Self {
1270            bucket_interval: 60,
1271            flush_interval: 120,
1272        }
1273    }
1274}
1275
1276/// Determines how to emit outcomes.
1277/// For compatibility reasons, this can either be true, false or AsClientReports
1278#[derive(Copy, Clone, Debug, PartialEq, Eq)]
1279
1280pub enum EmitOutcomes {
1281    /// Do not emit any outcomes
1282    None,
1283    /// Emit outcomes as client reports
1284    AsClientReports,
1285    /// Emit outcomes as outcomes
1286    AsOutcomes,
1287}
1288
1289impl EmitOutcomes {
1290    /// Returns true of outcomes are emitted via http, kafka, or client reports.
1291    pub fn any(&self) -> bool {
1292        !matches!(self, EmitOutcomes::None)
1293    }
1294}
1295
1296impl Serialize for EmitOutcomes {
1297    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1298    where
1299        S: Serializer,
1300    {
1301        // For compatibility, serialize None and AsOutcomes as booleans.
1302        match self {
1303            Self::None => serializer.serialize_bool(false),
1304            Self::AsClientReports => serializer.serialize_str("as_client_reports"),
1305            Self::AsOutcomes => serializer.serialize_bool(true),
1306        }
1307    }
1308}
1309
1310struct EmitOutcomesVisitor;
1311
1312impl Visitor<'_> for EmitOutcomesVisitor {
1313    type Value = EmitOutcomes;
1314
1315    fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
1316        formatter.write_str("true, false, or 'as_client_reports'")
1317    }
1318
1319    fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E>
1320    where
1321        E: serde::de::Error,
1322    {
1323        Ok(if v {
1324            EmitOutcomes::AsOutcomes
1325        } else {
1326            EmitOutcomes::None
1327        })
1328    }
1329
1330    fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
1331    where
1332        E: serde::de::Error,
1333    {
1334        if v == "as_client_reports" {
1335            Ok(EmitOutcomes::AsClientReports)
1336        } else {
1337            Err(E::invalid_value(Unexpected::Str(v), &"as_client_reports"))
1338        }
1339    }
1340}
1341
1342impl<'de> Deserialize<'de> for EmitOutcomes {
1343    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1344    where
1345        D: Deserializer<'de>,
1346    {
1347        deserializer.deserialize_any(EmitOutcomesVisitor)
1348    }
1349}
1350
1351/// Outcome generation specific configuration values.
1352#[derive(Serialize, Deserialize, Debug)]
1353#[serde(default)]
1354pub struct Outcomes {
1355    /// Controls whether outcomes will be emitted when processing is disabled.
1356    /// Processing relays always emit outcomes (for backwards compatibility).
1357    /// Can take the following values: false, "as_client_reports", true
1358    pub emit_outcomes: EmitOutcomes,
1359    /// Controls wheather client reported outcomes should be emitted.
1360    pub emit_client_outcomes: bool,
1361    /// The maximum number of outcomes that are batched before being sent
1362    /// via http to the upstream (only applies to non processing relays).
1363    pub batch_size: usize,
1364    /// The maximum time interval (in milliseconds) that an outcome may be batched
1365    /// via http to the upstream (only applies to non processing relays).
1366    pub batch_interval: u64,
1367    /// Defines the source string registered in the outcomes originating from
1368    /// this Relay (typically something like the region or the layer).
1369    pub source: Option<String>,
1370    /// Configures the outcome aggregator.
1371    pub aggregator: OutcomeAggregatorConfig,
1372}
1373
1374impl Default for Outcomes {
1375    fn default() -> Self {
1376        Outcomes {
1377            emit_outcomes: EmitOutcomes::AsClientReports,
1378            emit_client_outcomes: true,
1379            batch_size: 1000,
1380            batch_interval: 500,
1381            source: None,
1382            aggregator: OutcomeAggregatorConfig::default(),
1383        }
1384    }
1385}
1386
1387/// Minimal version of a config for dumping out.
1388#[derive(Serialize, Deserialize, Debug, Default)]
1389pub struct MinimalConfig {
1390    /// The relay part of the config.
1391    pub relay: Relay,
1392}
1393
1394impl MinimalConfig {
1395    /// Saves the config in the given config folder as config.yml
1396    pub fn save_in_folder<P: AsRef<Path>>(&self, p: P) -> anyhow::Result<()> {
1397        let path = p.as_ref();
1398        if fs::metadata(path).is_err() {
1399            fs::create_dir_all(path)
1400                .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotOpenFile, path))?;
1401        }
1402        self.save(path)
1403    }
1404}
1405
1406impl ConfigObject for MinimalConfig {
1407    fn format() -> ConfigFormat {
1408        ConfigFormat::Yaml
1409    }
1410
1411    fn name() -> &'static str {
1412        "config"
1413    }
1414}
1415
1416/// Alternative serialization of RelayInfo for config file using snake case.
1417mod config_relay_info {
1418    use serde::ser::SerializeMap;
1419
1420    use super::*;
1421
1422    // Uses snake_case as opposed to camelCase.
1423    #[derive(Debug, Serialize, Deserialize, Clone)]
1424    struct RelayInfoConfig {
1425        public_key: PublicKey,
1426        #[serde(default)]
1427        internal: bool,
1428    }
1429
1430    impl From<RelayInfoConfig> for RelayInfo {
1431        fn from(v: RelayInfoConfig) -> Self {
1432            RelayInfo {
1433                public_key: v.public_key,
1434                internal: v.internal,
1435            }
1436        }
1437    }
1438
1439    impl From<RelayInfo> for RelayInfoConfig {
1440        fn from(v: RelayInfo) -> Self {
1441            RelayInfoConfig {
1442                public_key: v.public_key,
1443                internal: v.internal,
1444            }
1445        }
1446    }
1447
1448    pub(super) fn deserialize<'de, D>(des: D) -> Result<HashMap<RelayId, RelayInfo>, D::Error>
1449    where
1450        D: Deserializer<'de>,
1451    {
1452        let map = HashMap::<RelayId, RelayInfoConfig>::deserialize(des)?;
1453        Ok(map.into_iter().map(|(k, v)| (k, v.into())).collect())
1454    }
1455
1456    pub(super) fn serialize<S>(elm: &HashMap<RelayId, RelayInfo>, ser: S) -> Result<S::Ok, S::Error>
1457    where
1458        S: Serializer,
1459    {
1460        let mut map = ser.serialize_map(Some(elm.len()))?;
1461
1462        for (k, v) in elm {
1463            map.serialize_entry(k, &RelayInfoConfig::from(v.clone()))?;
1464        }
1465
1466        map.end()
1467    }
1468}
1469
1470/// Authentication options.
1471#[derive(Serialize, Deserialize, Debug, Default)]
1472pub struct AuthConfig {
1473    /// Controls responses from the readiness health check endpoint based on authentication.
1474    #[serde(default, skip_serializing_if = "is_default")]
1475    pub ready: ReadinessCondition,
1476
1477    /// Statically authenticated downstream relays.
1478    #[serde(default, with = "config_relay_info")]
1479    pub static_relays: HashMap<RelayId, RelayInfo>,
1480
1481    /// How old a signature can be before it is considered invalid, in seconds.
1482    ///
1483    /// Defaults to 5 minutes.
1484    #[serde(default = "default_max_age")]
1485    pub signature_max_age: u64,
1486}
1487
1488fn default_max_age() -> u64 {
1489    300
1490}
1491
1492/// GeoIp database configuration options.
1493#[derive(Serialize, Deserialize, Debug, Default)]
1494pub struct GeoIpConfig {
1495    /// The path to GeoIP database.
1496    pub path: Option<PathBuf>,
1497}
1498
1499/// Cardinality Limiter configuration options.
1500#[derive(Serialize, Deserialize, Debug)]
1501#[serde(default)]
1502pub struct CardinalityLimiter {
1503    /// Cache vacuum interval in seconds for the in memory cache.
1504    ///
1505    /// The cache will scan for expired values based on this interval.
1506    ///
1507    /// Defaults to 180 seconds, 3 minutes.
1508    pub cache_vacuum_interval: u64,
1509}
1510
1511impl Default for CardinalityLimiter {
1512    fn default() -> Self {
1513        Self {
1514            cache_vacuum_interval: 180,
1515        }
1516    }
1517}
1518
1519/// Settings to control Relay's health checks.
1520///
1521/// After breaching one of the configured thresholds, Relay will
1522/// return an `unhealthy` status from its health endpoint.
1523#[derive(Serialize, Deserialize, Debug)]
1524#[serde(default)]
1525pub struct Health {
1526    /// Interval to refresh internal health checks.
1527    ///
1528    /// Shorter intervals will decrease the time it takes the health check endpoint to report
1529    /// issues, but can also increase sporadic unhealthy responses.
1530    ///
1531    /// Defaults to `3000`` (3 seconds).
1532    pub refresh_interval_ms: u64,
1533    /// Maximum memory watermark in bytes.
1534    ///
1535    /// By default, there is no absolute limit set and the watermark
1536    /// is only controlled by setting [`Self::max_memory_percent`].
1537    pub max_memory_bytes: Option<ByteSize>,
1538    /// Maximum memory watermark as a percentage of maximum system memory.
1539    ///
1540    /// Defaults to `0.95` (95%).
1541    pub max_memory_percent: f32,
1542    /// Health check probe timeout in milliseconds.
1543    ///
1544    /// Any probe exceeding the timeout will be considered failed.
1545    /// This limits the max execution time of Relay health checks.
1546    ///
1547    /// Defaults to 900 milliseconds.
1548    pub probe_timeout_ms: u64,
1549    /// The refresh frequency of memory stats which are used to poll memory
1550    /// usage of Relay.
1551    ///
1552    /// The implementation of memory stats guarantees that the refresh will happen at
1553    /// least every `x` ms since memory readings are lazy and are updated only if needed.
1554    pub memory_stat_refresh_frequency_ms: u64,
1555}
1556
1557impl Default for Health {
1558    fn default() -> Self {
1559        Self {
1560            refresh_interval_ms: 3000,
1561            max_memory_bytes: None,
1562            max_memory_percent: 0.95,
1563            probe_timeout_ms: 900,
1564            memory_stat_refresh_frequency_ms: 100,
1565        }
1566    }
1567}
1568
1569/// COGS configuration.
1570#[derive(Serialize, Deserialize, Debug)]
1571#[serde(default)]
1572pub struct Cogs {
1573    /// Maximium amount of COGS measurements allowed to backlog.
1574    ///
1575    /// Any additional COGS measurements recorded will be dropped.
1576    ///
1577    /// Defaults to `10_000`.
1578    pub max_queue_size: u64,
1579    /// Relay COGS resource id.
1580    ///
1581    /// All Relay related COGS measurements are emitted with this resource id.
1582    ///
1583    /// Defaults to `relay_service`.
1584    pub relay_resource_id: String,
1585}
1586
1587impl Default for Cogs {
1588    fn default() -> Self {
1589        Self {
1590            max_queue_size: 10_000,
1591            relay_resource_id: "relay_service".to_owned(),
1592        }
1593    }
1594}
1595
1596#[derive(Serialize, Deserialize, Debug, Default)]
1597struct ConfigValues {
1598    #[serde(default)]
1599    relay: Relay,
1600    #[serde(default)]
1601    http: Http,
1602    #[serde(default)]
1603    cache: Cache,
1604    #[serde(default)]
1605    spool: Spool,
1606    #[serde(default)]
1607    limits: Limits,
1608    #[serde(default)]
1609    logging: relay_log::LogConfig,
1610    #[serde(default)]
1611    routing: Routing,
1612    #[serde(default)]
1613    metrics: Metrics,
1614    #[serde(default)]
1615    sentry_metrics: SentryMetrics,
1616    #[serde(default)]
1617    sentry: relay_log::SentryConfig,
1618    #[serde(default)]
1619    processing: Processing,
1620    #[serde(default)]
1621    outcomes: Outcomes,
1622    #[serde(default)]
1623    aggregator: AggregatorServiceConfig,
1624    #[serde(default)]
1625    secondary_aggregators: Vec<ScopedAggregatorConfig>,
1626    #[serde(default)]
1627    auth: AuthConfig,
1628    #[serde(default)]
1629    geoip: GeoIpConfig,
1630    #[serde(default)]
1631    normalization: Normalization,
1632    #[serde(default)]
1633    cardinality_limiter: CardinalityLimiter,
1634    #[serde(default)]
1635    health: Health,
1636    #[serde(default)]
1637    cogs: Cogs,
1638}
1639
1640impl ConfigObject for ConfigValues {
1641    fn format() -> ConfigFormat {
1642        ConfigFormat::Yaml
1643    }
1644
1645    fn name() -> &'static str {
1646        "config"
1647    }
1648}
1649
1650/// Config struct.
1651pub struct Config {
1652    values: ConfigValues,
1653    credentials: Option<Credentials>,
1654    path: PathBuf,
1655}
1656
1657impl fmt::Debug for Config {
1658    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1659        f.debug_struct("Config")
1660            .field("path", &self.path)
1661            .field("values", &self.values)
1662            .finish()
1663    }
1664}
1665
1666impl Config {
1667    /// Loads a config from a given config folder.
1668    pub fn from_path<P: AsRef<Path>>(path: P) -> anyhow::Result<Config> {
1669        let path = env::current_dir()
1670            .map(|x| x.join(path.as_ref()))
1671            .unwrap_or_else(|_| path.as_ref().to_path_buf());
1672
1673        let config = Config {
1674            values: ConfigValues::load(&path)?,
1675            credentials: if Credentials::path(&path).exists() {
1676                Some(Credentials::load(&path)?)
1677            } else {
1678                None
1679            },
1680            path: path.clone(),
1681        };
1682
1683        if cfg!(not(feature = "processing")) && config.processing_enabled() {
1684            return Err(ConfigError::file(ConfigErrorKind::ProcessingNotAvailable, &path).into());
1685        }
1686
1687        Ok(config)
1688    }
1689
1690    /// Creates a config from a JSON value.
1691    ///
1692    /// This is mostly useful for tests.
1693    pub fn from_json_value(value: serde_json::Value) -> anyhow::Result<Config> {
1694        Ok(Config {
1695            values: serde_json::from_value(value)
1696                .with_context(|| ConfigError::new(ConfigErrorKind::BadJson))?,
1697            credentials: None,
1698            path: PathBuf::new(),
1699        })
1700    }
1701
1702    /// Override configuration with values coming from other sources (e.g. env variables or
1703    /// command line parameters)
1704    pub fn apply_override(
1705        &mut self,
1706        mut overrides: OverridableConfig,
1707    ) -> anyhow::Result<&mut Self> {
1708        let relay = &mut self.values.relay;
1709
1710        if let Some(mode) = overrides.mode {
1711            relay.mode = mode
1712                .parse::<RelayMode>()
1713                .with_context(|| ConfigError::field("mode"))?;
1714        }
1715
1716        if let Some(deployment) = overrides.instance {
1717            relay.instance = deployment
1718                .parse::<RelayInstance>()
1719                .with_context(|| ConfigError::field("deployment"))?;
1720        }
1721
1722        if let Some(log_level) = overrides.log_level {
1723            self.values.logging.level = log_level.parse()?;
1724        }
1725
1726        if let Some(log_format) = overrides.log_format {
1727            self.values.logging.format = log_format.parse()?;
1728        }
1729
1730        if let Some(upstream) = overrides.upstream {
1731            relay.upstream = upstream
1732                .parse::<UpstreamDescriptor>()
1733                .with_context(|| ConfigError::field("upstream"))?;
1734        } else if let Some(upstream_dsn) = overrides.upstream_dsn {
1735            relay.upstream = upstream_dsn
1736                .parse::<Dsn>()
1737                .map(|dsn| UpstreamDescriptor::from_dsn(&dsn).into_owned())
1738                .with_context(|| ConfigError::field("upstream_dsn"))?;
1739        }
1740
1741        if let Some(host) = overrides.host {
1742            relay.host = host
1743                .parse::<IpAddr>()
1744                .with_context(|| ConfigError::field("host"))?;
1745        }
1746
1747        if let Some(port) = overrides.port {
1748            relay.port = port
1749                .as_str()
1750                .parse()
1751                .with_context(|| ConfigError::field("port"))?;
1752        }
1753
1754        let processing = &mut self.values.processing;
1755        if let Some(enabled) = overrides.processing {
1756            match enabled.to_lowercase().as_str() {
1757                "true" | "1" => processing.enabled = true,
1758                "false" | "0" | "" => processing.enabled = false,
1759                _ => return Err(ConfigError::field("processing").into()),
1760            }
1761        }
1762
1763        if let Some(redis) = overrides.redis_url {
1764            processing.redis = Some(RedisConfigs::Unified(RedisConfig::single(redis)))
1765        }
1766
1767        if let Some(kafka_url) = overrides.kafka_url {
1768            let existing = processing
1769                .kafka_config
1770                .iter_mut()
1771                .find(|e| e.name == "bootstrap.servers");
1772
1773            if let Some(config_param) = existing {
1774                config_param.value = kafka_url;
1775            } else {
1776                processing.kafka_config.push(KafkaConfigParam {
1777                    name: "bootstrap.servers".to_owned(),
1778                    value: kafka_url,
1779                })
1780            }
1781        }
1782        // credentials overrides
1783        let id = if let Some(id) = overrides.id {
1784            let id = Uuid::parse_str(&id).with_context(|| ConfigError::field("id"))?;
1785            Some(id)
1786        } else {
1787            None
1788        };
1789        let public_key = if let Some(public_key) = overrides.public_key {
1790            let public_key = public_key
1791                .parse::<PublicKey>()
1792                .with_context(|| ConfigError::field("public_key"))?;
1793            Some(public_key)
1794        } else {
1795            None
1796        };
1797
1798        let secret_key = if let Some(secret_key) = overrides.secret_key {
1799            let secret_key = secret_key
1800                .parse::<SecretKey>()
1801                .with_context(|| ConfigError::field("secret_key"))?;
1802            Some(secret_key)
1803        } else {
1804            None
1805        };
1806        let outcomes = &mut self.values.outcomes;
1807        if overrides.outcome_source.is_some() {
1808            outcomes.source = overrides.outcome_source.take();
1809        }
1810
1811        if let Some(credentials) = &mut self.credentials {
1812            //we have existing credentials we may override some entries
1813            if let Some(id) = id {
1814                credentials.id = id;
1815            }
1816            if let Some(public_key) = public_key {
1817                credentials.public_key = public_key;
1818            }
1819            if let Some(secret_key) = secret_key {
1820                credentials.secret_key = secret_key
1821            }
1822        } else {
1823            //no existing credentials we may only create the full credentials
1824            match (id, public_key, secret_key) {
1825                (Some(id), Some(public_key), Some(secret_key)) => {
1826                    self.credentials = Some(Credentials {
1827                        secret_key,
1828                        public_key,
1829                        id,
1830                    })
1831                }
1832                (None, None, None) => {
1833                    // nothing provided, we'll just leave the credentials None, maybe we
1834                    // don't need them in the current command or we'll override them later
1835                }
1836                _ => {
1837                    return Err(ConfigError::field("incomplete credentials").into());
1838                }
1839            }
1840        }
1841
1842        let limits = &mut self.values.limits;
1843        if let Some(shutdown_timeout) = overrides.shutdown_timeout
1844            && let Ok(shutdown_timeout) = shutdown_timeout.parse::<u64>()
1845        {
1846            limits.shutdown_timeout = shutdown_timeout;
1847        }
1848
1849        if let Some(server_name) = overrides.server_name {
1850            self.values.sentry.server_name = Some(server_name.into());
1851        }
1852
1853        Ok(self)
1854    }
1855
1856    /// Checks if the config is already initialized.
1857    pub fn config_exists<P: AsRef<Path>>(path: P) -> bool {
1858        fs::metadata(ConfigValues::path(path.as_ref())).is_ok()
1859    }
1860
1861    /// Returns the filename of the config file.
1862    pub fn path(&self) -> &Path {
1863        &self.path
1864    }
1865
1866    /// Dumps out a YAML string of the values.
1867    pub fn to_yaml_string(&self) -> anyhow::Result<String> {
1868        serde_yaml::to_string(&self.values)
1869            .with_context(|| ConfigError::new(ConfigErrorKind::CouldNotWriteFile))
1870    }
1871
1872    /// Regenerates the relay credentials.
1873    ///
1874    /// This also writes the credentials back to the file.
1875    pub fn regenerate_credentials(&mut self, save: bool) -> anyhow::Result<()> {
1876        let creds = Credentials::generate();
1877        if save {
1878            creds.save(&self.path)?;
1879        }
1880        self.credentials = Some(creds);
1881        Ok(())
1882    }
1883
1884    /// Return the current credentials
1885    pub fn credentials(&self) -> Option<&Credentials> {
1886        self.credentials.as_ref()
1887    }
1888
1889    /// Set new credentials.
1890    ///
1891    /// This also writes the credentials back to the file.
1892    pub fn replace_credentials(
1893        &mut self,
1894        credentials: Option<Credentials>,
1895    ) -> anyhow::Result<bool> {
1896        if self.credentials == credentials {
1897            return Ok(false);
1898        }
1899
1900        match credentials {
1901            Some(ref creds) => {
1902                creds.save(&self.path)?;
1903            }
1904            None => {
1905                let path = Credentials::path(&self.path);
1906                if fs::metadata(&path).is_ok() {
1907                    fs::remove_file(&path).with_context(|| {
1908                        ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path)
1909                    })?;
1910                }
1911            }
1912        }
1913
1914        self.credentials = credentials;
1915        Ok(true)
1916    }
1917
1918    /// Returns `true` if the config is ready to use.
1919    pub fn has_credentials(&self) -> bool {
1920        self.credentials.is_some()
1921    }
1922
1923    /// Returns the secret key if set.
1924    pub fn secret_key(&self) -> Option<&SecretKey> {
1925        self.credentials.as_ref().map(|x| &x.secret_key)
1926    }
1927
1928    /// Returns the public key if set.
1929    pub fn public_key(&self) -> Option<&PublicKey> {
1930        self.credentials.as_ref().map(|x| &x.public_key)
1931    }
1932
1933    /// Returns the relay ID.
1934    pub fn relay_id(&self) -> Option<&RelayId> {
1935        self.credentials.as_ref().map(|x| &x.id)
1936    }
1937
1938    /// Returns the relay mode.
1939    pub fn relay_mode(&self) -> RelayMode {
1940        self.values.relay.mode
1941    }
1942
1943    /// Returns the instance type of relay.
1944    pub fn relay_instance(&self) -> RelayInstance {
1945        self.values.relay.instance
1946    }
1947
1948    /// Returns the upstream target as descriptor.
1949    pub fn upstream_descriptor(&self) -> &UpstreamDescriptor<'_> {
1950        &self.values.relay.upstream
1951    }
1952
1953    /// Returns the custom HTTP "Host" header.
1954    pub fn http_host_header(&self) -> Option<&str> {
1955        self.values.http.host_header.as_deref()
1956    }
1957
1958    /// Returns the listen address.
1959    pub fn listen_addr(&self) -> SocketAddr {
1960        (self.values.relay.host, self.values.relay.port).into()
1961    }
1962
1963    /// Returns the TLS listen address.
1964    pub fn tls_listen_addr(&self) -> Option<SocketAddr> {
1965        if self.values.relay.tls_identity_path.is_some() {
1966            let port = self.values.relay.tls_port.unwrap_or(3443);
1967            Some((self.values.relay.host, port).into())
1968        } else {
1969            None
1970        }
1971    }
1972
1973    /// Returns the path to the identity bundle
1974    pub fn tls_identity_path(&self) -> Option<&Path> {
1975        self.values.relay.tls_identity_path.as_deref()
1976    }
1977
1978    /// Returns the password for the identity bundle
1979    pub fn tls_identity_password(&self) -> Option<&str> {
1980        self.values.relay.tls_identity_password.as_deref()
1981    }
1982
1983    /// Returns `true` when project IDs should be overriden rather than validated.
1984    ///
1985    /// Defaults to `false`, which requires project ID validation.
1986    pub fn override_project_ids(&self) -> bool {
1987        self.values.relay.override_project_ids
1988    }
1989
1990    /// Returns `true` if Relay requires authentication for readiness.
1991    ///
1992    /// See [`ReadinessCondition`] for more information.
1993    pub fn requires_auth(&self) -> bool {
1994        match self.values.auth.ready {
1995            ReadinessCondition::Authenticated => self.relay_mode() == RelayMode::Managed,
1996            ReadinessCondition::Always => false,
1997        }
1998    }
1999
2000    /// Returns the interval at which Realy should try to re-authenticate with the upstream.
2001    ///
2002    /// Always disabled in processing mode.
2003    pub fn http_auth_interval(&self) -> Option<Duration> {
2004        if self.processing_enabled() {
2005            return None;
2006        }
2007
2008        match self.values.http.auth_interval {
2009            None | Some(0) => None,
2010            Some(secs) => Some(Duration::from_secs(secs)),
2011        }
2012    }
2013
2014    /// The maximum time of experiencing uninterrupted network failures until Relay considers that
2015    /// it has encountered a network outage.
2016    pub fn http_outage_grace_period(&self) -> Duration {
2017        Duration::from_secs(self.values.http.outage_grace_period)
2018    }
2019
2020    /// Time Relay waits before retrying an upstream request.
2021    ///
2022    /// Before going into a network outage, Relay may fail to make upstream
2023    /// requests. This is the time Relay waits before retrying the same request.
2024    pub fn http_retry_delay(&self) -> Duration {
2025        Duration::from_secs(self.values.http.retry_delay)
2026    }
2027
2028    /// Time of continued project request failures before Relay emits an error.
2029    pub fn http_project_failure_interval(&self) -> Duration {
2030        Duration::from_secs(self.values.http.project_failure_interval)
2031    }
2032
2033    /// Content encoding of upstream requests.
2034    pub fn http_encoding(&self) -> HttpEncoding {
2035        self.values.http.encoding
2036    }
2037
2038    /// Returns whether metrics should be sent globally through a shared endpoint.
2039    pub fn http_global_metrics(&self) -> bool {
2040        self.values.http.global_metrics
2041    }
2042
2043    /// Returns whether this Relay should emit outcomes.
2044    ///
2045    /// This is `true` either if `outcomes.emit_outcomes` is explicitly enabled, or if this Relay is
2046    /// in processing mode.
2047    pub fn emit_outcomes(&self) -> EmitOutcomes {
2048        if self.processing_enabled() {
2049            return EmitOutcomes::AsOutcomes;
2050        }
2051        self.values.outcomes.emit_outcomes
2052    }
2053
2054    /// Returns whether this Relay should emit client outcomes
2055    ///
2056    /// Relays that do not emit client outcomes will forward client recieved outcomes
2057    /// directly to the next relay in the chain as client report envelope.  This is only done
2058    /// if this relay emits outcomes at all. A relay that will not emit outcomes
2059    /// will forward the envelope unchanged.
2060    ///
2061    /// This flag can be explicitly disabled on processing relays as well to prevent the
2062    /// emitting of client outcomes to the kafka topic.
2063    pub fn emit_client_outcomes(&self) -> bool {
2064        self.values.outcomes.emit_client_outcomes
2065    }
2066
2067    /// Returns the maximum number of outcomes that are batched before being sent
2068    pub fn outcome_batch_size(&self) -> usize {
2069        self.values.outcomes.batch_size
2070    }
2071
2072    /// Returns the maximum interval that an outcome may be batched
2073    pub fn outcome_batch_interval(&self) -> Duration {
2074        Duration::from_millis(self.values.outcomes.batch_interval)
2075    }
2076
2077    /// The originating source of the outcome
2078    pub fn outcome_source(&self) -> Option<&str> {
2079        self.values.outcomes.source.as_deref()
2080    }
2081
2082    /// Returns the width of the buckets into which outcomes are aggregated, in seconds.
2083    pub fn outcome_aggregator(&self) -> &OutcomeAggregatorConfig {
2084        &self.values.outcomes.aggregator
2085    }
2086
2087    /// Returns logging configuration.
2088    pub fn logging(&self) -> &relay_log::LogConfig {
2089        &self.values.logging
2090    }
2091
2092    /// Returns logging configuration.
2093    pub fn sentry(&self) -> &relay_log::SentryConfig {
2094        &self.values.sentry
2095    }
2096
2097    /// Returns the socket addresses for statsd.
2098    ///
2099    /// If stats is disabled an empty vector is returned.
2100    pub fn statsd_addrs(&self) -> anyhow::Result<Vec<SocketAddr>> {
2101        if let Some(ref addr) = self.values.metrics.statsd {
2102            let addrs = addr
2103                .as_str()
2104                .to_socket_addrs()
2105                .with_context(|| ConfigError::file(ConfigErrorKind::InvalidValue, &self.path))?
2106                .collect();
2107            Ok(addrs)
2108        } else {
2109            Ok(vec![])
2110        }
2111    }
2112
2113    /// Return the prefix for statsd metrics.
2114    pub fn metrics_prefix(&self) -> &str {
2115        &self.values.metrics.prefix
2116    }
2117
2118    /// Returns the default tags for statsd metrics.
2119    pub fn metrics_default_tags(&self) -> &BTreeMap<String, String> {
2120        &self.values.metrics.default_tags
2121    }
2122
2123    /// Returns the name of the hostname tag that should be attached to each outgoing metric.
2124    pub fn metrics_hostname_tag(&self) -> Option<&str> {
2125        self.values.metrics.hostname_tag.as_deref()
2126    }
2127
2128    /// Returns the global sample rate for all metrics.
2129    pub fn metrics_sample_rate(&self) -> f32 {
2130        self.values.metrics.sample_rate
2131    }
2132
2133    /// Returns whether local metric aggregation should be enabled.
2134    pub fn metrics_aggregate(&self) -> bool {
2135        self.values.metrics.aggregate
2136    }
2137
2138    /// Returns whether high cardinality tags should be removed before sending metrics.
2139    pub fn metrics_allow_high_cardinality_tags(&self) -> bool {
2140        self.values.metrics.allow_high_cardinality_tags
2141    }
2142
2143    /// Returns the interval for periodic metrics emitted from Relay.
2144    ///
2145    /// `None` if periodic metrics are disabled.
2146    pub fn metrics_periodic_interval(&self) -> Option<Duration> {
2147        match self.values.metrics.periodic_secs {
2148            0 => None,
2149            secs => Some(Duration::from_secs(secs)),
2150        }
2151    }
2152
2153    /// Returns the default timeout for all upstream HTTP requests.
2154    pub fn http_timeout(&self) -> Duration {
2155        Duration::from_secs(self.values.http.timeout.into())
2156    }
2157
2158    /// Returns the connection timeout for all upstream HTTP requests.
2159    pub fn http_connection_timeout(&self) -> Duration {
2160        Duration::from_secs(self.values.http.connection_timeout.into())
2161    }
2162
2163    /// Returns the failed upstream request retry interval.
2164    pub fn http_max_retry_interval(&self) -> Duration {
2165        Duration::from_secs(self.values.http.max_retry_interval.into())
2166    }
2167
2168    /// Returns the expiry timeout for cached projects.
2169    pub fn project_cache_expiry(&self) -> Duration {
2170        Duration::from_secs(self.values.cache.project_expiry.into())
2171    }
2172
2173    /// Returns `true` if the full project state should be requested from upstream.
2174    pub fn request_full_project_config(&self) -> bool {
2175        self.values.cache.project_request_full_config
2176    }
2177
2178    /// Returns the expiry timeout for cached relay infos (public keys).
2179    pub fn relay_cache_expiry(&self) -> Duration {
2180        Duration::from_secs(self.values.cache.relay_expiry.into())
2181    }
2182
2183    /// Returns the maximum number of buffered envelopes
2184    pub fn envelope_buffer_size(&self) -> usize {
2185        self.values
2186            .cache
2187            .envelope_buffer_size
2188            .try_into()
2189            .unwrap_or(usize::MAX)
2190    }
2191
2192    /// Returns the expiry timeout for cached misses before trying to refetch.
2193    pub fn cache_miss_expiry(&self) -> Duration {
2194        Duration::from_secs(self.values.cache.miss_expiry.into())
2195    }
2196
2197    /// Returns the grace period for project caches.
2198    pub fn project_grace_period(&self) -> Duration {
2199        Duration::from_secs(self.values.cache.project_grace_period.into())
2200    }
2201
2202    /// Returns the refresh interval for a project.
2203    ///
2204    /// Validates the refresh time to be between the grace period and expiry.
2205    pub fn project_refresh_interval(&self) -> Option<Duration> {
2206        self.values
2207            .cache
2208            .project_refresh_interval
2209            .map(Into::into)
2210            .map(Duration::from_secs)
2211    }
2212
2213    /// Returns the duration in which batchable project config queries are
2214    /// collected before sending them in a single request.
2215    pub fn query_batch_interval(&self) -> Duration {
2216        Duration::from_millis(self.values.cache.batch_interval.into())
2217    }
2218
2219    /// Returns the duration in which downstream relays are requested from upstream.
2220    pub fn downstream_relays_batch_interval(&self) -> Duration {
2221        Duration::from_millis(self.values.cache.downstream_relays_batch_interval.into())
2222    }
2223
2224    /// Returns the interval in seconds in which local project configurations should be reloaded.
2225    pub fn local_cache_interval(&self) -> Duration {
2226        Duration::from_secs(self.values.cache.file_interval.into())
2227    }
2228
2229    /// Returns the interval in seconds in which fresh global configs should be
2230    /// fetched from  upstream.
2231    pub fn global_config_fetch_interval(&self) -> Duration {
2232        Duration::from_secs(self.values.cache.global_config_fetch_interval.into())
2233    }
2234
2235    /// Returns the path of the buffer file if the `cache.persistent_envelope_buffer.path` is configured.
2236    ///
2237    /// In case a partition with id > 0 is supplied, the filename of the envelopes path will be
2238    /// suffixed with `.{partition_id}`.
2239    pub fn spool_envelopes_path(&self, partition_id: u8) -> Option<PathBuf> {
2240        let mut path = self
2241            .values
2242            .spool
2243            .envelopes
2244            .path
2245            .as_ref()
2246            .map(|path| path.to_owned())?;
2247
2248        if partition_id == 0 {
2249            return Some(path);
2250        }
2251
2252        let file_name = path.file_name().and_then(|f| f.to_str())?;
2253        let new_file_name = format!("{file_name}.{partition_id}");
2254        path.set_file_name(new_file_name);
2255
2256        Some(path)
2257    }
2258
2259    /// The maximum size of the buffer, in bytes.
2260    pub fn spool_envelopes_max_disk_size(&self) -> usize {
2261        self.values.spool.envelopes.max_disk_size.as_bytes()
2262    }
2263
2264    /// Number of encoded envelope bytes that need to be accumulated before
2265    /// flushing one batch to disk.
2266    pub fn spool_envelopes_batch_size_bytes(&self) -> usize {
2267        self.values.spool.envelopes.batch_size_bytes.as_bytes()
2268    }
2269
2270    /// Returns the time after which we drop envelopes as a [`Duration`] object.
2271    pub fn spool_envelopes_max_age(&self) -> Duration {
2272        Duration::from_secs(self.values.spool.envelopes.max_envelope_delay_secs)
2273    }
2274
2275    /// Returns the refresh frequency for disk usage monitoring as a [`Duration`] object.
2276    pub fn spool_disk_usage_refresh_frequency_ms(&self) -> Duration {
2277        Duration::from_millis(self.values.spool.envelopes.disk_usage_refresh_frequency_ms)
2278    }
2279
2280    /// Returns the maximum number of envelopes that can be put in the bounded buffer.
2281    pub fn spool_max_backpressure_envelopes(&self) -> usize {
2282        self.values.spool.envelopes.max_backpressure_envelopes
2283    }
2284
2285    /// Returns the relative memory usage up to which the disk buffer will unspool envelopes.
2286    pub fn spool_max_backpressure_memory_percent(&self) -> f32 {
2287        self.values.spool.envelopes.max_backpressure_memory_percent
2288    }
2289
2290    /// Returns the number of partitions for the buffer.
2291    pub fn spool_partitions(&self) -> NonZeroU8 {
2292        self.values.spool.envelopes.partitions
2293    }
2294
2295    /// Returns the maximum size of an event payload in bytes.
2296    pub fn max_event_size(&self) -> usize {
2297        self.values.limits.max_event_size.as_bytes()
2298    }
2299
2300    /// Returns the maximum size of each attachment.
2301    pub fn max_attachment_size(&self) -> usize {
2302        self.values.limits.max_attachment_size.as_bytes()
2303    }
2304
2305    /// Returns the maximum combined size of attachments or payloads containing attachments
2306    /// (minidump, unreal, standalone attachments) in bytes.
2307    pub fn max_attachments_size(&self) -> usize {
2308        self.values.limits.max_attachments_size.as_bytes()
2309    }
2310
2311    /// Returns the maximum combined size of client reports in bytes.
2312    pub fn max_client_reports_size(&self) -> usize {
2313        self.values.limits.max_client_reports_size.as_bytes()
2314    }
2315
2316    /// Returns the maximum payload size of a monitor check-in in bytes.
2317    pub fn max_check_in_size(&self) -> usize {
2318        self.values.limits.max_check_in_size.as_bytes()
2319    }
2320
2321    /// Returns the maximum payload size of a log in bytes.
2322    pub fn max_log_size(&self) -> usize {
2323        self.values.limits.max_log_size.as_bytes()
2324    }
2325
2326    /// Returns the maximum payload size of a span in bytes.
2327    pub fn max_span_size(&self) -> usize {
2328        self.values.limits.max_span_size.as_bytes()
2329    }
2330
2331    /// Returns the maximum payload size of an item container in bytes.
2332    pub fn max_container_size(&self) -> usize {
2333        self.values.limits.max_container_size.as_bytes()
2334    }
2335
2336    /// Returns the maximum size of an envelope payload in bytes.
2337    ///
2338    /// Individual item size limits still apply.
2339    pub fn max_envelope_size(&self) -> usize {
2340        self.values.limits.max_envelope_size.as_bytes()
2341    }
2342
2343    /// Returns the maximum number of sessions per envelope.
2344    pub fn max_session_count(&self) -> usize {
2345        self.values.limits.max_session_count
2346    }
2347
2348    /// Returns the maximum number of standalone spans per envelope.
2349    pub fn max_span_count(&self) -> usize {
2350        self.values.limits.max_span_count
2351    }
2352
2353    /// Returns the maximum number of logs per envelope.
2354    pub fn max_log_count(&self) -> usize {
2355        self.values.limits.max_log_count
2356    }
2357
2358    /// Returns the maximum payload size of a statsd metric in bytes.
2359    pub fn max_statsd_size(&self) -> usize {
2360        self.values.limits.max_statsd_size.as_bytes()
2361    }
2362
2363    /// Returns the maximum payload size of metric buckets in bytes.
2364    pub fn max_metric_buckets_size(&self) -> usize {
2365        self.values.limits.max_metric_buckets_size.as_bytes()
2366    }
2367
2368    /// Whether metric stats are collected and emitted.
2369    ///
2370    /// Metric stats are always collected and emitted when processing
2371    /// is enabled.
2372    pub fn metric_stats_enabled(&self) -> bool {
2373        self.values.sentry_metrics.metric_stats_enabled || self.values.processing.enabled
2374    }
2375
2376    /// Returns the maximum payload size for general API requests.
2377    pub fn max_api_payload_size(&self) -> usize {
2378        self.values.limits.max_api_payload_size.as_bytes()
2379    }
2380
2381    /// Returns the maximum payload size for file uploads and chunks.
2382    pub fn max_api_file_upload_size(&self) -> usize {
2383        self.values.limits.max_api_file_upload_size.as_bytes()
2384    }
2385
2386    /// Returns the maximum payload size for chunks
2387    pub fn max_api_chunk_upload_size(&self) -> usize {
2388        self.values.limits.max_api_chunk_upload_size.as_bytes()
2389    }
2390
2391    /// Returns the maximum payload size for a profile
2392    pub fn max_profile_size(&self) -> usize {
2393        self.values.limits.max_profile_size.as_bytes()
2394    }
2395
2396    /// Returns the maximum payload size for a compressed replay.
2397    pub fn max_replay_compressed_size(&self) -> usize {
2398        self.values.limits.max_replay_compressed_size.as_bytes()
2399    }
2400
2401    /// Returns the maximum payload size for an uncompressed replay.
2402    pub fn max_replay_uncompressed_size(&self) -> usize {
2403        self.values.limits.max_replay_uncompressed_size.as_bytes()
2404    }
2405
2406    /// Returns the maximum message size for an uncompressed replay.
2407    ///
2408    /// This is greater than max_replay_compressed_size because
2409    /// it can include additional metadata about the replay in
2410    /// addition to the recording.
2411    pub fn max_replay_message_size(&self) -> usize {
2412        self.values.limits.max_replay_message_size.as_bytes()
2413    }
2414
2415    /// Returns the maximum number of active requests
2416    pub fn max_concurrent_requests(&self) -> usize {
2417        self.values.limits.max_concurrent_requests
2418    }
2419
2420    /// Returns the maximum number of active queries
2421    pub fn max_concurrent_queries(&self) -> usize {
2422        self.values.limits.max_concurrent_queries
2423    }
2424
2425    /// The maximum number of seconds a query is allowed to take across retries.
2426    pub fn query_timeout(&self) -> Duration {
2427        Duration::from_secs(self.values.limits.query_timeout)
2428    }
2429
2430    /// The maximum number of seconds to wait for pending envelopes after receiving a shutdown
2431    /// signal.
2432    pub fn shutdown_timeout(&self) -> Duration {
2433        Duration::from_secs(self.values.limits.shutdown_timeout)
2434    }
2435
2436    /// Returns the server keep-alive timeout in seconds.
2437    ///
2438    /// By default keep alive is set to a 5 seconds.
2439    pub fn keepalive_timeout(&self) -> Duration {
2440        Duration::from_secs(self.values.limits.keepalive_timeout)
2441    }
2442
2443    /// Returns the server idle timeout in seconds.
2444    pub fn idle_timeout(&self) -> Option<Duration> {
2445        self.values.limits.idle_timeout.map(Duration::from_secs)
2446    }
2447
2448    /// Returns the maximum connections.
2449    pub fn max_connections(&self) -> Option<usize> {
2450        self.values.limits.max_connections
2451    }
2452
2453    /// TCP listen backlog to configure on Relay's listening socket.
2454    pub fn tcp_listen_backlog(&self) -> u32 {
2455        self.values.limits.tcp_listen_backlog
2456    }
2457
2458    /// Returns the number of cores to use for thread pools.
2459    pub fn cpu_concurrency(&self) -> usize {
2460        self.values.limits.max_thread_count
2461    }
2462
2463    /// Returns the number of tasks that can run concurrently in the worker pool.
2464    pub fn pool_concurrency(&self) -> usize {
2465        self.values.limits.max_pool_concurrency
2466    }
2467
2468    /// Returns the maximum size of a project config query.
2469    pub fn query_batch_size(&self) -> usize {
2470        self.values.cache.batch_size
2471    }
2472
2473    /// Get filename for static project config.
2474    pub fn project_configs_path(&self) -> PathBuf {
2475        self.path.join("projects")
2476    }
2477
2478    /// True if the Relay should do processing.
2479    pub fn processing_enabled(&self) -> bool {
2480        self.values.processing.enabled
2481    }
2482
2483    /// Level of normalization for Relay to apply to incoming data.
2484    pub fn normalization_level(&self) -> NormalizationLevel {
2485        self.values.normalization.level
2486    }
2487
2488    /// The path to the GeoIp database required for event processing.
2489    pub fn geoip_path(&self) -> Option<&Path> {
2490        self.values
2491            .geoip
2492            .path
2493            .as_deref()
2494            .or(self.values.processing.geoip_path.as_deref())
2495    }
2496
2497    /// Maximum future timestamp of ingested data.
2498    ///
2499    /// Events past this timestamp will be adjusted to `now()`. Sessions will be dropped.
2500    pub fn max_secs_in_future(&self) -> i64 {
2501        self.values.processing.max_secs_in_future.into()
2502    }
2503
2504    /// Maximum age of ingested sessions. Older sessions will be dropped.
2505    pub fn max_session_secs_in_past(&self) -> i64 {
2506        self.values.processing.max_session_secs_in_past.into()
2507    }
2508
2509    /// Configuration name and list of Kafka configuration parameters for a given topic.
2510    pub fn kafka_configs(
2511        &self,
2512        topic: KafkaTopic,
2513    ) -> Result<KafkaTopicConfig<'_>, KafkaConfigError> {
2514        self.values.processing.topics.get(topic).kafka_configs(
2515            &self.values.processing.kafka_config,
2516            &self.values.processing.secondary_kafka_configs,
2517        )
2518    }
2519
2520    /// Whether to validate the topics against Kafka.
2521    pub fn kafka_validate_topics(&self) -> bool {
2522        self.values.processing.kafka_validate_topics
2523    }
2524
2525    /// All unused but configured topic assignments.
2526    pub fn unused_topic_assignments(&self) -> &relay_kafka::Unused {
2527        &self.values.processing.topics.unused
2528    }
2529
2530    /// Redis servers to connect to for project configs, cardinality limits,
2531    /// rate limiting, and metrics metadata.
2532    pub fn redis(&self) -> Option<RedisConfigsRef<'_>> {
2533        let redis_configs = self.values.processing.redis.as_ref()?;
2534
2535        Some(build_redis_configs(
2536            redis_configs,
2537            self.cpu_concurrency() as u32,
2538        ))
2539    }
2540
2541    /// Chunk size of attachments in bytes.
2542    pub fn attachment_chunk_size(&self) -> usize {
2543        self.values.processing.attachment_chunk_size.as_bytes()
2544    }
2545
2546    /// Maximum metrics batch size in bytes.
2547    pub fn metrics_max_batch_size_bytes(&self) -> usize {
2548        self.values.aggregator.max_flush_bytes
2549    }
2550
2551    /// Default prefix to use when looking up project configs in Redis. This is only done when
2552    /// Relay is in processing mode.
2553    pub fn projectconfig_cache_prefix(&self) -> &str {
2554        &self.values.processing.projectconfig_cache_prefix
2555    }
2556
2557    /// Maximum rate limit to report to clients in seconds.
2558    pub fn max_rate_limit(&self) -> Option<u64> {
2559        self.values.processing.max_rate_limit.map(u32::into)
2560    }
2561
2562    /// Cache vacuum interval for the cardinality limiter in memory cache.
2563    ///
2564    /// The cache will scan for expired values based on this interval.
2565    pub fn cardinality_limiter_cache_vacuum_interval(&self) -> Duration {
2566        Duration::from_secs(self.values.cardinality_limiter.cache_vacuum_interval)
2567    }
2568
2569    /// Interval to refresh internal health checks.
2570    pub fn health_refresh_interval(&self) -> Duration {
2571        Duration::from_millis(self.values.health.refresh_interval_ms)
2572    }
2573
2574    /// Maximum memory watermark in bytes.
2575    pub fn health_max_memory_watermark_bytes(&self) -> u64 {
2576        self.values
2577            .health
2578            .max_memory_bytes
2579            .as_ref()
2580            .map_or(u64::MAX, |b| b.as_bytes() as u64)
2581    }
2582
2583    /// Maximum memory watermark as a percentage of maximum system memory.
2584    pub fn health_max_memory_watermark_percent(&self) -> f32 {
2585        self.values.health.max_memory_percent
2586    }
2587
2588    /// Health check probe timeout.
2589    pub fn health_probe_timeout(&self) -> Duration {
2590        Duration::from_millis(self.values.health.probe_timeout_ms)
2591    }
2592
2593    /// Refresh frequency for polling new memory stats.
2594    pub fn memory_stat_refresh_frequency_ms(&self) -> u64 {
2595        self.values.health.memory_stat_refresh_frequency_ms
2596    }
2597
2598    /// Maximum amount of COGS measurements buffered in memory.
2599    pub fn cogs_max_queue_size(&self) -> u64 {
2600        self.values.cogs.max_queue_size
2601    }
2602
2603    /// Resource ID to use for Relay COGS measurements.
2604    pub fn cogs_relay_resource_id(&self) -> &str {
2605        &self.values.cogs.relay_resource_id
2606    }
2607
2608    /// Returns configuration for the default metrics aggregator.
2609    pub fn default_aggregator_config(&self) -> &AggregatorServiceConfig {
2610        &self.values.aggregator
2611    }
2612
2613    /// Returns configuration for non-default metrics aggregator.
2614    pub fn secondary_aggregator_configs(&self) -> &Vec<ScopedAggregatorConfig> {
2615        &self.values.secondary_aggregators
2616    }
2617
2618    /// Returns aggregator config for a given metrics namespace.
2619    pub fn aggregator_config_for(&self, namespace: MetricNamespace) -> &AggregatorServiceConfig {
2620        for entry in &self.values.secondary_aggregators {
2621            if entry.condition.matches(Some(namespace)) {
2622                return &entry.config;
2623            }
2624        }
2625        &self.values.aggregator
2626    }
2627
2628    /// Return the statically configured Relays.
2629    pub fn static_relays(&self) -> &HashMap<RelayId, RelayInfo> {
2630        &self.values.auth.static_relays
2631    }
2632
2633    /// Returns the max age a signature is considered valid, in seconds.
2634    pub fn signature_max_age(&self) -> Duration {
2635        Duration::from_secs(self.values.auth.signature_max_age)
2636    }
2637
2638    /// Returns `true` if unknown items should be accepted and forwarded.
2639    pub fn accept_unknown_items(&self) -> bool {
2640        let forward = self.values.routing.accept_unknown_items;
2641        forward.unwrap_or_else(|| !self.processing_enabled())
2642    }
2643
2644    /// Returns `true` if we should produce TraceItem spans on `snuba-items`.
2645    pub fn produce_protobuf_spans(&self) -> bool {
2646        self.values.processing.span_producers.produce_protobuf
2647    }
2648
2649    /// Returns `true` if we should produce JSON spans on `ingest-spans`.
2650    pub fn produce_json_spans(&self) -> bool {
2651        self.values.processing.span_producers.produce_json
2652    }
2653}
2654
2655impl Default for Config {
2656    fn default() -> Self {
2657        Self {
2658            values: ConfigValues::default(),
2659            credentials: None,
2660            path: PathBuf::new(),
2661        }
2662    }
2663}
2664
2665#[cfg(test)]
2666mod tests {
2667
2668    use super::*;
2669
2670    /// Regression test for renaming the envelope buffer flags.
2671    #[test]
2672    fn test_event_buffer_size() {
2673        let yaml = r###"
2674cache:
2675    event_buffer_size: 1000000
2676    event_expiry: 1800
2677"###;
2678
2679        let values: ConfigValues = serde_yaml::from_str(yaml).unwrap();
2680        assert_eq!(values.cache.envelope_buffer_size, 1_000_000);
2681        assert_eq!(values.cache.envelope_expiry, 1800);
2682    }
2683
2684    #[test]
2685    fn test_emit_outcomes() {
2686        for (serialized, deserialized) in &[
2687            ("true", EmitOutcomes::AsOutcomes),
2688            ("false", EmitOutcomes::None),
2689            ("\"as_client_reports\"", EmitOutcomes::AsClientReports),
2690        ] {
2691            let value: EmitOutcomes = serde_json::from_str(serialized).unwrap();
2692            assert_eq!(value, *deserialized);
2693            assert_eq!(serde_json::to_string(&value).unwrap(), *serialized);
2694        }
2695    }
2696
2697    #[test]
2698    fn test_emit_outcomes_invalid() {
2699        assert!(serde_json::from_str::<EmitOutcomes>("asdf").is_err());
2700    }
2701}