Skip to main content

relay_redis/
real.rs

1use deadpool::Runtime;
2use deadpool::managed::{BuildError, Manager, Metrics, Object, Pool, PoolError};
3use redis::{Cmd, Pipeline, RedisFuture, Value};
4use std::time::Duration;
5use thiserror::Error;
6
7use crate::config::RedisConfigOptions;
8use crate::pool;
9
10pub use redis;
11
12/// An error type that represents various failure modes when interacting with Redis.
13///
14/// This enum provides a unified error type for Redis-related operations, handling both
15/// configuration issues and runtime errors that may occur during Redis interactions.
16#[derive(Debug, Error)]
17pub enum RedisError {
18    /// An error that occurs during Redis configuration.
19    #[error("failed to configure redis")]
20    Configuration,
21
22    /// An error that occurs during communication with Redis.
23    #[error("failed to communicate with redis: {0}")]
24    Redis(
25        #[source]
26        #[from]
27        redis::RedisError,
28    ),
29
30    /// An error that occurs when interacting with the Redis connection pool.
31    #[error("failed to interact with the redis pool: {0}")]
32    Pool(#[source] PoolError<redis::RedisError>),
33
34    /// An error that occurs when creating a Redis connection pool.
35    #[error("failed to create redis pool: {0}")]
36    CreatePool(#[from] BuildError),
37}
38
39/// A collection of Redis clients used by Relay for different purposes.
40///
41/// This type manages separate Redis connection clients for different functionalities
42/// within the Relay system, such as project configurations and rate limiting.
43#[derive(Debug, Clone)]
44pub struct RedisClients {
45    /// The client used for project configurations
46    pub project_configs: AsyncRedisClient,
47    /// The client used for rate limiting/quotas.
48    pub quotas: AsyncRedisClient,
49}
50
51/// Statistics about the Redis client's connection client state.
52///
53/// Provides information about the current state of Redis connection clients,
54/// including the number of active and idle connections.
55#[derive(Debug)]
56pub struct RedisClientStats {
57    /// The number of connections currently being managed by the pool.
58    pub connections: u32,
59    /// The number of idle connections.
60    pub idle_connections: u32,
61    /// The maximum number of connections in the pool.
62    pub max_connections: u32,
63    /// The number of futures that are currently waiting to get a connection from the pool.
64    ///
65    /// This number increases when there are not enough connections in the pool.
66    pub waiting_for_connection: u32,
67}
68
69/// A connection client that can manage either a single Redis instance or a Redis cluster.
70///
71/// This enum provides a unified interface for Redis operations, supporting both
72/// single-instance and cluster configurations.
73#[derive(Clone)]
74pub enum AsyncRedisClient {
75    /// Contains a connection pool to a Redis cluster.
76    Cluster(pool::CustomClusterPool),
77    /// Contains a connection pool to a single Redis instance.
78    Single(pool::CustomSinglePool),
79}
80
81impl AsyncRedisClient {
82    /// Creates a new connection client for a Redis cluster.
83    ///
84    /// This method initializes a connection client that can communicate with multiple Redis nodes
85    /// in a cluster configuration. The client is configured with the specified servers and options.
86    ///
87    /// The client uses a custom cluster manager that implements a specific connection recycling
88    /// strategy, ensuring optimal performance and reliability in cluster environments.
89    pub fn cluster<'a>(
90        name: &'static str,
91        servers: impl IntoIterator<Item = &'a str>,
92        opts: &RedisConfigOptions,
93    ) -> Result<Self, RedisError> {
94        let servers = servers
95            .into_iter()
96            .map(|s| s.to_owned())
97            .collect::<Vec<_>>();
98
99        // We use our custom cluster manager which performs recycling in a different way from the
100        // default manager.
101        let manager = pool::CustomClusterManager::new(name, servers, false, opts.clone())
102            .map_err(RedisError::Redis)?;
103
104        let pool = Self::build_pool(manager, opts)?;
105
106        Ok(AsyncRedisClient::Cluster(pool))
107    }
108
109    /// Creates a new connection client for a single Redis instance.
110    ///
111    /// This method initializes a connection client that communicates with a single Redis server.
112    /// The client is configured with the specified server URL and options.
113    ///
114    /// The client uses a custom single manager that implements a specific connection recycling
115    /// strategy, ensuring optimal performance and reliability in single-instance environments.
116    pub fn single(
117        name: &'static str,
118        server: &str,
119        opts: &RedisConfigOptions,
120    ) -> Result<Self, RedisError> {
121        // We use our custom single manager which performs recycling in a different way from the
122        // default manager.
123        let manager = pool::CustomSingleManager::new(name, server, opts.clone())
124            .map_err(RedisError::Redis)?;
125
126        let pool = Self::build_pool(manager, opts)?;
127
128        Ok(AsyncRedisClient::Single(pool))
129    }
130
131    /// Acquires a connection from the pool.
132    ///
133    /// Returns a new [`AsyncRedisConnection`] that can be used to execute Redis commands.
134    /// The connection is automatically returned to the pool when dropped.
135    pub async fn get_connection(&self) -> Result<AsyncRedisConnection, RedisError> {
136        match self {
137            Self::Cluster(pool) => pool.get().await.map(AsyncRedisConnection::Cluster),
138            Self::Single(pool) => pool.get().await.map(AsyncRedisConnection::Single),
139        }
140        .map_err(RedisError::Pool)
141    }
142
143    /// Returns statistics about the current state of the connection pool.
144    ///
145    /// Provides information about the number of active and idle connections in the pool,
146    /// which can be useful for monitoring and debugging purposes.
147    pub fn stats(&self) -> RedisClientStats {
148        let status = match self {
149            Self::Cluster(pool) => pool.status(),
150            Self::Single(pool) => pool.status(),
151        };
152
153        RedisClientStats {
154            idle_connections: status.available as u32,
155            connections: status.size as u32,
156            max_connections: status.max_size as u32,
157            waiting_for_connection: status.waiting as u32,
158        }
159    }
160
161    /// Runs the `predicate` on the pool blocking it.
162    ///
163    /// If the `predicate` returns `false` the object will be removed from pool.
164    pub fn retain(&self, mut predicate: impl FnMut(Metrics) -> bool) {
165        match self {
166            Self::Cluster(pool) => {
167                pool.retain(|_, metrics| predicate(metrics));
168            }
169            Self::Single(pool) => {
170                pool.retain(|_, metrics| predicate(metrics));
171            }
172        }
173    }
174
175    /// Builds a [`Pool`] given a type implementing [`Manager`] and [`RedisConfigOptions`].
176    fn build_pool<M: Manager + 'static, W: From<Object<M>> + 'static>(
177        manager: M,
178        opts: &RedisConfigOptions,
179    ) -> Result<Pool<M, W>, BuildError> {
180        let result = Pool::builder(manager)
181            .max_size(opts.max_connections as usize)
182            .create_timeout(opts.create_timeout.map(Duration::from_secs))
183            .recycle_timeout(opts.recycle_timeout.map(Duration::from_secs))
184            .wait_timeout(opts.wait_timeout.map(Duration::from_secs))
185            .runtime(Runtime::Tokio1)
186            .build();
187
188        let idle_timeout = opts.idle_timeout;
189        let refresh_interval = opts.idle_timeout / 2;
190        if let Ok(pool) = result.clone() {
191            relay_system::spawn!(async move {
192                loop {
193                    pool.retain(|_, metrics| {
194                        metrics.last_used() < Duration::from_secs(idle_timeout)
195                    });
196                    tokio::time::sleep(Duration::from_secs(refresh_interval)).await;
197                }
198            });
199        }
200
201        result
202    }
203}
204
205impl std::fmt::Debug for AsyncRedisClient {
206    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
207        match self {
208            AsyncRedisClient::Cluster(_) => write!(f, "AsyncRedisPool::Cluster"),
209            AsyncRedisClient::Single(_) => write!(f, "AsyncRedisPool::Single"),
210        }
211    }
212}
213
214/// A connection to either a single Redis instance or a Redis cluster.
215///
216/// This enum provides a unified interface for Redis operations, abstracting away the
217/// differences between single-instance and cluster connections. It implements the
218/// [`redis::aio::ConnectionLike`] trait, allowing it to be used with Redis commands
219/// regardless of the underlying connection type.
220pub enum AsyncRedisConnection {
221    /// A connection to a Redis cluster.
222    Cluster(pool::CustomClusterConnection),
223    /// A connection to a single Redis instance.
224    Single(pool::CustomSingleConnection),
225}
226
227impl std::fmt::Debug for AsyncRedisConnection {
228    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
229        let name = match self {
230            Self::Cluster(_) => "Cluster",
231            Self::Single(_) => "Single",
232        };
233        f.debug_tuple(name).finish()
234    }
235}
236
237impl redis::aio::ConnectionLike for AsyncRedisConnection {
238    fn req_packed_command<'a>(&'a mut self, cmd: &'a Cmd) -> RedisFuture<'a, Value> {
239        match self {
240            Self::Cluster(conn) => conn.req_packed_command(cmd),
241            Self::Single(conn) => conn.req_packed_command(cmd),
242        }
243    }
244
245    fn req_packed_commands<'a>(
246        &'a mut self,
247        cmd: &'a Pipeline,
248        offset: usize,
249        count: usize,
250    ) -> RedisFuture<'a, Vec<Value>> {
251        match self {
252            Self::Cluster(conn) => conn.req_packed_commands(cmd, offset, count),
253            Self::Single(conn) => conn.req_packed_commands(cmd, offset, count),
254        }
255    }
256
257    fn get_db(&self) -> i64 {
258        match self {
259            Self::Cluster(conn) => conn.get_db(),
260            Self::Single(conn) => conn.get_db(),
261        }
262    }
263}