relay_config/
redis.rs

1use relay_redis::RedisConfigOptions;
2use serde::{Deserialize, Serialize};
3
4/// For small setups, `2 x limits.max_thread_count` does not leave enough headroom.
5/// In this case, we fall back to the old default.
6pub(crate) const DEFAULT_MIN_MAX_CONNECTIONS: u32 = 24;
7
8/// Additional configuration options for a redis client.
9#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
10#[serde(default)]
11pub struct PartialRedisConfigOptions {
12    /// Maximum number of connections managed by the pool.
13    ///
14    /// Defaults to 2x `limits.max_thread_count` or a minimum of 24.
15    #[serde(skip_serializing_if = "Option::is_none")]
16    pub max_connections: Option<u32>,
17    /// Sets the idle timeout used by the pool, in seconds.
18    ///
19    /// The idle timeout defines the maximum time a connection will be kept in the pool if unused.
20    pub idle_timeout: u64,
21    /// Sets the maximum time in seconds to wait when establishing a new Redis connection.
22    ///
23    /// If a connection cannot be established within this duration, it is considered a failure.
24    /// Applies when the pool needs to grow or create fresh connections.
25    #[serde(skip_serializing_if = "Option::is_none")]
26    pub create_timeout: Option<u64>,
27    /// Sets the maximum time in seconds to validate an existing connection when it is recycled.
28    ///
29    /// Recycling involves checking whether an idle connection is still alive before reuse.
30    /// If validation exceeds this timeout, the connection is discarded and a new fetch from the pool
31    /// is attempted.
32    #[serde(skip_serializing_if = "Option::is_none")]
33    pub recycle_timeout: Option<u64>,
34    /// Sets the maximum time, in seconds, that a caller is allowed to wait
35    /// when requesting a connection from the pool.
36    ///
37    /// If a connection does not become available within this period, the attempt
38    /// will fail with a timeout error. This setting helps prevent indefinite
39    /// blocking when the pool is exhausted.
40    #[serde(skip_serializing_if = "Option::is_none")]
41    pub wait_timeout: Option<u64>,
42    /// Sets the maximum time in seconds to wait for a result when sending a Redis command.
43    ///
44    /// If a command exceeds this timeout, the connection will be recycled.
45    #[serde(skip_serializing_if = "Option::is_none")]
46    pub response_timeout: Option<u64>,
47    /// Sets the number of times after which the connection will check whether it is active when
48    /// being recycled.
49    ///
50    /// A frequency of 1, means that the connection will check whether it is active every time it
51    /// is recycled.
52    pub recycle_check_frequency: usize,
53}
54
55impl Default for PartialRedisConfigOptions {
56    fn default() -> Self {
57        Self {
58            max_connections: None,
59            idle_timeout: 60,
60            create_timeout: Some(3),
61            recycle_timeout: Some(2),
62            wait_timeout: None,
63            response_timeout: Some(30),
64            recycle_check_frequency: 100,
65        }
66    }
67}
68
69/// Configuration for connecting a redis client.
70#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
71#[serde(untagged)]
72enum RedisConfigFromFile {
73    /// Connect to a Redis cluster.
74    Cluster {
75        /// List of `redis://` urls to use in cluster mode.
76        ///
77        /// This can also be a single node which is configured in cluster mode.
78        cluster_nodes: Vec<String>,
79
80        /// Additional configuration options for the redis client and a connections pool.
81        #[serde(flatten)]
82        options: PartialRedisConfigOptions,
83    },
84
85    /// Connect to a single Redis instance.
86    ///
87    /// Contains the `redis://` url to the node.
88    Single(String),
89
90    /// Connect to a single Redis instance.
91    ///
92    /// Allows to provide more configuration options, e.g. `max_connections`.
93    SingleWithOpts {
94        /// Contains the `redis://` url to the node.
95        server: String,
96
97        /// Additional configuration options for the redis client and a connections pool.
98        #[serde(flatten)]
99        options: PartialRedisConfigOptions,
100    },
101}
102
103/// Redis configuration.
104#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
105#[serde(untagged)]
106pub enum RedisConfig {
107    /// Connect to a Redis Cluster.
108    Cluster {
109        /// Redis nodes urls of the cluster.
110        cluster_nodes: Vec<String>,
111        /// Options of the Redis config.
112        #[serde(flatten)]
113        options: PartialRedisConfigOptions,
114    },
115    /// Connect to a single Redis instance.
116    Single(SingleRedisConfig),
117}
118
119/// Struct that can serialize a string to a single Redis connection.
120///
121/// This struct is needed for backward compatibility.
122#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
123#[serde(untagged)]
124pub enum SingleRedisConfig {
125    #[doc(hidden)]
126    Simple(String),
127    #[doc(hidden)]
128    Detailed {
129        #[doc(hidden)]
130        server: String,
131        #[doc(hidden)]
132        #[serde(flatten)]
133        options: PartialRedisConfigOptions,
134    },
135}
136
137impl RedisConfig {
138    /// Creates a new Redis config for a single Redis instance with default settings.
139    pub fn single(server: String) -> Self {
140        RedisConfig::Single(SingleRedisConfig::Detailed {
141            server,
142            options: Default::default(),
143        })
144    }
145}
146
147impl From<RedisConfigFromFile> for RedisConfig {
148    fn from(value: RedisConfigFromFile) -> Self {
149        match value {
150            RedisConfigFromFile::Cluster {
151                cluster_nodes,
152                options,
153            } => Self::Cluster {
154                cluster_nodes,
155                options,
156            },
157            RedisConfigFromFile::Single(server) => Self::Single(SingleRedisConfig::Detailed {
158                server,
159                options: Default::default(),
160            }),
161            RedisConfigFromFile::SingleWithOpts { server, options } => {
162                Self::Single(SingleRedisConfig::Detailed { server, options })
163            }
164        }
165    }
166}
167
168/// Configurations for the various Redis pools used by Relay.
169#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)]
170#[serde(untagged)]
171pub enum RedisConfigs {
172    /// All pools should be configured the same way.
173    Unified(RedisConfig),
174    /// Individual configurations for each pool.
175    Individual {
176        /// Configuration for the `project_configs` pool.
177        project_configs: Box<RedisConfig>,
178        /// Configuration for the `cardinality` pool.
179        cardinality: Box<RedisConfig>,
180        /// Configuration for the `quotas` pool.
181        quotas: Box<RedisConfig>,
182    },
183}
184
185/// Reference to the [`RedisConfig`] with the final [`RedisConfigOptions`].
186#[derive(Clone, Debug)]
187pub enum RedisConfigRef<'a> {
188    /// Connect to a Redis Cluster.
189    Cluster {
190        /// Reference to the Redis nodes urls of the cluster.
191        cluster_nodes: &'a Vec<String>,
192        /// Options of the Redis config.
193        options: RedisConfigOptions,
194    },
195    /// Connect to a single Redis instance.
196    Single {
197        /// Reference to the Redis node url.
198        server: &'a String,
199        /// Options of the Redis config.
200        options: RedisConfigOptions,
201    },
202}
203
204/// Helper struct bundling connections and options for the various Redis pools.
205#[allow(clippy::large_enum_variant)]
206#[derive(Clone, Debug)]
207pub enum RedisConfigsRef<'a> {
208    /// Use one pool for everything.
209    Unified(RedisConfigRef<'a>),
210    /// Use an individual pool for each use case.
211    Individual {
212        /// Configuration for the `project_configs` pool.
213        project_configs: RedisConfigRef<'a>,
214        /// Configuration for the `cardinality` pool.
215        cardinality: RedisConfigRef<'a>,
216        /// Configuration for the `quotas` pool.
217        quotas: RedisConfigRef<'a>,
218    },
219}
220
221fn build_redis_config_options(
222    options: &PartialRedisConfigOptions,
223    default_connections: u32,
224) -> RedisConfigOptions {
225    let max_connections = options.max_connections.unwrap_or(default_connections);
226
227    RedisConfigOptions {
228        max_connections,
229        idle_timeout: options.idle_timeout,
230        create_timeout: options.create_timeout,
231        recycle_timeout: options.recycle_timeout,
232        wait_timeout: options.wait_timeout,
233        response_timeout: options.response_timeout,
234        recycle_check_frequency: options.recycle_check_frequency,
235    }
236}
237
238/// Builds a [`RedisConfigsRef`] given a [`RedisConfig`].
239///
240/// The returned config contains more options for setting up Redis.
241pub(super) fn build_redis_config(
242    config: &RedisConfig,
243    default_connections: u32,
244) -> RedisConfigRef<'_> {
245    match config {
246        RedisConfig::Cluster {
247            cluster_nodes,
248            options,
249        } => RedisConfigRef::Cluster {
250            cluster_nodes,
251            options: build_redis_config_options(options, default_connections),
252        },
253        RedisConfig::Single(SingleRedisConfig::Detailed { server, options }) => {
254            RedisConfigRef::Single {
255                server,
256                options: build_redis_config_options(options, default_connections),
257            }
258        }
259        RedisConfig::Single(SingleRedisConfig::Simple(server)) => RedisConfigRef::Single {
260            server,
261            options: Default::default(),
262        },
263    }
264}
265
266/// Builds a [`RedisConfigsRef`] given a [`RedisConfigs`].
267///
268/// The returned configs contain more options for setting up Redis.
269pub(super) fn build_redis_configs(
270    configs: &RedisConfigs,
271    cpu_concurrency: u32,
272    pool_concurrency: u32,
273) -> RedisConfigsRef<'_> {
274    // Project configurations are estimated to need around or less of `cpu_concurrency`
275    // connections, double this value for some extra headroom.
276    //
277    // For smaller setups give some extra headroom through `DEFAULT_MIN_MAX_CONNECTIONS`.
278    let project_connections = std::cmp::max(cpu_concurrency * 2, DEFAULT_MIN_MAX_CONNECTIONS);
279    // The total concurrency for rate limiting/processing is the total pool concurrency
280    // calculated by `cpu_concurrency * pool_concurrency`.
281    //
282    // No need to consider `DEFAULT_MIN_MAX_CONNECTIONS`, as these numbers are accurate.
283    let total_pool_concurrency = cpu_concurrency * pool_concurrency;
284
285    match configs {
286        RedisConfigs::Unified(cfg) => {
287            // A unified pool needs enough connections for project configs and enough to satisfy
288            // the processing pool.
289            let default_connections = total_pool_concurrency + project_connections;
290            let config = build_redis_config(cfg, default_connections);
291            RedisConfigsRef::Unified(config)
292        }
293        RedisConfigs::Individual {
294            project_configs,
295            cardinality,
296            quotas,
297        } => {
298            let project_configs = build_redis_config(project_configs, project_connections);
299            let cardinality = build_redis_config(cardinality, total_pool_concurrency);
300            let quotas = build_redis_config(quotas, total_pool_concurrency);
301            RedisConfigsRef::Individual {
302                project_configs,
303                cardinality,
304                quotas,
305            }
306        }
307    }
308}
309
310#[cfg(test)]
311mod tests {
312    use insta::assert_json_snapshot;
313
314    use super::*;
315
316    #[test]
317    fn test_redis_single_opts() {
318        let yaml = r#"
319server: "redis://127.0.0.1:6379"
320max_connections: 42
321connection_timeout: 5
322"#;
323
324        let config: RedisConfig = serde_yaml::from_str(yaml)
325            .expect("Parsed processing redis config: single with options");
326
327        assert_eq!(
328            config,
329            RedisConfig::Single(SingleRedisConfig::Detailed {
330                server: "redis://127.0.0.1:6379".to_owned(),
331                options: PartialRedisConfigOptions {
332                    max_connections: Some(42),
333                    ..Default::default()
334                }
335            })
336        );
337    }
338
339    #[test]
340    fn test_redis_single_opts_unified() {
341        let yaml = r#"
342server: "redis://127.0.0.1:6379"
343max_connections: 42
344connection_timeout: 5
345"#;
346
347        let config: RedisConfigs = serde_yaml::from_str(yaml)
348            .expect("Parsed processing redis config: single with options");
349
350        assert_eq!(
351            config,
352            RedisConfigs::Unified(RedisConfig::Single(SingleRedisConfig::Detailed {
353                server: "redis://127.0.0.1:6379".to_owned(),
354                options: PartialRedisConfigOptions {
355                    max_connections: Some(42),
356                    ..Default::default()
357                }
358            }))
359        );
360    }
361
362    #[test]
363    fn test_redis_individual() {
364        let yaml = r#"
365project_configs:
366    server: "redis://127.0.0.1:6379"
367    max_connections: 42
368cardinality:
369    server: "redis://127.0.0.1:6379"
370quotas:
371    cluster_nodes:
372        - "redis://127.0.0.1:6379"
373        - "redis://127.0.0.2:6379"
374    max_connections: 17
375"#;
376
377        let configs: RedisConfigs = serde_yaml::from_str(yaml)
378            .expect("Parsed processing redis configs: single with options");
379
380        let expected = RedisConfigs::Individual {
381            project_configs: Box::new(RedisConfig::Single(SingleRedisConfig::Detailed {
382                server: "redis://127.0.0.1:6379".to_owned(),
383                options: PartialRedisConfigOptions {
384                    max_connections: Some(42),
385                    ..Default::default()
386                },
387            })),
388            cardinality: Box::new(RedisConfig::Single(SingleRedisConfig::Detailed {
389                server: "redis://127.0.0.1:6379".to_owned(),
390                options: Default::default(),
391            })),
392            quotas: Box::new(RedisConfig::Cluster {
393                cluster_nodes: vec![
394                    "redis://127.0.0.1:6379".to_owned(),
395                    "redis://127.0.0.2:6379".to_owned(),
396                ],
397                options: PartialRedisConfigOptions {
398                    max_connections: Some(17),
399                    ..Default::default()
400                },
401            }),
402        };
403
404        assert_eq!(configs, expected);
405    }
406
407    #[test]
408    fn test_redis_single_serialize() {
409        let config = RedisConfig::Single(SingleRedisConfig::Detailed {
410            server: "redis://127.0.0.1:6379".to_owned(),
411            options: PartialRedisConfigOptions {
412                max_connections: Some(42),
413                ..Default::default()
414            },
415        });
416
417        assert_json_snapshot!(config, @r###"
418        {
419          "server": "redis://127.0.0.1:6379",
420          "max_connections": 42,
421          "idle_timeout": 60,
422          "create_timeout": 3,
423          "recycle_timeout": 2,
424          "response_timeout": 30,
425          "recycle_check_frequency": 100
426        }
427        "###);
428    }
429
430    #[test]
431    fn test_redis_single_serialize_unified() {
432        let configs = RedisConfigs::Unified(RedisConfig::Single(SingleRedisConfig::Detailed {
433            server: "redis://127.0.0.1:6379".to_owned(),
434            options: PartialRedisConfigOptions {
435                max_connections: Some(42),
436                ..Default::default()
437            },
438        }));
439
440        assert_json_snapshot!(configs, @r###"
441        {
442          "server": "redis://127.0.0.1:6379",
443          "max_connections": 42,
444          "idle_timeout": 60,
445          "create_timeout": 3,
446          "recycle_timeout": 2,
447          "response_timeout": 30,
448          "recycle_check_frequency": 100
449        }
450        "###);
451    }
452
453    #[test]
454    fn test_redis_single_opts_default() {
455        let yaml = r#"
456server: "redis://127.0.0.1:6379"
457"#;
458
459        let config: RedisConfig = serde_yaml::from_str(yaml)
460            .expect("Parsed processing redis config: single with options");
461
462        assert_eq!(
463            config,
464            RedisConfig::Single(SingleRedisConfig::Detailed {
465                server: "redis://127.0.0.1:6379".to_owned(),
466                options: Default::default()
467            })
468        );
469    }
470
471    // To make sure that we have backwards compatibility and still support the redis configuration
472    // when the single `redis://...` address is provided.
473    #[test]
474    fn test_redis_single() {
475        let yaml = r#"
476"redis://127.0.0.1:6379"
477"#;
478
479        let config: RedisConfig = serde_yaml::from_str(yaml)
480            .expect("Parsed processing redis config: single with options");
481
482        assert_eq!(
483            config,
484            RedisConfig::Single(SingleRedisConfig::Simple(
485                "redis://127.0.0.1:6379".to_owned()
486            ))
487        );
488    }
489
490    #[test]
491    fn test_redis_cluster_nodes_opts() {
492        let yaml = r#"
493cluster_nodes:
494    - "redis://127.0.0.1:6379"
495    - "redis://127.0.0.2:6379"
496max_connections: 10
497"#;
498
499        let config: RedisConfig = serde_yaml::from_str(yaml)
500            .expect("Parsed processing redis config: single with options");
501
502        assert_eq!(
503            config,
504            RedisConfig::Cluster {
505                cluster_nodes: vec![
506                    "redis://127.0.0.1:6379".to_owned(),
507                    "redis://127.0.0.2:6379".to_owned()
508                ],
509                options: PartialRedisConfigOptions {
510                    max_connections: Some(10),
511                    ..Default::default()
512                },
513            }
514        );
515    }
516
517    #[test]
518    fn test_redis_cluster_nodes_opts_unified() {
519        let yaml = r#"
520cluster_nodes:
521    - "redis://127.0.0.1:6379"
522    - "redis://127.0.0.2:6379"
523max_connections: 20
524"#;
525
526        let config: RedisConfigs = serde_yaml::from_str(yaml)
527            .expect("Parsed processing redis config: single with options");
528
529        assert_eq!(
530            config,
531            RedisConfigs::Unified(RedisConfig::Cluster {
532                cluster_nodes: vec![
533                    "redis://127.0.0.1:6379".to_owned(),
534                    "redis://127.0.0.2:6379".to_owned()
535                ],
536                options: PartialRedisConfigOptions {
537                    max_connections: Some(20),
538                    ..Default::default()
539                },
540            })
541        );
542    }
543
544    #[test]
545    fn test_redis_cluster_serialize() {
546        let config = RedisConfig::Cluster {
547            cluster_nodes: vec![
548                "redis://127.0.0.1:6379".to_owned(),
549                "redis://127.0.0.2:6379".to_owned(),
550            ],
551            options: PartialRedisConfigOptions {
552                max_connections: Some(42),
553                ..Default::default()
554            },
555        };
556
557        assert_json_snapshot!(config, @r###"
558        {
559          "cluster_nodes": [
560            "redis://127.0.0.1:6379",
561            "redis://127.0.0.2:6379"
562          ],
563          "max_connections": 42,
564          "idle_timeout": 60,
565          "create_timeout": 3,
566          "recycle_timeout": 2,
567          "response_timeout": 30,
568          "recycle_check_frequency": 100
569        }
570        "###);
571    }
572
573    #[test]
574    fn test_redis_cluster_serialize_unified() {
575        let configs = RedisConfigs::Unified(RedisConfig::Cluster {
576            cluster_nodes: vec![
577                "redis://127.0.0.1:6379".to_owned(),
578                "redis://127.0.0.2:6379".to_owned(),
579            ],
580            options: PartialRedisConfigOptions {
581                max_connections: Some(42),
582                ..Default::default()
583            },
584        });
585
586        assert_json_snapshot!(configs, @r###"
587        {
588          "cluster_nodes": [
589            "redis://127.0.0.1:6379",
590            "redis://127.0.0.2:6379"
591          ],
592          "max_connections": 42,
593          "idle_timeout": 60,
594          "create_timeout": 3,
595          "recycle_timeout": 2,
596          "response_timeout": 30,
597          "recycle_check_frequency": 100
598        }
599        "###);
600    }
601
602    #[test]
603    fn test_redis_serialize_individual() {
604        let configs = RedisConfigs::Individual {
605            project_configs: Box::new(RedisConfig::Single(SingleRedisConfig::Detailed {
606                server: "redis://127.0.0.1:6379".to_owned(),
607                options: PartialRedisConfigOptions {
608                    max_connections: Some(42),
609                    ..Default::default()
610                },
611            })),
612            cardinality: Box::new(RedisConfig::Single(SingleRedisConfig::Detailed {
613                server: "redis://127.0.0.1:6379".to_owned(),
614                options: Default::default(),
615            })),
616            quotas: Box::new(RedisConfig::Cluster {
617                cluster_nodes: vec![
618                    "redis://127.0.0.1:6379".to_owned(),
619                    "redis://127.0.0.2:6379".to_owned(),
620                ],
621                options: PartialRedisConfigOptions {
622                    max_connections: Some(84),
623                    ..Default::default()
624                },
625            }),
626        };
627
628        assert_json_snapshot!(configs, @r###"
629        {
630          "project_configs": {
631            "server": "redis://127.0.0.1:6379",
632            "max_connections": 42,
633            "idle_timeout": 60,
634            "create_timeout": 3,
635            "recycle_timeout": 2,
636            "response_timeout": 30,
637            "recycle_check_frequency": 100
638          },
639          "cardinality": {
640            "server": "redis://127.0.0.1:6379",
641            "idle_timeout": 60,
642            "create_timeout": 3,
643            "recycle_timeout": 2,
644            "response_timeout": 30,
645            "recycle_check_frequency": 100
646          },
647          "quotas": {
648            "cluster_nodes": [
649              "redis://127.0.0.1:6379",
650              "redis://127.0.0.2:6379"
651            ],
652            "max_connections": 84,
653            "idle_timeout": 60,
654            "create_timeout": 3,
655            "recycle_timeout": 2,
656            "response_timeout": 30,
657            "recycle_check_frequency": 100
658          }
659        }
660        "###);
661    }
662}