relay_config/config.rs
1use std::collections::{BTreeMap, HashMap};
2use std::error::Error;
3use std::io::Write;
4use std::net::{IpAddr, SocketAddr};
5use std::num::{NonZeroU8, NonZeroU16};
6use std::path::{Path, PathBuf};
7use std::str::FromStr;
8use std::time::Duration;
9use std::{env, fmt, fs, io};
10
11use anyhow::Context;
12use relay_auth::{PublicKey, RelayId, SecretKey, generate_key_pair, generate_relay_id};
13use relay_common::Dsn;
14use relay_kafka::{
15 ConfigError as KafkaConfigError, KafkaConfigParam, KafkaTopic, KafkaTopicConfig,
16 TopicAssignments,
17};
18use relay_metrics::MetricNamespace;
19use serde::de::{DeserializeOwned, Unexpected, Visitor};
20use serde::{Deserialize, Deserializer, Serialize, Serializer};
21use uuid::Uuid;
22
23use crate::aggregator::{AggregatorServiceConfig, ScopedAggregatorConfig};
24use crate::byte_size::ByteSize;
25use crate::upstream::UpstreamDescriptor;
26use crate::{RedisConfig, RedisConfigs, RedisConfigsRef, build_redis_configs};
27
28const DEFAULT_NETWORK_OUTAGE_GRACE_PERIOD: u64 = 10;
29
30static CONFIG_YAML_HEADER: &str = r###"# Please see the relevant documentation.
31# Performance tuning: https://docs.sentry.io/product/relay/operating-guidelines/
32# All config options: https://docs.sentry.io/product/relay/options/
33"###;
34
35/// Indicates config related errors.
36#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
37#[non_exhaustive]
38pub enum ConfigErrorKind {
39 /// Failed to open the file.
40 CouldNotOpenFile,
41 /// Failed to save a file.
42 CouldNotWriteFile,
43 /// Parsing YAML failed.
44 BadYaml,
45 /// Parsing JSON failed.
46 BadJson,
47 /// Invalid config value
48 InvalidValue,
49 /// The user attempted to run Relay with processing enabled, but uses a binary that was
50 /// compiled without the processing feature.
51 ProcessingNotAvailable,
52}
53
54impl fmt::Display for ConfigErrorKind {
55 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
56 match self {
57 Self::CouldNotOpenFile => write!(f, "could not open config file"),
58 Self::CouldNotWriteFile => write!(f, "could not write config file"),
59 Self::BadYaml => write!(f, "could not parse yaml config file"),
60 Self::BadJson => write!(f, "could not parse json config file"),
61 Self::InvalidValue => write!(f, "invalid config value"),
62 Self::ProcessingNotAvailable => write!(
63 f,
64 "was not compiled with processing, cannot enable processing"
65 ),
66 }
67 }
68}
69
70/// Defines the source of a config error
71#[derive(Debug, Default)]
72enum ConfigErrorSource {
73 /// An error occurring independently.
74 #[default]
75 None,
76 /// An error originating from a configuration file.
77 File(PathBuf),
78 /// An error originating in a field override (an env var, or a CLI parameter).
79 FieldOverride(String),
80}
81
82impl fmt::Display for ConfigErrorSource {
83 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84 match self {
85 ConfigErrorSource::None => Ok(()),
86 ConfigErrorSource::File(file_name) => {
87 write!(f, " (file {})", file_name.display())
88 }
89 ConfigErrorSource::FieldOverride(name) => write!(f, " (field {name})"),
90 }
91 }
92}
93
94/// Indicates config related errors.
95#[derive(Debug)]
96pub struct ConfigError {
97 source: ConfigErrorSource,
98 kind: ConfigErrorKind,
99}
100
101impl ConfigError {
102 #[inline]
103 fn new(kind: ConfigErrorKind) -> Self {
104 Self {
105 source: ConfigErrorSource::None,
106 kind,
107 }
108 }
109
110 #[inline]
111 fn field(field: &'static str) -> Self {
112 Self {
113 source: ConfigErrorSource::FieldOverride(field.to_owned()),
114 kind: ConfigErrorKind::InvalidValue,
115 }
116 }
117
118 #[inline]
119 fn file(kind: ConfigErrorKind, p: impl AsRef<Path>) -> Self {
120 Self {
121 source: ConfigErrorSource::File(p.as_ref().to_path_buf()),
122 kind,
123 }
124 }
125
126 /// Returns the error kind of the error.
127 pub fn kind(&self) -> ConfigErrorKind {
128 self.kind
129 }
130}
131
132impl fmt::Display for ConfigError {
133 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
134 write!(f, "{}{}", self.kind(), self.source)
135 }
136}
137
138impl Error for ConfigError {}
139
140enum ConfigFormat {
141 Yaml,
142 Json,
143}
144
145impl ConfigFormat {
146 pub fn extension(&self) -> &'static str {
147 match self {
148 ConfigFormat::Yaml => "yml",
149 ConfigFormat::Json => "json",
150 }
151 }
152}
153
154trait ConfigObject: DeserializeOwned + Serialize {
155 /// The format in which to serialize this configuration.
156 fn format() -> ConfigFormat;
157
158 /// The basename of the config file.
159 fn name() -> &'static str;
160
161 /// The full filename of the config file, including the file extension.
162 fn path(base: &Path) -> PathBuf {
163 base.join(format!("{}.{}", Self::name(), Self::format().extension()))
164 }
165
166 /// Loads the config file from a file within the given directory location.
167 fn load(base: &Path) -> anyhow::Result<Self> {
168 let path = Self::path(base);
169
170 let f = fs::File::open(&path)
171 .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotOpenFile, &path))?;
172 let f = io::BufReader::new(f);
173
174 let mut source = {
175 let file = serde_vars::FileSource::default()
176 .with_variable_prefix("${file:")
177 .with_variable_suffix("}")
178 .with_base_path(base);
179 let env = serde_vars::EnvSource::default()
180 .with_variable_prefix("${")
181 .with_variable_suffix("}");
182 (file, env)
183 };
184 match Self::format() {
185 ConfigFormat::Yaml => {
186 serde_vars::deserialize(serde_yaml::Deserializer::from_reader(f), &mut source)
187 .with_context(|| ConfigError::file(ConfigErrorKind::BadYaml, &path))
188 }
189 ConfigFormat::Json => {
190 serde_vars::deserialize(&mut serde_json::Deserializer::from_reader(f), &mut source)
191 .with_context(|| ConfigError::file(ConfigErrorKind::BadJson, &path))
192 }
193 }
194 }
195
196 /// Writes the configuration to a file within the given directory location.
197 fn save(&self, base: &Path) -> anyhow::Result<()> {
198 let path = Self::path(base);
199 let mut options = fs::OpenOptions::new();
200 options.write(true).truncate(true).create(true);
201
202 // Remove all non-user permissions for the newly created file
203 #[cfg(unix)]
204 {
205 use std::os::unix::fs::OpenOptionsExt;
206 options.mode(0o600);
207 }
208
209 let mut f = options
210 .open(&path)
211 .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path))?;
212
213 match Self::format() {
214 ConfigFormat::Yaml => {
215 f.write_all(CONFIG_YAML_HEADER.as_bytes())?;
216 serde_yaml::to_writer(&mut f, self)
217 .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path))?
218 }
219 ConfigFormat::Json => serde_json::to_writer_pretty(&mut f, self)
220 .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path))?,
221 }
222
223 f.write_all(b"\n").ok();
224
225 Ok(())
226 }
227}
228
229/// Structure used to hold information about configuration overrides via
230/// CLI parameters or environment variables
231#[derive(Debug, Default)]
232pub struct OverridableConfig {
233 /// The operation mode of this relay.
234 pub mode: Option<String>,
235 /// The instance type of this relay.
236 pub instance: Option<String>,
237 /// The log level of this relay.
238 pub log_level: Option<String>,
239 /// The log format of this relay.
240 pub log_format: Option<String>,
241 /// The upstream relay or sentry instance.
242 pub upstream: Option<String>,
243 /// Alternate upstream provided through a Sentry DSN. Key and project will be ignored.
244 pub upstream_dsn: Option<String>,
245 /// The host the relay should bind to (network interface).
246 pub host: Option<String>,
247 /// The port to bind for the unencrypted relay HTTP server.
248 pub port: Option<String>,
249 /// "true" if processing is enabled "false" otherwise
250 pub processing: Option<String>,
251 /// the kafka bootstrap.servers configuration string
252 pub kafka_url: Option<String>,
253 /// the redis server url
254 pub redis_url: Option<String>,
255 /// The globally unique ID of the relay.
256 pub id: Option<String>,
257 /// The secret key of the relay
258 pub secret_key: Option<String>,
259 /// The public key of the relay
260 pub public_key: Option<String>,
261 /// Outcome source
262 pub outcome_source: Option<String>,
263 /// shutdown timeout
264 pub shutdown_timeout: Option<String>,
265 /// Server name reported in the Sentry SDK.
266 pub server_name: Option<String>,
267}
268
269/// The relay credentials
270#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
271pub struct Credentials {
272 /// The secret key of the relay
273 pub secret_key: SecretKey,
274 /// The public key of the relay
275 pub public_key: PublicKey,
276 /// The globally unique ID of the relay.
277 pub id: RelayId,
278}
279
280impl Credentials {
281 /// Generates new random credentials.
282 pub fn generate() -> Self {
283 relay_log::info!("generating new relay credentials");
284 let (sk, pk) = generate_key_pair();
285 Self {
286 secret_key: sk,
287 public_key: pk,
288 id: generate_relay_id(),
289 }
290 }
291
292 /// Serializes this configuration to JSON.
293 pub fn to_json_string(&self) -> anyhow::Result<String> {
294 serde_json::to_string(self)
295 .with_context(|| ConfigError::new(ConfigErrorKind::CouldNotWriteFile))
296 }
297}
298
299impl ConfigObject for Credentials {
300 fn format() -> ConfigFormat {
301 ConfigFormat::Json
302 }
303 fn name() -> &'static str {
304 "credentials"
305 }
306}
307
308/// Information on a downstream Relay.
309#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
310#[serde(rename_all = "camelCase")]
311pub struct RelayInfo {
312 /// The public key that this Relay uses to authenticate and sign requests.
313 pub public_key: PublicKey,
314
315 /// Marks an internal relay that has privileged access to more project configuration.
316 #[serde(default)]
317 pub internal: bool,
318}
319
320impl RelayInfo {
321 /// Creates a new RelayInfo
322 pub fn new(public_key: PublicKey) -> Self {
323 Self {
324 public_key,
325 internal: false,
326 }
327 }
328}
329
330/// The operation mode of a relay.
331#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
332#[serde(rename_all = "camelCase")]
333pub enum RelayMode {
334 /// This relay acts as a proxy for all requests and events.
335 ///
336 /// Events are normalized and rate limits from the upstream are enforced, but the relay will not
337 /// fetch project configurations from the upstream or perform PII stripping. All events are
338 /// accepted unless overridden on the file system.
339 Proxy,
340
341 /// Project configurations are managed by the upstream.
342 ///
343 /// Project configurations are always fetched from the upstream, unless they are statically
344 /// overridden in the file system. This relay must be allowed in the upstream Sentry. This is
345 /// only possible, if the upstream is Sentry directly, or another managed Relay.
346 Managed,
347}
348
349impl<'de> Deserialize<'de> for RelayMode {
350 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
351 where
352 D: Deserializer<'de>,
353 {
354 let s = String::deserialize(deserializer)?;
355 match s.as_str() {
356 "proxy" => Ok(RelayMode::Proxy),
357 "managed" => Ok(RelayMode::Managed),
358 "static" => Err(serde::de::Error::custom(
359 "Relay mode 'static' has been removed. Please use 'managed' or 'proxy' instead.",
360 )),
361 other => Err(serde::de::Error::unknown_variant(
362 other,
363 &["proxy", "managed"],
364 )),
365 }
366 }
367}
368
369impl fmt::Display for RelayMode {
370 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
371 match self {
372 RelayMode::Proxy => write!(f, "proxy"),
373 RelayMode::Managed => write!(f, "managed"),
374 }
375 }
376}
377
378/// The instance type of Relay.
379#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize)]
380#[serde(rename_all = "camelCase")]
381pub enum RelayInstance {
382 /// This Relay is run as a default instance.
383 Default,
384
385 /// This Relay is run as a canary instance where experiments can be run.
386 Canary,
387}
388
389impl RelayInstance {
390 /// Returns `true` if the [`RelayInstance`] is of type [`RelayInstance::Canary`].
391 pub fn is_canary(&self) -> bool {
392 matches!(self, RelayInstance::Canary)
393 }
394}
395
396impl fmt::Display for RelayInstance {
397 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
398 match self {
399 RelayInstance::Default => write!(f, "default"),
400 RelayInstance::Canary => write!(f, "canary"),
401 }
402 }
403}
404
405impl FromStr for RelayInstance {
406 type Err = fmt::Error;
407
408 fn from_str(s: &str) -> Result<Self, Self::Err> {
409 match s {
410 "canary" => Ok(RelayInstance::Canary),
411 _ => Ok(RelayInstance::Default),
412 }
413 }
414}
415
416/// Error returned when parsing an invalid [`RelayMode`].
417#[derive(Clone, Copy, Debug, Eq, PartialEq)]
418pub struct ParseRelayModeError;
419
420impl fmt::Display for ParseRelayModeError {
421 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
422 write!(f, "Relay mode must be one of: managed or proxy")
423 }
424}
425
426impl Error for ParseRelayModeError {}
427
428impl FromStr for RelayMode {
429 type Err = ParseRelayModeError;
430
431 fn from_str(s: &str) -> Result<Self, Self::Err> {
432 match s {
433 "proxy" => Ok(RelayMode::Proxy),
434 "managed" => Ok(RelayMode::Managed),
435 _ => Err(ParseRelayModeError),
436 }
437 }
438}
439
440/// Returns `true` if this value is equal to `Default::default()`.
441fn is_default<T: Default + PartialEq>(t: &T) -> bool {
442 *t == T::default()
443}
444
445/// Checks if we are running in docker.
446fn is_docker() -> bool {
447 if fs::metadata("/.dockerenv").is_ok() {
448 return true;
449 }
450
451 fs::read_to_string("/proc/self/cgroup").is_ok_and(|s| s.contains("/docker"))
452}
453
454/// Default value for the "bind" configuration.
455fn default_host() -> IpAddr {
456 if is_docker() {
457 // Docker images rely on this service being exposed
458 "0.0.0.0".parse().unwrap()
459 } else {
460 "127.0.0.1".parse().unwrap()
461 }
462}
463
464/// Controls responses from the readiness health check endpoint based on authentication.
465///
466/// Independent of the the readiness condition, shutdown always switches Relay into unready state.
467#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize)]
468#[serde(rename_all = "lowercase")]
469#[derive(Default)]
470pub enum ReadinessCondition {
471 /// (default) Relay is ready when authenticated and connected to the upstream.
472 ///
473 /// Before authentication has succeeded and during network outages, Relay responds as not ready.
474 /// Relay reauthenticates based on the `http.auth_interval` parameter. During reauthentication,
475 /// Relay remains ready until authentication fails.
476 ///
477 /// Authentication is only required for Relays in managed mode. Other Relays will only check for
478 /// network outages.
479 #[default]
480 Authenticated,
481 /// Relay reports readiness regardless of the authentication and networking state.
482 Always,
483}
484
485/// Relay specific configuration values.
486#[derive(Serialize, Deserialize, Debug)]
487#[serde(default)]
488pub struct Relay {
489 /// The operation mode of this Relay.
490 pub mode: RelayMode,
491 /// The instance type of this Relay.
492 pub instance: RelayInstance,
493 /// The upstream Relay or Sentry instance.
494 pub upstream: UpstreamDescriptor,
495 /// The upstream advertised to downstream Relay instances.
496 ///
497 /// This value will be advertised to downstream Relays as the upstream to use when forwarding
498 /// data. It can be used for traffic routing and balancing, it must not redirect to a different
499 /// Sentry instance.
500 ///
501 /// Downstream Relays will treat the advertised upstream as the same logical component as this instance
502 /// and re-use already established authentication keys.
503 pub advertised_upstream: Option<UpstreamDescriptor>,
504 /// The host the relay should bind to (network interface).
505 pub host: IpAddr,
506 /// The port to bind for the unencrypted relay HTTP server.
507 pub port: u16,
508 /// The host the relay should bind to (network interface) for internally exposed APIs, like
509 /// health checks.
510 ///
511 /// If not configured, internal routes are exposed on the main HTTP server.
512 ///
513 /// Note: configuring the internal http server on an address which overlaps with the main
514 /// server (e.g. main on `0.0.0.0:3000` and internal on `127.0.0.1:3000`) is a misconfiguration
515 /// resulting in approximately half of the requests sent to `127.0.0.1:3000` to fail, as the handling
516 /// http server is chosen by the operating system 'at random'.
517 ///
518 /// As a best practice you should always choose different ports to avoid this issue.
519 ///
520 /// Defaults to [`Self::host`].
521 pub internal_host: Option<IpAddr>,
522 /// The port to bind for internally exposed APIs.
523 ///
524 /// Defaults to [`Self::port`].
525 pub internal_port: Option<u16>,
526 /// Optional port to bind for the encrypted relay HTTPS server.
527 #[serde(skip_serializing)]
528 pub tls_port: Option<u16>,
529 /// The path to the identity (DER-encoded PKCS12) to use for TLS.
530 #[serde(skip_serializing)]
531 pub tls_identity_path: Option<PathBuf>,
532 /// Password for the PKCS12 archive.
533 #[serde(skip_serializing)]
534 pub tls_identity_password: Option<String>,
535 /// Always override project IDs from the URL and DSN with the identifier used at the upstream.
536 ///
537 /// Enable this setting for Relays used to redirect traffic to a migrated Sentry instance.
538 /// Validation of project identifiers can be safely skipped in these cases.
539 #[serde(skip_serializing_if = "is_default")]
540 pub override_project_ids: bool,
541}
542
543impl Default for Relay {
544 fn default() -> Self {
545 Relay {
546 mode: RelayMode::Managed,
547 instance: RelayInstance::Default,
548 upstream: "https://sentry.io/".parse().unwrap(),
549 advertised_upstream: None,
550 host: default_host(),
551 port: 3000,
552 internal_host: None,
553 internal_port: None,
554 tls_port: None,
555 tls_identity_path: None,
556 tls_identity_password: None,
557 override_project_ids: false,
558 }
559 }
560}
561
562/// Control the metrics.
563#[derive(Serialize, Deserialize, Debug)]
564#[serde(default)]
565pub struct Metrics {
566 /// Hostname and port of the statsd server.
567 ///
568 /// Defaults to `None`.
569 pub statsd: Option<String>,
570 /// Buffer size used for metrics sent to the statsd socket.
571 ///
572 /// Defaults to `None`.
573 pub statsd_buffer_size: Option<usize>,
574 /// Common prefix that should be added to all metrics.
575 ///
576 /// Defaults to `"sentry.relay"`.
577 pub prefix: String,
578 /// Default tags to apply to all metrics.
579 pub default_tags: BTreeMap<String, String>,
580 /// Tag name to report the hostname to for each metric. Defaults to not sending such a tag.
581 pub hostname_tag: Option<String>,
582 /// Interval for periodic metrics emitted from Relay.
583 ///
584 /// Setting it to `0` seconds disables the periodic metrics.
585 /// Defaults to 5 seconds.
586 pub periodic_secs: u64,
587}
588
589impl Default for Metrics {
590 fn default() -> Self {
591 Metrics {
592 statsd: None,
593 statsd_buffer_size: None,
594 prefix: "sentry.relay".into(),
595 default_tags: BTreeMap::new(),
596 hostname_tag: None,
597 periodic_secs: 5,
598 }
599 }
600}
601
602/// Controls various limits
603#[derive(Serialize, Deserialize, Debug)]
604#[serde(default)]
605pub struct Limits {
606 /// How many requests can be sent concurrently from Relay to the upstream before Relay starts
607 /// buffering.
608 pub max_concurrent_requests: usize,
609 /// How many queries can be sent concurrently from Relay to the upstream before Relay starts
610 /// buffering.
611 ///
612 /// The concurrency of queries is additionally constrained by `max_concurrent_requests`.
613 pub max_concurrent_queries: usize,
614 /// The maximum payload size for events.
615 pub max_event_size: ByteSize,
616 /// The maximum size for each attachment.
617 pub max_attachment_size: ByteSize,
618 /// The maximum size for a TUS upload request body.
619 pub max_upload_size: ByteSize,
620 /// The maximum combined size for all attachments in an envelope or request.
621 pub max_attachments_size: ByteSize,
622 /// The maximum combined size for all client reports in an envelope or request.
623 pub max_client_reports_size: ByteSize,
624 /// The maximum payload size for a monitor check-in.
625 pub max_check_in_size: ByteSize,
626 /// The maximum payload size for an entire envelopes. Individual limits still apply.
627 pub max_envelope_size: ByteSize,
628 /// The maximum number of session items per envelope.
629 pub max_session_count: usize,
630 /// The maximum payload size for general API requests.
631 pub max_api_payload_size: ByteSize,
632 /// The maximum payload size for file uploads and chunks.
633 pub max_api_file_upload_size: ByteSize,
634 /// The maximum payload size for chunks
635 pub max_api_chunk_upload_size: ByteSize,
636 /// The maximum payload size for a profile
637 pub max_profile_size: ByteSize,
638 /// The maximum payload size for a trace metric.
639 pub max_trace_metric_size: ByteSize,
640 /// The maximum payload size for a log.
641 pub max_log_size: ByteSize,
642 /// The maximum payload size for a span.
643 pub max_span_size: ByteSize,
644 /// The maximum payload size for an item container.
645 pub max_container_size: ByteSize,
646 /// The maximum payload size for a statsd metric.
647 pub max_statsd_size: ByteSize,
648 /// The maximum payload size for metric buckets.
649 pub max_metric_buckets_size: ByteSize,
650 /// The maximum payload size for a compressed replay.
651 pub max_replay_compressed_size: ByteSize,
652 /// The maximum payload size for an uncompressed replay.
653 #[serde(alias = "max_replay_size")]
654 max_replay_uncompressed_size: ByteSize,
655 /// The maximum size for a replay recording Kafka message.
656 pub max_replay_message_size: ByteSize,
657 /// The byte size limit up to which Relay will retain
658 /// keys of invalid/removed attributes.
659 ///
660 /// This is only relevant for EAP items (spans, logs, …).
661 /// In principle, we want to record all deletions of attributes,
662 /// but we have to institute some limit to protect our infrastructure
663 /// against excessive metadata sizes.
664 ///
665 /// Defaults to 10KiB.
666 pub max_removed_attribute_key_size: ByteSize,
667 /// The maximum number of threads to spawn for CPU and web work, each.
668 ///
669 /// The total number of threads spawned will roughly be `2 * max_thread_count`. Defaults to
670 /// the number of logical CPU cores on the host.
671 pub max_thread_count: usize,
672 /// Controls the maximum concurrency of each worker thread.
673 ///
674 /// Increasing the concurrency, can lead to a better utilization of worker threads by
675 /// increasing the amount of I/O done concurrently.
676 //
677 /// Currently has no effect on defaults to `1`.
678 pub max_pool_concurrency: usize,
679 /// The maximum number of seconds a query is allowed to take across retries. Individual requests
680 /// have lower timeouts. Defaults to 30 seconds.
681 pub query_timeout: u64,
682 /// The maximum number of seconds to wait for pending envelopes after receiving a shutdown
683 /// signal.
684 pub shutdown_timeout: u64,
685 /// Server keep-alive timeout in seconds.
686 ///
687 /// By default, keep-alive is set to 5 seconds.
688 pub keepalive_timeout: u64,
689 /// Server idle timeout in seconds.
690 ///
691 /// The idle timeout limits the amount of time a connection is kept open without activity.
692 /// Setting this too short may abort connections before Relay is able to send a response.
693 ///
694 /// By default there is no idle timeout.
695 pub idle_timeout: Option<u64>,
696 /// Sets the maximum number of concurrent connections.
697 ///
698 /// Upon reaching the limit, the server will stop accepting connections.
699 ///
700 /// By default there is no limit.
701 pub max_connections: Option<usize>,
702 /// The TCP listen backlog.
703 ///
704 /// Configures the TCP listen backlog for the listening socket of Relay.
705 /// See [`man listen(2)`](https://man7.org/linux/man-pages/man2/listen.2.html)
706 /// for a more detailed description of the listen backlog.
707 ///
708 /// Defaults to `1024`, a value [google has been using for a long time](https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/commit/?id=19f92a030ca6d772ab44b22ee6a01378a8cb32d4).
709 pub tcp_listen_backlog: u32,
710}
711
712impl Default for Limits {
713 fn default() -> Self {
714 Limits {
715 max_concurrent_requests: 100,
716 max_concurrent_queries: 5,
717 max_event_size: ByteSize::mebibytes(1),
718 max_attachment_size: ByteSize::mebibytes(200),
719 max_upload_size: ByteSize::mebibytes(1024),
720 max_attachments_size: ByteSize::mebibytes(200),
721 max_client_reports_size: ByteSize::kibibytes(4),
722 max_check_in_size: ByteSize::kibibytes(100),
723 max_envelope_size: ByteSize::mebibytes(200),
724 max_session_count: 100,
725 max_api_payload_size: ByteSize::mebibytes(20),
726 max_api_file_upload_size: ByteSize::mebibytes(40),
727 max_api_chunk_upload_size: ByteSize::mebibytes(100),
728 max_profile_size: ByteSize::mebibytes(50),
729 max_trace_metric_size: ByteSize::mebibytes(1),
730 max_log_size: ByteSize::mebibytes(1),
731 max_span_size: ByteSize::mebibytes(10),
732 max_container_size: ByteSize::mebibytes(12),
733 max_statsd_size: ByteSize::mebibytes(1),
734 max_metric_buckets_size: ByteSize::mebibytes(1),
735 max_replay_compressed_size: ByteSize::mebibytes(10),
736 max_replay_uncompressed_size: ByteSize::mebibytes(100),
737 max_replay_message_size: ByteSize::mebibytes(15),
738 max_thread_count: num_cpus::get(),
739 max_pool_concurrency: 1,
740 query_timeout: 30,
741 shutdown_timeout: 10,
742 keepalive_timeout: 5,
743 idle_timeout: None,
744 max_connections: None,
745 tcp_listen_backlog: 1024,
746 max_removed_attribute_key_size: ByteSize::kibibytes(10),
747 }
748 }
749}
750
751/// Controls traffic steering.
752#[derive(Debug, Default, Deserialize, Serialize)]
753#[serde(default)]
754pub struct Routing {
755 /// Accept and forward unknown Envelope items to the upstream.
756 ///
757 /// Forwarding unknown items should be enabled in most cases to allow proxying traffic for newer
758 /// SDK versions. The upstream in Sentry makes the final decision on which items are valid. If
759 /// this is disabled, just the unknown items are removed from Envelopes, and the rest is
760 /// processed as usual.
761 ///
762 /// Defaults to `true` for all Relay modes other than processing mode. In processing mode, this
763 /// is disabled by default since the item cannot be handled.
764 pub accept_unknown_items: Option<bool>,
765}
766
767/// Http content encoding for both incoming and outgoing web requests.
768#[derive(Clone, Copy, Debug, Default, Deserialize, Serialize)]
769#[serde(rename_all = "lowercase")]
770pub enum HttpEncoding {
771 /// Identity function without no compression.
772 ///
773 /// This is the default encoding and does not require the presence of the `content-encoding`
774 /// HTTP header.
775 #[default]
776 Identity,
777 /// Compression using a [zlib](https://en.wikipedia.org/wiki/Zlib) structure with
778 /// [deflate](https://en.wikipedia.org/wiki/DEFLATE) encoding.
779 ///
780 /// These structures are defined in [RFC 1950](https://datatracker.ietf.org/doc/html/rfc1950)
781 /// and [RFC 1951](https://datatracker.ietf.org/doc/html/rfc1951).
782 Deflate,
783 /// A format using the [Lempel-Ziv coding](https://en.wikipedia.org/wiki/LZ77_and_LZ78#LZ77)
784 /// (LZ77), with a 32-bit CRC.
785 ///
786 /// This is the original format of the UNIX gzip program. The HTTP/1.1 standard also recommends
787 /// that the servers supporting this content-encoding should recognize `x-gzip` as an alias, for
788 /// compatibility purposes.
789 Gzip,
790 /// A format using the [Brotli](https://en.wikipedia.org/wiki/Brotli) algorithm.
791 Br,
792 /// A format using the [Zstd](https://en.wikipedia.org/wiki/Zstd) compression algorithm.
793 Zstd,
794}
795
796impl HttpEncoding {
797 /// Parses a [`HttpEncoding`] from its `content-encoding` header value.
798 pub fn parse(str: &str) -> Self {
799 let str = str.trim();
800 if str.eq_ignore_ascii_case("zstd") {
801 Self::Zstd
802 } else if str.eq_ignore_ascii_case("br") {
803 Self::Br
804 } else if str.eq_ignore_ascii_case("gzip") || str.eq_ignore_ascii_case("x-gzip") {
805 Self::Gzip
806 } else if str.eq_ignore_ascii_case("deflate") {
807 Self::Deflate
808 } else {
809 Self::Identity
810 }
811 }
812
813 /// Returns the value for the `content-encoding` HTTP header.
814 ///
815 /// Returns `None` for [`Identity`](Self::Identity), and `Some` for other encodings.
816 pub fn name(&self) -> Option<&'static str> {
817 match self {
818 Self::Identity => None,
819 Self::Deflate => Some("deflate"),
820 Self::Gzip => Some("gzip"),
821 Self::Br => Some("br"),
822 Self::Zstd => Some("zstd"),
823 }
824 }
825}
826
827/// Controls authentication with upstream.
828#[derive(Serialize, Deserialize, Debug)]
829#[serde(default)]
830pub struct Http {
831 /// Timeout for upstream requests in seconds.
832 ///
833 /// This timeout covers the time from sending the request until receiving response headers.
834 /// Neither the connection process and handshakes, nor reading the response body is covered in
835 /// this timeout.
836 pub timeout: u32,
837 /// Timeout for establishing connections with the upstream in seconds.
838 ///
839 /// This includes SSL handshakes. Relay reuses connections when the upstream supports connection
840 /// keep-alive. Connections are retained for a maximum 75 seconds, or 15 seconds of inactivity.
841 pub connection_timeout: u32,
842 /// Maximum interval between failed request retries in seconds.
843 pub max_retry_interval: u32,
844 /// The custom HTTP Host header to send to the upstream.
845 pub host_header: Option<String>,
846 /// The interval in seconds at which Relay attempts to reauthenticate with the upstream server.
847 ///
848 /// Re-authentication happens even when Relay is idle. If authentication fails, Relay reverts
849 /// back into startup mode and tries to establish a connection. During this time, incoming
850 /// envelopes will be buffered.
851 ///
852 /// Defaults to `600` (10 minutes).
853 pub auth_interval: Option<u64>,
854 /// The maximum time of experiencing uninterrupted network failures until Relay considers that
855 /// it has encountered a network outage in seconds.
856 ///
857 /// During a network outage relay will try to reconnect and will buffer all upstream messages
858 /// until it manages to reconnect.
859 pub outage_grace_period: u64,
860 /// The time Relay waits before retrying an upstream request, in seconds.
861 ///
862 /// This time is only used before going into a network outage mode.
863 pub retry_delay: u64,
864 /// The interval in seconds for continued failed project fetches at which Relay will error.
865 ///
866 /// A successful fetch resets this interval. Relay does nothing during long
867 /// times without emitting requests.
868 pub project_failure_interval: u64,
869 /// Content encoding to apply to upstream store requests.
870 ///
871 /// By default, Relay applies `zstd` content encoding to compress upstream requests. Compression
872 /// can be disabled to reduce CPU consumption, but at the expense of increased network traffic.
873 ///
874 /// This setting applies to all store requests of SDK data, including events, transactions,
875 /// envelopes and sessions. At the moment, this does not apply to Relay's internal queries.
876 ///
877 /// Available options are:
878 ///
879 /// - `identity`: Disables compression.
880 /// - `deflate`: Compression using a zlib header with deflate encoding.
881 /// - `gzip` (default): Compression using gzip.
882 /// - `br`: Compression using the brotli algorithm.
883 /// - `zstd`: Compression using the zstd algorithm.
884 pub encoding: HttpEncoding,
885 /// Submit metrics globally through a shared endpoint.
886 ///
887 /// As opposed to regular envelopes which are sent to an endpoint inferred from the project's
888 /// DSN, this submits metrics to the global endpoint with Relay authentication.
889 ///
890 /// This option does not have any effect on processing mode.
891 pub global_metrics: bool,
892 /// Controls whether the forward endpoint is enabled.
893 ///
894 /// The forward endpoint forwards unknown API requests to the upstream.
895 ///
896 /// Relay instances with processing enabled are expected to support the latest API and do never
897 /// support forwarding requests to Sentry.
898 pub forward: bool,
899 /// Enables an async DNS resolver through the `hickory-dns` crate, which uses an LRU cache for
900 /// the resolved entries. This helps to limit the amount of requests made to the upstream DNS
901 /// server (important for K8s infrastructure).
902 pub dns_cache: bool,
903}
904
905impl Default for Http {
906 fn default() -> Self {
907 Http {
908 timeout: 5,
909 connection_timeout: 3,
910 max_retry_interval: 60, // 1 minute
911 host_header: None,
912 auth_interval: Some(600), // 10 minutes
913 outage_grace_period: DEFAULT_NETWORK_OUTAGE_GRACE_PERIOD,
914 retry_delay: 1,
915 project_failure_interval: 90,
916 encoding: HttpEncoding::Zstd,
917 global_metrics: false,
918 forward: true,
919 dns_cache: true,
920 }
921 }
922}
923
924/// Strategy used to assign envelopes to buffer partitions.
925#[derive(Clone, Copy, Debug, Eq, PartialEq, Default, Deserialize, Serialize)]
926#[serde(rename_all = "snake_case")]
927pub enum EnvelopeSpoolPartitioning {
928 /// Envelopes with the same project key pair land on the same partition.
929 ///
930 /// Keeps per-project state, disk files, and event ordering co-located on one partition.
931 ProjectKeyPair,
932 /// Envelopes are distributed across partitions in a round-robin fashion (default).
933 ///
934 /// This prevents "hot" partitions when a single project pair dominates traffic, but has
935 /// trade-offs:
936 /// - Per-project LIFO ordering is no longer preserved across partitions.
937 /// - Per-partition memory footprint grows since every partition sees every project.
938 #[default]
939 RoundRobin,
940}
941
942/// Persistent buffering configuration for incoming envelopes.
943#[derive(Debug, Serialize, Deserialize)]
944#[serde(default)]
945pub struct EnvelopeSpool {
946 /// The path of the SQLite database file(s) which persist the data.
947 ///
948 /// Based on the number of partitions, more database files will be created within the same path.
949 ///
950 /// If not set, the envelopes will be buffered in memory.
951 pub path: Option<PathBuf>,
952 /// The maximum size of the buffer to keep, in bytes.
953 ///
954 /// When the on-disk buffer reaches this size, new envelopes will be dropped.
955 ///
956 /// Defaults to 500MB.
957 pub max_disk_size: ByteSize,
958 /// Size of the batch of compressed envelopes that are spooled to disk at once.
959 ///
960 /// Note that this is the size after which spooling will be triggered but it does not guarantee
961 /// that exactly this size will be spooled, it can be greater or equal.
962 ///
963 /// Defaults to 10 KiB.
964 pub batch_size_bytes: ByteSize,
965 /// Maximum time between receiving the envelope and processing it.
966 ///
967 /// When envelopes spend too much time in the buffer (e.g. because their project cannot be loaded),
968 /// they are dropped.
969 ///
970 /// Defaults to 24h.
971 pub max_envelope_delay_secs: u64,
972 /// The refresh frequency in ms of how frequently disk usage is updated by querying SQLite
973 /// internal page stats.
974 ///
975 /// Defaults to 100ms.
976 pub disk_usage_refresh_frequency_ms: u64,
977 /// The relative memory usage above which the buffer service will stop dequeueing envelopes.
978 ///
979 /// Only applies when [`Self::path`] is set.
980 ///
981 /// This value should be lower than [`Health::max_memory_percent`] to prevent flip-flopping.
982 ///
983 /// Warning: This threshold can cause the buffer service to deadlock when the buffer consumes
984 /// excessive memory (as influenced by [`Self::batch_size_bytes`]).
985 ///
986 /// This scenario arises when the buffer stops spooling due to reaching the
987 /// [`Self::max_backpressure_memory_percent`] limit, but the batch threshold for spooling
988 /// ([`Self::batch_size_bytes`]) is never reached. As a result, no data is spooled, memory usage
989 /// continues to grow, and the system becomes deadlocked.
990 ///
991 /// ### Example
992 /// Suppose the system has 1GB of available memory and is configured to spool only after
993 /// accumulating 10GB worth of envelopes. If Relay consumes 900MB of memory, it will stop
994 /// unspooling due to reaching the [`Self::max_backpressure_memory_percent`] threshold.
995 ///
996 /// However, because the buffer hasn't accumulated the 10GB needed to trigger spooling,
997 /// no data will be offloaded. Memory usage keeps increasing until it hits the
998 /// [`Health::max_memory_percent`] threshold, e.g., at 950MB. At this point:
999 ///
1000 /// - No more envelopes are accepted.
1001 /// - The buffer remains stuck, as unspooling won’t resume until memory drops below 900MB which
1002 /// will not happen.
1003 /// - A deadlock occurs, with the system unable to recover without manual intervention.
1004 ///
1005 /// Defaults to 90% (5% less than max memory).
1006 pub max_backpressure_memory_percent: f32,
1007 /// Number of partitions of the buffer.
1008 ///
1009 /// A partition is a separate instance of the buffer which has its own isolated queue, stacks
1010 /// and other resources.
1011 ///
1012 /// Defaults to 1.
1013 pub partitions: NonZeroU8,
1014 /// Strategy used to assign envelopes to buffer partitions.
1015 ///
1016 /// Defaults to partitioning by `ProjectKeyPair`, which keeps all envelopes of a given project
1017 /// pair on the same partition. See [`EnvelopeSpoolPartitioning`] for alternatives and
1018 /// trade-offs.
1019 pub partitioning: EnvelopeSpoolPartitioning,
1020 /// Whether the database defined in `path` is on an ephemeral storage disk.
1021 ///
1022 /// With `ephemeral: true`, Relay does not spool in-flight data to disk
1023 /// during graceful shutdown. Instead, it attempts to process all data before it terminates.
1024 ///
1025 /// Defaults to `false`.
1026 pub ephemeral: bool,
1027}
1028
1029impl Default for EnvelopeSpool {
1030 fn default() -> Self {
1031 Self {
1032 path: None,
1033 max_disk_size: ByteSize::mebibytes(500),
1034 batch_size_bytes: ByteSize::kibibytes(10),
1035 max_envelope_delay_secs: 24 * 60 * 60,
1036 disk_usage_refresh_frequency_ms: 100,
1037 max_backpressure_memory_percent: 0.8,
1038 partitions: NonZeroU8::new(1).unwrap(),
1039 partitioning: EnvelopeSpoolPartitioning::default(),
1040 ephemeral: false,
1041 }
1042 }
1043}
1044
1045/// Persistent buffering configuration.
1046#[derive(Debug, Serialize, Deserialize, Default)]
1047#[serde(default)]
1048pub struct Spool {
1049 /// Configuration for envelope spooling.
1050 pub envelopes: EnvelopeSpool,
1051}
1052
1053/// Controls internal caching behavior.
1054#[derive(Serialize, Deserialize, Debug)]
1055#[serde(default)]
1056pub struct Cache {
1057 /// The full project state will be requested by this Relay if set to `true`.
1058 pub project_request_full_config: bool,
1059 /// The cache timeout for project configurations in seconds.
1060 pub project_expiry: u32,
1061 /// Continue using project state this many seconds after cache expiry while a new state is
1062 /// being fetched. This is added on top of `project_expiry`.
1063 ///
1064 /// Default is 2 minutes.
1065 pub project_grace_period: u32,
1066 /// Refresh a project after the specified seconds.
1067 ///
1068 /// The time must be between expiry time and the grace period.
1069 ///
1070 /// By default there are no refreshes enabled.
1071 pub project_refresh_interval: Option<u32>,
1072 /// The cache timeout for downstream relay info (public keys) in seconds.
1073 pub relay_expiry: u32,
1074 /// Unused cache timeout for envelopes.
1075 ///
1076 /// The envelope buffer is instead controlled by `envelope_buffer_size`, which controls the
1077 /// maximum number of envelopes in the buffer. A time based configuration may be re-introduced
1078 /// at a later point.
1079 #[serde(alias = "event_expiry")]
1080 envelope_expiry: u32,
1081 /// The maximum amount of envelopes to queue before dropping them.
1082 #[serde(alias = "event_buffer_size")]
1083 envelope_buffer_size: u32,
1084 /// The cache timeout for non-existing entries.
1085 pub miss_expiry: u32,
1086 /// The buffer timeout for batched project config queries before sending them upstream in ms.
1087 pub batch_interval: u32,
1088 /// The buffer timeout for batched queries of downstream relays in ms. Defaults to 100ms.
1089 pub downstream_relays_batch_interval: u32,
1090 /// The maximum number of project configs to fetch from Sentry at once. Defaults to 500.
1091 ///
1092 /// `cache.batch_interval` controls how quickly batches are sent, this controls the batch size.
1093 pub batch_size: usize,
1094 /// Interval for watching local cache override files in seconds.
1095 pub file_interval: u32,
1096 /// Interval for fetching new global configs from the upstream, in seconds.
1097 pub global_config_fetch_interval: u32,
1098}
1099
1100impl Default for Cache {
1101 fn default() -> Self {
1102 Cache {
1103 project_request_full_config: false,
1104 project_expiry: 300, // 5 minutes
1105 project_grace_period: 120, // 2 minutes
1106 project_refresh_interval: None,
1107 relay_expiry: 3600, // 1 hour
1108 envelope_expiry: 600, // 10 minutes
1109 envelope_buffer_size: 1000,
1110 miss_expiry: 60, // 1 minute
1111 batch_interval: 100, // 100ms
1112 downstream_relays_batch_interval: 100, // 100ms
1113 batch_size: 500,
1114 file_interval: 10, // 10 seconds
1115 global_config_fetch_interval: 10, // 10 seconds
1116 }
1117 }
1118}
1119
1120/// Controls Sentry-internal event processing.
1121#[derive(Serialize, Deserialize, Debug)]
1122#[serde(default)]
1123pub struct Processing {
1124 /// True if the Relay should do processing. Defaults to `false`.
1125 pub enabled: bool,
1126 /// GeoIp DB file source.
1127 pub geoip_path: Option<PathBuf>,
1128 /// Maximum future timestamp of ingested events.
1129 pub max_secs_in_future: u32,
1130 /// Maximum age of ingested sessions. Older sessions will be dropped.
1131 pub max_session_secs_in_past: u32,
1132 /// Kafka producer configurations.
1133 pub kafka_config: Vec<KafkaConfigParam>,
1134 /// Additional kafka producer configurations.
1135 ///
1136 /// The `kafka_config` is the default producer configuration used for all topics. A secondary
1137 /// kafka config can be referenced in `topics:` like this:
1138 ///
1139 /// ```yaml
1140 /// secondary_kafka_configs:
1141 /// mycustomcluster:
1142 /// - name: 'bootstrap.servers'
1143 /// value: 'sentry_kafka_metrics:9093'
1144 ///
1145 /// topics:
1146 /// transactions: ingest-transactions
1147 /// metrics:
1148 /// name: ingest-metrics
1149 /// config: mycustomcluster
1150 /// ```
1151 ///
1152 /// Then metrics will be produced to an entirely different Kafka cluster.
1153 pub secondary_kafka_configs: BTreeMap<String, Vec<KafkaConfigParam>>,
1154 /// Kafka topic names.
1155 pub topics: TopicAssignments,
1156 /// Whether to validate the supplied topics by calling Kafka's metadata endpoints.
1157 pub kafka_validate_topics: bool,
1158 /// Redis hosts to connect to for storing state for rate limits.
1159 pub redis: Option<RedisConfigs>,
1160 /// Maximum chunk size of attachments for Kafka.
1161 pub attachment_chunk_size: ByteSize,
1162 /// Prefix to use when looking up project configs in Redis. Defaults to "relayconfig".
1163 pub projectconfig_cache_prefix: String,
1164 /// Maximum rate limit to report to clients.
1165 pub max_rate_limit: Option<u32>,
1166 /// Configures the quota cache ratio between `0.0` and `1.0`.
1167 ///
1168 /// The quota cache, caches the specified ratio of remaining quota in memory to reduce the
1169 /// amount of synchronizations required with Redis.
1170 ///
1171 /// The ratio is applied to the (per second) rate of the quota, not the total limit.
1172 /// For example a quota with limit 100 with a 10 second window is treated equally to a quota of
1173 /// 10 with a 1 second window.
1174 ///
1175 /// By default quota caching is disabled.
1176 pub quota_cache_ratio: Option<f32>,
1177 /// Relative amount of the total quota limit to which quota caching is applied.
1178 ///
1179 /// If exceeded, the rate limiter will no longer cache the quota and sync with Redis on every call instead.
1180 /// Lowering this value reduces the probability of incorrectly over-accepting.
1181 ///
1182 /// Must be between `0.0` and `1.0`, by default there is no limit configured.
1183 pub quota_cache_max: Option<f32>,
1184 /// Configuration for the objectstore service.
1185 #[serde(alias = "upload")]
1186 pub objectstore: ObjectstoreServiceConfig,
1187}
1188
1189impl Default for Processing {
1190 /// Constructs a disabled processing configuration.
1191 fn default() -> Self {
1192 Self {
1193 enabled: false,
1194 geoip_path: None,
1195 max_secs_in_future: 60, // 1 minute
1196 max_session_secs_in_past: 5 * 24 * 3600, // 5 days
1197 kafka_config: Vec::new(),
1198 secondary_kafka_configs: BTreeMap::new(),
1199 topics: TopicAssignments::default(),
1200 kafka_validate_topics: false,
1201 redis: None,
1202 attachment_chunk_size: ByteSize::mebibytes(1),
1203 projectconfig_cache_prefix: "relayconfig".to_owned(),
1204 max_rate_limit: Some(300), // 5 minutes
1205 quota_cache_ratio: None,
1206 quota_cache_max: None,
1207 objectstore: ObjectstoreServiceConfig::default(),
1208 }
1209 }
1210}
1211
1212/// Configuration for normalization in this Relay.
1213#[derive(Debug, Default, Serialize, Deserialize)]
1214#[serde(default)]
1215pub struct Normalization {
1216 /// Level of normalization for Relay to apply to incoming data.
1217 pub level: NormalizationLevel,
1218}
1219
1220/// Configuration for the level of normalization this Relay should do.
1221#[derive(Copy, Clone, Debug, Default, Serialize, Deserialize, Eq, PartialEq)]
1222#[serde(rename_all = "lowercase")]
1223pub enum NormalizationLevel {
1224 /// Runs normalization, excluding steps that break future compatibility.
1225 ///
1226 /// Processing Relays run [`NormalizationLevel::Full`] if this option is set.
1227 #[default]
1228 Default,
1229 /// Run full normalization.
1230 ///
1231 /// It includes steps that break future compatibility and should only run in
1232 /// the last layer of relays.
1233 Full,
1234}
1235
1236/// Configuration values for the outcome aggregator
1237#[derive(Serialize, Deserialize, Debug)]
1238#[serde(default)]
1239pub struct OutcomeAggregatorConfig {
1240 /// Defines the width of the buckets into which outcomes are aggregated, in seconds.
1241 pub bucket_interval: u64,
1242 /// Defines how often all buckets are flushed, in seconds.
1243 pub flush_interval: u64,
1244}
1245
1246impl Default for OutcomeAggregatorConfig {
1247 fn default() -> Self {
1248 Self {
1249 bucket_interval: 60,
1250 flush_interval: 120,
1251 }
1252 }
1253}
1254
1255/// Configuration options for objectstore's auth scheme.
1256#[derive(Serialize, Deserialize)]
1257pub struct ObjectstoreAuthConfig {
1258 /// Identifier for the private key used to sign objectstore's tokens. Must correspond to a
1259 /// public key configured in objectstore.
1260 pub key_id: String,
1261
1262 /// EdDSA private key used to sign Objectstore's tokens, in PEM format.
1263 pub signing_key: String,
1264}
1265
1266impl fmt::Debug for ObjectstoreAuthConfig {
1267 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1268 f.debug_struct("ObjectstoreAuthConfig")
1269 .field("key_id", &self.key_id)
1270 .field("signing_key", &"[redacted]")
1271 .finish()
1272 }
1273}
1274
1275/// Configuration values for the objectstore service.
1276#[derive(Serialize, Deserialize, Debug)]
1277#[serde(default)]
1278pub struct ObjectstoreServiceConfig {
1279 /// The base URL for the objectstore service.
1280 ///
1281 /// This defaults to [`None`], which means that the service will be disabled,
1282 /// unless a proper configuration is provided.
1283 pub objectstore_url: Option<String>,
1284
1285 /// Maximum concurrency of uploads.
1286 pub max_concurrent_requests: usize,
1287
1288 /// Maximum size of the service input queue when `max_concurrent_requests` is saturated.
1289 ///
1290 /// The service will loadshed if this threshold is reached.
1291 pub max_backlog: usize,
1292
1293 /// Maximum duration of an attachment upload in seconds. Uploads that take longer are discarded.
1294 ///
1295 /// NOTE: This timeout applies to attachments that are already in-memory. Streaming uploads
1296 /// might take longer and are restricted independently by [`Self::stream_timeout`].
1297 pub timeout: u64,
1298
1299 /// Maximum duration of an upload stream.
1300 ///
1301 /// Streams get a larger default timeout because their duration depends on the client
1302 /// as well as the server.
1303 pub stream_timeout: u64,
1304
1305 /// Time between upload attempts.
1306 pub retry_delay: f64,
1307
1308 /// Maximum number of attempts made to upload.
1309 pub max_attempts: NonZeroU16,
1310
1311 /// Configuration values for objectstore's auth scheme.
1312 pub auth: Option<ObjectstoreAuthConfig>,
1313}
1314
1315impl Default for ObjectstoreServiceConfig {
1316 fn default() -> Self {
1317 Self {
1318 objectstore_url: None,
1319 max_concurrent_requests: 10,
1320 max_backlog: 20,
1321 timeout: 60,
1322 stream_timeout: 5 * 60, // synced with `Upload::timeout`
1323 retry_delay: 1.0,
1324 max_attempts: NonZeroU16::new(5).unwrap(),
1325 auth: None,
1326 }
1327 }
1328}
1329
1330/// Determines how to emit outcomes.
1331/// For compatibility reasons, this can either be true, false or AsClientReports
1332#[derive(Copy, Clone, Debug, PartialEq, Eq)]
1333
1334pub enum EmitOutcomes {
1335 /// Do not emit any outcomes.
1336 None,
1337 /// Emit outcomes as client reports.
1338 AsClientReports,
1339 /// Emit outcomes as outcomes.
1340 AsOutcomes,
1341}
1342
1343impl EmitOutcomes {
1344 /// Returns true of outcomes are emitted via http, kafka, or client reports.
1345 pub fn any(&self) -> bool {
1346 !matches!(self, EmitOutcomes::None)
1347 }
1348}
1349
1350impl Serialize for EmitOutcomes {
1351 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1352 where
1353 S: Serializer,
1354 {
1355 // For compatibility, serialize None and AsOutcomes as booleans.
1356 match self {
1357 Self::None => serializer.serialize_bool(false),
1358 Self::AsClientReports => serializer.serialize_str("as_client_reports"),
1359 Self::AsOutcomes => serializer.serialize_bool(true),
1360 }
1361 }
1362}
1363
1364struct EmitOutcomesVisitor;
1365
1366impl Visitor<'_> for EmitOutcomesVisitor {
1367 type Value = EmitOutcomes;
1368
1369 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
1370 formatter.write_str("true, false, 'as_client_reports'")
1371 }
1372
1373 fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E>
1374 where
1375 E: serde::de::Error,
1376 {
1377 Ok(if v {
1378 EmitOutcomes::AsOutcomes
1379 } else {
1380 EmitOutcomes::None
1381 })
1382 }
1383
1384 fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
1385 where
1386 E: serde::de::Error,
1387 {
1388 match v {
1389 "as_client_reports" => Ok(EmitOutcomes::AsClientReports),
1390 _ => Err(E::invalid_value(Unexpected::Str(v), &self)),
1391 }
1392 }
1393}
1394
1395impl<'de> Deserialize<'de> for EmitOutcomes {
1396 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1397 where
1398 D: Deserializer<'de>,
1399 {
1400 deserializer.deserialize_any(EmitOutcomesVisitor)
1401 }
1402}
1403
1404/// Outcome generation specific configuration values.
1405#[derive(Serialize, Deserialize, Debug)]
1406#[serde(default)]
1407pub struct Outcomes {
1408 /// Controls whether outcomes will be emitted when processing is disabled.
1409 /// Processing relays always emit outcomes (for backwards compatibility).
1410 /// Can take the following values: false, "as_client_reports", true
1411 pub emit_outcomes: EmitOutcomes,
1412 /// The maximum number of outcomes that are batched before being sent
1413 /// via http to the upstream (only applies to non processing relays).
1414 pub batch_size: usize,
1415 /// The maximum time interval (in milliseconds) that an outcome may be batched
1416 /// via http to the upstream (only applies to non processing relays).
1417 pub batch_interval: u64,
1418 /// Defines the source string registered in the outcomes originating from
1419 /// this Relay (typically something like the region or the layer).
1420 pub source: Option<String>,
1421 /// Configures the outcome aggregator.
1422 pub aggregator: OutcomeAggregatorConfig,
1423}
1424
1425impl Default for Outcomes {
1426 fn default() -> Self {
1427 Outcomes {
1428 emit_outcomes: EmitOutcomes::AsClientReports,
1429 batch_size: 1000,
1430 batch_interval: 500,
1431 source: None,
1432 aggregator: OutcomeAggregatorConfig::default(),
1433 }
1434 }
1435}
1436
1437/// Minimal version of a config for dumping out.
1438#[derive(Serialize, Deserialize, Debug, Default)]
1439pub struct MinimalConfig {
1440 /// The relay part of the config.
1441 pub relay: Relay,
1442}
1443
1444impl MinimalConfig {
1445 /// Saves the config in the given config folder as config.yml
1446 pub fn save_in_folder<P: AsRef<Path>>(&self, p: P) -> anyhow::Result<()> {
1447 let path = p.as_ref();
1448 if fs::metadata(path).is_err() {
1449 fs::create_dir_all(path)
1450 .with_context(|| ConfigError::file(ConfigErrorKind::CouldNotOpenFile, path))?;
1451 }
1452 self.save(path)
1453 }
1454}
1455
1456impl ConfigObject for MinimalConfig {
1457 fn format() -> ConfigFormat {
1458 ConfigFormat::Yaml
1459 }
1460
1461 fn name() -> &'static str {
1462 "config"
1463 }
1464}
1465
1466/// Alternative serialization of RelayInfo for config file using snake case.
1467mod config_relay_info {
1468 use serde::ser::SerializeMap;
1469
1470 use super::*;
1471
1472 // Uses snake_case as opposed to camelCase.
1473 #[derive(Debug, Serialize, Deserialize, Clone)]
1474 struct RelayInfoConfig {
1475 public_key: PublicKey,
1476 #[serde(default)]
1477 internal: bool,
1478 }
1479
1480 impl From<RelayInfoConfig> for RelayInfo {
1481 fn from(v: RelayInfoConfig) -> Self {
1482 RelayInfo {
1483 public_key: v.public_key,
1484 internal: v.internal,
1485 }
1486 }
1487 }
1488
1489 impl From<RelayInfo> for RelayInfoConfig {
1490 fn from(v: RelayInfo) -> Self {
1491 RelayInfoConfig {
1492 public_key: v.public_key,
1493 internal: v.internal,
1494 }
1495 }
1496 }
1497
1498 pub(super) fn deserialize<'de, D>(des: D) -> Result<HashMap<RelayId, RelayInfo>, D::Error>
1499 where
1500 D: Deserializer<'de>,
1501 {
1502 let map = HashMap::<RelayId, RelayInfoConfig>::deserialize(des)?;
1503 Ok(map.into_iter().map(|(k, v)| (k, v.into())).collect())
1504 }
1505
1506 pub(super) fn serialize<S>(elm: &HashMap<RelayId, RelayInfo>, ser: S) -> Result<S::Ok, S::Error>
1507 where
1508 S: Serializer,
1509 {
1510 let mut map = ser.serialize_map(Some(elm.len()))?;
1511
1512 for (k, v) in elm {
1513 map.serialize_entry(k, &RelayInfoConfig::from(v.clone()))?;
1514 }
1515
1516 map.end()
1517 }
1518}
1519
1520/// Authentication options.
1521#[derive(Serialize, Deserialize, Debug)]
1522#[serde(default)]
1523pub struct AuthConfig {
1524 /// Controls responses from the readiness health check endpoint based on authentication.
1525 #[serde(skip_serializing_if = "is_default")]
1526 pub ready: ReadinessCondition,
1527
1528 /// Statically authenticated downstream relays.
1529 #[serde(with = "config_relay_info")]
1530 pub static_relays: HashMap<RelayId, RelayInfo>,
1531
1532 /// How old a signature can be before it is considered invalid, in seconds.
1533 ///
1534 /// Defaults to 5 minutes.
1535 pub signature_max_age: u64,
1536}
1537
1538impl Default for AuthConfig {
1539 fn default() -> Self {
1540 Self {
1541 ready: ReadinessCondition::default(),
1542 static_relays: HashMap::new(),
1543 signature_max_age: 300, // 5 minutes
1544 }
1545 }
1546}
1547
1548/// GeoIp database configuration options.
1549#[derive(Serialize, Deserialize, Debug, Default)]
1550pub struct GeoIpConfig {
1551 /// The path to GeoIP database.
1552 pub path: Option<PathBuf>,
1553}
1554
1555/// Cardinality Limiter configuration options.
1556#[derive(Serialize, Deserialize, Debug)]
1557#[serde(default)]
1558pub struct CardinalityLimiter {
1559 /// Cache vacuum interval in seconds for the in memory cache.
1560 ///
1561 /// The cache will scan for expired values based on this interval.
1562 ///
1563 /// Defaults to 180 seconds, 3 minutes.
1564 pub cache_vacuum_interval: u64,
1565}
1566
1567impl Default for CardinalityLimiter {
1568 fn default() -> Self {
1569 Self {
1570 cache_vacuum_interval: 180,
1571 }
1572 }
1573}
1574
1575/// Settings to control Relay's health checks.
1576///
1577/// After breaching one of the configured thresholds, Relay will
1578/// return an `unhealthy` status from its health endpoint.
1579#[derive(Serialize, Deserialize, Debug)]
1580#[serde(default)]
1581pub struct Health {
1582 /// Interval to refresh internal health checks.
1583 ///
1584 /// Shorter intervals will decrease the time it takes the health check endpoint to report
1585 /// issues, but can also increase sporadic unhealthy responses.
1586 ///
1587 /// Defaults to `3000`` (3 seconds).
1588 pub refresh_interval_ms: u64,
1589 /// Maximum memory watermark in bytes.
1590 ///
1591 /// By default, there is no absolute limit set and the watermark
1592 /// is only controlled by setting [`Self::max_memory_percent`].
1593 pub max_memory_bytes: Option<ByteSize>,
1594 /// Maximum memory watermark as a percentage of maximum system memory.
1595 ///
1596 /// Defaults to `0.95` (95%).
1597 pub max_memory_percent: f32,
1598 /// Health check probe timeout in milliseconds.
1599 ///
1600 /// Any probe exceeding the timeout will be considered failed.
1601 /// This limits the max execution time of Relay health checks.
1602 ///
1603 /// Defaults to 900 milliseconds.
1604 pub probe_timeout_ms: u64,
1605 /// The refresh frequency of memory stats which are used to poll memory
1606 /// usage of Relay.
1607 ///
1608 /// The implementation of memory stats guarantees that the refresh will happen at
1609 /// least every `x` ms since memory readings are lazy and are updated only if needed.
1610 pub memory_stat_refresh_frequency_ms: u64,
1611}
1612
1613impl Default for Health {
1614 fn default() -> Self {
1615 Self {
1616 refresh_interval_ms: 3000,
1617 max_memory_bytes: None,
1618 max_memory_percent: 0.95,
1619 probe_timeout_ms: 900,
1620 memory_stat_refresh_frequency_ms: 100,
1621 }
1622 }
1623}
1624
1625/// COGS configuration.
1626#[derive(Serialize, Deserialize, Debug)]
1627#[serde(default)]
1628pub struct Cogs {
1629 /// Maximium amount of COGS measurements allowed to backlog.
1630 ///
1631 /// Any additional COGS measurements recorded will be dropped.
1632 ///
1633 /// Defaults to `10_000`.
1634 pub max_queue_size: u64,
1635 /// Relay COGS resource id.
1636 ///
1637 /// All Relay related COGS measurements are emitted with this resource id.
1638 ///
1639 /// Defaults to `relay_service`.
1640 pub relay_resource_id: String,
1641}
1642
1643impl Default for Cogs {
1644 fn default() -> Self {
1645 Self {
1646 max_queue_size: 10_000,
1647 relay_resource_id: "relay_service".to_owned(),
1648 }
1649 }
1650}
1651
1652/// Configuration for the upload service.
1653#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
1654#[serde(default)]
1655pub struct Upload {
1656 /// Maximum number of uploads that the service accepts.
1657 ///
1658 /// Additional uploads will be rejected.
1659 pub max_concurrent_requests: usize,
1660 /// Maximum time spent trying to upload, in seconds.
1661 pub timeout: u64,
1662 /// The maximum time between creating the upload and uploading the data / the attachment placeholder.
1663 ///
1664 /// In seconds.
1665 pub max_age: i64,
1666}
1667
1668impl Default for Upload {
1669 fn default() -> Self {
1670 Self {
1671 max_concurrent_requests: 100,
1672 timeout: 5 * 60, // five minutes
1673 max_age: 60 * 60, // 1h
1674 }
1675 }
1676}
1677
1678/// All configuration values that can be deserialized from `config.yml`.
1679#[derive(Serialize, Deserialize, Debug, Default)]
1680#[serde(default)]
1681#[allow(missing_docs)]
1682pub struct ConfigValues {
1683 pub relay: Relay,
1684 pub http: Http,
1685 pub cache: Cache,
1686 pub spool: Spool,
1687 pub limits: Limits,
1688 pub logging: relay_log::LogConfig,
1689 pub routing: Routing,
1690 pub metrics: Metrics,
1691 pub sentry: relay_log::SentryConfig,
1692 pub processing: Processing,
1693 pub outcomes: Outcomes,
1694 pub aggregator: AggregatorServiceConfig,
1695 pub secondary_aggregators: Vec<ScopedAggregatorConfig>,
1696 pub auth: AuthConfig,
1697 pub geoip: GeoIpConfig,
1698 pub normalization: Normalization,
1699 pub cardinality_limiter: CardinalityLimiter,
1700 pub health: Health,
1701 pub cogs: Cogs,
1702 pub upload: Upload,
1703}
1704
1705impl ConfigObject for ConfigValues {
1706 fn format() -> ConfigFormat {
1707 ConfigFormat::Yaml
1708 }
1709
1710 fn name() -> &'static str {
1711 "config"
1712 }
1713}
1714
1715/// Config struct.
1716pub struct Config {
1717 values: ConfigValues,
1718 credentials: Option<Credentials>,
1719 path: PathBuf,
1720}
1721
1722impl fmt::Debug for Config {
1723 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1724 f.debug_struct("Config")
1725 .field("path", &self.path)
1726 .field("values", &self.values)
1727 .finish()
1728 }
1729}
1730
1731impl Config {
1732 /// Loads a config from a given config folder.
1733 pub fn from_path<P: AsRef<Path>>(path: P) -> anyhow::Result<Config> {
1734 let path = env::current_dir()
1735 .map(|x| x.join(path.as_ref()))
1736 .unwrap_or_else(|_| path.as_ref().to_path_buf());
1737
1738 let config = Config {
1739 values: ConfigValues::load(&path)?,
1740 credentials: if Credentials::path(&path).exists() {
1741 Some(Credentials::load(&path)?)
1742 } else {
1743 None
1744 },
1745 path: path.clone(),
1746 };
1747
1748 if cfg!(not(feature = "processing")) && config.processing_enabled() {
1749 return Err(ConfigError::file(ConfigErrorKind::ProcessingNotAvailable, &path).into());
1750 }
1751
1752 Ok(config)
1753 }
1754
1755 /// Creates a config from a JSON value.
1756 ///
1757 /// This is mostly useful for tests.
1758 pub fn from_json_value(value: serde_json::Value) -> anyhow::Result<Config> {
1759 Ok(Config {
1760 values: serde_json::from_value(value)
1761 .with_context(|| ConfigError::new(ConfigErrorKind::BadJson))?,
1762 credentials: None,
1763 path: PathBuf::new(),
1764 })
1765 }
1766
1767 /// Override configuration with values coming from other sources (e.g. env variables or
1768 /// command line parameters)
1769 pub fn apply_override(
1770 &mut self,
1771 mut overrides: OverridableConfig,
1772 ) -> anyhow::Result<&mut Self> {
1773 let relay = &mut self.values.relay;
1774
1775 if let Some(mode) = overrides.mode {
1776 relay.mode = mode
1777 .parse::<RelayMode>()
1778 .with_context(|| ConfigError::field("mode"))?;
1779 }
1780
1781 if let Some(deployment) = overrides.instance {
1782 relay.instance = deployment
1783 .parse::<RelayInstance>()
1784 .with_context(|| ConfigError::field("deployment"))?;
1785 }
1786
1787 if let Some(log_level) = overrides.log_level {
1788 self.values.logging.level = log_level.parse()?;
1789 }
1790
1791 if let Some(log_format) = overrides.log_format {
1792 self.values.logging.format = log_format.parse()?;
1793 }
1794
1795 if let Some(upstream) = overrides.upstream {
1796 relay.upstream = upstream
1797 .parse::<UpstreamDescriptor>()
1798 .with_context(|| ConfigError::field("upstream"))?;
1799 } else if let Some(upstream_dsn) = overrides.upstream_dsn {
1800 relay.upstream = upstream_dsn
1801 .parse::<Dsn>()
1802 .map(|dsn| UpstreamDescriptor::from_dsn(&dsn))
1803 .with_context(|| ConfigError::field("upstream_dsn"))?;
1804 }
1805
1806 if let Some(host) = overrides.host {
1807 relay.host = host
1808 .parse::<IpAddr>()
1809 .with_context(|| ConfigError::field("host"))?;
1810 }
1811
1812 if let Some(port) = overrides.port {
1813 relay.port = port
1814 .as_str()
1815 .parse()
1816 .with_context(|| ConfigError::field("port"))?;
1817 }
1818
1819 let processing = &mut self.values.processing;
1820 if let Some(enabled) = overrides.processing {
1821 match enabled.to_lowercase().as_str() {
1822 "true" | "1" => processing.enabled = true,
1823 "false" | "0" | "" => processing.enabled = false,
1824 _ => return Err(ConfigError::field("processing").into()),
1825 }
1826 }
1827
1828 if let Some(redis) = overrides.redis_url {
1829 processing.redis = Some(RedisConfigs::Unified(RedisConfig::single(redis)))
1830 }
1831
1832 if let Some(kafka_url) = overrides.kafka_url {
1833 let existing = processing
1834 .kafka_config
1835 .iter_mut()
1836 .find(|e| e.name == "bootstrap.servers");
1837
1838 if let Some(config_param) = existing {
1839 config_param.value = kafka_url;
1840 } else {
1841 processing.kafka_config.push(KafkaConfigParam {
1842 name: "bootstrap.servers".to_owned(),
1843 value: kafka_url,
1844 })
1845 }
1846 }
1847 // credentials overrides
1848 let id = if let Some(id) = overrides.id {
1849 let id = Uuid::parse_str(&id).with_context(|| ConfigError::field("id"))?;
1850 Some(id)
1851 } else {
1852 None
1853 };
1854 let public_key = if let Some(public_key) = overrides.public_key {
1855 let public_key = public_key
1856 .parse::<PublicKey>()
1857 .with_context(|| ConfigError::field("public_key"))?;
1858 Some(public_key)
1859 } else {
1860 None
1861 };
1862
1863 let secret_key = if let Some(secret_key) = overrides.secret_key {
1864 let secret_key = secret_key
1865 .parse::<SecretKey>()
1866 .with_context(|| ConfigError::field("secret_key"))?;
1867 Some(secret_key)
1868 } else {
1869 None
1870 };
1871 let outcomes = &mut self.values.outcomes;
1872 if overrides.outcome_source.is_some() {
1873 outcomes.source = overrides.outcome_source.take();
1874 }
1875
1876 if let Some(credentials) = &mut self.credentials {
1877 //we have existing credentials we may override some entries
1878 if let Some(id) = id {
1879 credentials.id = id;
1880 }
1881 if let Some(public_key) = public_key {
1882 credentials.public_key = public_key;
1883 }
1884 if let Some(secret_key) = secret_key {
1885 credentials.secret_key = secret_key
1886 }
1887 } else {
1888 //no existing credentials we may only create the full credentials
1889 match (id, public_key, secret_key) {
1890 (Some(id), Some(public_key), Some(secret_key)) => {
1891 self.credentials = Some(Credentials {
1892 secret_key,
1893 public_key,
1894 id,
1895 })
1896 }
1897 (None, None, None) => {
1898 // nothing provided, we'll just leave the credentials None, maybe we
1899 // don't need them in the current command or we'll override them later
1900 }
1901 _ => {
1902 return Err(ConfigError::field("incomplete credentials").into());
1903 }
1904 }
1905 }
1906
1907 let limits = &mut self.values.limits;
1908 if let Some(shutdown_timeout) = overrides.shutdown_timeout
1909 && let Ok(shutdown_timeout) = shutdown_timeout.parse::<u64>()
1910 {
1911 limits.shutdown_timeout = shutdown_timeout;
1912 }
1913
1914 if let Some(server_name) = overrides.server_name {
1915 self.values.sentry.server_name = Some(server_name.into());
1916 }
1917
1918 Ok(self)
1919 }
1920
1921 /// Checks if the config is already initialized.
1922 pub fn config_exists<P: AsRef<Path>>(path: P) -> bool {
1923 fs::metadata(ConfigValues::path(path.as_ref())).is_ok()
1924 }
1925
1926 /// Returns the filename of the config file.
1927 pub fn path(&self) -> &Path {
1928 &self.path
1929 }
1930
1931 /// Dumps out a YAML string of the values.
1932 pub fn to_yaml_string(&self) -> anyhow::Result<String> {
1933 serde_yaml::to_string(&self.values)
1934 .with_context(|| ConfigError::new(ConfigErrorKind::CouldNotWriteFile))
1935 }
1936
1937 /// Regenerates the relay credentials.
1938 ///
1939 /// This also writes the credentials back to the file.
1940 pub fn regenerate_credentials(&mut self, save: bool) -> anyhow::Result<()> {
1941 let creds = Credentials::generate();
1942 if save {
1943 creds.save(&self.path)?;
1944 }
1945 self.credentials = Some(creds);
1946 Ok(())
1947 }
1948
1949 /// Return the current credentials
1950 pub fn credentials(&self) -> Option<&Credentials> {
1951 self.credentials.as_ref()
1952 }
1953
1954 /// Set new credentials.
1955 ///
1956 /// This also writes the credentials back to the file.
1957 pub fn replace_credentials(
1958 &mut self,
1959 credentials: Option<Credentials>,
1960 ) -> anyhow::Result<bool> {
1961 if self.credentials == credentials {
1962 return Ok(false);
1963 }
1964
1965 match credentials {
1966 Some(ref creds) => {
1967 creds.save(&self.path)?;
1968 }
1969 None => {
1970 let path = Credentials::path(&self.path);
1971 if fs::metadata(&path).is_ok() {
1972 fs::remove_file(&path).with_context(|| {
1973 ConfigError::file(ConfigErrorKind::CouldNotWriteFile, &path)
1974 })?;
1975 }
1976 }
1977 }
1978
1979 self.credentials = credentials;
1980 Ok(true)
1981 }
1982
1983 /// Returns `true` if the config is ready to use.
1984 pub fn has_credentials(&self) -> bool {
1985 self.credentials.is_some()
1986 }
1987
1988 /// Returns the secret key if set.
1989 pub fn secret_key(&self) -> Option<&SecretKey> {
1990 self.credentials.as_ref().map(|x| &x.secret_key)
1991 }
1992
1993 /// Returns the public key if set.
1994 pub fn public_key(&self) -> Option<&PublicKey> {
1995 self.credentials.as_ref().map(|x| &x.public_key)
1996 }
1997
1998 /// Returns the relay ID.
1999 pub fn relay_id(&self) -> Option<&RelayId> {
2000 self.credentials.as_ref().map(|x| &x.id)
2001 }
2002
2003 /// Returns the relay mode.
2004 pub fn relay_mode(&self) -> RelayMode {
2005 self.values.relay.mode
2006 }
2007
2008 /// Returns the instance type of relay.
2009 pub fn relay_instance(&self) -> RelayInstance {
2010 self.values.relay.instance
2011 }
2012
2013 /// Returns the upstream target as descriptor.
2014 pub fn upstream(&self) -> &UpstreamDescriptor {
2015 &self.values.relay.upstream
2016 }
2017
2018 /// Returns the advertised upstream for downstream instances as descriptor.
2019 pub fn advertised_upstream(&self) -> Option<&UpstreamDescriptor> {
2020 self.values.relay.advertised_upstream.as_ref()
2021 }
2022
2023 /// Returns the custom HTTP "Host" header.
2024 pub fn http_host_header(&self) -> Option<&str> {
2025 self.values.http.host_header.as_deref()
2026 }
2027
2028 /// Returns the listen address.
2029 pub fn listen_addr(&self) -> SocketAddr {
2030 (self.values.relay.host, self.values.relay.port).into()
2031 }
2032
2033 /// Returns the listen address for internal APIs.
2034 ///
2035 /// Internal APIs are APIs which do not need to be publicly exposed,
2036 /// like health checks.
2037 ///
2038 /// Returns `None` when there is no explicit address configured for internal APIs,
2039 /// and they should instead be exposed on the main [`Self::listen_addr`].
2040 pub fn listen_addr_internal(&self) -> Option<SocketAddr> {
2041 match (
2042 self.values.relay.internal_host,
2043 self.values.relay.internal_port,
2044 ) {
2045 (Some(host), None) => Some((host, self.values.relay.port).into()),
2046 (None, Some(port)) => Some((self.values.relay.host, port).into()),
2047 (Some(host), Some(port)) => Some((host, port).into()),
2048 (None, None) => None,
2049 }
2050 }
2051
2052 /// Returns the TLS listen address.
2053 pub fn tls_listen_addr(&self) -> Option<SocketAddr> {
2054 if self.values.relay.tls_identity_path.is_some() {
2055 let port = self.values.relay.tls_port.unwrap_or(3443);
2056 Some((self.values.relay.host, port).into())
2057 } else {
2058 None
2059 }
2060 }
2061
2062 /// Returns the path to the identity bundle
2063 pub fn tls_identity_path(&self) -> Option<&Path> {
2064 self.values.relay.tls_identity_path.as_deref()
2065 }
2066
2067 /// Returns the password for the identity bundle
2068 pub fn tls_identity_password(&self) -> Option<&str> {
2069 self.values.relay.tls_identity_password.as_deref()
2070 }
2071
2072 /// Returns `true` when project IDs should be overriden rather than validated.
2073 ///
2074 /// Defaults to `false`, which requires project ID validation.
2075 pub fn override_project_ids(&self) -> bool {
2076 self.values.relay.override_project_ids
2077 }
2078
2079 /// Returns `true` if Relay requires authentication for readiness.
2080 ///
2081 /// See [`ReadinessCondition`] for more information.
2082 pub fn requires_auth(&self) -> bool {
2083 match self.values.auth.ready {
2084 ReadinessCondition::Authenticated => self.relay_mode() == RelayMode::Managed,
2085 ReadinessCondition::Always => false,
2086 }
2087 }
2088
2089 /// Returns the interval at which Realy should try to re-authenticate with the upstream.
2090 ///
2091 /// Always disabled in processing mode.
2092 pub fn http_auth_interval(&self) -> Option<Duration> {
2093 if self.processing_enabled() {
2094 return None;
2095 }
2096
2097 match self.values.http.auth_interval {
2098 None | Some(0) => None,
2099 Some(secs) => Some(Duration::from_secs(secs)),
2100 }
2101 }
2102
2103 /// The maximum time of experiencing uninterrupted network failures until Relay considers that
2104 /// it has encountered a network outage.
2105 pub fn http_outage_grace_period(&self) -> Duration {
2106 Duration::from_secs(self.values.http.outage_grace_period)
2107 }
2108
2109 /// Time Relay waits before retrying an upstream request.
2110 ///
2111 /// Before going into a network outage, Relay may fail to make upstream
2112 /// requests. This is the time Relay waits before retrying the same request.
2113 pub fn http_retry_delay(&self) -> Duration {
2114 Duration::from_secs(self.values.http.retry_delay)
2115 }
2116
2117 /// Time of continued project request failures before Relay emits an error.
2118 pub fn http_project_failure_interval(&self) -> Duration {
2119 Duration::from_secs(self.values.http.project_failure_interval)
2120 }
2121
2122 /// Content encoding of upstream requests.
2123 pub fn http_encoding(&self) -> HttpEncoding {
2124 self.values.http.encoding
2125 }
2126
2127 /// Returns whether metrics should be sent globally through a shared endpoint.
2128 pub fn http_global_metrics(&self) -> bool {
2129 self.values.http.global_metrics
2130 }
2131
2132 /// Returns `true` if Relay supports forwarding unknown API requests.
2133 ///
2134 /// Relay instances with processing enabled are expected to support the latest API and do never
2135 /// support forwarding requests to Sentry.
2136 pub fn http_forward(&self) -> bool {
2137 self.values.http.forward && !self.processing_enabled()
2138 }
2139
2140 /// Returns whether this Relay should emit outcomes.
2141 ///
2142 /// This is `true` either if `outcomes.emit_outcomes` is explicitly enabled, or if this Relay is
2143 /// in processing mode.
2144 pub fn emit_outcomes(&self) -> EmitOutcomes {
2145 if self.processing_enabled() {
2146 return EmitOutcomes::AsOutcomes;
2147 }
2148 self.values.outcomes.emit_outcomes
2149 }
2150
2151 /// Returns the maximum number of outcomes that are batched before being sent
2152 pub fn outcome_batch_size(&self) -> usize {
2153 self.values.outcomes.batch_size
2154 }
2155
2156 /// Returns the maximum interval that an outcome may be batched
2157 pub fn outcome_batch_interval(&self) -> Duration {
2158 Duration::from_millis(self.values.outcomes.batch_interval)
2159 }
2160
2161 /// The originating source of the outcome
2162 pub fn outcome_source(&self) -> Option<&str> {
2163 self.values.outcomes.source.as_deref()
2164 }
2165
2166 /// Returns the width of the buckets into which outcomes are aggregated, in seconds.
2167 pub fn outcome_aggregator(&self) -> &OutcomeAggregatorConfig {
2168 &self.values.outcomes.aggregator
2169 }
2170
2171 /// Returns logging configuration.
2172 pub fn logging(&self) -> &relay_log::LogConfig {
2173 &self.values.logging
2174 }
2175
2176 /// Returns logging configuration.
2177 pub fn sentry(&self) -> &relay_log::SentryConfig {
2178 &self.values.sentry
2179 }
2180
2181 /// Returns the addresses for statsd metrics.
2182 pub fn statsd_addr(&self) -> Option<&str> {
2183 self.values.metrics.statsd.as_deref()
2184 }
2185
2186 /// Returns the addresses for statsd metrics.
2187 pub fn statsd_buffer_size(&self) -> Option<usize> {
2188 self.values.metrics.statsd_buffer_size
2189 }
2190
2191 /// Return the prefix for statsd metrics.
2192 pub fn metrics_prefix(&self) -> &str {
2193 &self.values.metrics.prefix
2194 }
2195
2196 /// Returns the default tags for statsd metrics.
2197 pub fn metrics_default_tags(&self) -> &BTreeMap<String, String> {
2198 &self.values.metrics.default_tags
2199 }
2200
2201 /// Returns the name of the hostname tag that should be attached to each outgoing metric.
2202 pub fn metrics_hostname_tag(&self) -> Option<&str> {
2203 self.values.metrics.hostname_tag.as_deref()
2204 }
2205
2206 /// Returns the interval for periodic metrics emitted from Relay.
2207 ///
2208 /// `None` if periodic metrics are disabled.
2209 pub fn metrics_periodic_interval(&self) -> Option<Duration> {
2210 match self.values.metrics.periodic_secs {
2211 0 => None,
2212 secs => Some(Duration::from_secs(secs)),
2213 }
2214 }
2215
2216 /// Returns the default timeout for all upstream HTTP requests.
2217 pub fn http_timeout(&self) -> Duration {
2218 Duration::from_secs(self.values.http.timeout.into())
2219 }
2220
2221 /// Returns the connection timeout for all upstream HTTP requests.
2222 pub fn http_connection_timeout(&self) -> Duration {
2223 Duration::from_secs(self.values.http.connection_timeout.into())
2224 }
2225
2226 /// Returns the failed upstream request retry interval.
2227 pub fn http_max_retry_interval(&self) -> Duration {
2228 Duration::from_secs(self.values.http.max_retry_interval.into())
2229 }
2230
2231 /// Returns `true` if relay should use an in-process cache for DNS lookups.
2232 pub fn http_dns_cache(&self) -> bool {
2233 self.values.http.dns_cache
2234 }
2235
2236 /// Returns the expiry timeout for cached projects.
2237 pub fn project_cache_expiry(&self) -> Duration {
2238 Duration::from_secs(self.values.cache.project_expiry.into())
2239 }
2240
2241 /// Returns `true` if the full project state should be requested from upstream.
2242 pub fn request_full_project_config(&self) -> bool {
2243 self.values.cache.project_request_full_config
2244 }
2245
2246 /// Returns the expiry timeout for cached relay infos (public keys).
2247 pub fn relay_cache_expiry(&self) -> Duration {
2248 Duration::from_secs(self.values.cache.relay_expiry.into())
2249 }
2250
2251 /// Returns the maximum number of buffered envelopes
2252 pub fn envelope_buffer_size(&self) -> usize {
2253 self.values
2254 .cache
2255 .envelope_buffer_size
2256 .try_into()
2257 .unwrap_or(usize::MAX)
2258 }
2259
2260 /// Returns the expiry timeout for cached misses before trying to refetch.
2261 pub fn cache_miss_expiry(&self) -> Duration {
2262 Duration::from_secs(self.values.cache.miss_expiry.into())
2263 }
2264
2265 /// Returns the grace period for project caches.
2266 pub fn project_grace_period(&self) -> Duration {
2267 Duration::from_secs(self.values.cache.project_grace_period.into())
2268 }
2269
2270 /// Returns the refresh interval for a project.
2271 ///
2272 /// Validates the refresh time to be between the grace period and expiry.
2273 pub fn project_refresh_interval(&self) -> Option<Duration> {
2274 self.values
2275 .cache
2276 .project_refresh_interval
2277 .map(Into::into)
2278 .map(Duration::from_secs)
2279 }
2280
2281 /// Returns the duration in which batchable project config queries are
2282 /// collected before sending them in a single request.
2283 pub fn query_batch_interval(&self) -> Duration {
2284 Duration::from_millis(self.values.cache.batch_interval.into())
2285 }
2286
2287 /// Returns the duration in which downstream relays are requested from upstream.
2288 pub fn downstream_relays_batch_interval(&self) -> Duration {
2289 Duration::from_millis(self.values.cache.downstream_relays_batch_interval.into())
2290 }
2291
2292 /// Returns the interval in seconds in which local project configurations should be reloaded.
2293 pub fn local_cache_interval(&self) -> Duration {
2294 Duration::from_secs(self.values.cache.file_interval.into())
2295 }
2296
2297 /// Returns the interval in seconds in which fresh global configs should be
2298 /// fetched from upstream.
2299 pub fn global_config_fetch_interval(&self) -> Duration {
2300 Duration::from_secs(self.values.cache.global_config_fetch_interval.into())
2301 }
2302
2303 /// Returns the path of the buffer file if the `cache.persistent_envelope_buffer.path` is configured.
2304 ///
2305 /// In case a partition with id > 0 is supplied, the filename of the envelopes path will be
2306 /// suffixed with `.{partition_id}`.
2307 pub fn spool_envelopes_path(&self, partition_id: u8) -> Option<PathBuf> {
2308 let mut path = self
2309 .values
2310 .spool
2311 .envelopes
2312 .path
2313 .as_ref()
2314 .map(|path| path.to_owned())?;
2315
2316 if partition_id == 0 {
2317 return Some(path);
2318 }
2319
2320 let file_name = path.file_name().and_then(|f| f.to_str())?;
2321 let new_file_name = format!("{file_name}.{partition_id}");
2322 path.set_file_name(new_file_name);
2323
2324 Some(path)
2325 }
2326
2327 /// The maximum size of the buffer, in bytes.
2328 pub fn spool_envelopes_max_disk_size(&self) -> usize {
2329 self.values.spool.envelopes.max_disk_size.as_bytes()
2330 }
2331
2332 /// Number of encoded envelope bytes that need to be accumulated before
2333 /// flushing one batch to disk.
2334 pub fn spool_envelopes_batch_size_bytes(&self) -> usize {
2335 self.values.spool.envelopes.batch_size_bytes.as_bytes()
2336 }
2337
2338 /// Returns the time after which we drop envelopes as a [`Duration`] object.
2339 pub fn spool_envelopes_max_age(&self) -> Duration {
2340 Duration::from_secs(self.values.spool.envelopes.max_envelope_delay_secs)
2341 }
2342
2343 /// Returns the refresh frequency for disk usage monitoring as a [`Duration`] object.
2344 pub fn spool_disk_usage_refresh_frequency_ms(&self) -> Duration {
2345 Duration::from_millis(self.values.spool.envelopes.disk_usage_refresh_frequency_ms)
2346 }
2347
2348 /// Returns the relative memory usage up to which the disk buffer will unspool envelopes.
2349 pub fn spool_max_backpressure_memory_percent(&self) -> f32 {
2350 self.values.spool.envelopes.max_backpressure_memory_percent
2351 }
2352
2353 /// Returns the number of partitions for the buffer.
2354 pub fn spool_partitions(&self) -> NonZeroU8 {
2355 self.values.spool.envelopes.partitions
2356 }
2357
2358 /// Returns the strategy used to assign envelopes to buffer partitions.
2359 pub fn spool_partitioning(&self) -> EnvelopeSpoolPartitioning {
2360 self.values.spool.envelopes.partitioning
2361 }
2362
2363 /// Returns `true` if the data is stored on ephemeral disks.
2364 pub fn spool_ephemeral(&self) -> bool {
2365 self.values.spool.envelopes.ephemeral
2366 }
2367
2368 /// Returns the maximum size of an event payload in bytes.
2369 pub fn max_event_size(&self) -> usize {
2370 self.values.limits.max_event_size.as_bytes()
2371 }
2372
2373 /// Returns the maximum size of each attachment.
2374 pub fn max_attachment_size(&self) -> usize {
2375 self.values.limits.max_attachment_size.as_bytes()
2376 }
2377
2378 /// Returns the maximum size of a TUS upload request body.
2379 pub fn max_upload_size(&self) -> usize {
2380 self.values.limits.max_upload_size.as_bytes()
2381 }
2382
2383 /// Returns the maximum combined size of attachments or payloads containing attachments
2384 /// (minidump, unreal, standalone attachments) in bytes.
2385 pub fn max_attachments_size(&self) -> usize {
2386 self.values.limits.max_attachments_size.as_bytes()
2387 }
2388
2389 /// Returns the maximum combined size of client reports in bytes.
2390 pub fn max_client_reports_size(&self) -> usize {
2391 self.values.limits.max_client_reports_size.as_bytes()
2392 }
2393
2394 /// Returns the maximum payload size of a monitor check-in in bytes.
2395 pub fn max_check_in_size(&self) -> usize {
2396 self.values.limits.max_check_in_size.as_bytes()
2397 }
2398
2399 /// Returns the maximum payload size of a log in bytes.
2400 pub fn max_log_size(&self) -> usize {
2401 self.values.limits.max_log_size.as_bytes()
2402 }
2403
2404 /// Returns the maximum payload size of a span in bytes.
2405 pub fn max_span_size(&self) -> usize {
2406 self.values.limits.max_span_size.as_bytes()
2407 }
2408
2409 /// Returns the maximum payload size of an item container in bytes.
2410 pub fn max_container_size(&self) -> usize {
2411 self.values.limits.max_container_size.as_bytes()
2412 }
2413
2414 /// Returns the maximum payload size for logs integration items in bytes.
2415 pub fn max_logs_integration_size(&self) -> usize {
2416 // Not explicitly configured, inherited from the maximum size of a log container.
2417 self.max_container_size()
2418 }
2419
2420 /// Returns the maximum payload size for spans integration items in bytes.
2421 pub fn max_spans_integration_size(&self) -> usize {
2422 // Not explicitly configured, inherited from the maximum size of a span container.
2423 self.max_container_size()
2424 }
2425
2426 /// Returns the maximum size of an envelope payload in bytes.
2427 ///
2428 /// Individual item size limits still apply.
2429 pub fn max_envelope_size(&self) -> usize {
2430 self.values.limits.max_envelope_size.as_bytes()
2431 }
2432
2433 /// Returns the maximum number of sessions per envelope.
2434 pub fn max_session_count(&self) -> usize {
2435 self.values.limits.max_session_count
2436 }
2437
2438 /// Returns the maximum payload size of a statsd metric in bytes.
2439 pub fn max_statsd_size(&self) -> usize {
2440 self.values.limits.max_statsd_size.as_bytes()
2441 }
2442
2443 /// Returns the maximum payload size of metric buckets in bytes.
2444 pub fn max_metric_buckets_size(&self) -> usize {
2445 self.values.limits.max_metric_buckets_size.as_bytes()
2446 }
2447
2448 /// Returns the maximum payload size for general API requests.
2449 pub fn max_api_payload_size(&self) -> usize {
2450 self.values.limits.max_api_payload_size.as_bytes()
2451 }
2452
2453 /// Returns the maximum payload size for file uploads and chunks.
2454 pub fn max_api_file_upload_size(&self) -> usize {
2455 self.values.limits.max_api_file_upload_size.as_bytes()
2456 }
2457
2458 /// Returns the maximum payload size for chunks
2459 pub fn max_api_chunk_upload_size(&self) -> usize {
2460 self.values.limits.max_api_chunk_upload_size.as_bytes()
2461 }
2462
2463 /// Returns the maximum payload size for a profile
2464 pub fn max_profile_size(&self) -> usize {
2465 self.values.limits.max_profile_size.as_bytes()
2466 }
2467
2468 /// Returns the maximum payload size for a trace metric.
2469 pub fn max_trace_metric_size(&self) -> usize {
2470 self.values.limits.max_trace_metric_size.as_bytes()
2471 }
2472
2473 /// Returns the maximum payload size for a compressed replay.
2474 pub fn max_replay_compressed_size(&self) -> usize {
2475 self.values.limits.max_replay_compressed_size.as_bytes()
2476 }
2477
2478 /// Returns the maximum payload size for an uncompressed replay.
2479 pub fn max_replay_uncompressed_size(&self) -> usize {
2480 self.values.limits.max_replay_uncompressed_size.as_bytes()
2481 }
2482
2483 /// Returns the maximum message size for an uncompressed replay.
2484 ///
2485 /// This is greater than max_replay_compressed_size because
2486 /// it can include additional metadata about the replay in
2487 /// addition to the recording.
2488 pub fn max_replay_message_size(&self) -> usize {
2489 self.values.limits.max_replay_message_size.as_bytes()
2490 }
2491
2492 /// Returns the maximum number of active requests
2493 pub fn max_concurrent_requests(&self) -> usize {
2494 self.values.limits.max_concurrent_requests
2495 }
2496
2497 /// Returns the maximum number of active queries
2498 pub fn max_concurrent_queries(&self) -> usize {
2499 self.values.limits.max_concurrent_queries
2500 }
2501
2502 /// Returns the maximum combined size of keys of invalid attributes.
2503 pub fn max_removed_attribute_key_size(&self) -> usize {
2504 self.values.limits.max_removed_attribute_key_size.as_bytes()
2505 }
2506
2507 /// The maximum number of seconds a query is allowed to take across retries.
2508 pub fn query_timeout(&self) -> Duration {
2509 Duration::from_secs(self.values.limits.query_timeout)
2510 }
2511
2512 /// The maximum number of seconds to wait for pending envelopes after receiving a shutdown
2513 /// signal.
2514 pub fn shutdown_timeout(&self) -> Duration {
2515 Duration::from_secs(self.values.limits.shutdown_timeout)
2516 }
2517
2518 /// Returns the server keep-alive timeout in seconds.
2519 ///
2520 /// By default keep alive is set to a 5 seconds.
2521 pub fn keepalive_timeout(&self) -> Duration {
2522 Duration::from_secs(self.values.limits.keepalive_timeout)
2523 }
2524
2525 /// Returns the server idle timeout in seconds.
2526 pub fn idle_timeout(&self) -> Option<Duration> {
2527 self.values.limits.idle_timeout.map(Duration::from_secs)
2528 }
2529
2530 /// Returns the maximum connections.
2531 pub fn max_connections(&self) -> Option<usize> {
2532 self.values.limits.max_connections
2533 }
2534
2535 /// TCP listen backlog to configure on Relay's listening socket.
2536 pub fn tcp_listen_backlog(&self) -> u32 {
2537 self.values.limits.tcp_listen_backlog
2538 }
2539
2540 /// Returns the number of cores to use for thread pools.
2541 pub fn cpu_concurrency(&self) -> usize {
2542 self.values.limits.max_thread_count
2543 }
2544
2545 /// Returns the number of tasks that can run concurrently in the worker pool.
2546 pub fn pool_concurrency(&self) -> usize {
2547 self.values.limits.max_pool_concurrency
2548 }
2549
2550 /// Returns the maximum size of a project config query.
2551 pub fn query_batch_size(&self) -> usize {
2552 self.values.cache.batch_size
2553 }
2554
2555 /// Get filename for static project config.
2556 pub fn project_configs_path(&self) -> PathBuf {
2557 self.path.join("projects")
2558 }
2559
2560 /// True if the Relay should do processing.
2561 pub fn processing_enabled(&self) -> bool {
2562 self.values.processing.enabled
2563 }
2564
2565 /// Level of normalization for Relay to apply to incoming data.
2566 pub fn normalization_level(&self) -> NormalizationLevel {
2567 self.values.normalization.level
2568 }
2569
2570 /// The path to the GeoIp database required for event processing.
2571 pub fn geoip_path(&self) -> Option<&Path> {
2572 self.values
2573 .geoip
2574 .path
2575 .as_deref()
2576 .or(self.values.processing.geoip_path.as_deref())
2577 }
2578
2579 /// Maximum future timestamp of ingested data.
2580 ///
2581 /// Events past this timestamp will be adjusted to `now()`. Sessions will be dropped.
2582 pub fn max_secs_in_future(&self) -> i64 {
2583 self.values.processing.max_secs_in_future.into()
2584 }
2585
2586 /// Maximum age of ingested sessions. Older sessions will be dropped.
2587 pub fn max_session_secs_in_past(&self) -> i64 {
2588 self.values.processing.max_session_secs_in_past.into()
2589 }
2590
2591 /// Configuration name and list of Kafka configuration parameters for a given topic.
2592 pub fn kafka_configs(
2593 &self,
2594 topic: KafkaTopic,
2595 ) -> Result<KafkaTopicConfig<'_>, KafkaConfigError> {
2596 self.values.processing.topics.get(topic).kafka_configs(
2597 &self.values.processing.kafka_config,
2598 &self.values.processing.secondary_kafka_configs,
2599 )
2600 }
2601
2602 /// Whether to validate the topics against Kafka.
2603 pub fn kafka_validate_topics(&self) -> bool {
2604 self.values.processing.kafka_validate_topics
2605 }
2606
2607 /// All unused but configured topic assignments.
2608 pub fn unused_topic_assignments(&self) -> &relay_kafka::Unused {
2609 &self.values.processing.topics.unused
2610 }
2611
2612 /// Configuration of the objectstore service.
2613 pub fn objectstore(&self) -> &ObjectstoreServiceConfig {
2614 &self.values.processing.objectstore
2615 }
2616
2617 /// Configuration of the upload service.
2618 pub fn upload(&self) -> &Upload {
2619 &self.values.upload
2620 }
2621
2622 /// Redis servers to connect to for project configs, cardinality limits,
2623 /// rate limiting, and metrics metadata.
2624 pub fn redis(&self) -> Option<RedisConfigsRef<'_>> {
2625 let redis_configs = self.values.processing.redis.as_ref()?;
2626
2627 Some(build_redis_configs(
2628 redis_configs,
2629 self.cpu_concurrency() as u32,
2630 self.pool_concurrency() as u32,
2631 ))
2632 }
2633
2634 /// Chunk size of attachments in bytes.
2635 pub fn attachment_chunk_size(&self) -> usize {
2636 self.values.processing.attachment_chunk_size.as_bytes()
2637 }
2638
2639 /// Maximum metrics batch size in bytes.
2640 pub fn metrics_max_batch_size_bytes(&self) -> usize {
2641 self.values.aggregator.max_flush_bytes
2642 }
2643
2644 /// Default prefix to use when looking up project configs in Redis. This is only done when
2645 /// Relay is in processing mode.
2646 pub fn projectconfig_cache_prefix(&self) -> &str {
2647 &self.values.processing.projectconfig_cache_prefix
2648 }
2649
2650 /// Maximum rate limit to report to clients in seconds.
2651 pub fn max_rate_limit(&self) -> Option<u64> {
2652 self.values.processing.max_rate_limit.map(u32::into)
2653 }
2654
2655 /// Amount of remaining quota which is cached in memory.
2656 pub fn quota_cache_ratio(&self) -> Option<f32> {
2657 self.values.processing.quota_cache_ratio
2658 }
2659
2660 /// Maximum limit (ratio) for the in memory quota cache.
2661 pub fn quota_cache_max(&self) -> Option<f32> {
2662 self.values.processing.quota_cache_max
2663 }
2664
2665 /// Cache vacuum interval for the cardinality limiter in memory cache.
2666 ///
2667 /// The cache will scan for expired values based on this interval.
2668 pub fn cardinality_limiter_cache_vacuum_interval(&self) -> Duration {
2669 Duration::from_secs(self.values.cardinality_limiter.cache_vacuum_interval)
2670 }
2671
2672 /// Interval to refresh internal health checks.
2673 pub fn health_refresh_interval(&self) -> Duration {
2674 Duration::from_millis(self.values.health.refresh_interval_ms)
2675 }
2676
2677 /// Maximum memory watermark in bytes.
2678 pub fn health_max_memory_watermark_bytes(&self) -> u64 {
2679 self.values
2680 .health
2681 .max_memory_bytes
2682 .as_ref()
2683 .map_or(u64::MAX, |b| b.as_bytes() as u64)
2684 }
2685
2686 /// Maximum memory watermark as a percentage of maximum system memory.
2687 pub fn health_max_memory_watermark_percent(&self) -> f32 {
2688 self.values.health.max_memory_percent
2689 }
2690
2691 /// Health check probe timeout.
2692 pub fn health_probe_timeout(&self) -> Duration {
2693 Duration::from_millis(self.values.health.probe_timeout_ms)
2694 }
2695
2696 /// Refresh frequency for polling new memory stats.
2697 pub fn memory_stat_refresh_frequency_ms(&self) -> u64 {
2698 self.values.health.memory_stat_refresh_frequency_ms
2699 }
2700
2701 /// Maximum amount of COGS measurements buffered in memory.
2702 pub fn cogs_max_queue_size(&self) -> u64 {
2703 self.values.cogs.max_queue_size
2704 }
2705
2706 /// Resource ID to use for Relay COGS measurements.
2707 pub fn cogs_relay_resource_id(&self) -> &str {
2708 &self.values.cogs.relay_resource_id
2709 }
2710
2711 /// Returns configuration for the default metrics aggregator.
2712 pub fn default_aggregator_config(&self) -> &AggregatorServiceConfig {
2713 &self.values.aggregator
2714 }
2715
2716 /// Returns configuration for non-default metrics aggregator.
2717 pub fn secondary_aggregator_configs(&self) -> &Vec<ScopedAggregatorConfig> {
2718 &self.values.secondary_aggregators
2719 }
2720
2721 /// Returns aggregator config for a given metrics namespace.
2722 pub fn aggregator_config_for(&self, namespace: MetricNamespace) -> &AggregatorServiceConfig {
2723 for entry in &self.values.secondary_aggregators {
2724 if entry.condition.matches(Some(namespace)) {
2725 return &entry.config;
2726 }
2727 }
2728 &self.values.aggregator
2729 }
2730
2731 /// Return the statically configured Relays.
2732 pub fn static_relays(&self) -> &HashMap<RelayId, RelayInfo> {
2733 &self.values.auth.static_relays
2734 }
2735
2736 /// Returns the max age a signature is considered valid, in seconds.
2737 pub fn signature_max_age(&self) -> Duration {
2738 Duration::from_secs(self.values.auth.signature_max_age)
2739 }
2740
2741 /// Returns `true` if unknown items should be accepted and forwarded.
2742 pub fn accept_unknown_items(&self) -> bool {
2743 let forward = self.values.routing.accept_unknown_items;
2744 forward.unwrap_or_else(|| !self.processing_enabled())
2745 }
2746}
2747
2748impl Default for Config {
2749 fn default() -> Self {
2750 Self {
2751 values: ConfigValues::default(),
2752 credentials: None,
2753 path: PathBuf::new(),
2754 }
2755 }
2756}
2757
2758#[cfg(test)]
2759mod tests {
2760
2761 use super::*;
2762
2763 /// Regression test for renaming the envelope buffer flags.
2764 #[test]
2765 fn test_event_buffer_size() {
2766 let yaml = r###"
2767cache:
2768 event_buffer_size: 1000000
2769 event_expiry: 1800
2770"###;
2771
2772 let values: ConfigValues = serde_yaml::from_str(yaml).unwrap();
2773 assert_eq!(values.cache.envelope_buffer_size, 1_000_000);
2774 assert_eq!(values.cache.envelope_expiry, 1800);
2775 }
2776
2777 #[test]
2778 fn test_emit_outcomes() {
2779 for (serialized, deserialized) in &[
2780 ("true", EmitOutcomes::AsOutcomes),
2781 ("false", EmitOutcomes::None),
2782 ("\"as_client_reports\"", EmitOutcomes::AsClientReports),
2783 ] {
2784 let value: EmitOutcomes = serde_json::from_str(serialized).unwrap();
2785 assert_eq!(value, *deserialized);
2786 assert_eq!(serde_json::to_string(&value).unwrap(), *serialized);
2787 }
2788 }
2789
2790 #[test]
2791 fn test_emit_outcomes_invalid() {
2792 assert!(serde_json::from_str::<EmitOutcomes>("asdf").is_err());
2793 }
2794}