relay_server/services/projects/source/
mod.rs1use relay_base_schema::project::ProjectKey;
2use relay_config::{Config, RelayMode};
3#[cfg(feature = "processing")]
4use relay_redis::RedisClients;
5use relay_system::{Addr, ServiceSpawn, ServiceSpawnExt as _};
6use std::convert::Infallible;
7use std::sync::Arc;
8
9#[cfg(feature = "processing")]
10pub mod redis;
11pub mod upstream;
12
13use crate::services::projects::project::{ProjectState, Revision};
14use crate::services::upstream::UpstreamRelay;
15
16#[cfg(feature = "processing")]
17use self::redis::RedisProjectSource;
18use self::upstream::{UpstreamProjectSource, UpstreamProjectSourceService};
19
20#[derive(Clone, Debug)]
22pub struct ProjectSource {
23 config: Arc<Config>,
24 upstream_source: Addr<UpstreamProjectSource>,
25 #[cfg(feature = "processing")]
26 redis_source: Option<RedisProjectSource>,
27}
28
29impl ProjectSource {
30 pub async fn start_in(
32 services: &dyn ServiceSpawn,
33 config: Arc<Config>,
34 upstream_relay: Addr<UpstreamRelay>,
35 #[cfg(feature = "processing")] _redis: Option<RedisClients>,
36 ) -> Self {
37 let upstream_source = services.start(UpstreamProjectSourceService::new(
38 config.clone(),
39 upstream_relay,
40 ));
41
42 #[cfg(feature = "processing")]
43 let redis_source =
44 _redis.map(|pool| RedisProjectSource::new(config.clone(), pool.project_configs));
45
46 Self {
47 config,
48 upstream_source,
49 #[cfg(feature = "processing")]
50 redis_source,
51 }
52 }
53
54 pub async fn fetch(
58 self,
59 project_key: ProjectKey,
60 no_cache: bool,
61 current_revision: Revision,
62 ) -> Result<SourceProjectState, ProjectSourceError> {
63 match self.config.relay_mode() {
64 RelayMode::Proxy => return Ok(ProjectState::new_allowed().into()),
65 RelayMode::Managed => (), }
67
68 #[cfg(feature = "processing")]
69 if let Some(redis_source) = self.redis_source {
70 let current_revision = current_revision.clone();
71
72 let state_fetch_result = redis_source
73 .get_config_if_changed(project_key, current_revision)
74 .await;
75
76 match state_fetch_result {
77 Ok(SourceProjectState::New(state)) => {
81 let state = state.sanitized();
82 if !state.is_pending() {
83 return Ok(state.into());
84 }
85 }
86 Ok(SourceProjectState::NotModified) => return Ok(SourceProjectState::NotModified),
89 Err(error) => {
90 relay_log::error!(
91 error = &error as &dyn std::error::Error,
92 "failed to fetch project from Redis",
93 );
94 }
95 };
96 };
97
98 let state = self
99 .upstream_source
100 .send(FetchProjectState {
101 project_key,
102 current_revision,
103 no_cache,
104 })
105 .await?;
106
107 Ok(match state {
108 SourceProjectState::New(state) => SourceProjectState::New(state.sanitized()),
109 SourceProjectState::NotModified => SourceProjectState::NotModified,
110 })
111 }
112}
113
114#[derive(Debug, thiserror::Error)]
115pub enum ProjectSourceError {
116 #[error("redis permit error {0}")]
117 RedisPermit(#[from] tokio::sync::AcquireError),
118 #[error("redis join error {0}")]
119 RedisJoin(#[from] tokio::task::JoinError),
120 #[error("upstream error {0}")]
121 Upstream(#[from] relay_system::SendError),
122}
123
124impl From<Infallible> for ProjectSourceError {
125 fn from(value: Infallible) -> Self {
126 match value {}
127 }
128}
129
130#[derive(Clone, Debug)]
131pub struct FetchProjectState {
132 pub project_key: ProjectKey,
134
135 pub current_revision: Revision,
141
142 pub no_cache: bool,
144}
145
146#[derive(Debug, Clone)]
149pub enum SourceProjectState {
150 New(ProjectState),
152 NotModified,
154}
155
156impl From<ProjectState> for SourceProjectState {
157 fn from(value: ProjectState) -> Self {
158 Self::New(value)
159 }
160}