1use relay_redis::RedisConfigOptions;
2use serde::{Deserialize, Serialize};
3
4pub(crate) const DEFAULT_MIN_MAX_CONNECTIONS: u32 = 24;
7
8#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
10#[serde(default)]
11pub struct PartialRedisConfigOptions {
12 #[serde(skip_serializing_if = "Option::is_none")]
16 pub max_connections: Option<u32>,
17 pub idle_timeout: u64,
21 #[serde(skip_serializing_if = "Option::is_none")]
26 pub create_timeout: Option<u64>,
27 #[serde(skip_serializing_if = "Option::is_none")]
33 pub recycle_timeout: Option<u64>,
34 #[serde(skip_serializing_if = "Option::is_none")]
41 pub wait_timeout: Option<u64>,
42 #[serde(skip_serializing_if = "Option::is_none")]
46 pub response_timeout: Option<u64>,
47}
48
49impl Default for PartialRedisConfigOptions {
50 fn default() -> Self {
51 Self {
52 max_connections: None,
53 idle_timeout: 60,
54 create_timeout: Some(3),
55 recycle_timeout: Some(2),
56 wait_timeout: None,
57 response_timeout: Some(30),
58 }
59 }
60}
61
62#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
64#[serde(untagged)]
65enum RedisConfigFromFile {
66 Cluster {
68 cluster_nodes: Vec<String>,
72
73 #[serde(flatten)]
75 options: PartialRedisConfigOptions,
76 },
77
78 Single(String),
82
83 SingleWithOpts {
87 server: String,
89
90 #[serde(flatten)]
92 options: PartialRedisConfigOptions,
93 },
94}
95
96#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
98#[serde(untagged)]
99pub enum RedisConfig {
100 Cluster {
102 cluster_nodes: Vec<String>,
104 #[serde(flatten)]
106 options: PartialRedisConfigOptions,
107 },
108 Single(SingleRedisConfig),
110}
111
112#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
116#[serde(untagged)]
117pub enum SingleRedisConfig {
118 #[doc(hidden)]
119 Simple(String),
120 #[doc(hidden)]
121 Detailed {
122 #[doc(hidden)]
123 server: String,
124 #[doc(hidden)]
125 #[serde(flatten)]
126 options: PartialRedisConfigOptions,
127 },
128}
129
130impl RedisConfig {
131 pub fn single(server: String) -> Self {
133 RedisConfig::Single(SingleRedisConfig::Detailed {
134 server,
135 options: Default::default(),
136 })
137 }
138}
139
140impl From<RedisConfigFromFile> for RedisConfig {
141 fn from(value: RedisConfigFromFile) -> Self {
142 match value {
143 RedisConfigFromFile::Cluster {
144 cluster_nodes,
145 options,
146 } => Self::Cluster {
147 cluster_nodes,
148 options,
149 },
150 RedisConfigFromFile::Single(server) => Self::Single(SingleRedisConfig::Detailed {
151 server,
152 options: Default::default(),
153 }),
154 RedisConfigFromFile::SingleWithOpts { server, options } => {
155 Self::Single(SingleRedisConfig::Detailed { server, options })
156 }
157 }
158 }
159}
160
161#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)]
163#[serde(untagged)]
164pub enum RedisConfigs {
165 Unified(RedisConfig),
167 Individual {
169 project_configs: Box<RedisConfig>,
171 cardinality: Box<RedisConfig>,
173 quotas: Box<RedisConfig>,
175 },
176}
177
178#[derive(Clone, Debug)]
180pub enum RedisConfigRef<'a> {
181 Cluster {
183 cluster_nodes: &'a Vec<String>,
185 options: RedisConfigOptions,
187 },
188 Single {
190 server: &'a String,
192 options: RedisConfigOptions,
194 },
195}
196
197#[allow(clippy::large_enum_variant)]
199#[derive(Clone, Debug)]
200pub enum RedisConfigsRef<'a> {
201 Unified(RedisConfigRef<'a>),
203 Individual {
205 project_configs: RedisConfigRef<'a>,
207 cardinality: RedisConfigRef<'a>,
209 quotas: RedisConfigRef<'a>,
211 },
212}
213
214fn build_redis_config_options(
215 options: &PartialRedisConfigOptions,
216 default_connections: u32,
217) -> RedisConfigOptions {
218 let max_connections = options.max_connections.unwrap_or(default_connections);
219
220 RedisConfigOptions {
221 max_connections,
222 idle_timeout: options.idle_timeout,
223 create_timeout: options.create_timeout,
224 recycle_timeout: options.recycle_timeout,
225 wait_timeout: options.wait_timeout,
226 response_timeout: options.response_timeout,
227 }
228}
229
230pub(super) fn build_redis_config(
234 config: &RedisConfig,
235 default_connections: u32,
236) -> RedisConfigRef<'_> {
237 match config {
238 RedisConfig::Cluster {
239 cluster_nodes,
240 options,
241 } => RedisConfigRef::Cluster {
242 cluster_nodes,
243 options: build_redis_config_options(options, default_connections),
244 },
245 RedisConfig::Single(SingleRedisConfig::Detailed { server, options }) => {
246 RedisConfigRef::Single {
247 server,
248 options: build_redis_config_options(options, default_connections),
249 }
250 }
251 RedisConfig::Single(SingleRedisConfig::Simple(server)) => RedisConfigRef::Single {
252 server,
253 options: Default::default(),
254 },
255 }
256}
257
258pub(super) fn build_redis_configs(
262 configs: &RedisConfigs,
263 cpu_concurrency: u32,
264 pool_concurrency: u32,
265) -> RedisConfigsRef<'_> {
266 let project_connections = std::cmp::max(cpu_concurrency * 2, DEFAULT_MIN_MAX_CONNECTIONS);
271 let total_pool_concurrency = cpu_concurrency * pool_concurrency;
276
277 match configs {
278 RedisConfigs::Unified(cfg) => {
279 let default_connections = total_pool_concurrency + project_connections;
282 let config = build_redis_config(cfg, default_connections);
283 RedisConfigsRef::Unified(config)
284 }
285 RedisConfigs::Individual {
286 project_configs,
287 cardinality,
288 quotas,
289 } => {
290 let project_configs = build_redis_config(project_configs, project_connections);
291 let cardinality = build_redis_config(cardinality, total_pool_concurrency);
292 let quotas = build_redis_config(quotas, total_pool_concurrency);
293 RedisConfigsRef::Individual {
294 project_configs,
295 cardinality,
296 quotas,
297 }
298 }
299 }
300}
301
302#[cfg(test)]
303mod tests {
304 use insta::assert_json_snapshot;
305
306 use super::*;
307
308 #[test]
309 fn test_redis_single_opts() {
310 let yaml = r#"
311server: "redis://127.0.0.1:6379"
312max_connections: 42
313connection_timeout: 5
314"#;
315
316 let config: RedisConfig = serde_yaml::from_str(yaml)
317 .expect("Parsed processing redis config: single with options");
318
319 assert_eq!(
320 config,
321 RedisConfig::Single(SingleRedisConfig::Detailed {
322 server: "redis://127.0.0.1:6379".to_owned(),
323 options: PartialRedisConfigOptions {
324 max_connections: Some(42),
325 ..Default::default()
326 }
327 })
328 );
329 }
330
331 #[test]
332 fn test_redis_single_opts_unified() {
333 let yaml = r#"
334server: "redis://127.0.0.1:6379"
335max_connections: 42
336connection_timeout: 5
337"#;
338
339 let config: RedisConfigs = serde_yaml::from_str(yaml)
340 .expect("Parsed processing redis config: single with options");
341
342 assert_eq!(
343 config,
344 RedisConfigs::Unified(RedisConfig::Single(SingleRedisConfig::Detailed {
345 server: "redis://127.0.0.1:6379".to_owned(),
346 options: PartialRedisConfigOptions {
347 max_connections: Some(42),
348 ..Default::default()
349 }
350 }))
351 );
352 }
353
354 #[test]
355 fn test_redis_individual() {
356 let yaml = r#"
357project_configs:
358 server: "redis://127.0.0.1:6379"
359 max_connections: 42
360cardinality:
361 server: "redis://127.0.0.1:6379"
362quotas:
363 cluster_nodes:
364 - "redis://127.0.0.1:6379"
365 - "redis://127.0.0.2:6379"
366 max_connections: 17
367"#;
368
369 let configs: RedisConfigs = serde_yaml::from_str(yaml)
370 .expect("Parsed processing redis configs: single with options");
371
372 let expected = RedisConfigs::Individual {
373 project_configs: Box::new(RedisConfig::Single(SingleRedisConfig::Detailed {
374 server: "redis://127.0.0.1:6379".to_owned(),
375 options: PartialRedisConfigOptions {
376 max_connections: Some(42),
377 ..Default::default()
378 },
379 })),
380 cardinality: Box::new(RedisConfig::Single(SingleRedisConfig::Detailed {
381 server: "redis://127.0.0.1:6379".to_owned(),
382 options: Default::default(),
383 })),
384 quotas: Box::new(RedisConfig::Cluster {
385 cluster_nodes: vec![
386 "redis://127.0.0.1:6379".to_owned(),
387 "redis://127.0.0.2:6379".to_owned(),
388 ],
389 options: PartialRedisConfigOptions {
390 max_connections: Some(17),
391 ..Default::default()
392 },
393 }),
394 };
395
396 assert_eq!(configs, expected);
397 }
398
399 #[test]
400 fn test_redis_single_serialize() {
401 let config = RedisConfig::Single(SingleRedisConfig::Detailed {
402 server: "redis://127.0.0.1:6379".to_owned(),
403 options: PartialRedisConfigOptions {
404 max_connections: Some(42),
405 ..Default::default()
406 },
407 });
408
409 assert_json_snapshot!(config, @r#"
410 {
411 "server": "redis://127.0.0.1:6379",
412 "max_connections": 42,
413 "idle_timeout": 60,
414 "create_timeout": 3,
415 "recycle_timeout": 2,
416 "response_timeout": 30
417 }
418 "#);
419 }
420
421 #[test]
422 fn test_redis_single_serialize_unified() {
423 let configs = RedisConfigs::Unified(RedisConfig::Single(SingleRedisConfig::Detailed {
424 server: "redis://127.0.0.1:6379".to_owned(),
425 options: PartialRedisConfigOptions {
426 max_connections: Some(42),
427 ..Default::default()
428 },
429 }));
430
431 assert_json_snapshot!(configs, @r#"
432 {
433 "server": "redis://127.0.0.1:6379",
434 "max_connections": 42,
435 "idle_timeout": 60,
436 "create_timeout": 3,
437 "recycle_timeout": 2,
438 "response_timeout": 30
439 }
440 "#);
441 }
442
443 #[test]
444 fn test_redis_single_opts_default() {
445 let yaml = r#"
446server: "redis://127.0.0.1:6379"
447"#;
448
449 let config: RedisConfig = serde_yaml::from_str(yaml)
450 .expect("Parsed processing redis config: single with options");
451
452 assert_eq!(
453 config,
454 RedisConfig::Single(SingleRedisConfig::Detailed {
455 server: "redis://127.0.0.1:6379".to_owned(),
456 options: Default::default()
457 })
458 );
459 }
460
461 #[test]
464 fn test_redis_single() {
465 let yaml = r#"
466"redis://127.0.0.1:6379"
467"#;
468
469 let config: RedisConfig = serde_yaml::from_str(yaml)
470 .expect("Parsed processing redis config: single with options");
471
472 assert_eq!(
473 config,
474 RedisConfig::Single(SingleRedisConfig::Simple(
475 "redis://127.0.0.1:6379".to_owned()
476 ))
477 );
478 }
479
480 #[test]
481 fn test_redis_cluster_nodes_opts() {
482 let yaml = r#"
483cluster_nodes:
484 - "redis://127.0.0.1:6379"
485 - "redis://127.0.0.2:6379"
486max_connections: 10
487"#;
488
489 let config: RedisConfig = serde_yaml::from_str(yaml)
490 .expect("Parsed processing redis config: single with options");
491
492 assert_eq!(
493 config,
494 RedisConfig::Cluster {
495 cluster_nodes: vec![
496 "redis://127.0.0.1:6379".to_owned(),
497 "redis://127.0.0.2:6379".to_owned()
498 ],
499 options: PartialRedisConfigOptions {
500 max_connections: Some(10),
501 ..Default::default()
502 },
503 }
504 );
505 }
506
507 #[test]
508 fn test_redis_cluster_nodes_opts_unified() {
509 let yaml = r#"
510cluster_nodes:
511 - "redis://127.0.0.1:6379"
512 - "redis://127.0.0.2:6379"
513max_connections: 20
514"#;
515
516 let config: RedisConfigs = serde_yaml::from_str(yaml)
517 .expect("Parsed processing redis config: single with options");
518
519 assert_eq!(
520 config,
521 RedisConfigs::Unified(RedisConfig::Cluster {
522 cluster_nodes: vec![
523 "redis://127.0.0.1:6379".to_owned(),
524 "redis://127.0.0.2:6379".to_owned()
525 ],
526 options: PartialRedisConfigOptions {
527 max_connections: Some(20),
528 ..Default::default()
529 },
530 })
531 );
532 }
533
534 #[test]
535 fn test_redis_cluster_serialize() {
536 let config = RedisConfig::Cluster {
537 cluster_nodes: vec![
538 "redis://127.0.0.1:6379".to_owned(),
539 "redis://127.0.0.2:6379".to_owned(),
540 ],
541 options: PartialRedisConfigOptions {
542 max_connections: Some(42),
543 ..Default::default()
544 },
545 };
546
547 assert_json_snapshot!(config, @r#"
548 {
549 "cluster_nodes": [
550 "redis://127.0.0.1:6379",
551 "redis://127.0.0.2:6379"
552 ],
553 "max_connections": 42,
554 "idle_timeout": 60,
555 "create_timeout": 3,
556 "recycle_timeout": 2,
557 "response_timeout": 30
558 }
559 "#);
560 }
561
562 #[test]
563 fn test_redis_cluster_serialize_unified() {
564 let configs = RedisConfigs::Unified(RedisConfig::Cluster {
565 cluster_nodes: vec![
566 "redis://127.0.0.1:6379".to_owned(),
567 "redis://127.0.0.2:6379".to_owned(),
568 ],
569 options: PartialRedisConfigOptions {
570 max_connections: Some(42),
571 ..Default::default()
572 },
573 });
574
575 assert_json_snapshot!(configs, @r#"
576 {
577 "cluster_nodes": [
578 "redis://127.0.0.1:6379",
579 "redis://127.0.0.2:6379"
580 ],
581 "max_connections": 42,
582 "idle_timeout": 60,
583 "create_timeout": 3,
584 "recycle_timeout": 2,
585 "response_timeout": 30
586 }
587 "#);
588 }
589
590 #[test]
591 fn test_redis_serialize_individual() {
592 let configs = RedisConfigs::Individual {
593 project_configs: Box::new(RedisConfig::Single(SingleRedisConfig::Detailed {
594 server: "redis://127.0.0.1:6379".to_owned(),
595 options: PartialRedisConfigOptions {
596 max_connections: Some(42),
597 ..Default::default()
598 },
599 })),
600 cardinality: Box::new(RedisConfig::Single(SingleRedisConfig::Detailed {
601 server: "redis://127.0.0.1:6379".to_owned(),
602 options: Default::default(),
603 })),
604 quotas: Box::new(RedisConfig::Cluster {
605 cluster_nodes: vec![
606 "redis://127.0.0.1:6379".to_owned(),
607 "redis://127.0.0.2:6379".to_owned(),
608 ],
609 options: PartialRedisConfigOptions {
610 max_connections: Some(84),
611 ..Default::default()
612 },
613 }),
614 };
615
616 assert_json_snapshot!(configs, @r#"
617 {
618 "project_configs": {
619 "server": "redis://127.0.0.1:6379",
620 "max_connections": 42,
621 "idle_timeout": 60,
622 "create_timeout": 3,
623 "recycle_timeout": 2,
624 "response_timeout": 30
625 },
626 "cardinality": {
627 "server": "redis://127.0.0.1:6379",
628 "idle_timeout": 60,
629 "create_timeout": 3,
630 "recycle_timeout": 2,
631 "response_timeout": 30
632 },
633 "quotas": {
634 "cluster_nodes": [
635 "redis://127.0.0.1:6379",
636 "redis://127.0.0.2:6379"
637 ],
638 "max_connections": 84,
639 "idle_timeout": 60,
640 "create_timeout": 3,
641 "recycle_timeout": 2,
642 "response_timeout": 30
643 }
644 }
645 "#);
646 }
647}