relay_server/services/projects/source/
redis.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
use relay_base_schema::project::ProjectKey;
use relay_config::Config;
use relay_redis::{AsyncRedisConnection, RedisError};
use relay_statsd::metric;
use std::fmt::Debug;
use std::sync::Arc;

use crate::services::projects::project::{ParsedProjectState, ProjectState, Revision};
use crate::services::projects::source::SourceProjectState;
use crate::statsd::{RelayCounters, RelayHistograms, RelayTimers};
use relay_redis::redis::cmd;

#[derive(Clone, Debug)]
pub struct RedisProjectSource {
    config: Arc<Config>,
    redis: AsyncRedisConnection,
}

#[derive(Debug, thiserror::Error)]
pub enum RedisProjectError {
    #[error("failed to parse projectconfig from redis")]
    Parsing(#[from] serde_json::Error),

    #[error("failed to talk to redis")]
    Redis(#[from] RedisError),
}

fn parse_redis_response(raw_response: &[u8]) -> Result<ParsedProjectState, RedisProjectError> {
    let decompression_result = metric!(timer(RelayTimers::ProjectStateDecompression), {
        zstd::decode_all(raw_response)
    });

    let decoded_response = match &decompression_result {
        Ok(decoded) => {
            metric!(
                histogram(RelayHistograms::ProjectStateSizeBytesCompressed) =
                    raw_response.len() as f64
            );
            metric!(
                histogram(RelayHistograms::ProjectStateSizeBytesDecompressed) =
                    decoded.len() as f64
            );
            decoded.as_slice()
        }
        // If decoding fails, assume uncompressed payload and try again
        Err(_) => raw_response,
    };

    Ok(serde_json::from_slice(decoded_response)?)
}

impl RedisProjectSource {
    pub fn new(config: Arc<Config>, redis: AsyncRedisConnection) -> Self {
        RedisProjectSource { config, redis }
    }

    /// Fetches a project config from Redis.
    ///
    /// The returned project state is [`ProjectState::Pending`] if the requested project config is not
    /// stored in Redis.
    pub async fn get_config_if_changed(
        &self,
        key: ProjectKey,
        revision: Revision,
    ) -> Result<SourceProjectState, RedisProjectError> {
        // Only check for the revision if we were passed a revision.
        if let Some(revision) = revision.as_str() {
            let mut cmd = cmd("GET");
            cmd.arg(self.get_redis_rev_key(key));
            let current_revision: Option<String> = self.redis.query_async(cmd).await?;

            relay_log::trace!(
                "Redis revision {current_revision:?}, requested revision {revision:?}"
            );
            if current_revision.as_deref() == Some(revision) {
                metric!(
                    counter(RelayCounters::ProjectStateRedis) += 1,
                    hit = "revision",
                );
                return Ok(SourceProjectState::NotModified);
            }
        }

        let mut raw_cmd = cmd("GET");
        raw_cmd.arg(self.get_redis_project_config_key(key));
        let raw_response_opt: Option<Vec<u8>> = self.redis.query_async(raw_cmd).await?;

        let Some(response) = raw_response_opt else {
            metric!(
                counter(RelayCounters::ProjectStateRedis) += 1,
                hit = "false"
            );
            return Ok(SourceProjectState::New(ProjectState::Pending));
        };

        let response = ProjectState::from(parse_redis_response(response.as_slice())?);

        // If we were passed a revision, check if we just loaded the same revision from Redis.
        //
        // We always want to keep the old revision alive if possible, since the already loaded
        // version has already initialized caches.
        //
        // While this is theoretically possible this should always been handled using the above revision
        // check using the additional Redis key.
        if response.revision() == revision {
            metric!(
                counter(RelayCounters::ProjectStateRedis) += 1,
                hit = "project_config_revision"
            );
            Ok(SourceProjectState::NotModified)
        } else {
            metric!(
                counter(RelayCounters::ProjectStateRedis) += 1,
                hit = "project_config"
            );
            Ok(SourceProjectState::New(response))
        }
    }

    fn get_redis_project_config_key(&self, key: ProjectKey) -> String {
        let prefix = self.config.projectconfig_cache_prefix();
        format!("{prefix}:{key}")
    }

    fn get_redis_rev_key(&self, key: ProjectKey) -> String {
        let prefix = self.config.projectconfig_cache_prefix();
        format!("{prefix}:{key}.rev")
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_parse_redis_response() {
        let raw_response = b"{}";
        let result = parse_redis_response(raw_response);
        assert!(result.is_ok());
    }

    #[test]
    fn test_parse_redis_response_compressed() {
        let raw_response = b"(\xb5/\xfd \x02\x11\x00\x00{}"; // As dumped by python zstandard library
        let result = parse_redis_response(raw_response);
        assert!(result.is_ok(), "{result:?}");
    }
}