Skip to main content

relay_config/
redis.rs

1use relay_redis::RedisConfigOptions;
2use serde::{Deserialize, Serialize};
3
4/// For small setups, `2 x limits.max_thread_count` does not leave enough headroom.
5/// In this case, we fall back to the old default.
6pub(crate) const DEFAULT_MIN_MAX_CONNECTIONS: u32 = 24;
7
8/// Additional configuration options for a redis client.
9#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
10#[serde(default)]
11pub struct PartialRedisConfigOptions {
12    /// Maximum number of connections managed by the pool.
13    ///
14    /// Defaults to 2x `limits.max_thread_count` or a minimum of 24.
15    #[serde(skip_serializing_if = "Option::is_none")]
16    pub max_connections: Option<u32>,
17    /// Sets the idle timeout used by the pool, in seconds.
18    ///
19    /// The idle timeout defines the maximum time a connection will be kept in the pool if unused.
20    pub idle_timeout: u64,
21    /// Sets the maximum time in seconds to wait when establishing a new Redis connection.
22    ///
23    /// If a connection cannot be established within this duration, it is considered a failure.
24    /// Applies when the pool needs to grow or create fresh connections.
25    #[serde(skip_serializing_if = "Option::is_none")]
26    pub create_timeout: Option<u64>,
27    /// Sets the maximum time in seconds to validate an existing connection when it is recycled.
28    ///
29    /// Recycling involves checking whether an idle connection is still alive before reuse.
30    /// If validation exceeds this timeout, the connection is discarded and a new fetch from the pool
31    /// is attempted.
32    #[serde(skip_serializing_if = "Option::is_none")]
33    pub recycle_timeout: Option<u64>,
34    /// Sets the maximum time, in seconds, that a caller is allowed to wait
35    /// when requesting a connection from the pool.
36    ///
37    /// If a connection does not become available within this period, the attempt
38    /// will fail with a timeout error. This setting helps prevent indefinite
39    /// blocking when the pool is exhausted.
40    #[serde(skip_serializing_if = "Option::is_none")]
41    pub wait_timeout: Option<u64>,
42    /// Sets the maximum time in seconds to wait for a result when sending a Redis command.
43    ///
44    /// If a command exceeds this timeout, the connection will be recycled.
45    #[serde(skip_serializing_if = "Option::is_none")]
46    pub response_timeout: Option<u64>,
47}
48
49impl Default for PartialRedisConfigOptions {
50    fn default() -> Self {
51        Self {
52            max_connections: None,
53            idle_timeout: 60,
54            create_timeout: Some(3),
55            recycle_timeout: Some(2),
56            wait_timeout: None,
57            response_timeout: Some(30),
58        }
59    }
60}
61
62/// Configuration for connecting a redis client.
63#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
64#[serde(untagged)]
65enum RedisConfigFromFile {
66    /// Connect to a Redis cluster.
67    Cluster {
68        /// List of `redis://` urls to use in cluster mode.
69        ///
70        /// This can also be a single node which is configured in cluster mode.
71        cluster_nodes: Vec<String>,
72
73        /// Additional configuration options for the redis client and a connections pool.
74        #[serde(flatten)]
75        options: PartialRedisConfigOptions,
76    },
77
78    /// Connect to a single Redis instance.
79    ///
80    /// Contains the `redis://` url to the node.
81    Single(String),
82
83    /// Connect to a single Redis instance.
84    ///
85    /// Allows to provide more configuration options, e.g. `max_connections`.
86    SingleWithOpts {
87        /// Contains the `redis://` url to the node.
88        server: String,
89
90        /// Additional configuration options for the redis client and a connections pool.
91        #[serde(flatten)]
92        options: PartialRedisConfigOptions,
93    },
94}
95
96/// Redis configuration.
97#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
98#[serde(untagged)]
99pub enum RedisConfig {
100    /// Connect to a Redis Cluster.
101    Cluster {
102        /// Redis nodes urls of the cluster.
103        cluster_nodes: Vec<String>,
104        /// Options of the Redis config.
105        #[serde(flatten)]
106        options: PartialRedisConfigOptions,
107    },
108    /// Connect to a single Redis instance.
109    Single(SingleRedisConfig),
110}
111
112/// Struct that can serialize a string to a single Redis connection.
113///
114/// This struct is needed for backward compatibility.
115#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
116#[serde(untagged)]
117pub enum SingleRedisConfig {
118    #[doc(hidden)]
119    Simple(String),
120    #[doc(hidden)]
121    Detailed {
122        #[doc(hidden)]
123        server: String,
124        #[doc(hidden)]
125        #[serde(flatten)]
126        options: PartialRedisConfigOptions,
127    },
128}
129
130impl RedisConfig {
131    /// Creates a new Redis config for a single Redis instance with default settings.
132    pub fn single(server: String) -> Self {
133        RedisConfig::Single(SingleRedisConfig::Detailed {
134            server,
135            options: Default::default(),
136        })
137    }
138}
139
140impl From<RedisConfigFromFile> for RedisConfig {
141    fn from(value: RedisConfigFromFile) -> Self {
142        match value {
143            RedisConfigFromFile::Cluster {
144                cluster_nodes,
145                options,
146            } => Self::Cluster {
147                cluster_nodes,
148                options,
149            },
150            RedisConfigFromFile::Single(server) => Self::Single(SingleRedisConfig::Detailed {
151                server,
152                options: Default::default(),
153            }),
154            RedisConfigFromFile::SingleWithOpts { server, options } => {
155                Self::Single(SingleRedisConfig::Detailed { server, options })
156            }
157        }
158    }
159}
160
161/// Configurations for the various Redis pools used by Relay.
162#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)]
163#[serde(untagged)]
164pub enum RedisConfigs {
165    /// All pools should be configured the same way.
166    Unified(RedisConfig),
167    /// Individual configurations for each pool.
168    Individual {
169        /// Configuration for the `project_configs` pool.
170        project_configs: Box<RedisConfig>,
171        /// Configuration for the `quotas` pool.
172        quotas: Box<RedisConfig>,
173    },
174}
175
176/// Reference to the [`RedisConfig`] with the final [`RedisConfigOptions`].
177#[derive(Clone, Debug)]
178pub enum RedisConfigRef<'a> {
179    /// Connect to a Redis Cluster.
180    Cluster {
181        /// Reference to the Redis nodes urls of the cluster.
182        cluster_nodes: &'a Vec<String>,
183        /// Options of the Redis config.
184        options: RedisConfigOptions,
185    },
186    /// Connect to a single Redis instance.
187    Single {
188        /// Reference to the Redis node url.
189        server: &'a String,
190        /// Options of the Redis config.
191        options: RedisConfigOptions,
192    },
193}
194
195/// Helper struct bundling connections and options for the various Redis pools.
196#[allow(clippy::large_enum_variant)]
197#[derive(Clone, Debug)]
198pub enum RedisConfigsRef<'a> {
199    /// Use one pool for everything.
200    Unified(RedisConfigRef<'a>),
201    /// Use an individual pool for each use case.
202    Individual {
203        /// Configuration for the `project_configs` pool.
204        project_configs: RedisConfigRef<'a>,
205        /// Configuration for the `quotas` pool.
206        quotas: RedisConfigRef<'a>,
207    },
208}
209
210fn build_redis_config_options(
211    options: &PartialRedisConfigOptions,
212    default_connections: u32,
213) -> RedisConfigOptions {
214    let max_connections = options.max_connections.unwrap_or(default_connections);
215
216    RedisConfigOptions {
217        max_connections,
218        idle_timeout: options.idle_timeout,
219        create_timeout: options.create_timeout,
220        recycle_timeout: options.recycle_timeout,
221        wait_timeout: options.wait_timeout,
222        response_timeout: options.response_timeout,
223    }
224}
225
226/// Builds a [`RedisConfigsRef`] given a [`RedisConfig`].
227///
228/// The returned config contains more options for setting up Redis.
229pub(super) fn build_redis_config(
230    config: &RedisConfig,
231    default_connections: u32,
232) -> RedisConfigRef<'_> {
233    match config {
234        RedisConfig::Cluster {
235            cluster_nodes,
236            options,
237        } => RedisConfigRef::Cluster {
238            cluster_nodes,
239            options: build_redis_config_options(options, default_connections),
240        },
241        RedisConfig::Single(SingleRedisConfig::Detailed { server, options }) => {
242            RedisConfigRef::Single {
243                server,
244                options: build_redis_config_options(options, default_connections),
245            }
246        }
247        RedisConfig::Single(SingleRedisConfig::Simple(server)) => RedisConfigRef::Single {
248            server,
249            options: Default::default(),
250        },
251    }
252}
253
254/// Builds a [`RedisConfigsRef`] given a [`RedisConfigs`].
255///
256/// The returned configs contain more options for setting up Redis.
257pub(super) fn build_redis_configs(
258    configs: &RedisConfigs,
259    cpu_concurrency: u32,
260    pool_concurrency: u32,
261) -> RedisConfigsRef<'_> {
262    // Project configurations are estimated to need around or less of `cpu_concurrency`
263    // connections, double this value for some extra headroom.
264    //
265    // For smaller setups give some extra headroom through `DEFAULT_MIN_MAX_CONNECTIONS`.
266    let project_connections = std::cmp::max(cpu_concurrency * 2, DEFAULT_MIN_MAX_CONNECTIONS);
267    // The total concurrency for rate limiting/processing is the total pool concurrency
268    // calculated by `cpu_concurrency * pool_concurrency`.
269    //
270    // No need to consider `DEFAULT_MIN_MAX_CONNECTIONS`, as these numbers are accurate.
271    let total_pool_concurrency = cpu_concurrency * pool_concurrency;
272
273    match configs {
274        RedisConfigs::Unified(cfg) => {
275            // A unified pool needs enough connections for project configs and enough to satisfy
276            // the processing pool.
277            let default_connections = total_pool_concurrency + project_connections;
278            let config = build_redis_config(cfg, default_connections);
279            RedisConfigsRef::Unified(config)
280        }
281        RedisConfigs::Individual {
282            project_configs,
283            quotas,
284        } => {
285            let project_configs = build_redis_config(project_configs, project_connections);
286            let quotas = build_redis_config(quotas, total_pool_concurrency);
287            RedisConfigsRef::Individual {
288                project_configs,
289                quotas,
290            }
291        }
292    }
293}
294
295#[cfg(test)]
296mod tests {
297    use insta::assert_json_snapshot;
298
299    use super::*;
300
301    #[test]
302    fn test_redis_single_opts() {
303        let yaml = r#"
304server: "redis://127.0.0.1:6379"
305max_connections: 42
306connection_timeout: 5
307"#;
308
309        let config: RedisConfig = serde_yaml::from_str(yaml)
310            .expect("Parsed processing redis config: single with options");
311
312        assert_eq!(
313            config,
314            RedisConfig::Single(SingleRedisConfig::Detailed {
315                server: "redis://127.0.0.1:6379".to_owned(),
316                options: PartialRedisConfigOptions {
317                    max_connections: Some(42),
318                    ..Default::default()
319                }
320            })
321        );
322    }
323
324    #[test]
325    fn test_redis_single_opts_unified() {
326        let yaml = r#"
327server: "redis://127.0.0.1:6379"
328max_connections: 42
329connection_timeout: 5
330"#;
331
332        let config: RedisConfigs = serde_yaml::from_str(yaml)
333            .expect("Parsed processing redis config: single with options");
334
335        assert_eq!(
336            config,
337            RedisConfigs::Unified(RedisConfig::Single(SingleRedisConfig::Detailed {
338                server: "redis://127.0.0.1:6379".to_owned(),
339                options: PartialRedisConfigOptions {
340                    max_connections: Some(42),
341                    ..Default::default()
342                }
343            }))
344        );
345    }
346
347    #[test]
348    fn test_redis_individual() {
349        let yaml = r#"
350project_configs:
351    server: "redis://127.0.0.1:6379"
352    max_connections: 42
353quotas:
354    cluster_nodes:
355        - "redis://127.0.0.1:6379"
356        - "redis://127.0.0.2:6379"
357    max_connections: 17
358"#;
359
360        let configs: RedisConfigs = serde_yaml::from_str(yaml)
361            .expect("Parsed processing redis configs: single with options");
362
363        let expected = RedisConfigs::Individual {
364            project_configs: Box::new(RedisConfig::Single(SingleRedisConfig::Detailed {
365                server: "redis://127.0.0.1:6379".to_owned(),
366                options: PartialRedisConfigOptions {
367                    max_connections: Some(42),
368                    ..Default::default()
369                },
370            })),
371            quotas: Box::new(RedisConfig::Cluster {
372                cluster_nodes: vec![
373                    "redis://127.0.0.1:6379".to_owned(),
374                    "redis://127.0.0.2:6379".to_owned(),
375                ],
376                options: PartialRedisConfigOptions {
377                    max_connections: Some(17),
378                    ..Default::default()
379                },
380            }),
381        };
382
383        assert_eq!(configs, expected);
384    }
385
386    #[test]
387    fn test_redis_single_serialize() {
388        let config = RedisConfig::Single(SingleRedisConfig::Detailed {
389            server: "redis://127.0.0.1:6379".to_owned(),
390            options: PartialRedisConfigOptions {
391                max_connections: Some(42),
392                ..Default::default()
393            },
394        });
395
396        assert_json_snapshot!(config, @r#"
397        {
398          "server": "redis://127.0.0.1:6379",
399          "max_connections": 42,
400          "idle_timeout": 60,
401          "create_timeout": 3,
402          "recycle_timeout": 2,
403          "response_timeout": 30
404        }
405        "#);
406    }
407
408    #[test]
409    fn test_redis_single_serialize_unified() {
410        let configs = RedisConfigs::Unified(RedisConfig::Single(SingleRedisConfig::Detailed {
411            server: "redis://127.0.0.1:6379".to_owned(),
412            options: PartialRedisConfigOptions {
413                max_connections: Some(42),
414                ..Default::default()
415            },
416        }));
417
418        assert_json_snapshot!(configs, @r#"
419        {
420          "server": "redis://127.0.0.1:6379",
421          "max_connections": 42,
422          "idle_timeout": 60,
423          "create_timeout": 3,
424          "recycle_timeout": 2,
425          "response_timeout": 30
426        }
427        "#);
428    }
429
430    #[test]
431    fn test_redis_single_opts_default() {
432        let yaml = r#"
433server: "redis://127.0.0.1:6379"
434"#;
435
436        let config: RedisConfig = serde_yaml::from_str(yaml)
437            .expect("Parsed processing redis config: single with options");
438
439        assert_eq!(
440            config,
441            RedisConfig::Single(SingleRedisConfig::Detailed {
442                server: "redis://127.0.0.1:6379".to_owned(),
443                options: Default::default()
444            })
445        );
446    }
447
448    // To make sure that we have backwards compatibility and still support the redis configuration
449    // when the single `redis://...` address is provided.
450    #[test]
451    fn test_redis_single() {
452        let yaml = r#"
453"redis://127.0.0.1:6379"
454"#;
455
456        let config: RedisConfig = serde_yaml::from_str(yaml)
457            .expect("Parsed processing redis config: single with options");
458
459        assert_eq!(
460            config,
461            RedisConfig::Single(SingleRedisConfig::Simple(
462                "redis://127.0.0.1:6379".to_owned()
463            ))
464        );
465    }
466
467    #[test]
468    fn test_redis_cluster_nodes_opts() {
469        let yaml = r#"
470cluster_nodes:
471    - "redis://127.0.0.1:6379"
472    - "redis://127.0.0.2:6379"
473max_connections: 10
474"#;
475
476        let config: RedisConfig = serde_yaml::from_str(yaml)
477            .expect("Parsed processing redis config: single with options");
478
479        assert_eq!(
480            config,
481            RedisConfig::Cluster {
482                cluster_nodes: vec![
483                    "redis://127.0.0.1:6379".to_owned(),
484                    "redis://127.0.0.2:6379".to_owned()
485                ],
486                options: PartialRedisConfigOptions {
487                    max_connections: Some(10),
488                    ..Default::default()
489                },
490            }
491        );
492    }
493
494    #[test]
495    fn test_redis_cluster_nodes_opts_unified() {
496        let yaml = r#"
497cluster_nodes:
498    - "redis://127.0.0.1:6379"
499    - "redis://127.0.0.2:6379"
500max_connections: 20
501"#;
502
503        let config: RedisConfigs = serde_yaml::from_str(yaml)
504            .expect("Parsed processing redis config: single with options");
505
506        assert_eq!(
507            config,
508            RedisConfigs::Unified(RedisConfig::Cluster {
509                cluster_nodes: vec![
510                    "redis://127.0.0.1:6379".to_owned(),
511                    "redis://127.0.0.2:6379".to_owned()
512                ],
513                options: PartialRedisConfigOptions {
514                    max_connections: Some(20),
515                    ..Default::default()
516                },
517            })
518        );
519    }
520
521    #[test]
522    fn test_redis_cluster_serialize() {
523        let config = RedisConfig::Cluster {
524            cluster_nodes: vec![
525                "redis://127.0.0.1:6379".to_owned(),
526                "redis://127.0.0.2:6379".to_owned(),
527            ],
528            options: PartialRedisConfigOptions {
529                max_connections: Some(42),
530                ..Default::default()
531            },
532        };
533
534        assert_json_snapshot!(config, @r#"
535        {
536          "cluster_nodes": [
537            "redis://127.0.0.1:6379",
538            "redis://127.0.0.2:6379"
539          ],
540          "max_connections": 42,
541          "idle_timeout": 60,
542          "create_timeout": 3,
543          "recycle_timeout": 2,
544          "response_timeout": 30
545        }
546        "#);
547    }
548
549    #[test]
550    fn test_redis_cluster_serialize_unified() {
551        let configs = RedisConfigs::Unified(RedisConfig::Cluster {
552            cluster_nodes: vec![
553                "redis://127.0.0.1:6379".to_owned(),
554                "redis://127.0.0.2:6379".to_owned(),
555            ],
556            options: PartialRedisConfigOptions {
557                max_connections: Some(42),
558                ..Default::default()
559            },
560        });
561
562        assert_json_snapshot!(configs, @r#"
563        {
564          "cluster_nodes": [
565            "redis://127.0.0.1:6379",
566            "redis://127.0.0.2:6379"
567          ],
568          "max_connections": 42,
569          "idle_timeout": 60,
570          "create_timeout": 3,
571          "recycle_timeout": 2,
572          "response_timeout": 30
573        }
574        "#);
575    }
576
577    #[test]
578    fn test_redis_serialize_individual() {
579        let configs = RedisConfigs::Individual {
580            project_configs: Box::new(RedisConfig::Single(SingleRedisConfig::Detailed {
581                server: "redis://127.0.0.1:6379".to_owned(),
582                options: PartialRedisConfigOptions {
583                    max_connections: Some(42),
584                    ..Default::default()
585                },
586            })),
587            quotas: Box::new(RedisConfig::Cluster {
588                cluster_nodes: vec![
589                    "redis://127.0.0.1:6379".to_owned(),
590                    "redis://127.0.0.2:6379".to_owned(),
591                ],
592                options: PartialRedisConfigOptions {
593                    max_connections: Some(84),
594                    ..Default::default()
595                },
596            }),
597        };
598
599        assert_json_snapshot!(configs, @r#"
600        {
601          "project_configs": {
602            "server": "redis://127.0.0.1:6379",
603            "max_connections": 42,
604            "idle_timeout": 60,
605            "create_timeout": 3,
606            "recycle_timeout": 2,
607            "response_timeout": 30
608          },
609          "quotas": {
610            "cluster_nodes": [
611              "redis://127.0.0.1:6379",
612              "redis://127.0.0.2:6379"
613            ],
614            "max_connections": 84,
615            "idle_timeout": 60,
616            "create_timeout": 3,
617            "recycle_timeout": 2,
618            "response_timeout": 30
619          }
620        }
621        "#);
622    }
623}