relay_server/services/projects/source/
redis.rsuse relay_base_schema::project::ProjectKey;
use relay_config::Config;
use relay_redis::{AsyncRedisConnection, RedisError};
use relay_statsd::metric;
use std::fmt::Debug;
use std::sync::Arc;
use crate::services::projects::project::{ParsedProjectState, ProjectState, Revision};
use crate::services::projects::source::SourceProjectState;
use crate::statsd::{RelayCounters, RelayHistograms, RelayTimers};
use relay_redis::redis::cmd;
#[derive(Clone, Debug)]
pub struct RedisProjectSource {
config: Arc<Config>,
redis: AsyncRedisConnection,
}
#[derive(Debug, thiserror::Error)]
pub enum RedisProjectError {
#[error("failed to parse projectconfig from redis")]
Parsing(#[from] serde_json::Error),
#[error("failed to talk to redis")]
Redis(#[from] RedisError),
}
fn parse_redis_response(raw_response: &[u8]) -> Result<ParsedProjectState, RedisProjectError> {
let decompression_result = metric!(timer(RelayTimers::ProjectStateDecompression), {
zstd::decode_all(raw_response)
});
let decoded_response = match &decompression_result {
Ok(decoded) => {
metric!(
histogram(RelayHistograms::ProjectStateSizeBytesCompressed) =
raw_response.len() as f64
);
metric!(
histogram(RelayHistograms::ProjectStateSizeBytesDecompressed) =
decoded.len() as f64
);
decoded.as_slice()
}
Err(_) => raw_response,
};
Ok(serde_json::from_slice(decoded_response)?)
}
impl RedisProjectSource {
pub fn new(config: Arc<Config>, redis: AsyncRedisConnection) -> Self {
RedisProjectSource { config, redis }
}
pub async fn get_config_if_changed(
&self,
key: ProjectKey,
revision: Revision,
) -> Result<SourceProjectState, RedisProjectError> {
if let Some(revision) = revision.as_str() {
let mut cmd = cmd("GET");
cmd.arg(self.get_redis_rev_key(key));
let current_revision: Option<String> = self.redis.query_async(cmd).await?;
relay_log::trace!(
"Redis revision {current_revision:?}, requested revision {revision:?}"
);
if current_revision.as_deref() == Some(revision) {
metric!(
counter(RelayCounters::ProjectStateRedis) += 1,
hit = "revision",
);
return Ok(SourceProjectState::NotModified);
}
}
let mut raw_cmd = cmd("GET");
raw_cmd.arg(self.get_redis_project_config_key(key));
let raw_response_opt: Option<Vec<u8>> = self.redis.query_async(raw_cmd).await?;
let Some(response) = raw_response_opt else {
metric!(
counter(RelayCounters::ProjectStateRedis) += 1,
hit = "false"
);
return Ok(SourceProjectState::New(ProjectState::Pending));
};
let response = ProjectState::from(parse_redis_response(response.as_slice())?);
if response.revision() == revision {
metric!(
counter(RelayCounters::ProjectStateRedis) += 1,
hit = "project_config_revision"
);
Ok(SourceProjectState::NotModified)
} else {
metric!(
counter(RelayCounters::ProjectStateRedis) += 1,
hit = "project_config"
);
Ok(SourceProjectState::New(response))
}
}
fn get_redis_project_config_key(&self, key: ProjectKey) -> String {
let prefix = self.config.projectconfig_cache_prefix();
format!("{prefix}:{key}")
}
fn get_redis_rev_key(&self, key: ProjectKey) -> String {
let prefix = self.config.projectconfig_cache_prefix();
format!("{prefix}:{key}.rev")
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_redis_response() {
let raw_response = b"{}";
let result = parse_redis_response(raw_response);
assert!(result.is_ok());
}
#[test]
fn test_parse_redis_response_compressed() {
let raw_response = b"(\xb5/\xfd \x02\x11\x00\x00{}"; let result = parse_redis_response(raw_response);
assert!(result.is_ok(), "{result:?}");
}
}