relay_server/services/projects/source/
mod.rs

1use relay_base_schema::project::ProjectKey;
2use relay_config::{Config, RelayMode};
3#[cfg(feature = "processing")]
4use relay_redis::RedisClients;
5use relay_system::{Addr, ServiceSpawn, ServiceSpawnExt as _};
6use std::convert::Infallible;
7use std::sync::Arc;
8
9pub mod local;
10#[cfg(feature = "processing")]
11pub mod redis;
12pub mod upstream;
13
14use crate::services::projects::project::{ProjectState, Revision};
15use crate::services::upstream::UpstreamRelay;
16
17use self::local::{LocalProjectSource, LocalProjectSourceService};
18#[cfg(feature = "processing")]
19use self::redis::RedisProjectSource;
20use self::upstream::{UpstreamProjectSource, UpstreamProjectSourceService};
21
22/// Helper type that contains all configured sources for project cache fetching.
23#[derive(Clone, Debug)]
24pub struct ProjectSource {
25    config: Arc<Config>,
26    local_source: Addr<LocalProjectSource>,
27    upstream_source: Addr<UpstreamProjectSource>,
28    #[cfg(feature = "processing")]
29    redis_source: Option<RedisProjectSource>,
30}
31
32impl ProjectSource {
33    /// Starts all project source services in the given [`ServiceSpawn`].
34    pub async fn start_in(
35        services: &dyn ServiceSpawn,
36        config: Arc<Config>,
37        upstream_relay: Addr<UpstreamRelay>,
38        #[cfg(feature = "processing")] _redis: Option<RedisClients>,
39    ) -> Self {
40        let local_source = services.start(LocalProjectSourceService::new(config.clone()));
41        let upstream_source = services.start(UpstreamProjectSourceService::new(
42            config.clone(),
43            upstream_relay,
44        ));
45
46        #[cfg(feature = "processing")]
47        let redis_source =
48            _redis.map(|pool| RedisProjectSource::new(config.clone(), pool.project_configs));
49
50        Self {
51            config,
52            local_source,
53            upstream_source,
54            #[cfg(feature = "processing")]
55            redis_source,
56        }
57    }
58
59    /// Fetches a project with `project_key` from the configured sources.
60    ///
61    /// Returns a fully sanitized project.
62    pub async fn fetch(
63        self,
64        project_key: ProjectKey,
65        no_cache: bool,
66        current_revision: Revision,
67    ) -> Result<SourceProjectState, ProjectSourceError> {
68        let state_opt = self
69            .local_source
70            .send(FetchOptionalProjectState { project_key })
71            .await?;
72
73        if let Some(state) = state_opt {
74            return Ok(state.into());
75        }
76
77        match self.config.relay_mode() {
78            RelayMode::Proxy => return Ok(ProjectState::new_allowed().into()),
79            RelayMode::Static => return Ok(ProjectState::Disabled.into()),
80            RelayMode::Capture => return Ok(ProjectState::new_allowed().into()),
81            RelayMode::Managed => (), // Proceed with loading the config from redis or upstream
82        }
83
84        #[cfg(feature = "processing")]
85        if let Some(redis_source) = self.redis_source {
86            let current_revision = current_revision.clone();
87
88            let state_fetch_result = redis_source
89                .get_config_if_changed(project_key, current_revision)
90                .await;
91
92            match state_fetch_result {
93                // New state fetched from Redis, possibly pending.
94                //
95                // If it is pending, we must fallback to fetching from the upstream.
96                Ok(SourceProjectState::New(state)) => {
97                    let state = state.sanitized();
98                    if !state.is_pending() {
99                        return Ok(state.into());
100                    }
101                }
102                // Redis reported that we're holding an up-to-date version of the state already,
103                // refresh the state and return the old cached state again.
104                Ok(SourceProjectState::NotModified) => return Ok(SourceProjectState::NotModified),
105                Err(error) => {
106                    relay_log::error!(
107                        error = &error as &dyn std::error::Error,
108                        "failed to fetch project from Redis",
109                    );
110                }
111            };
112        };
113
114        let state = self
115            .upstream_source
116            .send(FetchProjectState {
117                project_key,
118                current_revision,
119                no_cache,
120            })
121            .await?;
122
123        Ok(match state {
124            SourceProjectState::New(state) => SourceProjectState::New(state.sanitized()),
125            SourceProjectState::NotModified => SourceProjectState::NotModified,
126        })
127    }
128}
129
130#[derive(Debug, thiserror::Error)]
131pub enum ProjectSourceError {
132    #[error("redis permit error {0}")]
133    RedisPermit(#[from] tokio::sync::AcquireError),
134    #[error("redis join error {0}")]
135    RedisJoin(#[from] tokio::task::JoinError),
136    #[error("upstream error {0}")]
137    Upstream(#[from] relay_system::SendError),
138}
139
140impl From<Infallible> for ProjectSourceError {
141    fn from(value: Infallible) -> Self {
142        match value {}
143    }
144}
145
146#[derive(Clone, Debug)]
147pub struct FetchProjectState {
148    /// The public key to fetch the project by.
149    pub project_key: ProjectKey,
150
151    /// Currently cached revision if available.
152    ///
153    /// The upstream is allowed to omit full project configs
154    /// for requests for which the requester already has the most
155    /// recent revision.
156    pub current_revision: Revision,
157
158    /// If true, all caches should be skipped and a fresh state should be computed.
159    pub no_cache: bool,
160}
161
162#[derive(Clone, Debug)]
163pub struct FetchOptionalProjectState {
164    project_key: ProjectKey,
165}
166
167impl FetchOptionalProjectState {
168    pub fn project_key(&self) -> ProjectKey {
169        self.project_key
170    }
171}
172
173/// Response indicating whether a project state needs to be updated
174/// or the upstream does not have a newer version.
175#[derive(Debug, Clone)]
176pub enum SourceProjectState {
177    /// The upstream sent a [`ProjectState`].
178    New(ProjectState),
179    /// The upstream indicated that there is no newer version of the state available.
180    NotModified,
181}
182
183impl From<ProjectState> for SourceProjectState {
184    fn from(value: ProjectState) -> Self {
185        Self::New(value)
186    }
187}