use relay_redis::RedisConfigOptions;
use serde::{Deserialize, Serialize};
pub(crate) const DEFAULT_MIN_MAX_CONNECTIONS: u32 = 24;
pub(crate) const DEFAULT_MIN_IDLE_RATIO: u32 = 5;
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
#[serde(default)]
pub struct PartialRedisConfigOptions {
#[serde(skip_serializing_if = "Option::is_none")]
pub max_connections: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub min_idle: Option<u32>,
pub connection_timeout: u64,
pub max_lifetime: u64,
pub idle_timeout: u64,
pub read_timeout: u64,
pub write_timeout: u64,
}
impl Default for PartialRedisConfigOptions {
fn default() -> Self {
Self {
max_connections: None,
min_idle: None,
connection_timeout: 5,
max_lifetime: 300,
idle_timeout: 60,
read_timeout: 3,
write_timeout: 3,
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
#[serde(untagged)]
enum RedisConfigFromFile {
Cluster {
cluster_nodes: Vec<String>,
#[serde(flatten)]
options: PartialRedisConfigOptions,
},
MultiWrite(Vec<RedisConfigFromFile>),
Single(String),
SingleWithOpts {
server: String,
#[serde(flatten)]
options: PartialRedisConfigOptions,
},
}
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
#[serde(untagged)]
pub enum RedisConfig {
Cluster {
cluster_nodes: Vec<String>,
#[serde(flatten)]
options: PartialRedisConfigOptions,
},
MultiWrite {
configs: Vec<RedisConfig>,
},
Single(SingleRedisConfig),
}
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
#[serde(untagged)]
pub enum SingleRedisConfig {
#[doc(hidden)]
Simple(String),
#[doc(hidden)]
Detailed {
#[doc(hidden)]
server: String,
#[doc(hidden)]
#[serde(flatten)]
options: PartialRedisConfigOptions,
},
}
impl RedisConfig {
pub fn single(server: String) -> Self {
RedisConfig::Single(SingleRedisConfig::Detailed {
server,
options: Default::default(),
})
}
}
impl From<RedisConfigFromFile> for RedisConfig {
fn from(value: RedisConfigFromFile) -> Self {
match value {
RedisConfigFromFile::Cluster {
cluster_nodes,
options,
} => Self::Cluster {
cluster_nodes,
options,
},
RedisConfigFromFile::MultiWrite(configs) => Self::MultiWrite {
configs: configs.into_iter().map(|c| c.into()).collect(),
},
RedisConfigFromFile::Single(server) => Self::Single(SingleRedisConfig::Detailed {
server,
options: Default::default(),
}),
RedisConfigFromFile::SingleWithOpts { server, options } => {
Self::Single(SingleRedisConfig::Detailed { server, options })
}
}
}
}
#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)]
#[serde(untagged)]
pub enum RedisConfigs {
Unified(RedisConfig),
Individual {
project_configs: Box<RedisConfig>,
cardinality: Box<RedisConfig>,
quotas: Box<RedisConfig>,
},
}
#[derive(Clone, Debug)]
pub enum RedisConfigRef<'a> {
Cluster {
cluster_nodes: &'a Vec<String>,
options: RedisConfigOptions,
},
MultiWrite {
configs: Vec<RedisConfigRef<'a>>,
},
Single {
server: &'a String,
options: RedisConfigOptions,
},
}
#[allow(clippy::large_enum_variant)]
#[derive(Clone, Debug)]
pub enum RedisPoolConfigs<'a> {
Unified(RedisConfigRef<'a>),
Individual {
project_configs: RedisConfigRef<'a>,
cardinality: RedisConfigRef<'a>,
quotas: RedisConfigRef<'a>,
},
}
fn build_redis_config_options(
options: &PartialRedisConfigOptions,
default_connections: u32,
) -> RedisConfigOptions {
let max_connections = options.max_connections.unwrap_or(default_connections);
let min_idle = options
.min_idle
.unwrap_or_else(|| max_connections.div_ceil(DEFAULT_MIN_IDLE_RATIO));
RedisConfigOptions {
max_connections,
min_idle: Some(min_idle),
connection_timeout: options.connection_timeout,
max_lifetime: options.max_lifetime,
idle_timeout: options.idle_timeout,
read_timeout: options.read_timeout,
write_timeout: options.write_timeout,
}
}
pub(super) fn create_redis_pool(
config: &RedisConfig,
default_connections: u32,
) -> RedisConfigRef<'_> {
match config {
RedisConfig::Cluster {
cluster_nodes,
options,
} => RedisConfigRef::Cluster {
cluster_nodes,
options: build_redis_config_options(options, default_connections),
},
RedisConfig::MultiWrite { configs } => RedisConfigRef::MultiWrite {
configs: configs
.iter()
.map(|c| create_redis_pool(c, default_connections))
.collect(),
},
RedisConfig::Single(SingleRedisConfig::Detailed { server, options }) => {
RedisConfigRef::Single {
server,
options: build_redis_config_options(options, default_connections),
}
}
RedisConfig::Single(SingleRedisConfig::Simple(server)) => RedisConfigRef::Single {
server,
options: Default::default(),
},
}
}
pub(super) fn create_redis_pools(configs: &RedisConfigs, cpu_concurrency: u32) -> RedisPoolConfigs {
let project_configs_default_connections =
std::cmp::max(cpu_concurrency * 2, DEFAULT_MIN_MAX_CONNECTIONS);
match configs {
RedisConfigs::Unified(cfg) => {
let pool = create_redis_pool(cfg, project_configs_default_connections);
RedisPoolConfigs::Unified(pool)
}
RedisConfigs::Individual {
project_configs,
cardinality,
quotas,
} => {
let project_configs =
create_redis_pool(project_configs, project_configs_default_connections);
let cardinality = create_redis_pool(cardinality, cpu_concurrency);
let quotas = create_redis_pool(quotas, cpu_concurrency);
RedisPoolConfigs::Individual {
project_configs,
cardinality,
quotas,
}
}
}
}
#[cfg(test)]
mod tests {
use insta::assert_json_snapshot;
use super::*;
#[test]
fn test_redis_single_opts() {
let yaml = r#"
server: "redis://127.0.0.1:6379"
max_connections: 42
connection_timeout: 5
"#;
let config: RedisConfig = serde_yaml::from_str(yaml)
.expect("Parsed processing redis config: single with options");
assert_eq!(
config,
RedisConfig::Single(SingleRedisConfig::Detailed {
server: "redis://127.0.0.1:6379".to_owned(),
options: PartialRedisConfigOptions {
max_connections: Some(42),
connection_timeout: 5,
..Default::default()
}
})
);
}
#[test]
fn test_redis_single_opts_unified() {
let yaml = r#"
server: "redis://127.0.0.1:6379"
max_connections: 42
connection_timeout: 5
"#;
let config: RedisConfigs = serde_yaml::from_str(yaml)
.expect("Parsed processing redis config: single with options");
assert_eq!(
config,
RedisConfigs::Unified(RedisConfig::Single(SingleRedisConfig::Detailed {
server: "redis://127.0.0.1:6379".to_owned(),
options: PartialRedisConfigOptions {
max_connections: Some(42),
connection_timeout: 5,
..Default::default()
}
}))
);
}
#[test]
fn test_redis_individual() {
let yaml = r#"
project_configs:
server: "redis://127.0.0.1:6379"
max_connections: 42
connection_timeout: 5
cardinality:
server: "redis://127.0.0.1:6379"
quotas:
cluster_nodes:
- "redis://127.0.0.1:6379"
- "redis://127.0.0.2:6379"
max_connections: 17
connection_timeout: 5
"#;
let configs: RedisConfigs = serde_yaml::from_str(yaml)
.expect("Parsed processing redis configs: single with options");
let expected = RedisConfigs::Individual {
project_configs: Box::new(RedisConfig::Single(SingleRedisConfig::Detailed {
server: "redis://127.0.0.1:6379".to_owned(),
options: PartialRedisConfigOptions {
max_connections: Some(42),
connection_timeout: 5,
..Default::default()
},
})),
cardinality: Box::new(RedisConfig::Single(SingleRedisConfig::Detailed {
server: "redis://127.0.0.1:6379".to_owned(),
options: Default::default(),
})),
quotas: Box::new(RedisConfig::Cluster {
cluster_nodes: vec![
"redis://127.0.0.1:6379".to_owned(),
"redis://127.0.0.2:6379".to_owned(),
],
options: PartialRedisConfigOptions {
max_connections: Some(17),
connection_timeout: 5,
..Default::default()
},
}),
};
assert_eq!(configs, expected);
}
#[test]
fn test_redis_single_serialize() {
let config = RedisConfig::Single(SingleRedisConfig::Detailed {
server: "redis://127.0.0.1:6379".to_owned(),
options: PartialRedisConfigOptions {
connection_timeout: 5,
..Default::default()
},
});
assert_json_snapshot!(config, @r###"
{
"server": "redis://127.0.0.1:6379",
"connection_timeout": 5,
"max_lifetime": 300,
"idle_timeout": 60,
"read_timeout": 3,
"write_timeout": 3
}
"###);
}
#[test]
fn test_redis_single_serialize_unified() {
let configs = RedisConfigs::Unified(RedisConfig::Single(SingleRedisConfig::Detailed {
server: "redis://127.0.0.1:6379".to_owned(),
options: PartialRedisConfigOptions {
connection_timeout: 5,
..Default::default()
},
}));
assert_json_snapshot!(configs, @r###"
{
"server": "redis://127.0.0.1:6379",
"connection_timeout": 5,
"max_lifetime": 300,
"idle_timeout": 60,
"read_timeout": 3,
"write_timeout": 3
}
"###);
}
#[test]
fn test_redis_single_opts_default() {
let yaml = r#"
server: "redis://127.0.0.1:6379"
"#;
let config: RedisConfig = serde_yaml::from_str(yaml)
.expect("Parsed processing redis config: single with options");
assert_eq!(
config,
RedisConfig::Single(SingleRedisConfig::Detailed {
server: "redis://127.0.0.1:6379".to_owned(),
options: Default::default()
})
);
}
#[test]
fn test_redis_single() {
let yaml = r#"
"redis://127.0.0.1:6379"
"#;
let config: RedisConfig = serde_yaml::from_str(yaml)
.expect("Parsed processing redis config: single with options");
assert_eq!(
config,
RedisConfig::Single(SingleRedisConfig::Simple(
"redis://127.0.0.1:6379".to_owned()
))
);
}
#[test]
fn test_redis_cluster_nodes_opts() {
let yaml = r#"
cluster_nodes:
- "redis://127.0.0.1:6379"
- "redis://127.0.0.2:6379"
read_timeout: 10
"#;
let config: RedisConfig = serde_yaml::from_str(yaml)
.expect("Parsed processing redis config: single with options");
assert_eq!(
config,
RedisConfig::Cluster {
cluster_nodes: vec![
"redis://127.0.0.1:6379".to_owned(),
"redis://127.0.0.2:6379".to_owned()
],
options: PartialRedisConfigOptions {
read_timeout: 10,
..Default::default()
},
}
);
}
#[test]
fn test_redis_multi_write_opts() {
let yaml = r#"
configs:
- cluster_nodes:
- "redis://127.0.0.1:6379"
- "redis://127.0.0.2:6379"
max_connections: 42
connection_timeout: 5
- server: "redis://127.0.0.1:6379"
max_connections: 84
connection_timeout: 10
- configs:
- server: "redis://127.0.0.1:6379"
max_connections: 42
connection_timeout: 5
"#;
let config: RedisConfig = serde_yaml::from_str(yaml)
.expect("Parsed processing redis config: single with options");
assert_eq!(
config,
RedisConfig::MultiWrite {
configs: vec![
RedisConfig::Cluster {
cluster_nodes: vec![
"redis://127.0.0.1:6379".to_owned(),
"redis://127.0.0.2:6379".to_owned(),
],
options: PartialRedisConfigOptions {
max_connections: Some(42),
connection_timeout: 5,
..Default::default()
},
},
RedisConfig::Single(SingleRedisConfig::Detailed {
server: "redis://127.0.0.1:6379".to_owned(),
options: PartialRedisConfigOptions {
max_connections: Some(84),
connection_timeout: 10,
..Default::default()
},
}),
RedisConfig::MultiWrite {
configs: vec![RedisConfig::Single(SingleRedisConfig::Detailed {
server: "redis://127.0.0.1:6379".to_owned(),
options: PartialRedisConfigOptions {
max_connections: Some(42),
connection_timeout: 5,
..Default::default()
},
})]
}
],
}
);
}
#[test]
fn test_redis_cluster_nodes_opts_unified() {
let yaml = r#"
cluster_nodes:
- "redis://127.0.0.1:6379"
- "redis://127.0.0.2:6379"
read_timeout: 10
"#;
let config: RedisConfigs = serde_yaml::from_str(yaml)
.expect("Parsed processing redis config: single with options");
assert_eq!(
config,
RedisConfigs::Unified(RedisConfig::Cluster {
cluster_nodes: vec![
"redis://127.0.0.1:6379".to_owned(),
"redis://127.0.0.2:6379".to_owned()
],
options: PartialRedisConfigOptions {
read_timeout: 10,
..Default::default()
},
})
);
}
#[test]
fn test_redis_cluster_serialize() {
let config = RedisConfig::Cluster {
cluster_nodes: vec![
"redis://127.0.0.1:6379".to_owned(),
"redis://127.0.0.2:6379".to_owned(),
],
options: PartialRedisConfigOptions {
read_timeout: 33,
..Default::default()
},
};
assert_json_snapshot!(config, @r###"
{
"cluster_nodes": [
"redis://127.0.0.1:6379",
"redis://127.0.0.2:6379"
],
"connection_timeout": 5,
"max_lifetime": 300,
"idle_timeout": 60,
"read_timeout": 33,
"write_timeout": 3
}
"###);
}
#[test]
fn test_redis_cluster_serialize_unified() {
let configs = RedisConfigs::Unified(RedisConfig::Cluster {
cluster_nodes: vec![
"redis://127.0.0.1:6379".to_owned(),
"redis://127.0.0.2:6379".to_owned(),
],
options: PartialRedisConfigOptions {
read_timeout: 33,
..Default::default()
},
});
assert_json_snapshot!(configs, @r###"
{
"cluster_nodes": [
"redis://127.0.0.1:6379",
"redis://127.0.0.2:6379"
],
"connection_timeout": 5,
"max_lifetime": 300,
"idle_timeout": 60,
"read_timeout": 33,
"write_timeout": 3
}
"###);
}
#[test]
fn test_redis_serialize_individual() {
let configs = RedisConfigs::Individual {
project_configs: Box::new(RedisConfig::Single(SingleRedisConfig::Detailed {
server: "redis://127.0.0.1:6379".to_owned(),
options: PartialRedisConfigOptions {
max_connections: Some(42),
connection_timeout: 5,
..Default::default()
},
})),
cardinality: Box::new(RedisConfig::Single(SingleRedisConfig::Detailed {
server: "redis://127.0.0.1:6379".to_owned(),
options: Default::default(),
})),
quotas: Box::new(RedisConfig::MultiWrite {
configs: vec![
RedisConfig::Cluster {
cluster_nodes: vec![
"redis://127.0.0.1:6379".to_owned(),
"redis://127.0.0.2:6379".to_owned(),
],
options: PartialRedisConfigOptions {
max_connections: Some(84),
connection_timeout: 10,
..Default::default()
},
},
RedisConfig::Single(SingleRedisConfig::Detailed {
server: "redis://127.0.0.1:6379".to_owned(),
options: PartialRedisConfigOptions {
max_connections: Some(42),
connection_timeout: 5,
..Default::default()
},
}),
],
}),
};
assert_json_snapshot!(configs, @r###"
{
"project_configs": {
"server": "redis://127.0.0.1:6379",
"max_connections": 42,
"connection_timeout": 5,
"max_lifetime": 300,
"idle_timeout": 60,
"read_timeout": 3,
"write_timeout": 3
},
"cardinality": {
"server": "redis://127.0.0.1:6379",
"connection_timeout": 5,
"max_lifetime": 300,
"idle_timeout": 60,
"read_timeout": 3,
"write_timeout": 3
},
"quotas": {
"configs": [
{
"cluster_nodes": [
"redis://127.0.0.1:6379",
"redis://127.0.0.2:6379"
],
"max_connections": 84,
"connection_timeout": 10,
"max_lifetime": 300,
"idle_timeout": 60,
"read_timeout": 3,
"write_timeout": 3
},
{
"server": "redis://127.0.0.1:6379",
"max_connections": 42,
"connection_timeout": 5,
"max_lifetime": 300,
"idle_timeout": 60,
"read_timeout": 3,
"write_timeout": 3
}
]
}
}
"###);
}
}