relay_server/services/projects/source/
mod.rsuse relay_base_schema::project::ProjectKey;
use relay_config::{Config, RelayMode};
#[cfg(feature = "processing")]
use relay_redis::RedisPools;
use relay_system::{Addr, ServiceRunner};
use std::convert::Infallible;
use std::sync::Arc;
pub mod local;
#[cfg(feature = "processing")]
pub mod redis;
pub mod upstream;
use crate::services::projects::project::{ProjectState, Revision};
use crate::services::upstream::UpstreamRelay;
use self::local::{LocalProjectSource, LocalProjectSourceService};
#[cfg(feature = "processing")]
use self::redis::RedisProjectSource;
use self::upstream::{UpstreamProjectSource, UpstreamProjectSourceService};
#[derive(Clone, Debug)]
pub struct ProjectSource {
config: Arc<Config>,
local_source: Addr<LocalProjectSource>,
upstream_source: Addr<UpstreamProjectSource>,
#[cfg(feature = "processing")]
redis_source: Option<RedisProjectSource>,
}
impl ProjectSource {
pub async fn start_in(
runner: &mut ServiceRunner,
config: Arc<Config>,
upstream_relay: Addr<UpstreamRelay>,
#[cfg(feature = "processing")] _redis: Option<RedisPools>,
) -> Self {
let local_source = runner.start(LocalProjectSourceService::new(config.clone()));
let upstream_source = runner.start(UpstreamProjectSourceService::new(
config.clone(),
upstream_relay,
));
#[cfg(feature = "processing")]
let redis_source =
_redis.map(|pool| RedisProjectSource::new(config.clone(), pool.project_configs));
Self {
config,
local_source,
upstream_source,
#[cfg(feature = "processing")]
redis_source,
}
}
pub async fn fetch(
self,
project_key: ProjectKey,
no_cache: bool,
current_revision: Revision,
) -> Result<SourceProjectState, ProjectSourceError> {
let state_opt = self
.local_source
.send(FetchOptionalProjectState { project_key })
.await?;
if let Some(state) = state_opt {
return Ok(state.into());
}
match self.config.relay_mode() {
RelayMode::Proxy => return Ok(ProjectState::new_allowed().into()),
RelayMode::Static => return Ok(ProjectState::Disabled.into()),
RelayMode::Capture => return Ok(ProjectState::new_allowed().into()),
RelayMode::Managed => (), }
#[cfg(feature = "processing")]
if let Some(redis_source) = self.redis_source {
let current_revision = current_revision.clone();
let state_fetch_result = redis_source
.get_config_if_changed(project_key, current_revision)
.await;
match state_fetch_result {
Ok(SourceProjectState::New(state)) => {
let state = state.sanitized();
if !state.is_pending() {
return Ok(state.into());
}
}
Ok(SourceProjectState::NotModified) => return Ok(SourceProjectState::NotModified),
Err(error) => {
relay_log::error!(
error = &error as &dyn std::error::Error,
"failed to fetch project from Redis",
);
}
};
};
let state = self
.upstream_source
.send(FetchProjectState {
project_key,
current_revision,
no_cache,
})
.await?;
Ok(match state {
SourceProjectState::New(state) => SourceProjectState::New(state.sanitized()),
SourceProjectState::NotModified => SourceProjectState::NotModified,
})
}
}
#[derive(Debug, thiserror::Error)]
pub enum ProjectSourceError {
#[error("redis permit error {0}")]
RedisPermit(#[from] tokio::sync::AcquireError),
#[error("redis join error {0}")]
RedisJoin(#[from] tokio::task::JoinError),
#[error("upstream error {0}")]
Upstream(#[from] relay_system::SendError),
}
impl From<Infallible> for ProjectSourceError {
fn from(value: Infallible) -> Self {
match value {}
}
}
#[derive(Clone, Debug)]
pub struct FetchProjectState {
pub project_key: ProjectKey,
pub current_revision: Revision,
pub no_cache: bool,
}
#[derive(Clone, Debug)]
pub struct FetchOptionalProjectState {
project_key: ProjectKey,
}
impl FetchOptionalProjectState {
pub fn project_key(&self) -> ProjectKey {
self.project_key
}
}
#[derive(Debug, Clone)]
pub enum SourceProjectState {
New(ProjectState),
NotModified,
}
impl From<ProjectState> for SourceProjectState {
fn from(value: ProjectState) -> Self {
Self::New(value)
}
}