1use deadpool::managed::{BuildError, Manager, Metrics, Object, Pool, PoolError};
2use deadpool_redis::{ConfigError, Runtime};
3use redis::{Cmd, Pipeline, RedisFuture, Value};
4use std::time::Duration;
5use thiserror::Error;
6
7use crate::config::RedisConfigOptions;
8use crate::pool;
9
10pub use redis;
11
12#[derive(Debug, Error)]
17pub enum RedisError {
18 #[error("failed to configure redis")]
20 Configuration,
21
22 #[error("failed to communicate with redis: {0}")]
24 Redis(#[source] redis::RedisError),
25
26 #[error("failed to interact with the redis pool: {0}")]
28 Pool(#[source] PoolError<redis::RedisError>),
29
30 #[error("failed to create redis pool: {0}")]
32 CreatePool(#[from] BuildError),
33
34 #[error("failed to configure redis: {0}")]
36 ConfigError(#[from] ConfigError),
37}
38
39#[derive(Debug, Clone)]
45pub struct RedisClients {
46 pub project_configs: AsyncRedisClient,
48 pub cardinality: AsyncRedisClient,
50 pub quotas: AsyncRedisClient,
52}
53
54#[derive(Debug)]
59pub struct RedisClientStats {
60 pub connections: u32,
62 pub idle_connections: u32,
64 pub max_connections: u32,
66 pub waiting_for_connection: u32,
70}
71
72#[derive(Clone)]
77pub enum AsyncRedisClient {
78 Cluster(pool::CustomClusterPool),
80 Single(pool::CustomSinglePool),
82}
83
84impl AsyncRedisClient {
85 pub fn cluster<'a>(
93 servers: impl IntoIterator<Item = &'a str>,
94 opts: &RedisConfigOptions,
95 ) -> Result<Self, RedisError> {
96 let servers = servers
97 .into_iter()
98 .map(|s| s.to_owned())
99 .collect::<Vec<_>>();
100
101 let manager = pool::CustomClusterManager::new(servers, false, opts.recycle_check_frequency)
104 .map_err(RedisError::Redis)?;
105
106 let pool = Self::build_pool(manager, opts)?;
107
108 Ok(AsyncRedisClient::Cluster(pool))
109 }
110
111 pub fn single(server: &str, opts: &RedisConfigOptions) -> Result<Self, RedisError> {
119 let manager = pool::CustomSingleManager::new(server, opts.recycle_check_frequency)
122 .map_err(RedisError::Redis)?;
123
124 let pool = Self::build_pool(manager, opts)?;
125
126 Ok(AsyncRedisClient::Single(pool))
127 }
128
129 pub async fn get_connection(&self) -> Result<AsyncRedisConnection, RedisError> {
134 let connection = match self {
135 Self::Cluster(pool) => {
136 AsyncRedisConnection::Cluster(pool.get().await.map_err(RedisError::Pool)?)
137 }
138 Self::Single(pool) => {
139 AsyncRedisConnection::Single(pool.get().await.map_err(RedisError::Pool)?)
140 }
141 };
142
143 Ok(connection)
144 }
145
146 pub fn stats(&self) -> RedisClientStats {
151 let status = match self {
152 Self::Cluster(pool) => pool.status(),
153 Self::Single(pool) => pool.status(),
154 };
155
156 RedisClientStats {
157 idle_connections: status.available as u32,
158 connections: status.size as u32,
159 max_connections: status.max_size as u32,
160 waiting_for_connection: status.waiting as u32,
161 }
162 }
163
164 pub fn retain(&self, mut predicate: impl FnMut(Metrics) -> bool) {
168 match self {
169 Self::Cluster(pool) => {
170 pool.retain(|_, metrics| predicate(metrics));
171 }
172 Self::Single(pool) => {
173 pool.retain(|_, metrics| predicate(metrics));
174 }
175 }
176 }
177
178 fn build_pool<M: Manager + 'static, W: From<Object<M>> + 'static>(
180 manager: M,
181 opts: &RedisConfigOptions,
182 ) -> Result<Pool<M, W>, BuildError> {
183 let result = Pool::builder(manager)
184 .max_size(opts.max_connections as usize)
185 .create_timeout(opts.create_timeout.map(Duration::from_secs))
186 .recycle_timeout(opts.recycle_timeout.map(Duration::from_secs))
187 .wait_timeout(opts.wait_timeout.map(Duration::from_secs))
188 .runtime(Runtime::Tokio1)
189 .build();
190
191 let idle_timeout = opts.idle_timeout;
192 let refresh_interval = opts.idle_timeout / 2;
193 if let Ok(pool) = result.clone() {
194 relay_system::spawn!(async move {
195 loop {
196 pool.retain(|_, metrics| {
197 metrics.last_used() < Duration::from_secs(idle_timeout)
198 });
199 tokio::time::sleep(Duration::from_secs(refresh_interval)).await;
200 }
201 });
202 }
203
204 result
205 }
206}
207
208impl std::fmt::Debug for AsyncRedisClient {
209 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
210 match self {
211 AsyncRedisClient::Cluster(_) => write!(f, "AsyncRedisPool::Cluster"),
212 AsyncRedisClient::Single(_) => write!(f, "AsyncRedisPool::Single"),
213 }
214 }
215}
216
217pub enum AsyncRedisConnection {
224 Cluster(pool::CustomClusterConnection),
226 Single(pool::CustomSingleConnection),
228}
229
230impl std::fmt::Debug for AsyncRedisConnection {
231 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
232 let name = match self {
233 Self::Cluster(_) => "Cluster",
234 Self::Single(_) => "Single",
235 };
236 f.debug_tuple(name).finish()
237 }
238}
239
240impl redis::aio::ConnectionLike for AsyncRedisConnection {
241 fn req_packed_command<'a>(&'a mut self, cmd: &'a Cmd) -> RedisFuture<'a, Value> {
242 match self {
243 Self::Cluster(conn) => conn.req_packed_command(cmd),
244 Self::Single(conn) => conn.req_packed_command(cmd),
245 }
246 }
247
248 fn req_packed_commands<'a>(
249 &'a mut self,
250 cmd: &'a Pipeline,
251 offset: usize,
252 count: usize,
253 ) -> RedisFuture<'a, Vec<Value>> {
254 match self {
255 Self::Cluster(conn) => conn.req_packed_commands(cmd, offset, count),
256 Self::Single(conn) => conn.req_packed_commands(cmd, offset, count),
257 }
258 }
259
260 fn get_db(&self) -> i64 {
261 match self {
262 Self::Cluster(conn) => conn.get_db(),
263 Self::Single(conn) => conn.get_db(),
264 }
265 }
266}