relay_config/
redis.rs

1use relay_redis::RedisConfigOptions;
2use serde::{Deserialize, Serialize};
3
4/// For small setups, `2 x limits.max_thread_count` does not leave enough headroom.
5/// In this case, we fall back to the old default.
6pub(crate) const DEFAULT_MIN_MAX_CONNECTIONS: u32 = 24;
7
8/// Additional configuration options for a redis client.
9#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
10#[serde(default)]
11pub struct PartialRedisConfigOptions {
12    /// Maximum number of connections managed by the pool.
13    ///
14    /// Defaults to 2x `limits.max_thread_count` or a minimum of 24.
15    #[serde(skip_serializing_if = "Option::is_none")]
16    pub max_connections: Option<u32>,
17    /// Sets the idle timeout used by the pool, in seconds.
18    ///
19    /// The idle timeout defines the maximum time a connection will be kept in the pool if unused.
20    pub idle_timeout: u64,
21    /// Sets the maximum time in seconds to wait when establishing a new Redis connection.
22    ///
23    /// If a connection cannot be established within this duration, it is considered a failure.
24    /// Applies when the pool needs to grow or create fresh connections.
25    #[serde(skip_serializing_if = "Option::is_none")]
26    pub create_timeout: Option<u64>,
27    /// Sets the maximum time in seconds to validate an existing connection when it is recycled.
28    ///
29    /// Recycling involves checking whether an idle connection is still alive before reuse.
30    /// If validation exceeds this timeout, the connection is discarded and a new fetch from the pool
31    /// is attempted.
32    #[serde(skip_serializing_if = "Option::is_none")]
33    pub recycle_timeout: Option<u64>,
34    /// Sets the maximum time, in seconds, that a caller is allowed to wait
35    /// when requesting a connection from the pool.
36    ///
37    /// If a connection does not become available within this period, the attempt
38    /// will fail with a timeout error. This setting helps prevent indefinite
39    /// blocking when the pool is exhausted.
40    #[serde(skip_serializing_if = "Option::is_none")]
41    pub wait_timeout: Option<u64>,
42    /// Sets the maximum time in seconds to wait for a result when sending a Redis command.
43    ///
44    /// If a command exceeds this timeout, the connection will be recycled.
45    #[serde(skip_serializing_if = "Option::is_none")]
46    pub response_timeout: Option<u64>,
47}
48
49impl Default for PartialRedisConfigOptions {
50    fn default() -> Self {
51        Self {
52            max_connections: None,
53            idle_timeout: 60,
54            create_timeout: Some(3),
55            recycle_timeout: Some(2),
56            wait_timeout: None,
57            response_timeout: Some(30),
58        }
59    }
60}
61
62/// Configuration for connecting a redis client.
63#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
64#[serde(untagged)]
65enum RedisConfigFromFile {
66    /// Connect to a Redis cluster.
67    Cluster {
68        /// List of `redis://` urls to use in cluster mode.
69        ///
70        /// This can also be a single node which is configured in cluster mode.
71        cluster_nodes: Vec<String>,
72
73        /// Additional configuration options for the redis client and a connections pool.
74        #[serde(flatten)]
75        options: PartialRedisConfigOptions,
76    },
77
78    /// Connect to a single Redis instance.
79    ///
80    /// Contains the `redis://` url to the node.
81    Single(String),
82
83    /// Connect to a single Redis instance.
84    ///
85    /// Allows to provide more configuration options, e.g. `max_connections`.
86    SingleWithOpts {
87        /// Contains the `redis://` url to the node.
88        server: String,
89
90        /// Additional configuration options for the redis client and a connections pool.
91        #[serde(flatten)]
92        options: PartialRedisConfigOptions,
93    },
94}
95
96/// Redis configuration.
97#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
98#[serde(untagged)]
99pub enum RedisConfig {
100    /// Connect to a Redis Cluster.
101    Cluster {
102        /// Redis nodes urls of the cluster.
103        cluster_nodes: Vec<String>,
104        /// Options of the Redis config.
105        #[serde(flatten)]
106        options: PartialRedisConfigOptions,
107    },
108    /// Connect to a single Redis instance.
109    Single(SingleRedisConfig),
110}
111
112/// Struct that can serialize a string to a single Redis connection.
113///
114/// This struct is needed for backward compatibility.
115#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
116#[serde(untagged)]
117pub enum SingleRedisConfig {
118    #[doc(hidden)]
119    Simple(String),
120    #[doc(hidden)]
121    Detailed {
122        #[doc(hidden)]
123        server: String,
124        #[doc(hidden)]
125        #[serde(flatten)]
126        options: PartialRedisConfigOptions,
127    },
128}
129
130impl RedisConfig {
131    /// Creates a new Redis config for a single Redis instance with default settings.
132    pub fn single(server: String) -> Self {
133        RedisConfig::Single(SingleRedisConfig::Detailed {
134            server,
135            options: Default::default(),
136        })
137    }
138}
139
140impl From<RedisConfigFromFile> for RedisConfig {
141    fn from(value: RedisConfigFromFile) -> Self {
142        match value {
143            RedisConfigFromFile::Cluster {
144                cluster_nodes,
145                options,
146            } => Self::Cluster {
147                cluster_nodes,
148                options,
149            },
150            RedisConfigFromFile::Single(server) => Self::Single(SingleRedisConfig::Detailed {
151                server,
152                options: Default::default(),
153            }),
154            RedisConfigFromFile::SingleWithOpts { server, options } => {
155                Self::Single(SingleRedisConfig::Detailed { server, options })
156            }
157        }
158    }
159}
160
161/// Configurations for the various Redis pools used by Relay.
162#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)]
163#[serde(untagged)]
164pub enum RedisConfigs {
165    /// All pools should be configured the same way.
166    Unified(RedisConfig),
167    /// Individual configurations for each pool.
168    Individual {
169        /// Configuration for the `project_configs` pool.
170        project_configs: Box<RedisConfig>,
171        /// Configuration for the `cardinality` pool.
172        cardinality: Box<RedisConfig>,
173        /// Configuration for the `quotas` pool.
174        quotas: Box<RedisConfig>,
175    },
176}
177
178/// Reference to the [`RedisConfig`] with the final [`RedisConfigOptions`].
179#[derive(Clone, Debug)]
180pub enum RedisConfigRef<'a> {
181    /// Connect to a Redis Cluster.
182    Cluster {
183        /// Reference to the Redis nodes urls of the cluster.
184        cluster_nodes: &'a Vec<String>,
185        /// Options of the Redis config.
186        options: RedisConfigOptions,
187    },
188    /// Connect to a single Redis instance.
189    Single {
190        /// Reference to the Redis node url.
191        server: &'a String,
192        /// Options of the Redis config.
193        options: RedisConfigOptions,
194    },
195}
196
197/// Helper struct bundling connections and options for the various Redis pools.
198#[allow(clippy::large_enum_variant)]
199#[derive(Clone, Debug)]
200pub enum RedisConfigsRef<'a> {
201    /// Use one pool for everything.
202    Unified(RedisConfigRef<'a>),
203    /// Use an individual pool for each use case.
204    Individual {
205        /// Configuration for the `project_configs` pool.
206        project_configs: RedisConfigRef<'a>,
207        /// Configuration for the `cardinality` pool.
208        cardinality: RedisConfigRef<'a>,
209        /// Configuration for the `quotas` pool.
210        quotas: RedisConfigRef<'a>,
211    },
212}
213
214fn build_redis_config_options(
215    options: &PartialRedisConfigOptions,
216    default_connections: u32,
217) -> RedisConfigOptions {
218    let max_connections = options.max_connections.unwrap_or(default_connections);
219
220    RedisConfigOptions {
221        max_connections,
222        idle_timeout: options.idle_timeout,
223        create_timeout: options.create_timeout,
224        recycle_timeout: options.recycle_timeout,
225        wait_timeout: options.wait_timeout,
226        response_timeout: options.response_timeout,
227    }
228}
229
230/// Builds a [`RedisConfigsRef`] given a [`RedisConfig`].
231///
232/// The returned config contains more options for setting up Redis.
233pub(super) fn build_redis_config(
234    config: &RedisConfig,
235    default_connections: u32,
236) -> RedisConfigRef<'_> {
237    match config {
238        RedisConfig::Cluster {
239            cluster_nodes,
240            options,
241        } => RedisConfigRef::Cluster {
242            cluster_nodes,
243            options: build_redis_config_options(options, default_connections),
244        },
245        RedisConfig::Single(SingleRedisConfig::Detailed { server, options }) => {
246            RedisConfigRef::Single {
247                server,
248                options: build_redis_config_options(options, default_connections),
249            }
250        }
251        RedisConfig::Single(SingleRedisConfig::Simple(server)) => RedisConfigRef::Single {
252            server,
253            options: Default::default(),
254        },
255    }
256}
257
258/// Builds a [`RedisConfigsRef`] given a [`RedisConfigs`].
259///
260/// The returned configs contain more options for setting up Redis.
261pub(super) fn build_redis_configs(
262    configs: &RedisConfigs,
263    cpu_concurrency: u32,
264    pool_concurrency: u32,
265) -> RedisConfigsRef<'_> {
266    // Project configurations are estimated to need around or less of `cpu_concurrency`
267    // connections, double this value for some extra headroom.
268    //
269    // For smaller setups give some extra headroom through `DEFAULT_MIN_MAX_CONNECTIONS`.
270    let project_connections = std::cmp::max(cpu_concurrency * 2, DEFAULT_MIN_MAX_CONNECTIONS);
271    // The total concurrency for rate limiting/processing is the total pool concurrency
272    // calculated by `cpu_concurrency * pool_concurrency`.
273    //
274    // No need to consider `DEFAULT_MIN_MAX_CONNECTIONS`, as these numbers are accurate.
275    let total_pool_concurrency = cpu_concurrency * pool_concurrency;
276
277    match configs {
278        RedisConfigs::Unified(cfg) => {
279            // A unified pool needs enough connections for project configs and enough to satisfy
280            // the processing pool.
281            let default_connections = total_pool_concurrency + project_connections;
282            let config = build_redis_config(cfg, default_connections);
283            RedisConfigsRef::Unified(config)
284        }
285        RedisConfigs::Individual {
286            project_configs,
287            cardinality,
288            quotas,
289        } => {
290            let project_configs = build_redis_config(project_configs, project_connections);
291            let cardinality = build_redis_config(cardinality, total_pool_concurrency);
292            let quotas = build_redis_config(quotas, total_pool_concurrency);
293            RedisConfigsRef::Individual {
294                project_configs,
295                cardinality,
296                quotas,
297            }
298        }
299    }
300}
301
302#[cfg(test)]
303mod tests {
304    use insta::assert_json_snapshot;
305
306    use super::*;
307
308    #[test]
309    fn test_redis_single_opts() {
310        let yaml = r#"
311server: "redis://127.0.0.1:6379"
312max_connections: 42
313connection_timeout: 5
314"#;
315
316        let config: RedisConfig = serde_yaml::from_str(yaml)
317            .expect("Parsed processing redis config: single with options");
318
319        assert_eq!(
320            config,
321            RedisConfig::Single(SingleRedisConfig::Detailed {
322                server: "redis://127.0.0.1:6379".to_owned(),
323                options: PartialRedisConfigOptions {
324                    max_connections: Some(42),
325                    ..Default::default()
326                }
327            })
328        );
329    }
330
331    #[test]
332    fn test_redis_single_opts_unified() {
333        let yaml = r#"
334server: "redis://127.0.0.1:6379"
335max_connections: 42
336connection_timeout: 5
337"#;
338
339        let config: RedisConfigs = serde_yaml::from_str(yaml)
340            .expect("Parsed processing redis config: single with options");
341
342        assert_eq!(
343            config,
344            RedisConfigs::Unified(RedisConfig::Single(SingleRedisConfig::Detailed {
345                server: "redis://127.0.0.1:6379".to_owned(),
346                options: PartialRedisConfigOptions {
347                    max_connections: Some(42),
348                    ..Default::default()
349                }
350            }))
351        );
352    }
353
354    #[test]
355    fn test_redis_individual() {
356        let yaml = r#"
357project_configs:
358    server: "redis://127.0.0.1:6379"
359    max_connections: 42
360cardinality:
361    server: "redis://127.0.0.1:6379"
362quotas:
363    cluster_nodes:
364        - "redis://127.0.0.1:6379"
365        - "redis://127.0.0.2:6379"
366    max_connections: 17
367"#;
368
369        let configs: RedisConfigs = serde_yaml::from_str(yaml)
370            .expect("Parsed processing redis configs: single with options");
371
372        let expected = RedisConfigs::Individual {
373            project_configs: Box::new(RedisConfig::Single(SingleRedisConfig::Detailed {
374                server: "redis://127.0.0.1:6379".to_owned(),
375                options: PartialRedisConfigOptions {
376                    max_connections: Some(42),
377                    ..Default::default()
378                },
379            })),
380            cardinality: Box::new(RedisConfig::Single(SingleRedisConfig::Detailed {
381                server: "redis://127.0.0.1:6379".to_owned(),
382                options: Default::default(),
383            })),
384            quotas: Box::new(RedisConfig::Cluster {
385                cluster_nodes: vec![
386                    "redis://127.0.0.1:6379".to_owned(),
387                    "redis://127.0.0.2:6379".to_owned(),
388                ],
389                options: PartialRedisConfigOptions {
390                    max_connections: Some(17),
391                    ..Default::default()
392                },
393            }),
394        };
395
396        assert_eq!(configs, expected);
397    }
398
399    #[test]
400    fn test_redis_single_serialize() {
401        let config = RedisConfig::Single(SingleRedisConfig::Detailed {
402            server: "redis://127.0.0.1:6379".to_owned(),
403            options: PartialRedisConfigOptions {
404                max_connections: Some(42),
405                ..Default::default()
406            },
407        });
408
409        assert_json_snapshot!(config, @r#"
410        {
411          "server": "redis://127.0.0.1:6379",
412          "max_connections": 42,
413          "idle_timeout": 60,
414          "create_timeout": 3,
415          "recycle_timeout": 2,
416          "response_timeout": 30
417        }
418        "#);
419    }
420
421    #[test]
422    fn test_redis_single_serialize_unified() {
423        let configs = RedisConfigs::Unified(RedisConfig::Single(SingleRedisConfig::Detailed {
424            server: "redis://127.0.0.1:6379".to_owned(),
425            options: PartialRedisConfigOptions {
426                max_connections: Some(42),
427                ..Default::default()
428            },
429        }));
430
431        assert_json_snapshot!(configs, @r#"
432        {
433          "server": "redis://127.0.0.1:6379",
434          "max_connections": 42,
435          "idle_timeout": 60,
436          "create_timeout": 3,
437          "recycle_timeout": 2,
438          "response_timeout": 30
439        }
440        "#);
441    }
442
443    #[test]
444    fn test_redis_single_opts_default() {
445        let yaml = r#"
446server: "redis://127.0.0.1:6379"
447"#;
448
449        let config: RedisConfig = serde_yaml::from_str(yaml)
450            .expect("Parsed processing redis config: single with options");
451
452        assert_eq!(
453            config,
454            RedisConfig::Single(SingleRedisConfig::Detailed {
455                server: "redis://127.0.0.1:6379".to_owned(),
456                options: Default::default()
457            })
458        );
459    }
460
461    // To make sure that we have backwards compatibility and still support the redis configuration
462    // when the single `redis://...` address is provided.
463    #[test]
464    fn test_redis_single() {
465        let yaml = r#"
466"redis://127.0.0.1:6379"
467"#;
468
469        let config: RedisConfig = serde_yaml::from_str(yaml)
470            .expect("Parsed processing redis config: single with options");
471
472        assert_eq!(
473            config,
474            RedisConfig::Single(SingleRedisConfig::Simple(
475                "redis://127.0.0.1:6379".to_owned()
476            ))
477        );
478    }
479
480    #[test]
481    fn test_redis_cluster_nodes_opts() {
482        let yaml = r#"
483cluster_nodes:
484    - "redis://127.0.0.1:6379"
485    - "redis://127.0.0.2:6379"
486max_connections: 10
487"#;
488
489        let config: RedisConfig = serde_yaml::from_str(yaml)
490            .expect("Parsed processing redis config: single with options");
491
492        assert_eq!(
493            config,
494            RedisConfig::Cluster {
495                cluster_nodes: vec![
496                    "redis://127.0.0.1:6379".to_owned(),
497                    "redis://127.0.0.2:6379".to_owned()
498                ],
499                options: PartialRedisConfigOptions {
500                    max_connections: Some(10),
501                    ..Default::default()
502                },
503            }
504        );
505    }
506
507    #[test]
508    fn test_redis_cluster_nodes_opts_unified() {
509        let yaml = r#"
510cluster_nodes:
511    - "redis://127.0.0.1:6379"
512    - "redis://127.0.0.2:6379"
513max_connections: 20
514"#;
515
516        let config: RedisConfigs = serde_yaml::from_str(yaml)
517            .expect("Parsed processing redis config: single with options");
518
519        assert_eq!(
520            config,
521            RedisConfigs::Unified(RedisConfig::Cluster {
522                cluster_nodes: vec![
523                    "redis://127.0.0.1:6379".to_owned(),
524                    "redis://127.0.0.2:6379".to_owned()
525                ],
526                options: PartialRedisConfigOptions {
527                    max_connections: Some(20),
528                    ..Default::default()
529                },
530            })
531        );
532    }
533
534    #[test]
535    fn test_redis_cluster_serialize() {
536        let config = RedisConfig::Cluster {
537            cluster_nodes: vec![
538                "redis://127.0.0.1:6379".to_owned(),
539                "redis://127.0.0.2:6379".to_owned(),
540            ],
541            options: PartialRedisConfigOptions {
542                max_connections: Some(42),
543                ..Default::default()
544            },
545        };
546
547        assert_json_snapshot!(config, @r#"
548        {
549          "cluster_nodes": [
550            "redis://127.0.0.1:6379",
551            "redis://127.0.0.2:6379"
552          ],
553          "max_connections": 42,
554          "idle_timeout": 60,
555          "create_timeout": 3,
556          "recycle_timeout": 2,
557          "response_timeout": 30
558        }
559        "#);
560    }
561
562    #[test]
563    fn test_redis_cluster_serialize_unified() {
564        let configs = RedisConfigs::Unified(RedisConfig::Cluster {
565            cluster_nodes: vec![
566                "redis://127.0.0.1:6379".to_owned(),
567                "redis://127.0.0.2:6379".to_owned(),
568            ],
569            options: PartialRedisConfigOptions {
570                max_connections: Some(42),
571                ..Default::default()
572            },
573        });
574
575        assert_json_snapshot!(configs, @r#"
576        {
577          "cluster_nodes": [
578            "redis://127.0.0.1:6379",
579            "redis://127.0.0.2:6379"
580          ],
581          "max_connections": 42,
582          "idle_timeout": 60,
583          "create_timeout": 3,
584          "recycle_timeout": 2,
585          "response_timeout": 30
586        }
587        "#);
588    }
589
590    #[test]
591    fn test_redis_serialize_individual() {
592        let configs = RedisConfigs::Individual {
593            project_configs: Box::new(RedisConfig::Single(SingleRedisConfig::Detailed {
594                server: "redis://127.0.0.1:6379".to_owned(),
595                options: PartialRedisConfigOptions {
596                    max_connections: Some(42),
597                    ..Default::default()
598                },
599            })),
600            cardinality: Box::new(RedisConfig::Single(SingleRedisConfig::Detailed {
601                server: "redis://127.0.0.1:6379".to_owned(),
602                options: Default::default(),
603            })),
604            quotas: Box::new(RedisConfig::Cluster {
605                cluster_nodes: vec![
606                    "redis://127.0.0.1:6379".to_owned(),
607                    "redis://127.0.0.2:6379".to_owned(),
608                ],
609                options: PartialRedisConfigOptions {
610                    max_connections: Some(84),
611                    ..Default::default()
612                },
613            }),
614        };
615
616        assert_json_snapshot!(configs, @r#"
617        {
618          "project_configs": {
619            "server": "redis://127.0.0.1:6379",
620            "max_connections": 42,
621            "idle_timeout": 60,
622            "create_timeout": 3,
623            "recycle_timeout": 2,
624            "response_timeout": 30
625          },
626          "cardinality": {
627            "server": "redis://127.0.0.1:6379",
628            "idle_timeout": 60,
629            "create_timeout": 3,
630            "recycle_timeout": 2,
631            "response_timeout": 30
632          },
633          "quotas": {
634            "cluster_nodes": [
635              "redis://127.0.0.1:6379",
636              "redis://127.0.0.2:6379"
637            ],
638            "max_connections": 84,
639            "idle_timeout": 60,
640            "create_timeout": 3,
641            "recycle_timeout": 2,
642            "response_timeout": 30
643          }
644        }
645        "#);
646    }
647}