relay_config/
redis.rs

1use relay_redis::RedisConfigOptions;
2use serde::{Deserialize, Serialize};
3
4/// For small setups, `2 x limits.max_thread_count` does not leave enough headroom.
5/// In this case, we fall back to the old default.
6pub(crate) const DEFAULT_MIN_MAX_CONNECTIONS: u32 = 24;
7
8/// Additional configuration options for a redis client.
9#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
10#[serde(default)]
11pub struct PartialRedisConfigOptions {
12    /// Maximum number of connections managed by the pool.
13    ///
14    /// Defaults to 2x `limits.max_thread_count` or a minimum of 24.
15    #[serde(skip_serializing_if = "Option::is_none")]
16    pub max_connections: Option<u32>,
17    /// Sets the idle timeout used by the pool, in seconds.
18    ///
19    /// The idle timeout defines the maximum time a connection will be kept in the pool if unused.
20    pub idle_timeout: u64,
21    /// Sets the maximum time in seconds to wait when establishing a new Redis connection.
22    ///
23    /// If a connection cannot be established within this duration, it is considered a failure.
24    /// Applies when the pool needs to grow or create fresh connections.
25    #[serde(skip_serializing_if = "Option::is_none")]
26    pub create_timeout: Option<u64>,
27    /// Sets the maximum time in seconds to validate an existing connection when it is recycled.
28    ///
29    /// Recycling involves checking whether an idle connection is still alive before reuse.
30    /// If validation exceeds this timeout, the connection is discarded and a new fetch from the pool
31    /// is attempted.
32    #[serde(skip_serializing_if = "Option::is_none")]
33    pub recycle_timeout: Option<u64>,
34    /// Sets the maximum time, in seconds, that a caller is allowed to wait
35    /// when requesting a connection from the pool.
36    ///
37    /// If a connection does not become available within this period, the attempt
38    /// will fail with a timeout error. This setting helps prevent indefinite
39    /// blocking when the pool is exhausted.
40    #[serde(skip_serializing_if = "Option::is_none")]
41    pub wait_timeout: Option<u64>,
42    /// Sets the number of times after which the connection will check whether it is active when
43    /// being recycled.
44    ///
45    /// A frequency of 1, means that the connection will check whether it is active every time it
46    /// is recycled.
47    pub recycle_check_frequency: usize,
48}
49
50impl Default for PartialRedisConfigOptions {
51    fn default() -> Self {
52        Self {
53            max_connections: None,
54            idle_timeout: 60,
55            create_timeout: Some(3),
56            recycle_timeout: Some(2),
57            wait_timeout: None,
58            recycle_check_frequency: 100,
59        }
60    }
61}
62
63/// Configuration for connecting a redis client.
64#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
65#[serde(untagged)]
66enum RedisConfigFromFile {
67    /// Connect to a Redis cluster.
68    Cluster {
69        /// List of `redis://` urls to use in cluster mode.
70        ///
71        /// This can also be a single node which is configured in cluster mode.
72        cluster_nodes: Vec<String>,
73
74        /// Additional configuration options for the redis client and a connections pool.
75        #[serde(flatten)]
76        options: PartialRedisConfigOptions,
77    },
78
79    /// Connect to a single Redis instance.
80    ///
81    /// Contains the `redis://` url to the node.
82    Single(String),
83
84    /// Connect to a single Redis instance.
85    ///
86    /// Allows to provide more configuration options, e.g. `max_connections`.
87    SingleWithOpts {
88        /// Contains the `redis://` url to the node.
89        server: String,
90
91        /// Additional configuration options for the redis client and a connections pool.
92        #[serde(flatten)]
93        options: PartialRedisConfigOptions,
94    },
95}
96
97/// Redis configuration.
98#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
99#[serde(untagged)]
100pub enum RedisConfig {
101    /// Connect to a Redis Cluster.
102    Cluster {
103        /// Redis nodes urls of the cluster.
104        cluster_nodes: Vec<String>,
105        /// Options of the Redis config.
106        #[serde(flatten)]
107        options: PartialRedisConfigOptions,
108    },
109    /// Connect to a single Redis instance.
110    Single(SingleRedisConfig),
111}
112
113/// Struct that can serialize a string to a single Redis connection.
114///
115/// This struct is needed for backward compatibility.
116#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
117#[serde(untagged)]
118pub enum SingleRedisConfig {
119    #[doc(hidden)]
120    Simple(String),
121    #[doc(hidden)]
122    Detailed {
123        #[doc(hidden)]
124        server: String,
125        #[doc(hidden)]
126        #[serde(flatten)]
127        options: PartialRedisConfigOptions,
128    },
129}
130
131impl RedisConfig {
132    /// Creates a new Redis config for a single Redis instance with default settings.
133    pub fn single(server: String) -> Self {
134        RedisConfig::Single(SingleRedisConfig::Detailed {
135            server,
136            options: Default::default(),
137        })
138    }
139}
140
141impl From<RedisConfigFromFile> for RedisConfig {
142    fn from(value: RedisConfigFromFile) -> Self {
143        match value {
144            RedisConfigFromFile::Cluster {
145                cluster_nodes,
146                options,
147            } => Self::Cluster {
148                cluster_nodes,
149                options,
150            },
151            RedisConfigFromFile::Single(server) => Self::Single(SingleRedisConfig::Detailed {
152                server,
153                options: Default::default(),
154            }),
155            RedisConfigFromFile::SingleWithOpts { server, options } => {
156                Self::Single(SingleRedisConfig::Detailed { server, options })
157            }
158        }
159    }
160}
161
162/// Configurations for the various Redis pools used by Relay.
163#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)]
164#[serde(untagged)]
165pub enum RedisConfigs {
166    /// All pools should be configured the same way.
167    Unified(RedisConfig),
168    /// Individual configurations for each pool.
169    Individual {
170        /// Configuration for the `project_configs` pool.
171        project_configs: Box<RedisConfig>,
172        /// Configuration for the `cardinality` pool.
173        cardinality: Box<RedisConfig>,
174        /// Configuration for the `quotas` pool.
175        quotas: Box<RedisConfig>,
176    },
177}
178
179/// Reference to the [`RedisConfig`] with the final [`RedisConfigOptions`].
180#[derive(Clone, Debug)]
181pub enum RedisConfigRef<'a> {
182    /// Connect to a Redis Cluster.
183    Cluster {
184        /// Reference to the Redis nodes urls of the cluster.
185        cluster_nodes: &'a Vec<String>,
186        /// Options of the Redis config.
187        options: RedisConfigOptions,
188    },
189    /// Connect to a single Redis instance.
190    Single {
191        /// Reference to the Redis node url.
192        server: &'a String,
193        /// Options of the Redis config.
194        options: RedisConfigOptions,
195    },
196}
197
198/// Helper struct bundling connections and options for the various Redis pools.
199#[allow(clippy::large_enum_variant)]
200#[derive(Clone, Debug)]
201pub enum RedisConfigsRef<'a> {
202    /// Use one pool for everything.
203    Unified(RedisConfigRef<'a>),
204    /// Use an individual pool for each use case.
205    Individual {
206        /// Configuration for the `project_configs` pool.
207        project_configs: RedisConfigRef<'a>,
208        /// Configuration for the `cardinality` pool.
209        cardinality: RedisConfigRef<'a>,
210        /// Configuration for the `quotas` pool.
211        quotas: RedisConfigRef<'a>,
212    },
213}
214
215fn build_redis_config_options(
216    options: &PartialRedisConfigOptions,
217    default_connections: u32,
218) -> RedisConfigOptions {
219    let max_connections = options.max_connections.unwrap_or(default_connections);
220
221    RedisConfigOptions {
222        max_connections,
223        idle_timeout: options.idle_timeout,
224        create_timeout: options.create_timeout,
225        recycle_timeout: options.recycle_timeout,
226        wait_timeout: options.wait_timeout,
227        recycle_check_frequency: options.recycle_check_frequency,
228    }
229}
230
231/// Builds a [`RedisConfigsRef`] given a [`RedisConfig`].
232///
233/// The returned config contains more options for setting up Redis.
234pub(super) fn build_redis_config(
235    config: &RedisConfig,
236    default_connections: u32,
237) -> RedisConfigRef<'_> {
238    match config {
239        RedisConfig::Cluster {
240            cluster_nodes,
241            options,
242        } => RedisConfigRef::Cluster {
243            cluster_nodes,
244            options: build_redis_config_options(options, default_connections),
245        },
246        RedisConfig::Single(SingleRedisConfig::Detailed { server, options }) => {
247            RedisConfigRef::Single {
248                server,
249                options: build_redis_config_options(options, default_connections),
250            }
251        }
252        RedisConfig::Single(SingleRedisConfig::Simple(server)) => RedisConfigRef::Single {
253            server,
254            options: Default::default(),
255        },
256    }
257}
258
259/// Builds a [`RedisConfigsRef`] given a [`RedisConfigs`].
260///
261/// The returned configs contain more options for setting up Redis.
262pub(super) fn build_redis_configs(
263    configs: &RedisConfigs,
264    cpu_concurrency: u32,
265) -> RedisConfigsRef<'_> {
266    // The default number of connections is twice the concurrency since we are using async Redis
267    // so a single thread might be doing more I/O concurrently.
268    let default_connections = std::cmp::max(cpu_concurrency * 2, DEFAULT_MIN_MAX_CONNECTIONS);
269
270    match configs {
271        RedisConfigs::Unified(cfg) => {
272            let config = build_redis_config(cfg, default_connections);
273            RedisConfigsRef::Unified(config)
274        }
275        RedisConfigs::Individual {
276            project_configs,
277            cardinality,
278            quotas,
279        } => {
280            let project_configs = build_redis_config(project_configs, default_connections);
281            let cardinality = build_redis_config(cardinality, default_connections);
282            let quotas = build_redis_config(quotas, default_connections);
283            RedisConfigsRef::Individual {
284                project_configs,
285                cardinality,
286                quotas,
287            }
288        }
289    }
290}
291
292#[cfg(test)]
293mod tests {
294    use insta::assert_json_snapshot;
295
296    use super::*;
297
298    #[test]
299    fn test_redis_single_opts() {
300        let yaml = r#"
301server: "redis://127.0.0.1:6379"
302max_connections: 42
303connection_timeout: 5
304"#;
305
306        let config: RedisConfig = serde_yaml::from_str(yaml)
307            .expect("Parsed processing redis config: single with options");
308
309        assert_eq!(
310            config,
311            RedisConfig::Single(SingleRedisConfig::Detailed {
312                server: "redis://127.0.0.1:6379".to_owned(),
313                options: PartialRedisConfigOptions {
314                    max_connections: Some(42),
315                    ..Default::default()
316                }
317            })
318        );
319    }
320
321    #[test]
322    fn test_redis_single_opts_unified() {
323        let yaml = r#"
324server: "redis://127.0.0.1:6379"
325max_connections: 42
326connection_timeout: 5
327"#;
328
329        let config: RedisConfigs = serde_yaml::from_str(yaml)
330            .expect("Parsed processing redis config: single with options");
331
332        assert_eq!(
333            config,
334            RedisConfigs::Unified(RedisConfig::Single(SingleRedisConfig::Detailed {
335                server: "redis://127.0.0.1:6379".to_owned(),
336                options: PartialRedisConfigOptions {
337                    max_connections: Some(42),
338                    ..Default::default()
339                }
340            }))
341        );
342    }
343
344    #[test]
345    fn test_redis_individual() {
346        let yaml = r#"
347project_configs:
348    server: "redis://127.0.0.1:6379"
349    max_connections: 42
350cardinality:
351    server: "redis://127.0.0.1:6379"
352quotas:
353    cluster_nodes:
354        - "redis://127.0.0.1:6379"
355        - "redis://127.0.0.2:6379"
356    max_connections: 17
357"#;
358
359        let configs: RedisConfigs = serde_yaml::from_str(yaml)
360            .expect("Parsed processing redis configs: single with options");
361
362        let expected = RedisConfigs::Individual {
363            project_configs: Box::new(RedisConfig::Single(SingleRedisConfig::Detailed {
364                server: "redis://127.0.0.1:6379".to_owned(),
365                options: PartialRedisConfigOptions {
366                    max_connections: Some(42),
367                    ..Default::default()
368                },
369            })),
370            cardinality: Box::new(RedisConfig::Single(SingleRedisConfig::Detailed {
371                server: "redis://127.0.0.1:6379".to_owned(),
372                options: Default::default(),
373            })),
374            quotas: Box::new(RedisConfig::Cluster {
375                cluster_nodes: vec![
376                    "redis://127.0.0.1:6379".to_owned(),
377                    "redis://127.0.0.2:6379".to_owned(),
378                ],
379                options: PartialRedisConfigOptions {
380                    max_connections: Some(17),
381                    ..Default::default()
382                },
383            }),
384        };
385
386        assert_eq!(configs, expected);
387    }
388
389    #[test]
390    fn test_redis_single_serialize() {
391        let config = RedisConfig::Single(SingleRedisConfig::Detailed {
392            server: "redis://127.0.0.1:6379".to_owned(),
393            options: PartialRedisConfigOptions {
394                max_connections: Some(42),
395                ..Default::default()
396            },
397        });
398
399        assert_json_snapshot!(config, @r#"
400        {
401          "server": "redis://127.0.0.1:6379",
402          "max_connections": 42,
403          "idle_timeout": 60,
404          "create_timeout": 3,
405          "recycle_timeout": 2,
406          "recycle_check_frequency": 100
407        }
408        "#);
409    }
410
411    #[test]
412    fn test_redis_single_serialize_unified() {
413        let configs = RedisConfigs::Unified(RedisConfig::Single(SingleRedisConfig::Detailed {
414            server: "redis://127.0.0.1:6379".to_owned(),
415            options: PartialRedisConfigOptions {
416                max_connections: Some(42),
417                ..Default::default()
418            },
419        }));
420
421        assert_json_snapshot!(configs, @r#"
422        {
423          "server": "redis://127.0.0.1:6379",
424          "max_connections": 42,
425          "idle_timeout": 60,
426          "create_timeout": 3,
427          "recycle_timeout": 2,
428          "recycle_check_frequency": 100
429        }
430        "#);
431    }
432
433    #[test]
434    fn test_redis_single_opts_default() {
435        let yaml = r#"
436server: "redis://127.0.0.1:6379"
437"#;
438
439        let config: RedisConfig = serde_yaml::from_str(yaml)
440            .expect("Parsed processing redis config: single with options");
441
442        assert_eq!(
443            config,
444            RedisConfig::Single(SingleRedisConfig::Detailed {
445                server: "redis://127.0.0.1:6379".to_owned(),
446                options: Default::default()
447            })
448        );
449    }
450
451    // To make sure that we have backwards compatibility and still support the redis configuration
452    // when the single `redis://...` address is provided.
453    #[test]
454    fn test_redis_single() {
455        let yaml = r#"
456"redis://127.0.0.1:6379"
457"#;
458
459        let config: RedisConfig = serde_yaml::from_str(yaml)
460            .expect("Parsed processing redis config: single with options");
461
462        assert_eq!(
463            config,
464            RedisConfig::Single(SingleRedisConfig::Simple(
465                "redis://127.0.0.1:6379".to_owned()
466            ))
467        );
468    }
469
470    #[test]
471    fn test_redis_cluster_nodes_opts() {
472        let yaml = r#"
473cluster_nodes:
474    - "redis://127.0.0.1:6379"
475    - "redis://127.0.0.2:6379"
476max_connections: 10
477"#;
478
479        let config: RedisConfig = serde_yaml::from_str(yaml)
480            .expect("Parsed processing redis config: single with options");
481
482        assert_eq!(
483            config,
484            RedisConfig::Cluster {
485                cluster_nodes: vec![
486                    "redis://127.0.0.1:6379".to_owned(),
487                    "redis://127.0.0.2:6379".to_owned()
488                ],
489                options: PartialRedisConfigOptions {
490                    max_connections: Some(10),
491                    ..Default::default()
492                },
493            }
494        );
495    }
496
497    #[test]
498    fn test_redis_cluster_nodes_opts_unified() {
499        let yaml = r#"
500cluster_nodes:
501    - "redis://127.0.0.1:6379"
502    - "redis://127.0.0.2:6379"
503max_connections: 20
504"#;
505
506        let config: RedisConfigs = serde_yaml::from_str(yaml)
507            .expect("Parsed processing redis config: single with options");
508
509        assert_eq!(
510            config,
511            RedisConfigs::Unified(RedisConfig::Cluster {
512                cluster_nodes: vec![
513                    "redis://127.0.0.1:6379".to_owned(),
514                    "redis://127.0.0.2:6379".to_owned()
515                ],
516                options: PartialRedisConfigOptions {
517                    max_connections: Some(20),
518                    ..Default::default()
519                },
520            })
521        );
522    }
523
524    #[test]
525    fn test_redis_cluster_serialize() {
526        let config = RedisConfig::Cluster {
527            cluster_nodes: vec![
528                "redis://127.0.0.1:6379".to_owned(),
529                "redis://127.0.0.2:6379".to_owned(),
530            ],
531            options: PartialRedisConfigOptions {
532                max_connections: Some(42),
533                ..Default::default()
534            },
535        };
536
537        assert_json_snapshot!(config, @r#"
538        {
539          "cluster_nodes": [
540            "redis://127.0.0.1:6379",
541            "redis://127.0.0.2:6379"
542          ],
543          "max_connections": 42,
544          "idle_timeout": 60,
545          "create_timeout": 3,
546          "recycle_timeout": 2,
547          "recycle_check_frequency": 100
548        }
549        "#);
550    }
551
552    #[test]
553    fn test_redis_cluster_serialize_unified() {
554        let configs = RedisConfigs::Unified(RedisConfig::Cluster {
555            cluster_nodes: vec![
556                "redis://127.0.0.1:6379".to_owned(),
557                "redis://127.0.0.2:6379".to_owned(),
558            ],
559            options: PartialRedisConfigOptions {
560                max_connections: Some(42),
561                ..Default::default()
562            },
563        });
564
565        assert_json_snapshot!(configs, @r#"
566        {
567          "cluster_nodes": [
568            "redis://127.0.0.1:6379",
569            "redis://127.0.0.2:6379"
570          ],
571          "max_connections": 42,
572          "idle_timeout": 60,
573          "create_timeout": 3,
574          "recycle_timeout": 2,
575          "recycle_check_frequency": 100
576        }
577        "#);
578    }
579
580    #[test]
581    fn test_redis_serialize_individual() {
582        let configs = RedisConfigs::Individual {
583            project_configs: Box::new(RedisConfig::Single(SingleRedisConfig::Detailed {
584                server: "redis://127.0.0.1:6379".to_owned(),
585                options: PartialRedisConfigOptions {
586                    max_connections: Some(42),
587                    ..Default::default()
588                },
589            })),
590            cardinality: Box::new(RedisConfig::Single(SingleRedisConfig::Detailed {
591                server: "redis://127.0.0.1:6379".to_owned(),
592                options: Default::default(),
593            })),
594            quotas: Box::new(RedisConfig::Cluster {
595                cluster_nodes: vec![
596                    "redis://127.0.0.1:6379".to_owned(),
597                    "redis://127.0.0.2:6379".to_owned(),
598                ],
599                options: PartialRedisConfigOptions {
600                    max_connections: Some(84),
601                    ..Default::default()
602                },
603            }),
604        };
605
606        assert_json_snapshot!(configs, @r#"
607        {
608          "project_configs": {
609            "server": "redis://127.0.0.1:6379",
610            "max_connections": 42,
611            "idle_timeout": 60,
612            "create_timeout": 3,
613            "recycle_timeout": 2,
614            "recycle_check_frequency": 100
615          },
616          "cardinality": {
617            "server": "redis://127.0.0.1:6379",
618            "idle_timeout": 60,
619            "create_timeout": 3,
620            "recycle_timeout": 2,
621            "recycle_check_frequency": 100
622          },
623          "quotas": {
624            "cluster_nodes": [
625              "redis://127.0.0.1:6379",
626              "redis://127.0.0.2:6379"
627            ],
628            "max_connections": 84,
629            "idle_timeout": 60,
630            "create_timeout": 3,
631            "recycle_timeout": 2,
632            "recycle_check_frequency": 100
633          }
634        }
635        "#);
636    }
637}