relay_redis/
real.rs

1use deadpool::managed::{BuildError, Manager, Metrics, Object, Pool, PoolError};
2use deadpool_redis::{ConfigError, Runtime};
3use redis::{Cmd, Pipeline, RedisFuture, Value};
4use std::time::Duration;
5use thiserror::Error;
6
7use crate::config::RedisConfigOptions;
8use crate::pool;
9
10pub use redis;
11
12/// An error type that represents various failure modes when interacting with Redis.
13///
14/// This enum provides a unified error type for Redis-related operations, handling both
15/// configuration issues and runtime errors that may occur during Redis interactions.
16#[derive(Debug, Error)]
17pub enum RedisError {
18    /// An error that occurs during Redis configuration.
19    #[error("failed to configure redis")]
20    Configuration,
21
22    /// An error that occurs during communication with Redis.
23    #[error("failed to communicate with redis: {0}")]
24    Redis(#[source] redis::RedisError),
25
26    /// An error that occurs when interacting with the Redis connection pool.
27    #[error("failed to interact with the redis pool: {0}")]
28    Pool(#[source] PoolError<redis::RedisError>),
29
30    /// An error that occurs when creating a Redis connection pool.
31    #[error("failed to create redis pool: {0}")]
32    CreatePool(#[from] BuildError),
33
34    /// An error that occurs when configuring Redis.
35    #[error("failed to configure redis: {0}")]
36    ConfigError(#[from] ConfigError),
37}
38
39/// A collection of Redis clients used by Relay for different purposes.
40///
41/// This struct manages separate Redis connection clients for different functionalities
42/// within the Relay system, such as project configurations, cardinality limits,
43/// and rate limiting.
44#[derive(Debug, Clone)]
45pub struct RedisClients {
46    /// The client used for project configurations
47    pub project_configs: AsyncRedisClient,
48    /// The client used for cardinality limits.
49    pub cardinality: AsyncRedisClient,
50    /// The client used for rate limiting/quotas.
51    pub quotas: AsyncRedisClient,
52}
53
54/// Statistics about the Redis client's connection client state.
55///
56/// Provides information about the current state of Redis connection clients,
57/// including the number of active and idle connections.
58#[derive(Debug)]
59pub struct RedisClientStats {
60    /// The number of connections currently being managed by the pool.
61    pub connections: u32,
62    /// The number of idle connections.
63    pub idle_connections: u32,
64    /// The maximum number of connections in the pool.
65    pub max_connections: u32,
66    /// The number of futures that are currently waiting to get a connection from the pool.
67    ///
68    /// This number increases when there are not enough connections in the pool.
69    pub waiting_for_connection: u32,
70}
71
72/// A connection client that can manage either a single Redis instance or a Redis cluster.
73///
74/// This enum provides a unified interface for Redis operations, supporting both
75/// single-instance and cluster configurations.
76#[derive(Clone)]
77pub enum AsyncRedisClient {
78    /// Contains a connection pool to a Redis cluster.
79    Cluster(pool::CustomClusterPool),
80    /// Contains a connection pool to a single Redis instance.
81    Single(pool::CustomSinglePool),
82}
83
84impl AsyncRedisClient {
85    /// Creates a new connection client for a Redis cluster.
86    ///
87    /// This method initializes a connection client that can communicate with multiple Redis nodes
88    /// in a cluster configuration. The client is configured with the specified servers and options.
89    ///
90    /// The client uses a custom cluster manager that implements a specific connection recycling
91    /// strategy, ensuring optimal performance and reliability in cluster environments.
92    pub fn cluster<'a>(
93        servers: impl IntoIterator<Item = &'a str>,
94        opts: &RedisConfigOptions,
95    ) -> Result<Self, RedisError> {
96        let servers = servers
97            .into_iter()
98            .map(|s| s.to_owned())
99            .collect::<Vec<_>>();
100
101        // We use our custom cluster manager which performs recycling in a different way from the
102        // default manager.
103        let manager = pool::CustomClusterManager::new(servers, false, opts.recycle_check_frequency)
104            .map_err(RedisError::Redis)?;
105
106        let pool = Self::build_pool(manager, opts)?;
107
108        Ok(AsyncRedisClient::Cluster(pool))
109    }
110
111    /// Creates a new connection client for a single Redis instance.
112    ///
113    /// This method initializes a connection client that communicates with a single Redis server.
114    /// The client is configured with the specified server URL and options.
115    ///
116    /// The client uses a custom single manager that implements a specific connection recycling
117    /// strategy, ensuring optimal performance and reliability in single-instance environments.
118    pub fn single(server: &str, opts: &RedisConfigOptions) -> Result<Self, RedisError> {
119        // We use our custom single manager which performs recycling in a different way from the
120        // default manager.
121        let manager = pool::CustomSingleManager::new(server, opts.recycle_check_frequency)
122            .map_err(RedisError::Redis)?;
123
124        let pool = Self::build_pool(manager, opts)?;
125
126        Ok(AsyncRedisClient::Single(pool))
127    }
128
129    /// Acquires a connection from the pool.
130    ///
131    /// Returns a new [`AsyncRedisConnection`] that can be used to execute Redis commands.
132    /// The connection is automatically returned to the pool when dropped.
133    pub async fn get_connection(&self) -> Result<AsyncRedisConnection, RedisError> {
134        let connection = match self {
135            Self::Cluster(pool) => {
136                AsyncRedisConnection::Cluster(pool.get().await.map_err(RedisError::Pool)?)
137            }
138            Self::Single(pool) => {
139                AsyncRedisConnection::Single(pool.get().await.map_err(RedisError::Pool)?)
140            }
141        };
142
143        Ok(connection)
144    }
145
146    /// Returns statistics about the current state of the connection pool.
147    ///
148    /// Provides information about the number of active and idle connections in the pool,
149    /// which can be useful for monitoring and debugging purposes.
150    pub fn stats(&self) -> RedisClientStats {
151        let status = match self {
152            Self::Cluster(pool) => pool.status(),
153            Self::Single(pool) => pool.status(),
154        };
155
156        RedisClientStats {
157            idle_connections: status.available as u32,
158            connections: status.size as u32,
159            max_connections: status.max_size as u32,
160            waiting_for_connection: status.waiting as u32,
161        }
162    }
163
164    /// Runs the `predicate` on the pool blocking it.
165    ///
166    /// If the `predicate` returns `false` the object will be removed from pool.
167    pub fn retain(&self, mut predicate: impl FnMut(Metrics) -> bool) {
168        match self {
169            Self::Cluster(pool) => {
170                pool.retain(|_, metrics| predicate(metrics));
171            }
172            Self::Single(pool) => {
173                pool.retain(|_, metrics| predicate(metrics));
174            }
175        }
176    }
177
178    /// Builds a [`Pool`] given a type implementing [`Manager`] and [`RedisConfigOptions`].
179    fn build_pool<M: Manager + 'static, W: From<Object<M>> + 'static>(
180        manager: M,
181        opts: &RedisConfigOptions,
182    ) -> Result<Pool<M, W>, BuildError> {
183        let result = Pool::builder(manager)
184            .max_size(opts.max_connections as usize)
185            .create_timeout(opts.create_timeout.map(Duration::from_secs))
186            .recycle_timeout(opts.recycle_timeout.map(Duration::from_secs))
187            .wait_timeout(opts.wait_timeout.map(Duration::from_secs))
188            .runtime(Runtime::Tokio1)
189            .build();
190
191        let idle_timeout = opts.idle_timeout;
192        let refresh_interval = opts.idle_timeout / 2;
193        if let Ok(pool) = result.clone() {
194            relay_system::spawn!(async move {
195                loop {
196                    pool.retain(|_, metrics| {
197                        metrics.last_used() < Duration::from_secs(idle_timeout)
198                    });
199                    tokio::time::sleep(Duration::from_secs(refresh_interval)).await;
200                }
201            });
202        }
203
204        result
205    }
206}
207
208impl std::fmt::Debug for AsyncRedisClient {
209    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
210        match self {
211            AsyncRedisClient::Cluster(_) => write!(f, "AsyncRedisPool::Cluster"),
212            AsyncRedisClient::Single(_) => write!(f, "AsyncRedisPool::Single"),
213        }
214    }
215}
216
217/// A connection to either a single Redis instance or a Redis cluster.
218///
219/// This enum provides a unified interface for Redis operations, abstracting away the
220/// differences between single-instance and cluster connections. It implements the
221/// [`redis::aio::ConnectionLike`] trait, allowing it to be used with Redis commands
222/// regardless of the underlying connection type.
223pub enum AsyncRedisConnection {
224    /// A connection to a Redis cluster.
225    Cluster(pool::CustomClusterConnection),
226    /// A connection to a single Redis instance.
227    Single(pool::CustomSingleConnection),
228}
229
230impl std::fmt::Debug for AsyncRedisConnection {
231    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
232        let name = match self {
233            Self::Cluster(_) => "Cluster",
234            Self::Single(_) => "Single",
235        };
236        f.debug_tuple(name).finish()
237    }
238}
239
240impl redis::aio::ConnectionLike for AsyncRedisConnection {
241    fn req_packed_command<'a>(&'a mut self, cmd: &'a Cmd) -> RedisFuture<'a, Value> {
242        match self {
243            Self::Cluster(conn) => conn.req_packed_command(cmd),
244            Self::Single(conn) => conn.req_packed_command(cmd),
245        }
246    }
247
248    fn req_packed_commands<'a>(
249        &'a mut self,
250        cmd: &'a Pipeline,
251        offset: usize,
252        count: usize,
253    ) -> RedisFuture<'a, Vec<Value>> {
254        match self {
255            Self::Cluster(conn) => conn.req_packed_commands(cmd, offset, count),
256            Self::Single(conn) => conn.req_packed_commands(cmd, offset, count),
257        }
258    }
259
260    fn get_db(&self) -> i64 {
261        match self {
262            Self::Cluster(conn) => conn.get_db(),
263            Self::Single(conn) => conn.get_db(),
264        }
265    }
266}