relay_server/services/projects/source/
redis.rs

1use relay_base_schema::project::ProjectKey;
2use relay_config::Config;
3use relay_redis::{AsyncRedisClient, RedisError};
4use relay_statsd::metric;
5use std::fmt::Debug;
6use std::sync::Arc;
7
8use crate::services::projects::project::{ParsedProjectState, ProjectState, Revision};
9use crate::services::projects::source::SourceProjectState;
10use crate::statsd::{RelayCounters, RelayHistograms, RelayTimers};
11use relay_redis::redis::cmd;
12
13#[derive(Clone, Debug)]
14pub struct RedisProjectSource {
15    config: Arc<Config>,
16    redis: AsyncRedisClient,
17}
18
19#[derive(Debug, thiserror::Error)]
20pub enum RedisProjectError {
21    #[error("failed to parse projectconfig from redis")]
22    Parsing(#[from] serde_json::Error),
23
24    #[error("failed to talk to redis")]
25    Redis(#[from] RedisError),
26}
27
28fn parse_redis_response(raw_response: &[u8]) -> Result<ParsedProjectState, RedisProjectError> {
29    let decompression_result = metric!(timer(RelayTimers::ProjectStateDecompression), {
30        zstd::decode_all(raw_response)
31    });
32
33    let decoded_response = match &decompression_result {
34        Ok(decoded) => {
35            metric!(
36                histogram(RelayHistograms::ProjectStateSizeBytesCompressed) =
37                    raw_response.len() as f64
38            );
39            metric!(
40                histogram(RelayHistograms::ProjectStateSizeBytesDecompressed) =
41                    decoded.len() as f64
42            );
43            decoded.as_slice()
44        }
45        // If decoding fails, assume uncompressed payload and try again
46        Err(_) => raw_response,
47    };
48
49    Ok(serde_json::from_slice(decoded_response)?)
50}
51
52impl RedisProjectSource {
53    pub fn new(config: Arc<Config>, redis: AsyncRedisClient) -> Self {
54        RedisProjectSource { config, redis }
55    }
56
57    /// Fetches a project config from Redis.
58    ///
59    /// The returned project state is [`ProjectState::Pending`] if the requested project config is not
60    /// stored in Redis.
61    pub async fn get_config_if_changed(
62        &self,
63        key: ProjectKey,
64        revision: Revision,
65    ) -> Result<SourceProjectState, RedisProjectError> {
66        let mut connection = self.redis.get_connection();
67        // Only check for the revision if we were passed a revision.
68        if let Some(revision) = revision.as_str() {
69            let current_revision: Option<String> = cmd("GET")
70                .arg(self.get_redis_rev_key(key))
71                .query_async(&mut connection)
72                .await
73                .map_err(RedisError::Redis)?;
74
75            relay_log::trace!(
76                "Redis revision {current_revision:?}, requested revision {revision:?}"
77            );
78            if current_revision.as_deref() == Some(revision) {
79                metric!(
80                    counter(RelayCounters::ProjectStateRedis) += 1,
81                    hit = "revision",
82                );
83                return Ok(SourceProjectState::NotModified);
84            }
85        }
86
87        let raw_response_opt: Option<Vec<u8>> = cmd("GET")
88            .arg(self.get_redis_project_config_key(key))
89            .query_async(&mut connection)
90            .await
91            .map_err(RedisError::Redis)?;
92
93        let Some(response) = raw_response_opt else {
94            metric!(
95                counter(RelayCounters::ProjectStateRedis) += 1,
96                hit = "false"
97            );
98            return Ok(SourceProjectState::New(ProjectState::Pending));
99        };
100
101        let response = ProjectState::from(parse_redis_response(response.as_slice())?);
102
103        // If we were passed a revision, check if we just loaded the same revision from Redis.
104        //
105        // We always want to keep the old revision alive if possible, since the already loaded
106        // version has already initialized caches.
107        //
108        // While this is theoretically possible this should always been handled using the above revision
109        // check using the additional Redis key.
110        if response.revision() == revision {
111            metric!(
112                counter(RelayCounters::ProjectStateRedis) += 1,
113                hit = "project_config_revision"
114            );
115            Ok(SourceProjectState::NotModified)
116        } else {
117            metric!(
118                counter(RelayCounters::ProjectStateRedis) += 1,
119                hit = "project_config"
120            );
121            Ok(SourceProjectState::New(response))
122        }
123    }
124
125    fn get_redis_project_config_key(&self, key: ProjectKey) -> String {
126        let prefix = self.config.projectconfig_cache_prefix();
127        format!("{prefix}:{key}")
128    }
129
130    fn get_redis_rev_key(&self, key: ProjectKey) -> String {
131        let prefix = self.config.projectconfig_cache_prefix();
132        format!("{prefix}:{key}.rev")
133    }
134}
135
136#[cfg(test)]
137mod tests {
138    use super::*;
139
140    #[test]
141    fn test_parse_redis_response() {
142        let raw_response = b"{}";
143        let result = parse_redis_response(raw_response);
144        assert!(result.is_ok());
145    }
146
147    #[test]
148    fn test_parse_redis_response_compressed() {
149        let raw_response = b"(\xb5/\xfd \x02\x11\x00\x00{}"; // As dumped by python zstandard library
150        let result = parse_redis_response(raw_response);
151        assert!(result.is_ok(), "{result:?}");
152    }
153}