1use relay_redis::RedisConfigOptions;
2use serde::{Deserialize, Serialize};
3
4pub(crate) const DEFAULT_MIN_MAX_CONNECTIONS: u32 = 24;
7
8#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
10#[serde(default)]
11pub struct PartialRedisConfigOptions {
12 #[serde(skip_serializing_if = "Option::is_none")]
16 pub max_connections: Option<u32>,
17 pub idle_timeout: u64,
21 #[serde(skip_serializing_if = "Option::is_none")]
26 pub create_timeout: Option<u64>,
27 #[serde(skip_serializing_if = "Option::is_none")]
33 pub recycle_timeout: Option<u64>,
34 #[serde(skip_serializing_if = "Option::is_none")]
41 pub wait_timeout: Option<u64>,
42 #[serde(skip_serializing_if = "Option::is_none")]
46 pub response_timeout: Option<u64>,
47 pub recycle_check_frequency: usize,
53}
54
55impl Default for PartialRedisConfigOptions {
56 fn default() -> Self {
57 Self {
58 max_connections: None,
59 idle_timeout: 60,
60 create_timeout: Some(3),
61 recycle_timeout: Some(2),
62 wait_timeout: None,
63 response_timeout: Some(30),
64 recycle_check_frequency: 100,
65 }
66 }
67}
68
69#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
71#[serde(untagged)]
72enum RedisConfigFromFile {
73 Cluster {
75 cluster_nodes: Vec<String>,
79
80 #[serde(flatten)]
82 options: PartialRedisConfigOptions,
83 },
84
85 Single(String),
89
90 SingleWithOpts {
94 server: String,
96
97 #[serde(flatten)]
99 options: PartialRedisConfigOptions,
100 },
101}
102
103#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
105#[serde(untagged)]
106pub enum RedisConfig {
107 Cluster {
109 cluster_nodes: Vec<String>,
111 #[serde(flatten)]
113 options: PartialRedisConfigOptions,
114 },
115 Single(SingleRedisConfig),
117}
118
119#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
123#[serde(untagged)]
124pub enum SingleRedisConfig {
125 #[doc(hidden)]
126 Simple(String),
127 #[doc(hidden)]
128 Detailed {
129 #[doc(hidden)]
130 server: String,
131 #[doc(hidden)]
132 #[serde(flatten)]
133 options: PartialRedisConfigOptions,
134 },
135}
136
137impl RedisConfig {
138 pub fn single(server: String) -> Self {
140 RedisConfig::Single(SingleRedisConfig::Detailed {
141 server,
142 options: Default::default(),
143 })
144 }
145}
146
147impl From<RedisConfigFromFile> for RedisConfig {
148 fn from(value: RedisConfigFromFile) -> Self {
149 match value {
150 RedisConfigFromFile::Cluster {
151 cluster_nodes,
152 options,
153 } => Self::Cluster {
154 cluster_nodes,
155 options,
156 },
157 RedisConfigFromFile::Single(server) => Self::Single(SingleRedisConfig::Detailed {
158 server,
159 options: Default::default(),
160 }),
161 RedisConfigFromFile::SingleWithOpts { server, options } => {
162 Self::Single(SingleRedisConfig::Detailed { server, options })
163 }
164 }
165 }
166}
167
168#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)]
170#[serde(untagged)]
171pub enum RedisConfigs {
172 Unified(RedisConfig),
174 Individual {
176 project_configs: Box<RedisConfig>,
178 cardinality: Box<RedisConfig>,
180 quotas: Box<RedisConfig>,
182 },
183}
184
185#[derive(Clone, Debug)]
187pub enum RedisConfigRef<'a> {
188 Cluster {
190 cluster_nodes: &'a Vec<String>,
192 options: RedisConfigOptions,
194 },
195 Single {
197 server: &'a String,
199 options: RedisConfigOptions,
201 },
202}
203
204#[allow(clippy::large_enum_variant)]
206#[derive(Clone, Debug)]
207pub enum RedisConfigsRef<'a> {
208 Unified(RedisConfigRef<'a>),
210 Individual {
212 project_configs: RedisConfigRef<'a>,
214 cardinality: RedisConfigRef<'a>,
216 quotas: RedisConfigRef<'a>,
218 },
219}
220
221fn build_redis_config_options(
222 options: &PartialRedisConfigOptions,
223 default_connections: u32,
224) -> RedisConfigOptions {
225 let max_connections = options.max_connections.unwrap_or(default_connections);
226
227 RedisConfigOptions {
228 max_connections,
229 idle_timeout: options.idle_timeout,
230 create_timeout: options.create_timeout,
231 recycle_timeout: options.recycle_timeout,
232 wait_timeout: options.wait_timeout,
233 response_timeout: options.response_timeout,
234 recycle_check_frequency: options.recycle_check_frequency,
235 }
236}
237
238pub(super) fn build_redis_config(
242 config: &RedisConfig,
243 default_connections: u32,
244) -> RedisConfigRef<'_> {
245 match config {
246 RedisConfig::Cluster {
247 cluster_nodes,
248 options,
249 } => RedisConfigRef::Cluster {
250 cluster_nodes,
251 options: build_redis_config_options(options, default_connections),
252 },
253 RedisConfig::Single(SingleRedisConfig::Detailed { server, options }) => {
254 RedisConfigRef::Single {
255 server,
256 options: build_redis_config_options(options, default_connections),
257 }
258 }
259 RedisConfig::Single(SingleRedisConfig::Simple(server)) => RedisConfigRef::Single {
260 server,
261 options: Default::default(),
262 },
263 }
264}
265
266pub(super) fn build_redis_configs(
270 configs: &RedisConfigs,
271 cpu_concurrency: u32,
272 pool_concurrency: u32,
273) -> RedisConfigsRef<'_> {
274 let project_connections = std::cmp::max(cpu_concurrency * 2, DEFAULT_MIN_MAX_CONNECTIONS);
279 let total_pool_concurrency = cpu_concurrency * pool_concurrency;
284
285 match configs {
286 RedisConfigs::Unified(cfg) => {
287 let default_connections = total_pool_concurrency + project_connections;
290 let config = build_redis_config(cfg, default_connections);
291 RedisConfigsRef::Unified(config)
292 }
293 RedisConfigs::Individual {
294 project_configs,
295 cardinality,
296 quotas,
297 } => {
298 let project_configs = build_redis_config(project_configs, project_connections);
299 let cardinality = build_redis_config(cardinality, total_pool_concurrency);
300 let quotas = build_redis_config(quotas, total_pool_concurrency);
301 RedisConfigsRef::Individual {
302 project_configs,
303 cardinality,
304 quotas,
305 }
306 }
307 }
308}
309
310#[cfg(test)]
311mod tests {
312 use insta::assert_json_snapshot;
313
314 use super::*;
315
316 #[test]
317 fn test_redis_single_opts() {
318 let yaml = r#"
319server: "redis://127.0.0.1:6379"
320max_connections: 42
321connection_timeout: 5
322"#;
323
324 let config: RedisConfig = serde_yaml::from_str(yaml)
325 .expect("Parsed processing redis config: single with options");
326
327 assert_eq!(
328 config,
329 RedisConfig::Single(SingleRedisConfig::Detailed {
330 server: "redis://127.0.0.1:6379".to_owned(),
331 options: PartialRedisConfigOptions {
332 max_connections: Some(42),
333 ..Default::default()
334 }
335 })
336 );
337 }
338
339 #[test]
340 fn test_redis_single_opts_unified() {
341 let yaml = r#"
342server: "redis://127.0.0.1:6379"
343max_connections: 42
344connection_timeout: 5
345"#;
346
347 let config: RedisConfigs = serde_yaml::from_str(yaml)
348 .expect("Parsed processing redis config: single with options");
349
350 assert_eq!(
351 config,
352 RedisConfigs::Unified(RedisConfig::Single(SingleRedisConfig::Detailed {
353 server: "redis://127.0.0.1:6379".to_owned(),
354 options: PartialRedisConfigOptions {
355 max_connections: Some(42),
356 ..Default::default()
357 }
358 }))
359 );
360 }
361
362 #[test]
363 fn test_redis_individual() {
364 let yaml = r#"
365project_configs:
366 server: "redis://127.0.0.1:6379"
367 max_connections: 42
368cardinality:
369 server: "redis://127.0.0.1:6379"
370quotas:
371 cluster_nodes:
372 - "redis://127.0.0.1:6379"
373 - "redis://127.0.0.2:6379"
374 max_connections: 17
375"#;
376
377 let configs: RedisConfigs = serde_yaml::from_str(yaml)
378 .expect("Parsed processing redis configs: single with options");
379
380 let expected = RedisConfigs::Individual {
381 project_configs: Box::new(RedisConfig::Single(SingleRedisConfig::Detailed {
382 server: "redis://127.0.0.1:6379".to_owned(),
383 options: PartialRedisConfigOptions {
384 max_connections: Some(42),
385 ..Default::default()
386 },
387 })),
388 cardinality: Box::new(RedisConfig::Single(SingleRedisConfig::Detailed {
389 server: "redis://127.0.0.1:6379".to_owned(),
390 options: Default::default(),
391 })),
392 quotas: Box::new(RedisConfig::Cluster {
393 cluster_nodes: vec![
394 "redis://127.0.0.1:6379".to_owned(),
395 "redis://127.0.0.2:6379".to_owned(),
396 ],
397 options: PartialRedisConfigOptions {
398 max_connections: Some(17),
399 ..Default::default()
400 },
401 }),
402 };
403
404 assert_eq!(configs, expected);
405 }
406
407 #[test]
408 fn test_redis_single_serialize() {
409 let config = RedisConfig::Single(SingleRedisConfig::Detailed {
410 server: "redis://127.0.0.1:6379".to_owned(),
411 options: PartialRedisConfigOptions {
412 max_connections: Some(42),
413 ..Default::default()
414 },
415 });
416
417 assert_json_snapshot!(config, @r###"
418 {
419 "server": "redis://127.0.0.1:6379",
420 "max_connections": 42,
421 "idle_timeout": 60,
422 "create_timeout": 3,
423 "recycle_timeout": 2,
424 "response_timeout": 30,
425 "recycle_check_frequency": 100
426 }
427 "###);
428 }
429
430 #[test]
431 fn test_redis_single_serialize_unified() {
432 let configs = RedisConfigs::Unified(RedisConfig::Single(SingleRedisConfig::Detailed {
433 server: "redis://127.0.0.1:6379".to_owned(),
434 options: PartialRedisConfigOptions {
435 max_connections: Some(42),
436 ..Default::default()
437 },
438 }));
439
440 assert_json_snapshot!(configs, @r###"
441 {
442 "server": "redis://127.0.0.1:6379",
443 "max_connections": 42,
444 "idle_timeout": 60,
445 "create_timeout": 3,
446 "recycle_timeout": 2,
447 "response_timeout": 30,
448 "recycle_check_frequency": 100
449 }
450 "###);
451 }
452
453 #[test]
454 fn test_redis_single_opts_default() {
455 let yaml = r#"
456server: "redis://127.0.0.1:6379"
457"#;
458
459 let config: RedisConfig = serde_yaml::from_str(yaml)
460 .expect("Parsed processing redis config: single with options");
461
462 assert_eq!(
463 config,
464 RedisConfig::Single(SingleRedisConfig::Detailed {
465 server: "redis://127.0.0.1:6379".to_owned(),
466 options: Default::default()
467 })
468 );
469 }
470
471 #[test]
474 fn test_redis_single() {
475 let yaml = r#"
476"redis://127.0.0.1:6379"
477"#;
478
479 let config: RedisConfig = serde_yaml::from_str(yaml)
480 .expect("Parsed processing redis config: single with options");
481
482 assert_eq!(
483 config,
484 RedisConfig::Single(SingleRedisConfig::Simple(
485 "redis://127.0.0.1:6379".to_owned()
486 ))
487 );
488 }
489
490 #[test]
491 fn test_redis_cluster_nodes_opts() {
492 let yaml = r#"
493cluster_nodes:
494 - "redis://127.0.0.1:6379"
495 - "redis://127.0.0.2:6379"
496max_connections: 10
497"#;
498
499 let config: RedisConfig = serde_yaml::from_str(yaml)
500 .expect("Parsed processing redis config: single with options");
501
502 assert_eq!(
503 config,
504 RedisConfig::Cluster {
505 cluster_nodes: vec![
506 "redis://127.0.0.1:6379".to_owned(),
507 "redis://127.0.0.2:6379".to_owned()
508 ],
509 options: PartialRedisConfigOptions {
510 max_connections: Some(10),
511 ..Default::default()
512 },
513 }
514 );
515 }
516
517 #[test]
518 fn test_redis_cluster_nodes_opts_unified() {
519 let yaml = r#"
520cluster_nodes:
521 - "redis://127.0.0.1:6379"
522 - "redis://127.0.0.2:6379"
523max_connections: 20
524"#;
525
526 let config: RedisConfigs = serde_yaml::from_str(yaml)
527 .expect("Parsed processing redis config: single with options");
528
529 assert_eq!(
530 config,
531 RedisConfigs::Unified(RedisConfig::Cluster {
532 cluster_nodes: vec![
533 "redis://127.0.0.1:6379".to_owned(),
534 "redis://127.0.0.2:6379".to_owned()
535 ],
536 options: PartialRedisConfigOptions {
537 max_connections: Some(20),
538 ..Default::default()
539 },
540 })
541 );
542 }
543
544 #[test]
545 fn test_redis_cluster_serialize() {
546 let config = RedisConfig::Cluster {
547 cluster_nodes: vec![
548 "redis://127.0.0.1:6379".to_owned(),
549 "redis://127.0.0.2:6379".to_owned(),
550 ],
551 options: PartialRedisConfigOptions {
552 max_connections: Some(42),
553 ..Default::default()
554 },
555 };
556
557 assert_json_snapshot!(config, @r###"
558 {
559 "cluster_nodes": [
560 "redis://127.0.0.1:6379",
561 "redis://127.0.0.2:6379"
562 ],
563 "max_connections": 42,
564 "idle_timeout": 60,
565 "create_timeout": 3,
566 "recycle_timeout": 2,
567 "response_timeout": 30,
568 "recycle_check_frequency": 100
569 }
570 "###);
571 }
572
573 #[test]
574 fn test_redis_cluster_serialize_unified() {
575 let configs = RedisConfigs::Unified(RedisConfig::Cluster {
576 cluster_nodes: vec![
577 "redis://127.0.0.1:6379".to_owned(),
578 "redis://127.0.0.2:6379".to_owned(),
579 ],
580 options: PartialRedisConfigOptions {
581 max_connections: Some(42),
582 ..Default::default()
583 },
584 });
585
586 assert_json_snapshot!(configs, @r###"
587 {
588 "cluster_nodes": [
589 "redis://127.0.0.1:6379",
590 "redis://127.0.0.2:6379"
591 ],
592 "max_connections": 42,
593 "idle_timeout": 60,
594 "create_timeout": 3,
595 "recycle_timeout": 2,
596 "response_timeout": 30,
597 "recycle_check_frequency": 100
598 }
599 "###);
600 }
601
602 #[test]
603 fn test_redis_serialize_individual() {
604 let configs = RedisConfigs::Individual {
605 project_configs: Box::new(RedisConfig::Single(SingleRedisConfig::Detailed {
606 server: "redis://127.0.0.1:6379".to_owned(),
607 options: PartialRedisConfigOptions {
608 max_connections: Some(42),
609 ..Default::default()
610 },
611 })),
612 cardinality: Box::new(RedisConfig::Single(SingleRedisConfig::Detailed {
613 server: "redis://127.0.0.1:6379".to_owned(),
614 options: Default::default(),
615 })),
616 quotas: Box::new(RedisConfig::Cluster {
617 cluster_nodes: vec![
618 "redis://127.0.0.1:6379".to_owned(),
619 "redis://127.0.0.2:6379".to_owned(),
620 ],
621 options: PartialRedisConfigOptions {
622 max_connections: Some(84),
623 ..Default::default()
624 },
625 }),
626 };
627
628 assert_json_snapshot!(configs, @r###"
629 {
630 "project_configs": {
631 "server": "redis://127.0.0.1:6379",
632 "max_connections": 42,
633 "idle_timeout": 60,
634 "create_timeout": 3,
635 "recycle_timeout": 2,
636 "response_timeout": 30,
637 "recycle_check_frequency": 100
638 },
639 "cardinality": {
640 "server": "redis://127.0.0.1:6379",
641 "idle_timeout": 60,
642 "create_timeout": 3,
643 "recycle_timeout": 2,
644 "response_timeout": 30,
645 "recycle_check_frequency": 100
646 },
647 "quotas": {
648 "cluster_nodes": [
649 "redis://127.0.0.1:6379",
650 "redis://127.0.0.2:6379"
651 ],
652 "max_connections": 84,
653 "idle_timeout": 60,
654 "create_timeout": 3,
655 "recycle_timeout": 2,
656 "response_timeout": 30,
657 "recycle_check_frequency": 100
658 }
659 }
660 "###);
661 }
662}