relay_server/services/projects/source/
redis.rs1use relay_base_schema::project::ProjectKey;
2use relay_config::Config;
3use relay_redis::{AsyncRedisClient, RedisError};
4use relay_statsd::metric;
5use std::fmt::Debug;
6use std::sync::Arc;
7
8use crate::services::projects::project::{ParsedProjectState, ProjectState, Revision};
9use crate::services::projects::source::SourceProjectState;
10use crate::statsd::{RelayCounters, RelayHistograms, RelayTimers};
11use relay_redis::redis::cmd;
12
13#[derive(Clone, Debug)]
14pub struct RedisProjectSource {
15 config: Arc<Config>,
16 redis: AsyncRedisClient,
17}
18
19#[derive(Debug, thiserror::Error)]
20pub enum RedisProjectError {
21 #[error("failed to parse projectconfig from redis")]
22 Parsing(#[from] serde_json::Error),
23
24 #[error("failed to talk to redis")]
25 Redis(#[from] RedisError),
26}
27
28fn parse_redis_response(raw_response: &[u8]) -> Result<ParsedProjectState, RedisProjectError> {
29 let decompression_result = metric!(timer(RelayTimers::ProjectStateDecompression), {
30 zstd::decode_all(raw_response)
31 });
32
33 let decoded_response = match &decompression_result {
34 Ok(decoded) => {
35 metric!(
36 histogram(RelayHistograms::ProjectStateSizeBytesCompressed) =
37 raw_response.len() as f64
38 );
39 metric!(
40 histogram(RelayHistograms::ProjectStateSizeBytesDecompressed) =
41 decoded.len() as f64
42 );
43 decoded.as_slice()
44 }
45 Err(_) => raw_response,
47 };
48
49 Ok(serde_json::from_slice(decoded_response)?)
50}
51
52impl RedisProjectSource {
53 pub fn new(config: Arc<Config>, redis: AsyncRedisClient) -> Self {
54 RedisProjectSource { config, redis }
55 }
56
57 pub async fn get_config_if_changed(
62 &self,
63 key: ProjectKey,
64 revision: Revision,
65 ) -> Result<SourceProjectState, RedisProjectError> {
66 let mut connection = self.redis.get_connection();
67 if let Some(revision) = revision.as_str() {
69 let current_revision: Option<String> = cmd("GET")
70 .arg(self.get_redis_rev_key(key))
71 .query_async(&mut connection)
72 .await
73 .map_err(RedisError::Redis)?;
74
75 relay_log::trace!(
76 "Redis revision {current_revision:?}, requested revision {revision:?}"
77 );
78 if current_revision.as_deref() == Some(revision) {
79 metric!(
80 counter(RelayCounters::ProjectStateRedis) += 1,
81 hit = "revision",
82 );
83 return Ok(SourceProjectState::NotModified);
84 }
85 }
86
87 let raw_response_opt: Option<Vec<u8>> = cmd("GET")
88 .arg(self.get_redis_project_config_key(key))
89 .query_async(&mut connection)
90 .await
91 .map_err(RedisError::Redis)?;
92
93 let Some(response) = raw_response_opt else {
94 metric!(
95 counter(RelayCounters::ProjectStateRedis) += 1,
96 hit = "false"
97 );
98 return Ok(SourceProjectState::New(ProjectState::Pending));
99 };
100
101 let response = ProjectState::from(parse_redis_response(response.as_slice())?);
102
103 if response.revision() == revision {
111 metric!(
112 counter(RelayCounters::ProjectStateRedis) += 1,
113 hit = "project_config_revision"
114 );
115 Ok(SourceProjectState::NotModified)
116 } else {
117 metric!(
118 counter(RelayCounters::ProjectStateRedis) += 1,
119 hit = "project_config"
120 );
121 Ok(SourceProjectState::New(response))
122 }
123 }
124
125 fn get_redis_project_config_key(&self, key: ProjectKey) -> String {
126 let prefix = self.config.projectconfig_cache_prefix();
127 format!("{prefix}:{key}")
128 }
129
130 fn get_redis_rev_key(&self, key: ProjectKey) -> String {
131 let prefix = self.config.projectconfig_cache_prefix();
132 format!("{prefix}:{key}.rev")
133 }
134}
135
136#[cfg(test)]
137mod tests {
138 use super::*;
139
140 #[test]
141 fn test_parse_redis_response() {
142 let raw_response = b"{}";
143 let result = parse_redis_response(raw_response);
144 assert!(result.is_ok());
145 }
146
147 #[test]
148 fn test_parse_redis_response_compressed() {
149 let raw_response = b"(\xb5/\xfd \x02\x11\x00\x00{}"; let result = parse_redis_response(raw_response);
151 assert!(result.is_ok(), "{result:?}");
152 }
153}