relay_server/services/projects/source/
mod.rs1use relay_base_schema::project::ProjectKey;
2use relay_config::{Config, RelayMode};
3#[cfg(feature = "processing")]
4use relay_redis::RedisClients;
5use relay_system::{Addr, ServiceSpawn, ServiceSpawnExt as _};
6use std::convert::Infallible;
7use std::sync::Arc;
8
9pub mod local;
10#[cfg(feature = "processing")]
11pub mod redis;
12pub mod upstream;
13
14use crate::services::projects::project::{ProjectState, Revision};
15use crate::services::upstream::UpstreamRelay;
16
17use self::local::{LocalProjectSource, LocalProjectSourceService};
18#[cfg(feature = "processing")]
19use self::redis::RedisProjectSource;
20use self::upstream::{UpstreamProjectSource, UpstreamProjectSourceService};
21
22#[derive(Clone, Debug)]
24pub struct ProjectSource {
25 config: Arc<Config>,
26 local_source: Addr<LocalProjectSource>,
27 upstream_source: Addr<UpstreamProjectSource>,
28 #[cfg(feature = "processing")]
29 redis_source: Option<RedisProjectSource>,
30}
31
32impl ProjectSource {
33 pub async fn start_in(
35 services: &dyn ServiceSpawn,
36 config: Arc<Config>,
37 upstream_relay: Addr<UpstreamRelay>,
38 #[cfg(feature = "processing")] _redis: Option<RedisClients>,
39 ) -> Self {
40 let local_source = services.start(LocalProjectSourceService::new(config.clone()));
41 let upstream_source = services.start(UpstreamProjectSourceService::new(
42 config.clone(),
43 upstream_relay,
44 ));
45
46 #[cfg(feature = "processing")]
47 let redis_source =
48 _redis.map(|pool| RedisProjectSource::new(config.clone(), pool.project_configs));
49
50 Self {
51 config,
52 local_source,
53 upstream_source,
54 #[cfg(feature = "processing")]
55 redis_source,
56 }
57 }
58
59 pub async fn fetch(
63 self,
64 project_key: ProjectKey,
65 no_cache: bool,
66 current_revision: Revision,
67 ) -> Result<SourceProjectState, ProjectSourceError> {
68 let state_opt = self
69 .local_source
70 .send(FetchOptionalProjectState { project_key })
71 .await?;
72
73 if let Some(state) = state_opt {
74 return Ok(state.into());
75 }
76
77 match self.config.relay_mode() {
78 RelayMode::Proxy => return Ok(ProjectState::new_allowed().into()),
79 RelayMode::Static => return Ok(ProjectState::Disabled.into()),
80 RelayMode::Capture => return Ok(ProjectState::new_allowed().into()),
81 RelayMode::Managed => (), }
83
84 #[cfg(feature = "processing")]
85 if let Some(redis_source) = self.redis_source {
86 let current_revision = current_revision.clone();
87
88 let state_fetch_result = redis_source
89 .get_config_if_changed(project_key, current_revision)
90 .await;
91
92 match state_fetch_result {
93 Ok(SourceProjectState::New(state)) => {
97 let state = state.sanitized();
98 if !state.is_pending() {
99 return Ok(state.into());
100 }
101 }
102 Ok(SourceProjectState::NotModified) => return Ok(SourceProjectState::NotModified),
105 Err(error) => {
106 relay_log::error!(
107 error = &error as &dyn std::error::Error,
108 "failed to fetch project from Redis",
109 );
110 }
111 };
112 };
113
114 let state = self
115 .upstream_source
116 .send(FetchProjectState {
117 project_key,
118 current_revision,
119 no_cache,
120 })
121 .await?;
122
123 Ok(match state {
124 SourceProjectState::New(state) => SourceProjectState::New(state.sanitized()),
125 SourceProjectState::NotModified => SourceProjectState::NotModified,
126 })
127 }
128}
129
130#[derive(Debug, thiserror::Error)]
131pub enum ProjectSourceError {
132 #[error("redis permit error {0}")]
133 RedisPermit(#[from] tokio::sync::AcquireError),
134 #[error("redis join error {0}")]
135 RedisJoin(#[from] tokio::task::JoinError),
136 #[error("upstream error {0}")]
137 Upstream(#[from] relay_system::SendError),
138}
139
140impl From<Infallible> for ProjectSourceError {
141 fn from(value: Infallible) -> Self {
142 match value {}
143 }
144}
145
146#[derive(Clone, Debug)]
147pub struct FetchProjectState {
148 pub project_key: ProjectKey,
150
151 pub current_revision: Revision,
157
158 pub no_cache: bool,
160}
161
162#[derive(Clone, Debug)]
163pub struct FetchOptionalProjectState {
164 project_key: ProjectKey,
165}
166
167impl FetchOptionalProjectState {
168 pub fn project_key(&self) -> ProjectKey {
169 self.project_key
170 }
171}
172
173#[derive(Debug, Clone)]
176pub enum SourceProjectState {
177 New(ProjectState),
179 NotModified,
181}
182
183impl From<ProjectState> for SourceProjectState {
184 fn from(value: ProjectState) -> Self {
185 Self::New(value)
186 }
187}