1use relay_redis::RedisConfigOptions;
2use serde::{Deserialize, Serialize};
3
4pub(crate) const DEFAULT_MIN_MAX_CONNECTIONS: u32 = 24;
7
8#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
10#[serde(default)]
11pub struct PartialRedisConfigOptions {
12 #[serde(skip_serializing_if = "Option::is_none")]
16 pub max_connections: Option<u32>,
17 pub idle_timeout: u64,
21 #[serde(skip_serializing_if = "Option::is_none")]
26 pub create_timeout: Option<u64>,
27 #[serde(skip_serializing_if = "Option::is_none")]
33 pub recycle_timeout: Option<u64>,
34 #[serde(skip_serializing_if = "Option::is_none")]
41 pub wait_timeout: Option<u64>,
42 pub recycle_check_frequency: usize,
48}
49
50impl Default for PartialRedisConfigOptions {
51 fn default() -> Self {
52 Self {
53 max_connections: None,
54 idle_timeout: 60,
55 create_timeout: Some(3),
56 recycle_timeout: Some(2),
57 wait_timeout: None,
58 recycle_check_frequency: 100,
59 }
60 }
61}
62
63#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
65#[serde(untagged)]
66enum RedisConfigFromFile {
67 Cluster {
69 cluster_nodes: Vec<String>,
73
74 #[serde(flatten)]
76 options: PartialRedisConfigOptions,
77 },
78
79 Single(String),
83
84 SingleWithOpts {
88 server: String,
90
91 #[serde(flatten)]
93 options: PartialRedisConfigOptions,
94 },
95}
96
97#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
99#[serde(untagged)]
100pub enum RedisConfig {
101 Cluster {
103 cluster_nodes: Vec<String>,
105 #[serde(flatten)]
107 options: PartialRedisConfigOptions,
108 },
109 Single(SingleRedisConfig),
111}
112
113#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
117#[serde(untagged)]
118pub enum SingleRedisConfig {
119 #[doc(hidden)]
120 Simple(String),
121 #[doc(hidden)]
122 Detailed {
123 #[doc(hidden)]
124 server: String,
125 #[doc(hidden)]
126 #[serde(flatten)]
127 options: PartialRedisConfigOptions,
128 },
129}
130
131impl RedisConfig {
132 pub fn single(server: String) -> Self {
134 RedisConfig::Single(SingleRedisConfig::Detailed {
135 server,
136 options: Default::default(),
137 })
138 }
139}
140
141impl From<RedisConfigFromFile> for RedisConfig {
142 fn from(value: RedisConfigFromFile) -> Self {
143 match value {
144 RedisConfigFromFile::Cluster {
145 cluster_nodes,
146 options,
147 } => Self::Cluster {
148 cluster_nodes,
149 options,
150 },
151 RedisConfigFromFile::Single(server) => Self::Single(SingleRedisConfig::Detailed {
152 server,
153 options: Default::default(),
154 }),
155 RedisConfigFromFile::SingleWithOpts { server, options } => {
156 Self::Single(SingleRedisConfig::Detailed { server, options })
157 }
158 }
159 }
160}
161
162#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)]
164#[serde(untagged)]
165pub enum RedisConfigs {
166 Unified(RedisConfig),
168 Individual {
170 project_configs: Box<RedisConfig>,
172 cardinality: Box<RedisConfig>,
174 quotas: Box<RedisConfig>,
176 },
177}
178
179#[derive(Clone, Debug)]
181pub enum RedisConfigRef<'a> {
182 Cluster {
184 cluster_nodes: &'a Vec<String>,
186 options: RedisConfigOptions,
188 },
189 Single {
191 server: &'a String,
193 options: RedisConfigOptions,
195 },
196}
197
198#[allow(clippy::large_enum_variant)]
200#[derive(Clone, Debug)]
201pub enum RedisConfigsRef<'a> {
202 Unified(RedisConfigRef<'a>),
204 Individual {
206 project_configs: RedisConfigRef<'a>,
208 cardinality: RedisConfigRef<'a>,
210 quotas: RedisConfigRef<'a>,
212 },
213}
214
215fn build_redis_config_options(
216 options: &PartialRedisConfigOptions,
217 default_connections: u32,
218) -> RedisConfigOptions {
219 let max_connections = options.max_connections.unwrap_or(default_connections);
220
221 RedisConfigOptions {
222 max_connections,
223 idle_timeout: options.idle_timeout,
224 create_timeout: options.create_timeout,
225 recycle_timeout: options.recycle_timeout,
226 wait_timeout: options.wait_timeout,
227 recycle_check_frequency: options.recycle_check_frequency,
228 }
229}
230
231pub(super) fn build_redis_config(
235 config: &RedisConfig,
236 default_connections: u32,
237) -> RedisConfigRef<'_> {
238 match config {
239 RedisConfig::Cluster {
240 cluster_nodes,
241 options,
242 } => RedisConfigRef::Cluster {
243 cluster_nodes,
244 options: build_redis_config_options(options, default_connections),
245 },
246 RedisConfig::Single(SingleRedisConfig::Detailed { server, options }) => {
247 RedisConfigRef::Single {
248 server,
249 options: build_redis_config_options(options, default_connections),
250 }
251 }
252 RedisConfig::Single(SingleRedisConfig::Simple(server)) => RedisConfigRef::Single {
253 server,
254 options: Default::default(),
255 },
256 }
257}
258
259pub(super) fn build_redis_configs(
263 configs: &RedisConfigs,
264 cpu_concurrency: u32,
265) -> RedisConfigsRef<'_> {
266 let default_connections = std::cmp::max(cpu_concurrency * 2, DEFAULT_MIN_MAX_CONNECTIONS);
269
270 match configs {
271 RedisConfigs::Unified(cfg) => {
272 let config = build_redis_config(cfg, default_connections);
273 RedisConfigsRef::Unified(config)
274 }
275 RedisConfigs::Individual {
276 project_configs,
277 cardinality,
278 quotas,
279 } => {
280 let project_configs = build_redis_config(project_configs, default_connections);
281 let cardinality = build_redis_config(cardinality, default_connections);
282 let quotas = build_redis_config(quotas, default_connections);
283 RedisConfigsRef::Individual {
284 project_configs,
285 cardinality,
286 quotas,
287 }
288 }
289 }
290}
291
292#[cfg(test)]
293mod tests {
294 use insta::assert_json_snapshot;
295
296 use super::*;
297
298 #[test]
299 fn test_redis_single_opts() {
300 let yaml = r#"
301server: "redis://127.0.0.1:6379"
302max_connections: 42
303connection_timeout: 5
304"#;
305
306 let config: RedisConfig = serde_yaml::from_str(yaml)
307 .expect("Parsed processing redis config: single with options");
308
309 assert_eq!(
310 config,
311 RedisConfig::Single(SingleRedisConfig::Detailed {
312 server: "redis://127.0.0.1:6379".to_owned(),
313 options: PartialRedisConfigOptions {
314 max_connections: Some(42),
315 ..Default::default()
316 }
317 })
318 );
319 }
320
321 #[test]
322 fn test_redis_single_opts_unified() {
323 let yaml = r#"
324server: "redis://127.0.0.1:6379"
325max_connections: 42
326connection_timeout: 5
327"#;
328
329 let config: RedisConfigs = serde_yaml::from_str(yaml)
330 .expect("Parsed processing redis config: single with options");
331
332 assert_eq!(
333 config,
334 RedisConfigs::Unified(RedisConfig::Single(SingleRedisConfig::Detailed {
335 server: "redis://127.0.0.1:6379".to_owned(),
336 options: PartialRedisConfigOptions {
337 max_connections: Some(42),
338 ..Default::default()
339 }
340 }))
341 );
342 }
343
344 #[test]
345 fn test_redis_individual() {
346 let yaml = r#"
347project_configs:
348 server: "redis://127.0.0.1:6379"
349 max_connections: 42
350cardinality:
351 server: "redis://127.0.0.1:6379"
352quotas:
353 cluster_nodes:
354 - "redis://127.0.0.1:6379"
355 - "redis://127.0.0.2:6379"
356 max_connections: 17
357"#;
358
359 let configs: RedisConfigs = serde_yaml::from_str(yaml)
360 .expect("Parsed processing redis configs: single with options");
361
362 let expected = RedisConfigs::Individual {
363 project_configs: Box::new(RedisConfig::Single(SingleRedisConfig::Detailed {
364 server: "redis://127.0.0.1:6379".to_owned(),
365 options: PartialRedisConfigOptions {
366 max_connections: Some(42),
367 ..Default::default()
368 },
369 })),
370 cardinality: Box::new(RedisConfig::Single(SingleRedisConfig::Detailed {
371 server: "redis://127.0.0.1:6379".to_owned(),
372 options: Default::default(),
373 })),
374 quotas: Box::new(RedisConfig::Cluster {
375 cluster_nodes: vec![
376 "redis://127.0.0.1:6379".to_owned(),
377 "redis://127.0.0.2:6379".to_owned(),
378 ],
379 options: PartialRedisConfigOptions {
380 max_connections: Some(17),
381 ..Default::default()
382 },
383 }),
384 };
385
386 assert_eq!(configs, expected);
387 }
388
389 #[test]
390 fn test_redis_single_serialize() {
391 let config = RedisConfig::Single(SingleRedisConfig::Detailed {
392 server: "redis://127.0.0.1:6379".to_owned(),
393 options: PartialRedisConfigOptions {
394 max_connections: Some(42),
395 ..Default::default()
396 },
397 });
398
399 assert_json_snapshot!(config, @r#"
400 {
401 "server": "redis://127.0.0.1:6379",
402 "max_connections": 42,
403 "idle_timeout": 60,
404 "create_timeout": 3,
405 "recycle_timeout": 2,
406 "recycle_check_frequency": 100
407 }
408 "#);
409 }
410
411 #[test]
412 fn test_redis_single_serialize_unified() {
413 let configs = RedisConfigs::Unified(RedisConfig::Single(SingleRedisConfig::Detailed {
414 server: "redis://127.0.0.1:6379".to_owned(),
415 options: PartialRedisConfigOptions {
416 max_connections: Some(42),
417 ..Default::default()
418 },
419 }));
420
421 assert_json_snapshot!(configs, @r#"
422 {
423 "server": "redis://127.0.0.1:6379",
424 "max_connections": 42,
425 "idle_timeout": 60,
426 "create_timeout": 3,
427 "recycle_timeout": 2,
428 "recycle_check_frequency": 100
429 }
430 "#);
431 }
432
433 #[test]
434 fn test_redis_single_opts_default() {
435 let yaml = r#"
436server: "redis://127.0.0.1:6379"
437"#;
438
439 let config: RedisConfig = serde_yaml::from_str(yaml)
440 .expect("Parsed processing redis config: single with options");
441
442 assert_eq!(
443 config,
444 RedisConfig::Single(SingleRedisConfig::Detailed {
445 server: "redis://127.0.0.1:6379".to_owned(),
446 options: Default::default()
447 })
448 );
449 }
450
451 #[test]
454 fn test_redis_single() {
455 let yaml = r#"
456"redis://127.0.0.1:6379"
457"#;
458
459 let config: RedisConfig = serde_yaml::from_str(yaml)
460 .expect("Parsed processing redis config: single with options");
461
462 assert_eq!(
463 config,
464 RedisConfig::Single(SingleRedisConfig::Simple(
465 "redis://127.0.0.1:6379".to_owned()
466 ))
467 );
468 }
469
470 #[test]
471 fn test_redis_cluster_nodes_opts() {
472 let yaml = r#"
473cluster_nodes:
474 - "redis://127.0.0.1:6379"
475 - "redis://127.0.0.2:6379"
476max_connections: 10
477"#;
478
479 let config: RedisConfig = serde_yaml::from_str(yaml)
480 .expect("Parsed processing redis config: single with options");
481
482 assert_eq!(
483 config,
484 RedisConfig::Cluster {
485 cluster_nodes: vec![
486 "redis://127.0.0.1:6379".to_owned(),
487 "redis://127.0.0.2:6379".to_owned()
488 ],
489 options: PartialRedisConfigOptions {
490 max_connections: Some(10),
491 ..Default::default()
492 },
493 }
494 );
495 }
496
497 #[test]
498 fn test_redis_cluster_nodes_opts_unified() {
499 let yaml = r#"
500cluster_nodes:
501 - "redis://127.0.0.1:6379"
502 - "redis://127.0.0.2:6379"
503max_connections: 20
504"#;
505
506 let config: RedisConfigs = serde_yaml::from_str(yaml)
507 .expect("Parsed processing redis config: single with options");
508
509 assert_eq!(
510 config,
511 RedisConfigs::Unified(RedisConfig::Cluster {
512 cluster_nodes: vec![
513 "redis://127.0.0.1:6379".to_owned(),
514 "redis://127.0.0.2:6379".to_owned()
515 ],
516 options: PartialRedisConfigOptions {
517 max_connections: Some(20),
518 ..Default::default()
519 },
520 })
521 );
522 }
523
524 #[test]
525 fn test_redis_cluster_serialize() {
526 let config = RedisConfig::Cluster {
527 cluster_nodes: vec![
528 "redis://127.0.0.1:6379".to_owned(),
529 "redis://127.0.0.2:6379".to_owned(),
530 ],
531 options: PartialRedisConfigOptions {
532 max_connections: Some(42),
533 ..Default::default()
534 },
535 };
536
537 assert_json_snapshot!(config, @r#"
538 {
539 "cluster_nodes": [
540 "redis://127.0.0.1:6379",
541 "redis://127.0.0.2:6379"
542 ],
543 "max_connections": 42,
544 "idle_timeout": 60,
545 "create_timeout": 3,
546 "recycle_timeout": 2,
547 "recycle_check_frequency": 100
548 }
549 "#);
550 }
551
552 #[test]
553 fn test_redis_cluster_serialize_unified() {
554 let configs = RedisConfigs::Unified(RedisConfig::Cluster {
555 cluster_nodes: vec![
556 "redis://127.0.0.1:6379".to_owned(),
557 "redis://127.0.0.2:6379".to_owned(),
558 ],
559 options: PartialRedisConfigOptions {
560 max_connections: Some(42),
561 ..Default::default()
562 },
563 });
564
565 assert_json_snapshot!(configs, @r#"
566 {
567 "cluster_nodes": [
568 "redis://127.0.0.1:6379",
569 "redis://127.0.0.2:6379"
570 ],
571 "max_connections": 42,
572 "idle_timeout": 60,
573 "create_timeout": 3,
574 "recycle_timeout": 2,
575 "recycle_check_frequency": 100
576 }
577 "#);
578 }
579
580 #[test]
581 fn test_redis_serialize_individual() {
582 let configs = RedisConfigs::Individual {
583 project_configs: Box::new(RedisConfig::Single(SingleRedisConfig::Detailed {
584 server: "redis://127.0.0.1:6379".to_owned(),
585 options: PartialRedisConfigOptions {
586 max_connections: Some(42),
587 ..Default::default()
588 },
589 })),
590 cardinality: Box::new(RedisConfig::Single(SingleRedisConfig::Detailed {
591 server: "redis://127.0.0.1:6379".to_owned(),
592 options: Default::default(),
593 })),
594 quotas: Box::new(RedisConfig::Cluster {
595 cluster_nodes: vec![
596 "redis://127.0.0.1:6379".to_owned(),
597 "redis://127.0.0.2:6379".to_owned(),
598 ],
599 options: PartialRedisConfigOptions {
600 max_connections: Some(84),
601 ..Default::default()
602 },
603 }),
604 };
605
606 assert_json_snapshot!(configs, @r#"
607 {
608 "project_configs": {
609 "server": "redis://127.0.0.1:6379",
610 "max_connections": 42,
611 "idle_timeout": 60,
612 "create_timeout": 3,
613 "recycle_timeout": 2,
614 "recycle_check_frequency": 100
615 },
616 "cardinality": {
617 "server": "redis://127.0.0.1:6379",
618 "idle_timeout": 60,
619 "create_timeout": 3,
620 "recycle_timeout": 2,
621 "recycle_check_frequency": 100
622 },
623 "quotas": {
624 "cluster_nodes": [
625 "redis://127.0.0.1:6379",
626 "redis://127.0.0.2:6379"
627 ],
628 "max_connections": 84,
629 "idle_timeout": 60,
630 "create_timeout": 3,
631 "recycle_timeout": 2,
632 "recycle_check_frequency": 100
633 }
634 }
635 "#);
636 }
637}