1use deadpool::Runtime;
2use deadpool::managed::{BuildError, Manager, Metrics, Object, Pool, PoolError};
3use redis::{Cmd, Pipeline, RedisFuture, Value};
4use std::time::Duration;
5use thiserror::Error;
6
7use crate::config::RedisConfigOptions;
8use crate::pool;
9
10pub use redis;
11
12#[derive(Debug, Error)]
17pub enum RedisError {
18 #[error("failed to configure redis")]
20 Configuration,
21
22 #[error("failed to communicate with redis: {0}")]
24 Redis(
25 #[source]
26 #[from]
27 redis::RedisError,
28 ),
29
30 #[error("failed to interact with the redis pool: {0}")]
32 Pool(#[source] PoolError<redis::RedisError>),
33
34 #[error("failed to create redis pool: {0}")]
36 CreatePool(#[from] BuildError),
37}
38
39#[derive(Debug, Clone)]
45pub struct RedisClients {
46 pub project_configs: AsyncRedisClient,
48 pub cardinality: AsyncRedisClient,
50 pub quotas: AsyncRedisClient,
52}
53
54#[derive(Debug)]
59pub struct RedisClientStats {
60 pub connections: u32,
62 pub idle_connections: u32,
64 pub max_connections: u32,
66 pub waiting_for_connection: u32,
70}
71
72#[derive(Clone)]
77pub enum AsyncRedisClient {
78 Cluster(pool::CustomClusterPool),
80 Single(pool::CustomSinglePool),
82}
83
84impl AsyncRedisClient {
85 pub fn cluster<'a>(
93 name: &'static str,
94 servers: impl IntoIterator<Item = &'a str>,
95 opts: &RedisConfigOptions,
96 ) -> Result<Self, RedisError> {
97 let servers = servers
98 .into_iter()
99 .map(|s| s.to_owned())
100 .collect::<Vec<_>>();
101
102 let manager = pool::CustomClusterManager::new(name, servers, false, opts.clone())
105 .map_err(RedisError::Redis)?;
106
107 let pool = Self::build_pool(manager, opts)?;
108
109 Ok(AsyncRedisClient::Cluster(pool))
110 }
111
112 pub fn single(
120 name: &'static str,
121 server: &str,
122 opts: &RedisConfigOptions,
123 ) -> Result<Self, RedisError> {
124 let manager = pool::CustomSingleManager::new(name, server, opts.clone())
127 .map_err(RedisError::Redis)?;
128
129 let pool = Self::build_pool(manager, opts)?;
130
131 Ok(AsyncRedisClient::Single(pool))
132 }
133
134 pub async fn get_connection(&self) -> Result<AsyncRedisConnection, RedisError> {
139 let connection = match self {
140 Self::Cluster(pool) => {
141 AsyncRedisConnection::Cluster(pool.get().await.map_err(RedisError::Pool)?)
142 }
143 Self::Single(pool) => {
144 AsyncRedisConnection::Single(pool.get().await.map_err(RedisError::Pool)?)
145 }
146 };
147
148 Ok(connection)
149 }
150
151 pub fn stats(&self) -> RedisClientStats {
156 let status = match self {
157 Self::Cluster(pool) => pool.status(),
158 Self::Single(pool) => pool.status(),
159 };
160
161 RedisClientStats {
162 idle_connections: status.available as u32,
163 connections: status.size as u32,
164 max_connections: status.max_size as u32,
165 waiting_for_connection: status.waiting as u32,
166 }
167 }
168
169 pub fn retain(&self, mut predicate: impl FnMut(Metrics) -> bool) {
173 match self {
174 Self::Cluster(pool) => {
175 pool.retain(|_, metrics| predicate(metrics));
176 }
177 Self::Single(pool) => {
178 pool.retain(|_, metrics| predicate(metrics));
179 }
180 }
181 }
182
183 fn build_pool<M: Manager + 'static, W: From<Object<M>> + 'static>(
185 manager: M,
186 opts: &RedisConfigOptions,
187 ) -> Result<Pool<M, W>, BuildError> {
188 let result = Pool::builder(manager)
189 .max_size(opts.max_connections as usize)
190 .create_timeout(opts.create_timeout.map(Duration::from_secs))
191 .recycle_timeout(opts.recycle_timeout.map(Duration::from_secs))
192 .wait_timeout(opts.wait_timeout.map(Duration::from_secs))
193 .runtime(Runtime::Tokio1)
194 .build();
195
196 let idle_timeout = opts.idle_timeout;
197 let refresh_interval = opts.idle_timeout / 2;
198 if let Ok(pool) = result.clone() {
199 relay_system::spawn!(async move {
200 loop {
201 pool.retain(|_, metrics| {
202 metrics.last_used() < Duration::from_secs(idle_timeout)
203 });
204 tokio::time::sleep(Duration::from_secs(refresh_interval)).await;
205 }
206 });
207 }
208
209 result
210 }
211}
212
213impl std::fmt::Debug for AsyncRedisClient {
214 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
215 match self {
216 AsyncRedisClient::Cluster(_) => write!(f, "AsyncRedisPool::Cluster"),
217 AsyncRedisClient::Single(_) => write!(f, "AsyncRedisPool::Single"),
218 }
219 }
220}
221
222pub enum AsyncRedisConnection {
229 Cluster(pool::CustomClusterConnection),
231 Single(pool::CustomSingleConnection),
233}
234
235impl std::fmt::Debug for AsyncRedisConnection {
236 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
237 let name = match self {
238 Self::Cluster(_) => "Cluster",
239 Self::Single(_) => "Single",
240 };
241 f.debug_tuple(name).finish()
242 }
243}
244
245impl redis::aio::ConnectionLike for AsyncRedisConnection {
246 fn req_packed_command<'a>(&'a mut self, cmd: &'a Cmd) -> RedisFuture<'a, Value> {
247 match self {
248 Self::Cluster(conn) => conn.req_packed_command(cmd),
249 Self::Single(conn) => conn.req_packed_command(cmd),
250 }
251 }
252
253 fn req_packed_commands<'a>(
254 &'a mut self,
255 cmd: &'a Pipeline,
256 offset: usize,
257 count: usize,
258 ) -> RedisFuture<'a, Vec<Value>> {
259 match self {
260 Self::Cluster(conn) => conn.req_packed_commands(cmd, offset, count),
261 Self::Single(conn) => conn.req_packed_commands(cmd, offset, count),
262 }
263 }
264
265 fn get_db(&self) -> i64 {
266 match self {
267 Self::Cluster(conn) => conn.get_db(),
268 Self::Single(conn) => conn.get_db(),
269 }
270 }
271}