1use relay_redis::RedisConfigOptions;
2use serde::{Deserialize, Serialize};
3
4pub(crate) const DEFAULT_MIN_MAX_CONNECTIONS: u32 = 24;
7
8#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
10#[serde(default)]
11pub struct PartialRedisConfigOptions {
12 #[serde(skip_serializing_if = "Option::is_none")]
16 pub max_connections: Option<u32>,
17 pub idle_timeout: u64,
21 #[serde(skip_serializing_if = "Option::is_none")]
26 pub create_timeout: Option<u64>,
27 #[serde(skip_serializing_if = "Option::is_none")]
33 pub recycle_timeout: Option<u64>,
34 #[serde(skip_serializing_if = "Option::is_none")]
41 pub wait_timeout: Option<u64>,
42 #[serde(skip_serializing_if = "Option::is_none")]
46 pub response_timeout: Option<u64>,
47}
48
49impl Default for PartialRedisConfigOptions {
50 fn default() -> Self {
51 Self {
52 max_connections: None,
53 idle_timeout: 60,
54 create_timeout: Some(3),
55 recycle_timeout: Some(2),
56 wait_timeout: None,
57 response_timeout: Some(30),
58 }
59 }
60}
61
62#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
64#[serde(untagged)]
65enum RedisConfigFromFile {
66 Cluster {
68 cluster_nodes: Vec<String>,
72
73 #[serde(flatten)]
75 options: PartialRedisConfigOptions,
76 },
77
78 Single(String),
82
83 SingleWithOpts {
87 server: String,
89
90 #[serde(flatten)]
92 options: PartialRedisConfigOptions,
93 },
94}
95
96#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
98#[serde(untagged)]
99pub enum RedisConfig {
100 Cluster {
102 cluster_nodes: Vec<String>,
104 #[serde(flatten)]
106 options: PartialRedisConfigOptions,
107 },
108 Single(SingleRedisConfig),
110}
111
112#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
116#[serde(untagged)]
117pub enum SingleRedisConfig {
118 #[doc(hidden)]
119 Simple(String),
120 #[doc(hidden)]
121 Detailed {
122 #[doc(hidden)]
123 server: String,
124 #[doc(hidden)]
125 #[serde(flatten)]
126 options: PartialRedisConfigOptions,
127 },
128}
129
130impl RedisConfig {
131 pub fn single(server: String) -> Self {
133 RedisConfig::Single(SingleRedisConfig::Detailed {
134 server,
135 options: Default::default(),
136 })
137 }
138}
139
140impl From<RedisConfigFromFile> for RedisConfig {
141 fn from(value: RedisConfigFromFile) -> Self {
142 match value {
143 RedisConfigFromFile::Cluster {
144 cluster_nodes,
145 options,
146 } => Self::Cluster {
147 cluster_nodes,
148 options,
149 },
150 RedisConfigFromFile::Single(server) => Self::Single(SingleRedisConfig::Detailed {
151 server,
152 options: Default::default(),
153 }),
154 RedisConfigFromFile::SingleWithOpts { server, options } => {
155 Self::Single(SingleRedisConfig::Detailed { server, options })
156 }
157 }
158 }
159}
160
161#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)]
163#[serde(untagged)]
164pub enum RedisConfigs {
165 Unified(RedisConfig),
167 Individual {
169 project_configs: Box<RedisConfig>,
171 quotas: Box<RedisConfig>,
173 },
174}
175
176#[derive(Clone, Debug)]
178pub enum RedisConfigRef<'a> {
179 Cluster {
181 cluster_nodes: &'a Vec<String>,
183 options: RedisConfigOptions,
185 },
186 Single {
188 server: &'a String,
190 options: RedisConfigOptions,
192 },
193}
194
195#[allow(clippy::large_enum_variant)]
197#[derive(Clone, Debug)]
198pub enum RedisConfigsRef<'a> {
199 Unified(RedisConfigRef<'a>),
201 Individual {
203 project_configs: RedisConfigRef<'a>,
205 quotas: RedisConfigRef<'a>,
207 },
208}
209
210fn build_redis_config_options(
211 options: &PartialRedisConfigOptions,
212 default_connections: u32,
213) -> RedisConfigOptions {
214 let max_connections = options.max_connections.unwrap_or(default_connections);
215
216 RedisConfigOptions {
217 max_connections,
218 idle_timeout: options.idle_timeout,
219 create_timeout: options.create_timeout,
220 recycle_timeout: options.recycle_timeout,
221 wait_timeout: options.wait_timeout,
222 response_timeout: options.response_timeout,
223 }
224}
225
226pub(super) fn build_redis_config(
230 config: &RedisConfig,
231 default_connections: u32,
232) -> RedisConfigRef<'_> {
233 match config {
234 RedisConfig::Cluster {
235 cluster_nodes,
236 options,
237 } => RedisConfigRef::Cluster {
238 cluster_nodes,
239 options: build_redis_config_options(options, default_connections),
240 },
241 RedisConfig::Single(SingleRedisConfig::Detailed { server, options }) => {
242 RedisConfigRef::Single {
243 server,
244 options: build_redis_config_options(options, default_connections),
245 }
246 }
247 RedisConfig::Single(SingleRedisConfig::Simple(server)) => RedisConfigRef::Single {
248 server,
249 options: Default::default(),
250 },
251 }
252}
253
254pub(super) fn build_redis_configs(
258 configs: &RedisConfigs,
259 cpu_concurrency: u32,
260 pool_concurrency: u32,
261) -> RedisConfigsRef<'_> {
262 let project_connections = std::cmp::max(cpu_concurrency * 2, DEFAULT_MIN_MAX_CONNECTIONS);
267 let total_pool_concurrency = cpu_concurrency * pool_concurrency;
272
273 match configs {
274 RedisConfigs::Unified(cfg) => {
275 let default_connections = total_pool_concurrency + project_connections;
278 let config = build_redis_config(cfg, default_connections);
279 RedisConfigsRef::Unified(config)
280 }
281 RedisConfigs::Individual {
282 project_configs,
283 quotas,
284 } => {
285 let project_configs = build_redis_config(project_configs, project_connections);
286 let quotas = build_redis_config(quotas, total_pool_concurrency);
287 RedisConfigsRef::Individual {
288 project_configs,
289 quotas,
290 }
291 }
292 }
293}
294
295#[cfg(test)]
296mod tests {
297 use insta::assert_json_snapshot;
298
299 use super::*;
300
301 #[test]
302 fn test_redis_single_opts() {
303 let yaml = r#"
304server: "redis://127.0.0.1:6379"
305max_connections: 42
306connection_timeout: 5
307"#;
308
309 let config: RedisConfig = serde_yaml::from_str(yaml)
310 .expect("Parsed processing redis config: single with options");
311
312 assert_eq!(
313 config,
314 RedisConfig::Single(SingleRedisConfig::Detailed {
315 server: "redis://127.0.0.1:6379".to_owned(),
316 options: PartialRedisConfigOptions {
317 max_connections: Some(42),
318 ..Default::default()
319 }
320 })
321 );
322 }
323
324 #[test]
325 fn test_redis_single_opts_unified() {
326 let yaml = r#"
327server: "redis://127.0.0.1:6379"
328max_connections: 42
329connection_timeout: 5
330"#;
331
332 let config: RedisConfigs = serde_yaml::from_str(yaml)
333 .expect("Parsed processing redis config: single with options");
334
335 assert_eq!(
336 config,
337 RedisConfigs::Unified(RedisConfig::Single(SingleRedisConfig::Detailed {
338 server: "redis://127.0.0.1:6379".to_owned(),
339 options: PartialRedisConfigOptions {
340 max_connections: Some(42),
341 ..Default::default()
342 }
343 }))
344 );
345 }
346
347 #[test]
348 fn test_redis_individual() {
349 let yaml = r#"
350project_configs:
351 server: "redis://127.0.0.1:6379"
352 max_connections: 42
353quotas:
354 cluster_nodes:
355 - "redis://127.0.0.1:6379"
356 - "redis://127.0.0.2:6379"
357 max_connections: 17
358"#;
359
360 let configs: RedisConfigs = serde_yaml::from_str(yaml)
361 .expect("Parsed processing redis configs: single with options");
362
363 let expected = RedisConfigs::Individual {
364 project_configs: Box::new(RedisConfig::Single(SingleRedisConfig::Detailed {
365 server: "redis://127.0.0.1:6379".to_owned(),
366 options: PartialRedisConfigOptions {
367 max_connections: Some(42),
368 ..Default::default()
369 },
370 })),
371 quotas: Box::new(RedisConfig::Cluster {
372 cluster_nodes: vec![
373 "redis://127.0.0.1:6379".to_owned(),
374 "redis://127.0.0.2:6379".to_owned(),
375 ],
376 options: PartialRedisConfigOptions {
377 max_connections: Some(17),
378 ..Default::default()
379 },
380 }),
381 };
382
383 assert_eq!(configs, expected);
384 }
385
386 #[test]
387 fn test_redis_single_serialize() {
388 let config = RedisConfig::Single(SingleRedisConfig::Detailed {
389 server: "redis://127.0.0.1:6379".to_owned(),
390 options: PartialRedisConfigOptions {
391 max_connections: Some(42),
392 ..Default::default()
393 },
394 });
395
396 assert_json_snapshot!(config, @r#"
397 {
398 "server": "redis://127.0.0.1:6379",
399 "max_connections": 42,
400 "idle_timeout": 60,
401 "create_timeout": 3,
402 "recycle_timeout": 2,
403 "response_timeout": 30
404 }
405 "#);
406 }
407
408 #[test]
409 fn test_redis_single_serialize_unified() {
410 let configs = RedisConfigs::Unified(RedisConfig::Single(SingleRedisConfig::Detailed {
411 server: "redis://127.0.0.1:6379".to_owned(),
412 options: PartialRedisConfigOptions {
413 max_connections: Some(42),
414 ..Default::default()
415 },
416 }));
417
418 assert_json_snapshot!(configs, @r#"
419 {
420 "server": "redis://127.0.0.1:6379",
421 "max_connections": 42,
422 "idle_timeout": 60,
423 "create_timeout": 3,
424 "recycle_timeout": 2,
425 "response_timeout": 30
426 }
427 "#);
428 }
429
430 #[test]
431 fn test_redis_single_opts_default() {
432 let yaml = r#"
433server: "redis://127.0.0.1:6379"
434"#;
435
436 let config: RedisConfig = serde_yaml::from_str(yaml)
437 .expect("Parsed processing redis config: single with options");
438
439 assert_eq!(
440 config,
441 RedisConfig::Single(SingleRedisConfig::Detailed {
442 server: "redis://127.0.0.1:6379".to_owned(),
443 options: Default::default()
444 })
445 );
446 }
447
448 #[test]
451 fn test_redis_single() {
452 let yaml = r#"
453"redis://127.0.0.1:6379"
454"#;
455
456 let config: RedisConfig = serde_yaml::from_str(yaml)
457 .expect("Parsed processing redis config: single with options");
458
459 assert_eq!(
460 config,
461 RedisConfig::Single(SingleRedisConfig::Simple(
462 "redis://127.0.0.1:6379".to_owned()
463 ))
464 );
465 }
466
467 #[test]
468 fn test_redis_cluster_nodes_opts() {
469 let yaml = r#"
470cluster_nodes:
471 - "redis://127.0.0.1:6379"
472 - "redis://127.0.0.2:6379"
473max_connections: 10
474"#;
475
476 let config: RedisConfig = serde_yaml::from_str(yaml)
477 .expect("Parsed processing redis config: single with options");
478
479 assert_eq!(
480 config,
481 RedisConfig::Cluster {
482 cluster_nodes: vec![
483 "redis://127.0.0.1:6379".to_owned(),
484 "redis://127.0.0.2:6379".to_owned()
485 ],
486 options: PartialRedisConfigOptions {
487 max_connections: Some(10),
488 ..Default::default()
489 },
490 }
491 );
492 }
493
494 #[test]
495 fn test_redis_cluster_nodes_opts_unified() {
496 let yaml = r#"
497cluster_nodes:
498 - "redis://127.0.0.1:6379"
499 - "redis://127.0.0.2:6379"
500max_connections: 20
501"#;
502
503 let config: RedisConfigs = serde_yaml::from_str(yaml)
504 .expect("Parsed processing redis config: single with options");
505
506 assert_eq!(
507 config,
508 RedisConfigs::Unified(RedisConfig::Cluster {
509 cluster_nodes: vec![
510 "redis://127.0.0.1:6379".to_owned(),
511 "redis://127.0.0.2:6379".to_owned()
512 ],
513 options: PartialRedisConfigOptions {
514 max_connections: Some(20),
515 ..Default::default()
516 },
517 })
518 );
519 }
520
521 #[test]
522 fn test_redis_cluster_serialize() {
523 let config = RedisConfig::Cluster {
524 cluster_nodes: vec![
525 "redis://127.0.0.1:6379".to_owned(),
526 "redis://127.0.0.2:6379".to_owned(),
527 ],
528 options: PartialRedisConfigOptions {
529 max_connections: Some(42),
530 ..Default::default()
531 },
532 };
533
534 assert_json_snapshot!(config, @r#"
535 {
536 "cluster_nodes": [
537 "redis://127.0.0.1:6379",
538 "redis://127.0.0.2:6379"
539 ],
540 "max_connections": 42,
541 "idle_timeout": 60,
542 "create_timeout": 3,
543 "recycle_timeout": 2,
544 "response_timeout": 30
545 }
546 "#);
547 }
548
549 #[test]
550 fn test_redis_cluster_serialize_unified() {
551 let configs = RedisConfigs::Unified(RedisConfig::Cluster {
552 cluster_nodes: vec![
553 "redis://127.0.0.1:6379".to_owned(),
554 "redis://127.0.0.2:6379".to_owned(),
555 ],
556 options: PartialRedisConfigOptions {
557 max_connections: Some(42),
558 ..Default::default()
559 },
560 });
561
562 assert_json_snapshot!(configs, @r#"
563 {
564 "cluster_nodes": [
565 "redis://127.0.0.1:6379",
566 "redis://127.0.0.2:6379"
567 ],
568 "max_connections": 42,
569 "idle_timeout": 60,
570 "create_timeout": 3,
571 "recycle_timeout": 2,
572 "response_timeout": 30
573 }
574 "#);
575 }
576
577 #[test]
578 fn test_redis_serialize_individual() {
579 let configs = RedisConfigs::Individual {
580 project_configs: Box::new(RedisConfig::Single(SingleRedisConfig::Detailed {
581 server: "redis://127.0.0.1:6379".to_owned(),
582 options: PartialRedisConfigOptions {
583 max_connections: Some(42),
584 ..Default::default()
585 },
586 })),
587 quotas: Box::new(RedisConfig::Cluster {
588 cluster_nodes: vec![
589 "redis://127.0.0.1:6379".to_owned(),
590 "redis://127.0.0.2:6379".to_owned(),
591 ],
592 options: PartialRedisConfigOptions {
593 max_connections: Some(84),
594 ..Default::default()
595 },
596 }),
597 };
598
599 assert_json_snapshot!(configs, @r#"
600 {
601 "project_configs": {
602 "server": "redis://127.0.0.1:6379",
603 "max_connections": 42,
604 "idle_timeout": 60,
605 "create_timeout": 3,
606 "recycle_timeout": 2,
607 "response_timeout": 30
608 },
609 "quotas": {
610 "cluster_nodes": [
611 "redis://127.0.0.1:6379",
612 "redis://127.0.0.2:6379"
613 ],
614 "max_connections": 84,
615 "idle_timeout": 60,
616 "create_timeout": 3,
617 "recycle_timeout": 2,
618 "response_timeout": 30
619 }
620 }
621 "#);
622 }
623}