1use deadpool::Runtime;
2use deadpool::managed::{BuildError, Manager, Metrics, Object, Pool, PoolError};
3use redis::{Cmd, Pipeline, RedisFuture, Value};
4use std::time::Duration;
5use thiserror::Error;
6
7use crate::config::RedisConfigOptions;
8use crate::pool;
9
10pub use redis;
11
12#[derive(Debug, Error)]
17pub enum RedisError {
18 #[error("failed to configure redis")]
20 Configuration,
21
22 #[error("failed to communicate with redis: {0}")]
24 Redis(
25 #[source]
26 #[from]
27 redis::RedisError,
28 ),
29
30 #[error("failed to interact with the redis pool: {0}")]
32 Pool(#[source] PoolError<redis::RedisError>),
33
34 #[error("failed to create redis pool: {0}")]
36 CreatePool(#[from] BuildError),
37}
38
39#[derive(Debug, Clone)]
44pub struct RedisClients {
45 pub project_configs: AsyncRedisClient,
47 pub quotas: AsyncRedisClient,
49}
50
51#[derive(Debug)]
56pub struct RedisClientStats {
57 pub connections: u32,
59 pub idle_connections: u32,
61 pub max_connections: u32,
63 pub waiting_for_connection: u32,
67}
68
69#[derive(Clone)]
74pub enum AsyncRedisClient {
75 Cluster(pool::CustomClusterPool),
77 Single(pool::CustomSinglePool),
79}
80
81impl AsyncRedisClient {
82 pub fn cluster<'a>(
90 name: &'static str,
91 servers: impl IntoIterator<Item = &'a str>,
92 opts: &RedisConfigOptions,
93 ) -> Result<Self, RedisError> {
94 let servers = servers
95 .into_iter()
96 .map(|s| s.to_owned())
97 .collect::<Vec<_>>();
98
99 let manager = pool::CustomClusterManager::new(name, servers, false, opts.clone())
102 .map_err(RedisError::Redis)?;
103
104 let pool = Self::build_pool(manager, opts)?;
105
106 Ok(AsyncRedisClient::Cluster(pool))
107 }
108
109 pub fn single(
117 name: &'static str,
118 server: &str,
119 opts: &RedisConfigOptions,
120 ) -> Result<Self, RedisError> {
121 let manager = pool::CustomSingleManager::new(name, server, opts.clone())
124 .map_err(RedisError::Redis)?;
125
126 let pool = Self::build_pool(manager, opts)?;
127
128 Ok(AsyncRedisClient::Single(pool))
129 }
130
131 pub async fn get_connection(&self) -> Result<AsyncRedisConnection, RedisError> {
136 match self {
137 Self::Cluster(pool) => pool.get().await.map(AsyncRedisConnection::Cluster),
138 Self::Single(pool) => pool.get().await.map(AsyncRedisConnection::Single),
139 }
140 .map_err(RedisError::Pool)
141 }
142
143 pub fn stats(&self) -> RedisClientStats {
148 let status = match self {
149 Self::Cluster(pool) => pool.status(),
150 Self::Single(pool) => pool.status(),
151 };
152
153 RedisClientStats {
154 idle_connections: status.available as u32,
155 connections: status.size as u32,
156 max_connections: status.max_size as u32,
157 waiting_for_connection: status.waiting as u32,
158 }
159 }
160
161 pub fn retain(&self, mut predicate: impl FnMut(Metrics) -> bool) {
165 match self {
166 Self::Cluster(pool) => {
167 pool.retain(|_, metrics| predicate(metrics));
168 }
169 Self::Single(pool) => {
170 pool.retain(|_, metrics| predicate(metrics));
171 }
172 }
173 }
174
175 fn build_pool<M: Manager + 'static, W: From<Object<M>> + 'static>(
177 manager: M,
178 opts: &RedisConfigOptions,
179 ) -> Result<Pool<M, W>, BuildError> {
180 let result = Pool::builder(manager)
181 .max_size(opts.max_connections as usize)
182 .create_timeout(opts.create_timeout.map(Duration::from_secs))
183 .recycle_timeout(opts.recycle_timeout.map(Duration::from_secs))
184 .wait_timeout(opts.wait_timeout.map(Duration::from_secs))
185 .runtime(Runtime::Tokio1)
186 .build();
187
188 let idle_timeout = opts.idle_timeout;
189 let refresh_interval = opts.idle_timeout / 2;
190 if let Ok(pool) = result.clone() {
191 relay_system::spawn!(async move {
192 loop {
193 pool.retain(|_, metrics| {
194 metrics.last_used() < Duration::from_secs(idle_timeout)
195 });
196 tokio::time::sleep(Duration::from_secs(refresh_interval)).await;
197 }
198 });
199 }
200
201 result
202 }
203}
204
205impl std::fmt::Debug for AsyncRedisClient {
206 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
207 match self {
208 AsyncRedisClient::Cluster(_) => write!(f, "AsyncRedisPool::Cluster"),
209 AsyncRedisClient::Single(_) => write!(f, "AsyncRedisPool::Single"),
210 }
211 }
212}
213
214pub enum AsyncRedisConnection {
221 Cluster(pool::CustomClusterConnection),
223 Single(pool::CustomSingleConnection),
225}
226
227impl std::fmt::Debug for AsyncRedisConnection {
228 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
229 let name = match self {
230 Self::Cluster(_) => "Cluster",
231 Self::Single(_) => "Single",
232 };
233 f.debug_tuple(name).finish()
234 }
235}
236
237impl redis::aio::ConnectionLike for AsyncRedisConnection {
238 fn req_packed_command<'a>(&'a mut self, cmd: &'a Cmd) -> RedisFuture<'a, Value> {
239 match self {
240 Self::Cluster(conn) => conn.req_packed_command(cmd),
241 Self::Single(conn) => conn.req_packed_command(cmd),
242 }
243 }
244
245 fn req_packed_commands<'a>(
246 &'a mut self,
247 cmd: &'a Pipeline,
248 offset: usize,
249 count: usize,
250 ) -> RedisFuture<'a, Vec<Value>> {
251 match self {
252 Self::Cluster(conn) => conn.req_packed_commands(cmd, offset, count),
253 Self::Single(conn) => conn.req_packed_commands(cmd, offset, count),
254 }
255 }
256
257 fn get_db(&self) -> i64 {
258 match self {
259 Self::Cluster(conn) => conn.get_db(),
260 Self::Single(conn) => conn.get_db(),
261 }
262 }
263}