relay_redis/
real.rs

1use deadpool::Runtime;
2use deadpool::managed::{BuildError, Manager, Metrics, Object, Pool, PoolError};
3use redis::{Cmd, Pipeline, RedisFuture, Value};
4use std::time::Duration;
5use thiserror::Error;
6
7use crate::config::RedisConfigOptions;
8use crate::pool;
9
10pub use redis;
11
12/// An error type that represents various failure modes when interacting with Redis.
13///
14/// This enum provides a unified error type for Redis-related operations, handling both
15/// configuration issues and runtime errors that may occur during Redis interactions.
16#[derive(Debug, Error)]
17pub enum RedisError {
18    /// An error that occurs during Redis configuration.
19    #[error("failed to configure redis")]
20    Configuration,
21
22    /// An error that occurs during communication with Redis.
23    #[error("failed to communicate with redis: {0}")]
24    Redis(
25        #[source]
26        #[from]
27        redis::RedisError,
28    ),
29
30    /// An error that occurs when interacting with the Redis connection pool.
31    #[error("failed to interact with the redis pool: {0}")]
32    Pool(#[source] PoolError<redis::RedisError>),
33
34    /// An error that occurs when creating a Redis connection pool.
35    #[error("failed to create redis pool: {0}")]
36    CreatePool(#[from] BuildError),
37}
38
39/// A collection of Redis clients used by Relay for different purposes.
40///
41/// This struct manages separate Redis connection clients for different functionalities
42/// within the Relay system, such as project configurations, cardinality limits,
43/// and rate limiting.
44#[derive(Debug, Clone)]
45pub struct RedisClients {
46    /// The client used for project configurations
47    pub project_configs: AsyncRedisClient,
48    /// The client used for cardinality limits.
49    pub cardinality: AsyncRedisClient,
50    /// The client used for rate limiting/quotas.
51    pub quotas: AsyncRedisClient,
52}
53
54/// Statistics about the Redis client's connection client state.
55///
56/// Provides information about the current state of Redis connection clients,
57/// including the number of active and idle connections.
58#[derive(Debug)]
59pub struct RedisClientStats {
60    /// The number of connections currently being managed by the pool.
61    pub connections: u32,
62    /// The number of idle connections.
63    pub idle_connections: u32,
64    /// The maximum number of connections in the pool.
65    pub max_connections: u32,
66    /// The number of futures that are currently waiting to get a connection from the pool.
67    ///
68    /// This number increases when there are not enough connections in the pool.
69    pub waiting_for_connection: u32,
70}
71
72/// A connection client that can manage either a single Redis instance or a Redis cluster.
73///
74/// This enum provides a unified interface for Redis operations, supporting both
75/// single-instance and cluster configurations.
76#[derive(Clone)]
77pub enum AsyncRedisClient {
78    /// Contains a connection pool to a Redis cluster.
79    Cluster(pool::CustomClusterPool),
80    /// Contains a connection pool to a single Redis instance.
81    Single(pool::CustomSinglePool),
82}
83
84impl AsyncRedisClient {
85    /// Creates a new connection client for a Redis cluster.
86    ///
87    /// This method initializes a connection client that can communicate with multiple Redis nodes
88    /// in a cluster configuration. The client is configured with the specified servers and options.
89    ///
90    /// The client uses a custom cluster manager that implements a specific connection recycling
91    /// strategy, ensuring optimal performance and reliability in cluster environments.
92    pub fn cluster<'a>(
93        name: &'static str,
94        servers: impl IntoIterator<Item = &'a str>,
95        opts: &RedisConfigOptions,
96    ) -> Result<Self, RedisError> {
97        let servers = servers
98            .into_iter()
99            .map(|s| s.to_owned())
100            .collect::<Vec<_>>();
101
102        // We use our custom cluster manager which performs recycling in a different way from the
103        // default manager.
104        let manager = pool::CustomClusterManager::new(name, servers, false, opts.clone())
105            .map_err(RedisError::Redis)?;
106
107        let pool = Self::build_pool(manager, opts)?;
108
109        Ok(AsyncRedisClient::Cluster(pool))
110    }
111
112    /// Creates a new connection client for a single Redis instance.
113    ///
114    /// This method initializes a connection client that communicates with a single Redis server.
115    /// The client is configured with the specified server URL and options.
116    ///
117    /// The client uses a custom single manager that implements a specific connection recycling
118    /// strategy, ensuring optimal performance and reliability in single-instance environments.
119    pub fn single(
120        name: &'static str,
121        server: &str,
122        opts: &RedisConfigOptions,
123    ) -> Result<Self, RedisError> {
124        // We use our custom single manager which performs recycling in a different way from the
125        // default manager.
126        let manager = pool::CustomSingleManager::new(name, server, opts.clone())
127            .map_err(RedisError::Redis)?;
128
129        let pool = Self::build_pool(manager, opts)?;
130
131        Ok(AsyncRedisClient::Single(pool))
132    }
133
134    /// Acquires a connection from the pool.
135    ///
136    /// Returns a new [`AsyncRedisConnection`] that can be used to execute Redis commands.
137    /// The connection is automatically returned to the pool when dropped.
138    pub async fn get_connection(&self) -> Result<AsyncRedisConnection, RedisError> {
139        let connection = match self {
140            Self::Cluster(pool) => {
141                AsyncRedisConnection::Cluster(pool.get().await.map_err(RedisError::Pool)?)
142            }
143            Self::Single(pool) => {
144                AsyncRedisConnection::Single(pool.get().await.map_err(RedisError::Pool)?)
145            }
146        };
147
148        Ok(connection)
149    }
150
151    /// Returns statistics about the current state of the connection pool.
152    ///
153    /// Provides information about the number of active and idle connections in the pool,
154    /// which can be useful for monitoring and debugging purposes.
155    pub fn stats(&self) -> RedisClientStats {
156        let status = match self {
157            Self::Cluster(pool) => pool.status(),
158            Self::Single(pool) => pool.status(),
159        };
160
161        RedisClientStats {
162            idle_connections: status.available as u32,
163            connections: status.size as u32,
164            max_connections: status.max_size as u32,
165            waiting_for_connection: status.waiting as u32,
166        }
167    }
168
169    /// Runs the `predicate` on the pool blocking it.
170    ///
171    /// If the `predicate` returns `false` the object will be removed from pool.
172    pub fn retain(&self, mut predicate: impl FnMut(Metrics) -> bool) {
173        match self {
174            Self::Cluster(pool) => {
175                pool.retain(|_, metrics| predicate(metrics));
176            }
177            Self::Single(pool) => {
178                pool.retain(|_, metrics| predicate(metrics));
179            }
180        }
181    }
182
183    /// Builds a [`Pool`] given a type implementing [`Manager`] and [`RedisConfigOptions`].
184    fn build_pool<M: Manager + 'static, W: From<Object<M>> + 'static>(
185        manager: M,
186        opts: &RedisConfigOptions,
187    ) -> Result<Pool<M, W>, BuildError> {
188        let result = Pool::builder(manager)
189            .max_size(opts.max_connections as usize)
190            .create_timeout(opts.create_timeout.map(Duration::from_secs))
191            .recycle_timeout(opts.recycle_timeout.map(Duration::from_secs))
192            .wait_timeout(opts.wait_timeout.map(Duration::from_secs))
193            .runtime(Runtime::Tokio1)
194            .build();
195
196        let idle_timeout = opts.idle_timeout;
197        let refresh_interval = opts.idle_timeout / 2;
198        if let Ok(pool) = result.clone() {
199            relay_system::spawn!(async move {
200                loop {
201                    pool.retain(|_, metrics| {
202                        metrics.last_used() < Duration::from_secs(idle_timeout)
203                    });
204                    tokio::time::sleep(Duration::from_secs(refresh_interval)).await;
205                }
206            });
207        }
208
209        result
210    }
211}
212
213impl std::fmt::Debug for AsyncRedisClient {
214    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
215        match self {
216            AsyncRedisClient::Cluster(_) => write!(f, "AsyncRedisPool::Cluster"),
217            AsyncRedisClient::Single(_) => write!(f, "AsyncRedisPool::Single"),
218        }
219    }
220}
221
222/// A connection to either a single Redis instance or a Redis cluster.
223///
224/// This enum provides a unified interface for Redis operations, abstracting away the
225/// differences between single-instance and cluster connections. It implements the
226/// [`redis::aio::ConnectionLike`] trait, allowing it to be used with Redis commands
227/// regardless of the underlying connection type.
228pub enum AsyncRedisConnection {
229    /// A connection to a Redis cluster.
230    Cluster(pool::CustomClusterConnection),
231    /// A connection to a single Redis instance.
232    Single(pool::CustomSingleConnection),
233}
234
235impl std::fmt::Debug for AsyncRedisConnection {
236    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
237        let name = match self {
238            Self::Cluster(_) => "Cluster",
239            Self::Single(_) => "Single",
240        };
241        f.debug_tuple(name).finish()
242    }
243}
244
245impl redis::aio::ConnectionLike for AsyncRedisConnection {
246    fn req_packed_command<'a>(&'a mut self, cmd: &'a Cmd) -> RedisFuture<'a, Value> {
247        match self {
248            Self::Cluster(conn) => conn.req_packed_command(cmd),
249            Self::Single(conn) => conn.req_packed_command(cmd),
250        }
251    }
252
253    fn req_packed_commands<'a>(
254        &'a mut self,
255        cmd: &'a Pipeline,
256        offset: usize,
257        count: usize,
258    ) -> RedisFuture<'a, Vec<Value>> {
259        match self {
260            Self::Cluster(conn) => conn.req_packed_commands(cmd, offset, count),
261            Self::Single(conn) => conn.req_packed_commands(cmd, offset, count),
262        }
263    }
264
265    fn get_db(&self) -> i64 {
266        match self {
267            Self::Cluster(conn) => conn.get_db(),
268            Self::Single(conn) => conn.get_db(),
269        }
270    }
271}