relay_server/services/projects/source/
mod.rs

1use relay_base_schema::project::ProjectKey;
2use relay_config::{Config, RelayMode};
3#[cfg(feature = "processing")]
4use relay_redis::RedisClients;
5use relay_system::{Addr, ServiceSpawn, ServiceSpawnExt as _};
6use std::convert::Infallible;
7use std::sync::Arc;
8
9#[cfg(feature = "processing")]
10pub mod redis;
11pub mod upstream;
12
13use crate::services::projects::project::{ProjectState, Revision};
14use crate::services::upstream::UpstreamRelay;
15
16#[cfg(feature = "processing")]
17use self::redis::RedisProjectSource;
18use self::upstream::{UpstreamProjectSource, UpstreamProjectSourceService};
19
20/// Helper type that contains all configured sources for project cache fetching.
21#[derive(Clone, Debug)]
22pub struct ProjectSource {
23    config: Arc<Config>,
24    upstream_source: Addr<UpstreamProjectSource>,
25    #[cfg(feature = "processing")]
26    redis_source: Option<RedisProjectSource>,
27}
28
29impl ProjectSource {
30    /// Starts all project source services in the given [`ServiceSpawn`].
31    pub async fn start_in(
32        services: &dyn ServiceSpawn,
33        config: Arc<Config>,
34        upstream_relay: Addr<UpstreamRelay>,
35        #[cfg(feature = "processing")] _redis: Option<RedisClients>,
36    ) -> Self {
37        let upstream_source = services.start(UpstreamProjectSourceService::new(
38            config.clone(),
39            upstream_relay,
40        ));
41
42        #[cfg(feature = "processing")]
43        let redis_source =
44            _redis.map(|pool| RedisProjectSource::new(config.clone(), pool.project_configs));
45
46        Self {
47            config,
48            upstream_source,
49            #[cfg(feature = "processing")]
50            redis_source,
51        }
52    }
53
54    /// Fetches a project with `project_key` from the configured sources.
55    ///
56    /// Returns a fully sanitized project.
57    pub async fn fetch(
58        self,
59        project_key: ProjectKey,
60        no_cache: bool,
61        current_revision: Revision,
62    ) -> Result<SourceProjectState, ProjectSourceError> {
63        match self.config.relay_mode() {
64            RelayMode::Proxy => return Ok(ProjectState::new_allowed().into()),
65            RelayMode::Managed => (), // Proceed with loading the config from redis or upstream
66        }
67
68        #[cfg(feature = "processing")]
69        if let Some(redis_source) = self.redis_source {
70            let current_revision = current_revision.clone();
71
72            let state_fetch_result = redis_source
73                .get_config_if_changed(project_key, current_revision)
74                .await;
75
76            match state_fetch_result {
77                // New state fetched from Redis, possibly pending.
78                //
79                // If it is pending, we must fallback to fetching from the upstream.
80                Ok(SourceProjectState::New(state)) => {
81                    let state = state.sanitized();
82                    if !state.is_pending() {
83                        return Ok(state.into());
84                    }
85                }
86                // Redis reported that we're holding an up-to-date version of the state already,
87                // refresh the state and return the old cached state again.
88                Ok(SourceProjectState::NotModified) => return Ok(SourceProjectState::NotModified),
89                Err(error) => {
90                    relay_log::error!(
91                        error = &error as &dyn std::error::Error,
92                        "failed to fetch project from Redis",
93                    );
94                }
95            };
96        };
97
98        let state = self
99            .upstream_source
100            .send(FetchProjectState {
101                project_key,
102                current_revision,
103                no_cache,
104            })
105            .await?;
106
107        Ok(match state {
108            SourceProjectState::New(state) => SourceProjectState::New(state.sanitized()),
109            SourceProjectState::NotModified => SourceProjectState::NotModified,
110        })
111    }
112}
113
114#[derive(Debug, thiserror::Error)]
115pub enum ProjectSourceError {
116    #[error("redis permit error {0}")]
117    RedisPermit(#[from] tokio::sync::AcquireError),
118    #[error("redis join error {0}")]
119    RedisJoin(#[from] tokio::task::JoinError),
120    #[error("upstream error {0}")]
121    Upstream(#[from] relay_system::SendError),
122}
123
124impl From<Infallible> for ProjectSourceError {
125    fn from(value: Infallible) -> Self {
126        match value {}
127    }
128}
129
130#[derive(Clone, Debug)]
131pub struct FetchProjectState {
132    /// The public key to fetch the project by.
133    pub project_key: ProjectKey,
134
135    /// Currently cached revision if available.
136    ///
137    /// The upstream is allowed to omit full project configs
138    /// for requests for which the requester already has the most
139    /// recent revision.
140    pub current_revision: Revision,
141
142    /// If true, all caches should be skipped and a fresh state should be computed.
143    pub no_cache: bool,
144}
145
146/// Response indicating whether a project state needs to be updated
147/// or the upstream does not have a newer version.
148#[derive(Debug, Clone)]
149pub enum SourceProjectState {
150    /// The upstream sent a [`ProjectState`].
151    New(ProjectState),
152    /// The upstream indicated that there is no newer version of the state available.
153    NotModified,
154}
155
156impl From<ProjectState> for SourceProjectState {
157    fn from(value: ProjectState) -> Self {
158        Self::New(value)
159    }
160}