1use std::borrow::Cow;
13use std::fmt;
14use std::sync::Arc;
15use std::time::Duration;
16
17use relay_config::Config;
18use relay_config::RelayMode;
19use relay_dynamic_config::GlobalConfig;
20use relay_statsd::metric;
21use relay_system::{Addr, AsyncResponse, Controller, FromMessage, Interface, Service};
22use reqwest::Method;
23use serde::{Deserialize, Serialize};
24use tokio::sync::{mpsc, watch};
25use tokio::time::Instant;
26
27use crate::services::upstream::{
28 RequestPriority, SendQuery, UpstreamQuery, UpstreamRelay, UpstreamRequestError,
29};
30use crate::statsd::{RelayCounters, RelayTimers};
31use crate::utils::SleepHandle;
32
33type UpstreamQueryResult =
36 Result<Result<GetGlobalConfigResponse, UpstreamRequestError>, relay_system::SendError>;
37
38#[derive(Debug, Deserialize, Serialize)]
40#[serde(rename_all = "camelCase")]
41struct GetGlobalConfigResponse {
42 global: Option<GlobalConfig>,
43 global_status: Option<StatusResponse>,
46}
47
48#[derive(Clone, Copy, Debug, Deserialize, Serialize)]
50#[serde(rename_all = "lowercase")]
51pub enum StatusResponse {
52 Ready,
53 Pending,
54}
55
56impl StatusResponse {
57 pub fn is_ready(self) -> bool {
58 matches!(self, Self::Ready)
59 }
60}
61
62#[derive(Debug, Deserialize, Serialize)]
64#[serde(rename_all = "camelCase")]
65struct GetGlobalConfig {
66 global: bool,
67 public_keys: Vec<()>,
69}
70
71impl GetGlobalConfig {
72 fn new() -> GetGlobalConfig {
73 GetGlobalConfig {
74 global: true,
75 public_keys: vec![],
76 }
77 }
78}
79
80impl UpstreamQuery for GetGlobalConfig {
81 type Response = GetGlobalConfigResponse;
82
83 fn method(&self) -> reqwest::Method {
84 Method::POST
85 }
86
87 fn path(&self) -> std::borrow::Cow<'static, str> {
88 Cow::Borrowed("/api/0/relays/projectconfigs/?version=3")
89 }
90
91 fn retry() -> bool {
92 false
93 }
94
95 fn priority() -> super::upstream::RequestPriority {
96 RequestPriority::High
97 }
98
99 fn route(&self) -> &'static str {
100 "global_config"
101 }
102}
103
104pub struct Get;
106
107pub enum GlobalConfigManager {
113 Get(relay_system::Sender<Status>),
115}
116
117impl Interface for GlobalConfigManager {}
118
119impl FromMessage<Get> for GlobalConfigManager {
120 type Response = AsyncResponse<Status>;
121
122 fn from_message(_: Get, sender: relay_system::Sender<Status>) -> Self {
123 Self::Get(sender)
124 }
125}
126
127#[derive(Debug, Clone, Default)]
129pub enum Status {
130 Ready(Arc<GlobalConfig>),
136 #[default]
140 Pending,
141}
142
143impl Status {
144 pub fn is_ready(&self) -> bool {
146 matches!(self, Self::Ready(_))
147 }
148}
149
150#[derive(Clone)]
151pub struct GlobalConfigHandle {
152 watch: watch::Receiver<Status>,
153}
154
155impl GlobalConfigHandle {
156 #[cfg(test)]
158 pub fn fixed(config: GlobalConfig) -> Self {
159 let (_, watch) = watch::channel(Status::Ready(Arc::new(config)));
160 Self { watch }
161 }
162
163 pub fn current(&self) -> Arc<GlobalConfig> {
168 match &*self.watch.borrow() {
169 Status::Ready(config) => Arc::clone(config),
170 Status::Pending => Default::default(),
171 }
172 }
173}
174
175impl fmt::Debug for GlobalConfigHandle {
176 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
177 f.debug_tuple("GlobalConfigHandle")
178 .field(&*self.watch.borrow())
179 .finish()
180 }
181}
182
183#[derive(Debug)]
185pub struct GlobalConfigService {
186 config: Arc<Config>,
187 global_config_watch: watch::Sender<Status>,
189 internal_tx: mpsc::Sender<UpstreamQueryResult>,
191 internal_rx: mpsc::Receiver<UpstreamQueryResult>,
193 upstream: Addr<UpstreamRelay>,
195 fetch_handle: SleepHandle,
197 last_fetched: Instant,
199 upstream_failure_interval: Duration,
201 shutdown: bool,
203}
204
205impl GlobalConfigService {
206 pub fn new(
208 config: Arc<Config>,
209 upstream: Addr<UpstreamRelay>,
210 ) -> (Self, watch::Receiver<Status>) {
211 let (internal_tx, internal_rx) = mpsc::channel(1);
212 let (global_config_watch, rx) = watch::channel(Status::Pending);
213
214 (
215 Self {
216 config,
217 global_config_watch,
218 internal_tx,
219 internal_rx,
220 upstream,
221 fetch_handle: SleepHandle::idle(),
222 last_fetched: Instant::now(),
223 upstream_failure_interval: Duration::from_secs(35),
224 shutdown: false,
225 },
226 rx,
227 )
228 }
229
230 pub fn handle(&self) -> GlobalConfigHandle {
233 GlobalConfigHandle {
234 watch: self.global_config_watch.subscribe(),
235 }
236 }
237
238 fn handle_message(&mut self, message: GlobalConfigManager) {
240 match message {
241 GlobalConfigManager::Get(sender) => {
242 sender.send(self.global_config_watch.borrow().clone());
243 }
244 }
245 }
246
247 fn schedule_fetch(&mut self) {
249 if !self.shutdown && self.fetch_handle.is_idle() {
250 self.fetch_handle
251 .set(self.config.global_config_fetch_interval());
252 }
253 }
254
255 fn request_global_config(&mut self) {
260 self.fetch_handle.reset();
262
263 let upstream_relay = self.upstream.clone();
264 let internal_tx = self.internal_tx.clone();
265
266 relay_system::spawn!(async move {
267 metric!(timer(RelayTimers::GlobalConfigRequestDuration), {
268 let query = GetGlobalConfig::new();
269 let res = upstream_relay.send(SendQuery(query)).await;
270 internal_tx.send(res).await.ok();
273 });
274 });
275 }
276
277 fn handle_result(&mut self, result: UpstreamQueryResult) {
285 match result {
286 Ok(Ok(response)) => {
287 let mut success = false;
288 let is_ready = response.global_status.is_none_or(|stat| stat.is_ready());
291
292 match response.global {
293 Some(global_config) if is_ready => {
294 if !self.global_config_watch.borrow().is_ready() {
296 relay_log::info!("received global config from upstream");
297 }
298
299 self.global_config_watch
300 .send_replace(Status::Ready(Arc::new(global_config)));
301 success = true;
302 self.last_fetched = Instant::now();
303 }
304 Some(_) => relay_log::info!("global config from upstream is not yet ready"),
305 None => relay_log::error!("global config missing in upstream response"),
306 }
307 metric!(
308 counter(RelayCounters::GlobalConfigFetched) += 1,
309 success = if success { "true" } else { "false" },
310 );
311 }
312 Ok(Err(e)) => {
313 if self.last_fetched.elapsed() >= self.upstream_failure_interval {
314 relay_log::error!(
315 error = &e as &dyn std::error::Error,
316 "failed to fetch global config from upstream"
317 );
318 }
319 }
320 Err(e) => relay_log::error!(
321 error = &e as &dyn std::error::Error,
322 "failed to send request to upstream"
323 ),
324 }
325
326 self.schedule_fetch();
328 }
329
330 fn handle_shutdown(&mut self) {
331 self.shutdown = true;
332 self.fetch_handle.reset();
333 }
334}
335
336impl Service for GlobalConfigService {
337 type Interface = GlobalConfigManager;
338
339 async fn run(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
340 let mut shutdown_handle = Controller::shutdown_handle();
341
342 relay_log::info!("global config service starting");
343 if self.config.relay_mode() == RelayMode::Managed {
344 relay_log::info!("requesting global config from upstream");
345 self.request_global_config();
346 } else {
347 match GlobalConfig::load(self.config.path()) {
348 Ok(Some(from_file)) => {
349 relay_log::info!("serving static global config loaded from file");
350 self.global_config_watch
351 .send_replace(Status::Ready(Arc::new(from_file)));
352 }
353 Ok(None) => {
354 relay_log::info!(
355 "serving default global configs due to lacking static global config file"
356 );
357 self.global_config_watch
358 .send_replace(Status::Ready(Arc::default()));
359 }
360 Err(e) => {
361 relay_log::error!("failed to load global config from file: {}", e);
362 relay_log::info!(
363 "serving default global configs due to failure to load global config from file"
364 );
365 self.global_config_watch
366 .send_replace(Status::Ready(Arc::default()));
367 }
368 }
369 };
370
371 loop {
372 tokio::select! {
373 biased;
374
375 () = &mut self.fetch_handle => self.request_global_config(),
376 Some(result) = self.internal_rx.recv() => self.handle_result(result),
377 Some(message) = rx.recv() => self.handle_message(message),
378 _ = shutdown_handle.notified() => self.handle_shutdown(),
379
380 else => break,
381 }
382 }
383 relay_log::info!("global config service stopped");
384 }
385}
386
387#[cfg(test)]
388mod tests {
389 use std::sync::Arc;
390 use std::time::Duration;
391
392 use relay_config::{Config, RelayMode};
393 use relay_system::{Controller, Service, ShutdownMode};
394 use relay_test::mock_service;
395
396 use crate::services::global_config::{Get, GlobalConfigService};
397
398 #[tokio::test]
401 async fn shutdown_service() {
402 relay_test::setup();
403 tokio::time::pause();
404
405 let (upstream, _) = mock_service("upstream", 0, |state, _| {
406 *state += 1;
407
408 if *state > 1 {
409 panic!("should not receive requests after shutdown");
410 }
411 });
412
413 Controller::start(Duration::from_secs(1));
414 let mut config = Config::default();
415 config.regenerate_credentials(false).unwrap();
416 let fetch_interval = config.global_config_fetch_interval();
417
418 let service = GlobalConfigService::new(Arc::new(config), upstream)
419 .0
420 .start_detached();
421
422 assert!(service.send(Get).await.is_ok());
423
424 Controller::shutdown(ShutdownMode::Immediate);
425 tokio::time::sleep(fetch_interval * 2).await;
426
427 assert!(service.send(Get).await.is_ok());
428 }
429
430 #[tokio::test]
431 #[should_panic]
432 async fn managed_relay_makes_upstream_request() {
433 relay_test::setup();
434 tokio::time::pause();
435
436 let (upstream, handle) = mock_service("upstream", (), |(), _| {
437 panic!();
438 });
439
440 let mut config = Config::from_json_value(serde_json::json!({
441 "relay": {
442 "mode": RelayMode::Managed
443 }
444 }))
445 .unwrap();
446 config.regenerate_credentials(false).unwrap();
447
448 let fetch_interval = config.global_config_fetch_interval();
449 let service = GlobalConfigService::new(Arc::new(config), upstream)
450 .0
451 .start_detached();
452 service.send(Get).await.unwrap();
453
454 tokio::time::sleep(fetch_interval * 2).await;
455 handle.await.unwrap();
456 }
457
458 #[tokio::test]
459 async fn proxy_relay_does_not_make_upstream_request() {
460 relay_test::setup();
461 tokio::time::pause();
462
463 let (upstream, _) = mock_service("upstream", (), |(), _| {
464 panic!("upstream should not be called outside of managed mode");
465 });
466
467 let config = Config::from_json_value(serde_json::json!({
468 "relay": {
469 "mode": RelayMode::Proxy
470 }
471 }))
472 .unwrap();
473
474 let fetch_interval = config.global_config_fetch_interval();
475
476 let service = GlobalConfigService::new(Arc::new(config), upstream)
477 .0
478 .start_detached();
479 service.send(Get).await.unwrap();
480
481 tokio::time::sleep(fetch_interval * 2).await;
482 }
483}