relay_redis/
real.rs

1use deadpool::managed::{BuildError, Manager, Metrics, Object, Pool, PoolError};
2use deadpool_redis::{ConfigError, Runtime};
3use redis::{Cmd, Pipeline, RedisFuture, Value};
4use std::time::Duration;
5use thiserror::Error;
6
7use crate::config::RedisConfigOptions;
8use crate::pool;
9
10pub use redis;
11
12/// An error type that represents various failure modes when interacting with Redis.
13///
14/// This enum provides a unified error type for Redis-related operations, handling both
15/// configuration issues and runtime errors that may occur during Redis interactions.
16#[derive(Debug, Error)]
17pub enum RedisError {
18    /// An error that occurs during Redis configuration.
19    #[error("failed to configure redis")]
20    Configuration,
21
22    /// An error that occurs during communication with Redis.
23    #[error("failed to communicate with redis: {0}")]
24    Redis(
25        #[source]
26        #[from]
27        redis::RedisError,
28    ),
29
30    /// An error that occurs when interacting with the Redis connection pool.
31    #[error("failed to interact with the redis pool: {0}")]
32    Pool(#[source] PoolError<redis::RedisError>),
33
34    /// An error that occurs when creating a Redis connection pool.
35    #[error("failed to create redis pool: {0}")]
36    CreatePool(#[from] BuildError),
37
38    /// An error that occurs when configuring Redis.
39    #[error("failed to configure redis: {0}")]
40    ConfigError(#[from] ConfigError),
41}
42
43/// A collection of Redis clients used by Relay for different purposes.
44///
45/// This struct manages separate Redis connection clients for different functionalities
46/// within the Relay system, such as project configurations, cardinality limits,
47/// and rate limiting.
48#[derive(Debug, Clone)]
49pub struct RedisClients {
50    /// The client used for project configurations
51    pub project_configs: AsyncRedisClient,
52    /// The client used for cardinality limits.
53    pub cardinality: AsyncRedisClient,
54    /// The client used for rate limiting/quotas.
55    pub quotas: AsyncRedisClient,
56}
57
58/// Statistics about the Redis client's connection client state.
59///
60/// Provides information about the current state of Redis connection clients,
61/// including the number of active and idle connections.
62#[derive(Debug)]
63pub struct RedisClientStats {
64    /// The number of connections currently being managed by the pool.
65    pub connections: u32,
66    /// The number of idle connections.
67    pub idle_connections: u32,
68    /// The maximum number of connections in the pool.
69    pub max_connections: u32,
70    /// The number of futures that are currently waiting to get a connection from the pool.
71    ///
72    /// This number increases when there are not enough connections in the pool.
73    pub waiting_for_connection: u32,
74}
75
76/// A connection client that can manage either a single Redis instance or a Redis cluster.
77///
78/// This enum provides a unified interface for Redis operations, supporting both
79/// single-instance and cluster configurations.
80#[derive(Clone)]
81pub enum AsyncRedisClient {
82    /// Contains a connection pool to a Redis cluster.
83    Cluster(pool::CustomClusterPool),
84    /// Contains a connection pool to a single Redis instance.
85    Single(pool::CustomSinglePool),
86}
87
88impl AsyncRedisClient {
89    /// Creates a new connection client for a Redis cluster.
90    ///
91    /// This method initializes a connection client that can communicate with multiple Redis nodes
92    /// in a cluster configuration. The client is configured with the specified servers and options.
93    ///
94    /// The client uses a custom cluster manager that implements a specific connection recycling
95    /// strategy, ensuring optimal performance and reliability in cluster environments.
96    pub fn cluster<'a>(
97        servers: impl IntoIterator<Item = &'a str>,
98        opts: &RedisConfigOptions,
99    ) -> Result<Self, RedisError> {
100        let servers = servers
101            .into_iter()
102            .map(|s| s.to_owned())
103            .collect::<Vec<_>>();
104
105        // We use our custom cluster manager which performs recycling in a different way from the
106        // default manager.
107        let manager = pool::CustomClusterManager::new(servers, false, opts.recycle_check_frequency)
108            .map_err(RedisError::Redis)?;
109
110        let pool = Self::build_pool(manager, opts)?;
111
112        Ok(AsyncRedisClient::Cluster(pool))
113    }
114
115    /// Creates a new connection client for a single Redis instance.
116    ///
117    /// This method initializes a connection client that communicates with a single Redis server.
118    /// The client is configured with the specified server URL and options.
119    ///
120    /// The client uses a custom single manager that implements a specific connection recycling
121    /// strategy, ensuring optimal performance and reliability in single-instance environments.
122    pub fn single(server: &str, opts: &RedisConfigOptions) -> Result<Self, RedisError> {
123        // We use our custom single manager which performs recycling in a different way from the
124        // default manager.
125        let manager = pool::CustomSingleManager::new(server, opts.recycle_check_frequency)
126            .map_err(RedisError::Redis)?;
127
128        let pool = Self::build_pool(manager, opts)?;
129
130        Ok(AsyncRedisClient::Single(pool))
131    }
132
133    /// Acquires a connection from the pool.
134    ///
135    /// Returns a new [`AsyncRedisConnection`] that can be used to execute Redis commands.
136    /// The connection is automatically returned to the pool when dropped.
137    pub async fn get_connection(&self) -> Result<AsyncRedisConnection, RedisError> {
138        let connection = match self {
139            Self::Cluster(pool) => {
140                AsyncRedisConnection::Cluster(pool.get().await.map_err(RedisError::Pool)?)
141            }
142            Self::Single(pool) => {
143                AsyncRedisConnection::Single(pool.get().await.map_err(RedisError::Pool)?)
144            }
145        };
146
147        Ok(connection)
148    }
149
150    /// Returns statistics about the current state of the connection pool.
151    ///
152    /// Provides information about the number of active and idle connections in the pool,
153    /// which can be useful for monitoring and debugging purposes.
154    pub fn stats(&self) -> RedisClientStats {
155        let status = match self {
156            Self::Cluster(pool) => pool.status(),
157            Self::Single(pool) => pool.status(),
158        };
159
160        RedisClientStats {
161            idle_connections: status.available as u32,
162            connections: status.size as u32,
163            max_connections: status.max_size as u32,
164            waiting_for_connection: status.waiting as u32,
165        }
166    }
167
168    /// Runs the `predicate` on the pool blocking it.
169    ///
170    /// If the `predicate` returns `false` the object will be removed from pool.
171    pub fn retain(&self, mut predicate: impl FnMut(Metrics) -> bool) {
172        match self {
173            Self::Cluster(pool) => {
174                pool.retain(|_, metrics| predicate(metrics));
175            }
176            Self::Single(pool) => {
177                pool.retain(|_, metrics| predicate(metrics));
178            }
179        }
180    }
181
182    /// Builds a [`Pool`] given a type implementing [`Manager`] and [`RedisConfigOptions`].
183    fn build_pool<M: Manager + 'static, W: From<Object<M>> + 'static>(
184        manager: M,
185        opts: &RedisConfigOptions,
186    ) -> Result<Pool<M, W>, BuildError> {
187        let result = Pool::builder(manager)
188            .max_size(opts.max_connections as usize)
189            .create_timeout(opts.create_timeout.map(Duration::from_secs))
190            .recycle_timeout(opts.recycle_timeout.map(Duration::from_secs))
191            .wait_timeout(opts.wait_timeout.map(Duration::from_secs))
192            .runtime(Runtime::Tokio1)
193            .build();
194
195        let idle_timeout = opts.idle_timeout;
196        let refresh_interval = opts.idle_timeout / 2;
197        if let Ok(pool) = result.clone() {
198            relay_system::spawn!(async move {
199                loop {
200                    pool.retain(|_, metrics| {
201                        metrics.last_used() < Duration::from_secs(idle_timeout)
202                    });
203                    tokio::time::sleep(Duration::from_secs(refresh_interval)).await;
204                }
205            });
206        }
207
208        result
209    }
210}
211
212impl std::fmt::Debug for AsyncRedisClient {
213    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
214        match self {
215            AsyncRedisClient::Cluster(_) => write!(f, "AsyncRedisPool::Cluster"),
216            AsyncRedisClient::Single(_) => write!(f, "AsyncRedisPool::Single"),
217        }
218    }
219}
220
221/// A connection to either a single Redis instance or a Redis cluster.
222///
223/// This enum provides a unified interface for Redis operations, abstracting away the
224/// differences between single-instance and cluster connections. It implements the
225/// [`redis::aio::ConnectionLike`] trait, allowing it to be used with Redis commands
226/// regardless of the underlying connection type.
227pub enum AsyncRedisConnection {
228    /// A connection to a Redis cluster.
229    Cluster(pool::CustomClusterConnection),
230    /// A connection to a single Redis instance.
231    Single(pool::CustomSingleConnection),
232}
233
234impl std::fmt::Debug for AsyncRedisConnection {
235    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
236        let name = match self {
237            Self::Cluster(_) => "Cluster",
238            Self::Single(_) => "Single",
239        };
240        f.debug_tuple(name).finish()
241    }
242}
243
244impl redis::aio::ConnectionLike for AsyncRedisConnection {
245    fn req_packed_command<'a>(&'a mut self, cmd: &'a Cmd) -> RedisFuture<'a, Value> {
246        match self {
247            Self::Cluster(conn) => conn.req_packed_command(cmd),
248            Self::Single(conn) => conn.req_packed_command(cmd),
249        }
250    }
251
252    fn req_packed_commands<'a>(
253        &'a mut self,
254        cmd: &'a Pipeline,
255        offset: usize,
256        count: usize,
257    ) -> RedisFuture<'a, Vec<Value>> {
258        match self {
259            Self::Cluster(conn) => conn.req_packed_commands(cmd, offset, count),
260            Self::Single(conn) => conn.req_packed_commands(cmd, offset, count),
261        }
262    }
263
264    fn get_db(&self) -> i64 {
265        match self {
266            Self::Cluster(conn) => conn.get_db(),
267            Self::Single(conn) => conn.get_db(),
268        }
269    }
270}