use relay_redis::RedisConfigOptions;
use serde::{Deserialize, Serialize};
pub(crate) const DEFAULT_MIN_MAX_CONNECTIONS: u32 = 24;
pub(crate) const DEFAULT_MIN_IDLE_RATIO: u32 = 5;
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
#[serde(default)]
pub struct PartialRedisConfigOptions {
#[serde(skip_serializing_if = "Option::is_none")]
pub max_connections: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub min_idle: Option<u32>,
pub connection_timeout: u64,
pub max_lifetime: u64,
pub idle_timeout: u64,
pub read_timeout: u64,
pub write_timeout: u64,
}
impl Default for PartialRedisConfigOptions {
fn default() -> Self {
Self {
max_connections: None,
min_idle: None,
connection_timeout: 5,
max_lifetime: 300,
idle_timeout: 60,
read_timeout: 3,
write_timeout: 3,
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
#[serde(untagged)]
enum RedisConfigFromFile {
Cluster {
cluster_nodes: Vec<String>,
#[serde(flatten)]
options: PartialRedisConfigOptions,
},
Single(String),
SingleWithOpts {
server: String,
#[serde(flatten)]
options: PartialRedisConfigOptions,
},
}
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub enum RedisConnection {
#[serde(rename = "cluster_nodes")]
Cluster(Vec<String>),
#[serde(rename = "server")]
Single(String),
}
#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)]
#[serde(from = "RedisConfigFromFile")]
pub struct RedisConfig {
#[serde(flatten)]
pub connection: RedisConnection,
#[serde(flatten)]
pub options: PartialRedisConfigOptions,
}
impl RedisConfig {
pub fn single(server: String) -> Self {
Self {
connection: RedisConnection::Single(server),
options: Default::default(),
}
}
}
impl From<RedisConfigFromFile> for RedisConfig {
fn from(value: RedisConfigFromFile) -> Self {
match value {
RedisConfigFromFile::Cluster {
cluster_nodes,
options,
} => Self {
connection: RedisConnection::Cluster(cluster_nodes),
options,
},
RedisConfigFromFile::Single(server) => Self {
connection: RedisConnection::Single(server),
options: Default::default(),
},
RedisConfigFromFile::SingleWithOpts { server, options } => Self {
connection: RedisConnection::Single(server),
options,
},
}
}
}
#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)]
#[serde(untagged)]
pub enum RedisConfigs {
Unified(RedisConfig),
Individual {
project_configs: Box<RedisConfig>,
cardinality: Box<RedisConfig>,
quotas: Box<RedisConfig>,
misc: Box<RedisConfig>,
},
}
#[derive(Clone, Debug)]
pub enum RedisPoolConfigs<'a> {
Unified((&'a RedisConnection, RedisConfigOptions)),
Individual {
project_configs: (&'a RedisConnection, RedisConfigOptions),
cardinality: (&'a RedisConnection, RedisConfigOptions),
quotas: (&'a RedisConnection, RedisConfigOptions),
misc: (&'a RedisConnection, RedisConfigOptions),
},
}
pub(super) fn create_redis_pool(
config: &RedisConfig,
default_connections: u32,
) -> (&RedisConnection, RedisConfigOptions) {
let max_connections = config
.options
.max_connections
.unwrap_or(default_connections);
let min_idle = config
.options
.min_idle
.unwrap_or_else(|| max_connections.div_ceil(crate::redis::DEFAULT_MIN_IDLE_RATIO));
let options = RedisConfigOptions {
max_connections,
min_idle: Some(min_idle),
connection_timeout: config.options.connection_timeout,
max_lifetime: config.options.max_lifetime,
idle_timeout: config.options.idle_timeout,
read_timeout: config.options.read_timeout,
write_timeout: config.options.write_timeout,
};
(&config.connection, options)
}
pub(super) fn create_redis_pools(configs: &RedisConfigs, cpu_concurrency: u32) -> RedisPoolConfigs {
let project_configs_default_connections = std::cmp::max(
cpu_concurrency * 2,
crate::redis::DEFAULT_MIN_MAX_CONNECTIONS,
);
match configs {
RedisConfigs::Unified(cfg) => {
let pool = create_redis_pool(cfg, project_configs_default_connections);
RedisPoolConfigs::Unified(pool)
}
RedisConfigs::Individual {
project_configs,
cardinality,
quotas,
misc,
} => {
let project_configs =
create_redis_pool(project_configs, project_configs_default_connections);
let cardinality = create_redis_pool(cardinality, cpu_concurrency);
let quotas = create_redis_pool(quotas, cpu_concurrency);
let misc = create_redis_pool(misc, cpu_concurrency);
RedisPoolConfigs::Individual {
project_configs,
cardinality,
quotas,
misc,
}
}
}
}
#[cfg(test)]
mod tests {
use insta::assert_json_snapshot;
use super::*;
#[test]
fn test_redis_single_opts() {
let yaml = r#"
server: "redis://127.0.0.1:6379"
max_connections: 42
connection_timeout: 5
"#;
let config: RedisConfig = serde_yaml::from_str(yaml)
.expect("Parsed processing redis config: single with options");
assert_eq!(
config,
RedisConfig {
connection: RedisConnection::Single("redis://127.0.0.1:6379".to_owned()),
options: PartialRedisConfigOptions {
max_connections: Some(42),
connection_timeout: 5,
..Default::default()
}
}
);
}
#[test]
fn test_redis_single_opts_unified() {
let yaml = r#"
server: "redis://127.0.0.1:6379"
max_connections: 42
connection_timeout: 5
"#;
let config: RedisConfigs = serde_yaml::from_str(yaml)
.expect("Parsed processing redis config: single with options");
assert_eq!(
config,
RedisConfigs::Unified(RedisConfig {
connection: RedisConnection::Single("redis://127.0.0.1:6379".to_owned()),
options: PartialRedisConfigOptions {
max_connections: Some(42),
connection_timeout: 5,
..Default::default()
}
})
);
}
#[test]
fn test_redis_individual() {
let yaml = r#"
project_configs:
server: "redis://127.0.0.1:6379"
max_connections: 42
connection_timeout: 5
cardinality:
server: "redis://127.0.0.1:6379"
quotas:
cluster_nodes:
- "redis://127.0.0.1:6379"
- "redis://127.0.0.2:6379"
max_connections: 17
connection_timeout: 5
misc:
cluster_nodes:
- "redis://127.0.0.1:6379"
- "redis://127.0.0.2:6379"
"#;
let configs: RedisConfigs = serde_yaml::from_str(yaml)
.expect("Parsed processing redis configs: single with options");
let expected = RedisConfigs::Individual {
project_configs: Box::new(RedisConfig {
connection: RedisConnection::Single("redis://127.0.0.1:6379".to_owned()),
options: PartialRedisConfigOptions {
max_connections: Some(42),
connection_timeout: 5,
..Default::default()
},
}),
cardinality: Box::new(RedisConfig {
connection: RedisConnection::Single("redis://127.0.0.1:6379".to_owned()),
options: Default::default(),
}),
quotas: Box::new(RedisConfig {
connection: RedisConnection::Cluster(vec![
"redis://127.0.0.1:6379".to_owned(),
"redis://127.0.0.2:6379".to_owned(),
]),
options: PartialRedisConfigOptions {
max_connections: Some(17),
connection_timeout: 5,
..Default::default()
},
}),
misc: Box::new(RedisConfig {
connection: RedisConnection::Cluster(vec![
"redis://127.0.0.1:6379".to_owned(),
"redis://127.0.0.2:6379".to_owned(),
]),
options: Default::default(),
}),
};
assert_eq!(configs, expected);
}
#[test]
fn test_redis_single_serialize() {
let config = RedisConfig {
connection: RedisConnection::Single("redis://127.0.0.1:6379".to_owned()),
options: PartialRedisConfigOptions {
connection_timeout: 5,
..Default::default()
},
};
assert_json_snapshot!(config, @r###"
{
"server": "redis://127.0.0.1:6379",
"connection_timeout": 5,
"max_lifetime": 300,
"idle_timeout": 60,
"read_timeout": 3,
"write_timeout": 3
}
"###);
}
#[test]
fn test_redis_single_serialize_unified() {
let configs = RedisConfigs::Unified(RedisConfig {
connection: RedisConnection::Single("redis://127.0.0.1:6379".to_owned()),
options: PartialRedisConfigOptions {
connection_timeout: 5,
..Default::default()
},
});
assert_json_snapshot!(configs, @r###"
{
"server": "redis://127.0.0.1:6379",
"connection_timeout": 5,
"max_lifetime": 300,
"idle_timeout": 60,
"read_timeout": 3,
"write_timeout": 3
}
"###);
}
#[test]
fn test_redis_single_opts_default() {
let yaml = r#"
server: "redis://127.0.0.1:6379"
"#;
let config: RedisConfig = serde_yaml::from_str(yaml)
.expect("Parsed processing redis config: single with options");
assert_eq!(
config,
RedisConfig {
connection: RedisConnection::Single("redis://127.0.0.1:6379".to_owned()),
options: Default::default()
}
);
}
#[test]
fn test_redis_single() {
let yaml = r#"
"redis://127.0.0.1:6379"
"#;
let config: RedisConfig = serde_yaml::from_str(yaml)
.expect("Parsed processing redis config: single with options");
assert_eq!(
config,
RedisConfig {
connection: RedisConnection::Single("redis://127.0.0.1:6379".to_owned()),
options: Default::default()
}
);
}
#[test]
fn test_redis_cluster_nodes_opts() {
let yaml = r#"
cluster_nodes:
- "redis://127.0.0.1:6379"
- "redis://127.0.0.2:6379"
read_timeout: 10
"#;
let config: RedisConfig = serde_yaml::from_str(yaml)
.expect("Parsed processing redis config: single with options");
assert_eq!(
config,
RedisConfig {
connection: RedisConnection::Cluster(vec![
"redis://127.0.0.1:6379".to_owned(),
"redis://127.0.0.2:6379".to_owned()
]),
options: PartialRedisConfigOptions {
read_timeout: 10,
..Default::default()
},
}
);
}
#[test]
fn test_redis_cluster_nodes_opts_unified() {
let yaml = r#"
cluster_nodes:
- "redis://127.0.0.1:6379"
- "redis://127.0.0.2:6379"
read_timeout: 10
"#;
let config: RedisConfigs = serde_yaml::from_str(yaml)
.expect("Parsed processing redis config: single with options");
assert_eq!(
config,
RedisConfigs::Unified(RedisConfig {
connection: RedisConnection::Cluster(vec![
"redis://127.0.0.1:6379".to_owned(),
"redis://127.0.0.2:6379".to_owned()
]),
options: PartialRedisConfigOptions {
read_timeout: 10,
..Default::default()
},
})
);
}
#[test]
fn test_redis_cluster_serialize() {
let config = RedisConfig {
connection: RedisConnection::Cluster(vec![
"redis://127.0.0.1:6379".to_owned(),
"redis://127.0.0.2:6379".to_owned(),
]),
options: PartialRedisConfigOptions {
read_timeout: 33,
..Default::default()
},
};
assert_json_snapshot!(config, @r###"
{
"cluster_nodes": [
"redis://127.0.0.1:6379",
"redis://127.0.0.2:6379"
],
"connection_timeout": 5,
"max_lifetime": 300,
"idle_timeout": 60,
"read_timeout": 33,
"write_timeout": 3
}
"###);
}
#[test]
fn test_redis_cluster_serialize_unified() {
let configs = RedisConfigs::Unified(RedisConfig {
connection: RedisConnection::Cluster(vec![
"redis://127.0.0.1:6379".to_owned(),
"redis://127.0.0.2:6379".to_owned(),
]),
options: PartialRedisConfigOptions {
read_timeout: 33,
..Default::default()
},
});
assert_json_snapshot!(configs, @r###"
{
"cluster_nodes": [
"redis://127.0.0.1:6379",
"redis://127.0.0.2:6379"
],
"connection_timeout": 5,
"max_lifetime": 300,
"idle_timeout": 60,
"read_timeout": 33,
"write_timeout": 3
}
"###);
}
#[test]
fn test_redis_serialize_individual() {
let configs = RedisConfigs::Individual {
project_configs: Box::new(RedisConfig {
connection: RedisConnection::Single("redis://127.0.0.1:6379".to_owned()),
options: PartialRedisConfigOptions {
max_connections: Some(42),
connection_timeout: 5,
..Default::default()
},
}),
cardinality: Box::new(RedisConfig {
connection: RedisConnection::Single("redis://127.0.0.1:6379".to_owned()),
options: Default::default(),
}),
quotas: Box::new(RedisConfig {
connection: RedisConnection::Cluster(vec![
"redis://127.0.0.1:6379".to_owned(),
"redis://127.0.0.2:6379".to_owned(),
]),
options: PartialRedisConfigOptions {
max_connections: Some(17),
connection_timeout: 5,
..Default::default()
},
}),
misc: Box::new(RedisConfig {
connection: RedisConnection::Cluster(vec![
"redis://127.0.0.1:6379".to_owned(),
"redis://127.0.0.2:6379".to_owned(),
]),
options: Default::default(),
}),
};
assert_json_snapshot!(configs, @r###"
{
"project_configs": {
"server": "redis://127.0.0.1:6379",
"max_connections": 42,
"connection_timeout": 5,
"max_lifetime": 300,
"idle_timeout": 60,
"read_timeout": 3,
"write_timeout": 3
},
"cardinality": {
"server": "redis://127.0.0.1:6379",
"connection_timeout": 5,
"max_lifetime": 300,
"idle_timeout": 60,
"read_timeout": 3,
"write_timeout": 3
},
"quotas": {
"cluster_nodes": [
"redis://127.0.0.1:6379",
"redis://127.0.0.2:6379"
],
"max_connections": 17,
"connection_timeout": 5,
"max_lifetime": 300,
"idle_timeout": 60,
"read_timeout": 3,
"write_timeout": 3
},
"misc": {
"cluster_nodes": [
"redis://127.0.0.1:6379",
"redis://127.0.0.2:6379"
],
"connection_timeout": 5,
"max_lifetime": 300,
"idle_timeout": 60,
"read_timeout": 3,
"write_timeout": 3
}
}
"###);
}
}