use std::borrow::Cow;
use std::fmt;
use std::sync::Arc;
use std::time::Duration;
use relay_config::Config;
use relay_config::RelayMode;
use relay_dynamic_config::GlobalConfig;
use relay_statsd::metric;
use relay_system::{Addr, AsyncResponse, Controller, FromMessage, Interface, Service};
use reqwest::Method;
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, watch};
use tokio::time::Instant;
use crate::services::upstream::{
RequestPriority, SendQuery, UpstreamQuery, UpstreamRelay, UpstreamRequestError,
};
use crate::statsd::{RelayCounters, RelayTimers};
use crate::utils::SleepHandle;
type UpstreamQueryResult =
Result<Result<GetGlobalConfigResponse, UpstreamRequestError>, relay_system::SendError>;
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
struct GetGlobalConfigResponse {
global: Option<GlobalConfig>,
global_status: Option<StatusResponse>,
}
#[derive(Clone, Copy, Debug, Deserialize, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum StatusResponse {
Ready,
Pending,
}
impl StatusResponse {
pub fn is_ready(self) -> bool {
matches!(self, Self::Ready)
}
}
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
struct GetGlobalConfig {
global: bool,
public_keys: Vec<()>,
}
impl GetGlobalConfig {
fn new() -> GetGlobalConfig {
GetGlobalConfig {
global: true,
public_keys: vec![],
}
}
}
impl UpstreamQuery for GetGlobalConfig {
type Response = GetGlobalConfigResponse;
fn method(&self) -> reqwest::Method {
Method::POST
}
fn path(&self) -> std::borrow::Cow<'static, str> {
Cow::Borrowed("/api/0/relays/projectconfigs/?version=3")
}
fn retry() -> bool {
false
}
fn priority() -> super::upstream::RequestPriority {
RequestPriority::High
}
fn route(&self) -> &'static str {
"global_config"
}
}
pub struct Get;
pub enum GlobalConfigManager {
Get(relay_system::Sender<Status>),
}
impl Interface for GlobalConfigManager {}
impl FromMessage<Get> for GlobalConfigManager {
type Response = AsyncResponse<Status>;
fn from_message(_: Get, sender: relay_system::Sender<Status>) -> Self {
Self::Get(sender)
}
}
#[derive(Debug, Clone, Default)]
pub enum Status {
Ready(Arc<GlobalConfig>),
#[default]
Pending,
}
impl Status {
pub fn is_ready(&self) -> bool {
matches!(self, Self::Ready(_))
}
}
#[derive(Clone)]
pub struct GlobalConfigHandle {
watch: watch::Receiver<Status>,
}
impl GlobalConfigHandle {
#[cfg(test)]
pub fn fixed(config: GlobalConfig) -> Self {
let (_, watch) = watch::channel(Status::Ready(Arc::new(config)));
Self { watch }
}
pub fn current(&self) -> Arc<GlobalConfig> {
match &*self.watch.borrow() {
Status::Ready(config) => Arc::clone(config),
Status::Pending => Default::default(),
}
}
}
impl fmt::Debug for GlobalConfigHandle {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("GlobalConfigHandle")
.field(&*self.watch.borrow())
.finish()
}
}
#[derive(Debug)]
pub struct GlobalConfigService {
config: Arc<Config>,
global_config_watch: watch::Sender<Status>,
internal_tx: mpsc::Sender<UpstreamQueryResult>,
internal_rx: mpsc::Receiver<UpstreamQueryResult>,
upstream: Addr<UpstreamRelay>,
fetch_handle: SleepHandle,
last_fetched: Instant,
upstream_failure_interval: Duration,
shutdown: bool,
}
impl GlobalConfigService {
pub fn new(
config: Arc<Config>,
upstream: Addr<UpstreamRelay>,
) -> (Self, watch::Receiver<Status>) {
let (internal_tx, internal_rx) = mpsc::channel(1);
let (global_config_watch, rx) = watch::channel(Status::Pending);
(
Self {
config,
global_config_watch,
internal_tx,
internal_rx,
upstream,
fetch_handle: SleepHandle::idle(),
last_fetched: Instant::now(),
upstream_failure_interval: Duration::from_secs(35),
shutdown: false,
},
rx,
)
}
pub fn handle(&self) -> GlobalConfigHandle {
GlobalConfigHandle {
watch: self.global_config_watch.subscribe(),
}
}
fn handle_message(&mut self, message: GlobalConfigManager) {
match message {
GlobalConfigManager::Get(sender) => {
sender.send(self.global_config_watch.borrow().clone());
}
}
}
fn schedule_fetch(&mut self) {
if !self.shutdown && self.fetch_handle.is_idle() {
self.fetch_handle
.set(self.config.global_config_fetch_interval());
}
}
fn request_global_config(&mut self) {
self.fetch_handle.reset();
let upstream_relay = self.upstream.clone();
let internal_tx = self.internal_tx.clone();
relay_system::spawn!(async move {
metric!(timer(RelayTimers::GlobalConfigRequestDuration), {
let query = GetGlobalConfig::new();
let res = upstream_relay.send(SendQuery(query)).await;
internal_tx.send(res).await.ok();
});
});
}
fn handle_result(&mut self, result: UpstreamQueryResult) {
match result {
Ok(Ok(response)) => {
let mut success = false;
let is_ready = response.global_status.map_or(true, |stat| stat.is_ready());
match response.global {
Some(mut global_config) if is_ready => {
if !self.global_config_watch.borrow().is_ready() {
relay_log::info!("received global config from upstream");
}
global_config.normalize();
self.global_config_watch
.send_replace(Status::Ready(Arc::new(global_config)));
success = true;
self.last_fetched = Instant::now();
}
Some(_) => relay_log::info!("global config from upstream is not yet ready"),
None => relay_log::error!("global config missing in upstream response"),
}
metric!(
counter(RelayCounters::GlobalConfigFetched) += 1,
success = if success { "true" } else { "false" },
);
}
Ok(Err(e)) => {
if self.last_fetched.elapsed() >= self.upstream_failure_interval {
relay_log::error!(
error = &e as &dyn std::error::Error,
"failed to fetch global config from upstream"
);
}
}
Err(e) => relay_log::error!(
error = &e as &dyn std::error::Error,
"failed to send request to upstream"
),
}
self.schedule_fetch();
}
fn handle_shutdown(&mut self) {
self.shutdown = true;
self.fetch_handle.reset();
}
}
impl Service for GlobalConfigService {
type Interface = GlobalConfigManager;
async fn run(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
let mut shutdown_handle = Controller::shutdown_handle();
relay_log::info!("global config service starting");
if self.config.relay_mode() == RelayMode::Managed {
relay_log::info!("requesting global config from upstream");
self.request_global_config();
} else {
match GlobalConfig::load(self.config.path()) {
Ok(Some(from_file)) => {
relay_log::info!("serving static global config loaded from file");
self.global_config_watch
.send_replace(Status::Ready(Arc::new(from_file)));
}
Ok(None) => {
relay_log::info!(
"serving default global configs due to lacking static global config file"
);
self.global_config_watch
.send_replace(Status::Ready(Arc::default()));
}
Err(e) => {
relay_log::error!("failed to load global config from file: {}", e);
relay_log::info!(
"serving default global configs due to failure to load global config from file"
);
self.global_config_watch
.send_replace(Status::Ready(Arc::default()));
}
}
};
loop {
tokio::select! {
biased;
() = &mut self.fetch_handle => self.request_global_config(),
Some(result) = self.internal_rx.recv() => self.handle_result(result),
Some(message) = rx.recv() => self.handle_message(message),
_ = shutdown_handle.notified() => self.handle_shutdown(),
else => break,
}
}
relay_log::info!("global config service stopped");
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::time::Duration;
use relay_config::{Config, RelayMode};
use relay_system::{Controller, Service, ShutdownMode};
use relay_test::mock_service;
use crate::services::global_config::{Get, GlobalConfigService};
#[tokio::test]
async fn shutdown_service() {
relay_test::setup();
tokio::time::pause();
let (upstream, _) = mock_service("upstream", 0, |state, _| {
*state += 1;
if *state > 1 {
panic!("should not receive requests after shutdown");
}
});
Controller::start(Duration::from_secs(1));
let mut config = Config::default();
config.regenerate_credentials(false).unwrap();
let fetch_interval = config.global_config_fetch_interval();
let service = GlobalConfigService::new(Arc::new(config), upstream)
.0
.start_detached();
assert!(service.send(Get).await.is_ok());
Controller::shutdown(ShutdownMode::Immediate);
tokio::time::sleep(fetch_interval * 2).await;
assert!(service.send(Get).await.is_ok());
}
#[tokio::test]
#[should_panic]
async fn managed_relay_makes_upstream_request() {
relay_test::setup();
tokio::time::pause();
let (upstream, handle) = mock_service("upstream", (), |(), _| {
panic!();
});
let mut config = Config::from_json_value(serde_json::json!({
"relay": {
"mode": RelayMode::Managed
}
}))
.unwrap();
config.regenerate_credentials(false).unwrap();
let fetch_interval = config.global_config_fetch_interval();
let service = GlobalConfigService::new(Arc::new(config), upstream)
.0
.start_detached();
service.send(Get).await.unwrap();
tokio::time::sleep(fetch_interval * 2).await;
handle.await.unwrap();
}
#[tokio::test]
async fn proxy_relay_does_not_make_upstream_request() {
relay_test::setup();
tokio::time::pause();
let (upstream, _) = mock_service("upstream", (), |(), _| {
panic!("upstream should not be called outside of managed mode");
});
let config = Config::from_json_value(serde_json::json!({
"relay": {
"mode": RelayMode::Proxy
}
}))
.unwrap();
let fetch_interval = config.global_config_fetch_interval();
let service = GlobalConfigService::new(Arc::new(config), upstream)
.0
.start_detached();
service.send(Get).await.unwrap();
tokio::time::sleep(fetch_interval * 2).await;
}
}