1use deadpool::managed::{BuildError, Manager, Metrics, Object, Pool, PoolError};
2use deadpool_redis::{ConfigError, Runtime};
3use redis::{Cmd, Pipeline, RedisFuture, Value};
4use std::time::Duration;
5use thiserror::Error;
6
7use crate::config::RedisConfigOptions;
8use crate::pool;
9
10pub use redis;
11
12#[derive(Debug, Error)]
17pub enum RedisError {
18 #[error("failed to configure redis")]
20 Configuration,
21
22 #[error("failed to communicate with redis: {0}")]
24 Redis(
25 #[source]
26 #[from]
27 redis::RedisError,
28 ),
29
30 #[error("failed to interact with the redis pool: {0}")]
32 Pool(#[source] PoolError<redis::RedisError>),
33
34 #[error("failed to create redis pool: {0}")]
36 CreatePool(#[from] BuildError),
37
38 #[error("failed to configure redis: {0}")]
40 ConfigError(#[from] ConfigError),
41}
42
43#[derive(Debug, Clone)]
49pub struct RedisClients {
50 pub project_configs: AsyncRedisClient,
52 pub cardinality: AsyncRedisClient,
54 pub quotas: AsyncRedisClient,
56}
57
58#[derive(Debug)]
63pub struct RedisClientStats {
64 pub connections: u32,
66 pub idle_connections: u32,
68 pub max_connections: u32,
70 pub waiting_for_connection: u32,
74}
75
76#[derive(Clone)]
81pub enum AsyncRedisClient {
82 Cluster(pool::CustomClusterPool),
84 Single(pool::CustomSinglePool),
86}
87
88impl AsyncRedisClient {
89 pub fn cluster<'a>(
97 servers: impl IntoIterator<Item = &'a str>,
98 opts: &RedisConfigOptions,
99 ) -> Result<Self, RedisError> {
100 let servers = servers
101 .into_iter()
102 .map(|s| s.to_owned())
103 .collect::<Vec<_>>();
104
105 let manager = pool::CustomClusterManager::new(servers, false, opts.recycle_check_frequency)
108 .map_err(RedisError::Redis)?;
109
110 let pool = Self::build_pool(manager, opts)?;
111
112 Ok(AsyncRedisClient::Cluster(pool))
113 }
114
115 pub fn single(server: &str, opts: &RedisConfigOptions) -> Result<Self, RedisError> {
123 let manager = pool::CustomSingleManager::new(server, opts.recycle_check_frequency)
126 .map_err(RedisError::Redis)?;
127
128 let pool = Self::build_pool(manager, opts)?;
129
130 Ok(AsyncRedisClient::Single(pool))
131 }
132
133 pub async fn get_connection(&self) -> Result<AsyncRedisConnection, RedisError> {
138 let connection = match self {
139 Self::Cluster(pool) => {
140 AsyncRedisConnection::Cluster(pool.get().await.map_err(RedisError::Pool)?)
141 }
142 Self::Single(pool) => {
143 AsyncRedisConnection::Single(pool.get().await.map_err(RedisError::Pool)?)
144 }
145 };
146
147 Ok(connection)
148 }
149
150 pub fn stats(&self) -> RedisClientStats {
155 let status = match self {
156 Self::Cluster(pool) => pool.status(),
157 Self::Single(pool) => pool.status(),
158 };
159
160 RedisClientStats {
161 idle_connections: status.available as u32,
162 connections: status.size as u32,
163 max_connections: status.max_size as u32,
164 waiting_for_connection: status.waiting as u32,
165 }
166 }
167
168 pub fn retain(&self, mut predicate: impl FnMut(Metrics) -> bool) {
172 match self {
173 Self::Cluster(pool) => {
174 pool.retain(|_, metrics| predicate(metrics));
175 }
176 Self::Single(pool) => {
177 pool.retain(|_, metrics| predicate(metrics));
178 }
179 }
180 }
181
182 fn build_pool<M: Manager + 'static, W: From<Object<M>> + 'static>(
184 manager: M,
185 opts: &RedisConfigOptions,
186 ) -> Result<Pool<M, W>, BuildError> {
187 let result = Pool::builder(manager)
188 .max_size(opts.max_connections as usize)
189 .create_timeout(opts.create_timeout.map(Duration::from_secs))
190 .recycle_timeout(opts.recycle_timeout.map(Duration::from_secs))
191 .wait_timeout(opts.wait_timeout.map(Duration::from_secs))
192 .runtime(Runtime::Tokio1)
193 .build();
194
195 let idle_timeout = opts.idle_timeout;
196 let refresh_interval = opts.idle_timeout / 2;
197 if let Ok(pool) = result.clone() {
198 relay_system::spawn!(async move {
199 loop {
200 pool.retain(|_, metrics| {
201 metrics.last_used() < Duration::from_secs(idle_timeout)
202 });
203 tokio::time::sleep(Duration::from_secs(refresh_interval)).await;
204 }
205 });
206 }
207
208 result
209 }
210}
211
212impl std::fmt::Debug for AsyncRedisClient {
213 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
214 match self {
215 AsyncRedisClient::Cluster(_) => write!(f, "AsyncRedisPool::Cluster"),
216 AsyncRedisClient::Single(_) => write!(f, "AsyncRedisPool::Single"),
217 }
218 }
219}
220
221pub enum AsyncRedisConnection {
228 Cluster(pool::CustomClusterConnection),
230 Single(pool::CustomSingleConnection),
232}
233
234impl std::fmt::Debug for AsyncRedisConnection {
235 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
236 let name = match self {
237 Self::Cluster(_) => "Cluster",
238 Self::Single(_) => "Single",
239 };
240 f.debug_tuple(name).finish()
241 }
242}
243
244impl redis::aio::ConnectionLike for AsyncRedisConnection {
245 fn req_packed_command<'a>(&'a mut self, cmd: &'a Cmd) -> RedisFuture<'a, Value> {
246 match self {
247 Self::Cluster(conn) => conn.req_packed_command(cmd),
248 Self::Single(conn) => conn.req_packed_command(cmd),
249 }
250 }
251
252 fn req_packed_commands<'a>(
253 &'a mut self,
254 cmd: &'a Pipeline,
255 offset: usize,
256 count: usize,
257 ) -> RedisFuture<'a, Vec<Value>> {
258 match self {
259 Self::Cluster(conn) => conn.req_packed_commands(cmd, offset, count),
260 Self::Single(conn) => conn.req_packed_commands(cmd, offset, count),
261 }
262 }
263
264 fn get_db(&self) -> i64 {
265 match self {
266 Self::Cluster(conn) => conn.get_db(),
267 Self::Single(conn) => conn.get_db(),
268 }
269 }
270}