1use std::borrow::Cow;
13use std::fmt;
14use std::sync::Arc;
15use std::time::Duration;
16
17use relay_config::Config;
18use relay_config::RelayMode;
19use relay_dynamic_config::GlobalConfig;
20use relay_statsd::metric;
21use relay_system::{Addr, AsyncResponse, Controller, FromMessage, Interface, Service};
22use reqwest::Method;
23use serde::{Deserialize, Serialize};
24use tokio::sync::{mpsc, watch};
25use tokio::time::Instant;
26
27use crate::services::upstream::{
28 RequestPriority, SendQuery, UpstreamQuery, UpstreamRelay, UpstreamRequestError,
29};
30use crate::statsd::{RelayCounters, RelayTimers};
31use crate::utils::SleepHandle;
32
33type UpstreamQueryResult =
36 Result<Result<GetGlobalConfigResponse, UpstreamRequestError>, relay_system::SendError>;
37
38#[derive(Debug, Deserialize, Serialize)]
40#[serde(rename_all = "camelCase")]
41struct GetGlobalConfigResponse {
42 global: Option<GlobalConfig>,
43 global_status: Option<StatusResponse>,
46}
47
48#[derive(Clone, Copy, Debug, Deserialize, Serialize)]
50#[serde(rename_all = "lowercase")]
51pub enum StatusResponse {
52 Ready,
53 Pending,
54}
55
56impl StatusResponse {
57 pub fn is_ready(self) -> bool {
58 matches!(self, Self::Ready)
59 }
60}
61
62#[derive(Debug, Deserialize, Serialize)]
64#[serde(rename_all = "camelCase")]
65struct GetGlobalConfig {
66 global: bool,
67 public_keys: Vec<()>,
69}
70
71impl GetGlobalConfig {
72 fn new() -> GetGlobalConfig {
73 GetGlobalConfig {
74 global: true,
75 public_keys: vec![],
76 }
77 }
78}
79
80impl UpstreamQuery for GetGlobalConfig {
81 type Response = GetGlobalConfigResponse;
82
83 fn method(&self) -> reqwest::Method {
84 Method::POST
85 }
86
87 fn path(&self) -> std::borrow::Cow<'static, str> {
88 Cow::Borrowed("/api/0/relays/projectconfigs/?version=3")
89 }
90
91 fn retry() -> bool {
92 false
93 }
94
95 fn priority() -> super::upstream::RequestPriority {
96 RequestPriority::High
97 }
98
99 fn route(&self) -> &'static str {
100 "global_config"
101 }
102}
103
104pub struct Get;
106
107pub enum GlobalConfigManager {
113 Get(relay_system::Sender<Status>),
115}
116
117impl Interface for GlobalConfigManager {}
118
119impl FromMessage<Get> for GlobalConfigManager {
120 type Response = AsyncResponse<Status>;
121
122 fn from_message(_: Get, sender: relay_system::Sender<Status>) -> Self {
123 Self::Get(sender)
124 }
125}
126
127#[derive(Debug, Clone, Default)]
129pub enum Status {
130 Ready(Arc<GlobalConfig>),
136 #[default]
140 Pending,
141}
142
143impl Status {
144 pub fn is_ready(&self) -> bool {
146 matches!(self, Self::Ready(_))
147 }
148}
149
150#[derive(Clone)]
151pub struct GlobalConfigHandle {
152 watch: watch::Receiver<Status>,
153}
154
155impl GlobalConfigHandle {
156 #[cfg(test)]
158 pub fn fixed(config: GlobalConfig) -> Self {
159 let (_, watch) = watch::channel(Status::Ready(Arc::new(config)));
160 Self { watch }
161 }
162
163 pub fn current(&self) -> Arc<GlobalConfig> {
168 match &*self.watch.borrow() {
169 Status::Ready(config) => Arc::clone(config),
170 Status::Pending => Default::default(),
171 }
172 }
173}
174
175impl fmt::Debug for GlobalConfigHandle {
176 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
177 f.debug_tuple("GlobalConfigHandle")
178 .field(&*self.watch.borrow())
179 .finish()
180 }
181}
182
183#[derive(Debug)]
185pub struct GlobalConfigService {
186 config: Arc<Config>,
187 global_config_watch: watch::Sender<Status>,
189 internal_tx: mpsc::Sender<UpstreamQueryResult>,
191 internal_rx: mpsc::Receiver<UpstreamQueryResult>,
193 upstream: Addr<UpstreamRelay>,
195 fetch_handle: SleepHandle,
197 last_fetched: Instant,
199 upstream_failure_interval: Duration,
201 shutdown: bool,
203}
204
205impl GlobalConfigService {
206 pub fn new(
208 config: Arc<Config>,
209 upstream: Addr<UpstreamRelay>,
210 ) -> (Self, watch::Receiver<Status>) {
211 let (internal_tx, internal_rx) = mpsc::channel(1);
212 let (global_config_watch, rx) = watch::channel(Status::Pending);
213
214 (
215 Self {
216 config,
217 global_config_watch,
218 internal_tx,
219 internal_rx,
220 upstream,
221 fetch_handle: SleepHandle::idle(),
222 last_fetched: Instant::now(),
223 upstream_failure_interval: Duration::from_secs(35),
224 shutdown: false,
225 },
226 rx,
227 )
228 }
229
230 pub fn handle(&self) -> GlobalConfigHandle {
233 GlobalConfigHandle {
234 watch: self.global_config_watch.subscribe(),
235 }
236 }
237
238 fn handle_message(&mut self, message: GlobalConfigManager) {
240 match message {
241 GlobalConfigManager::Get(sender) => {
242 sender.send(self.global_config_watch.borrow().clone());
243 }
244 }
245 }
246
247 fn schedule_fetch(&mut self) {
249 if !self.shutdown && self.fetch_handle.is_idle() {
250 self.fetch_handle
251 .set(self.config.global_config_fetch_interval());
252 }
253 }
254
255 fn request_global_config(&mut self) {
260 self.fetch_handle.reset();
262
263 let upstream_relay = self.upstream.clone();
264 let internal_tx = self.internal_tx.clone();
265
266 relay_system::spawn!(async move {
267 metric!(timer(RelayTimers::GlobalConfigRequestDuration), {
268 let query = GetGlobalConfig::new();
269 let res = upstream_relay.send(SendQuery(query)).await;
270 internal_tx.send(res).await.ok();
273 });
274 });
275 }
276
277 fn handle_result(&mut self, result: UpstreamQueryResult) {
285 match result {
286 Ok(Ok(response)) => {
287 let mut success = false;
288 let is_ready = response.global_status.is_none_or(|stat| stat.is_ready());
291
292 match response.global {
293 Some(mut global_config) if is_ready => {
294 if !self.global_config_watch.borrow().is_ready() {
296 relay_log::info!("received global config from upstream");
297 }
298
299 global_config.normalize();
300
301 self.global_config_watch
302 .send_replace(Status::Ready(Arc::new(global_config)));
303 success = true;
304 self.last_fetched = Instant::now();
305 }
306 Some(_) => relay_log::info!("global config from upstream is not yet ready"),
307 None => relay_log::error!("global config missing in upstream response"),
308 }
309 metric!(
310 counter(RelayCounters::GlobalConfigFetched) += 1,
311 success = if success { "true" } else { "false" },
312 );
313 }
314 Ok(Err(e)) => {
315 if self.last_fetched.elapsed() >= self.upstream_failure_interval {
316 relay_log::error!(
317 error = &e as &dyn std::error::Error,
318 "failed to fetch global config from upstream"
319 );
320 }
321 }
322 Err(e) => relay_log::error!(
323 error = &e as &dyn std::error::Error,
324 "failed to send request to upstream"
325 ),
326 }
327
328 self.schedule_fetch();
330 }
331
332 fn handle_shutdown(&mut self) {
333 self.shutdown = true;
334 self.fetch_handle.reset();
335 }
336}
337
338impl Service for GlobalConfigService {
339 type Interface = GlobalConfigManager;
340
341 async fn run(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
342 let mut shutdown_handle = Controller::shutdown_handle();
343
344 relay_log::info!("global config service starting");
345 if self.config.relay_mode() == RelayMode::Managed {
346 relay_log::info!("requesting global config from upstream");
347 self.request_global_config();
348 } else {
349 match GlobalConfig::load(self.config.path()) {
350 Ok(Some(from_file)) => {
351 relay_log::info!("serving static global config loaded from file");
352 self.global_config_watch
353 .send_replace(Status::Ready(Arc::new(from_file)));
354 }
355 Ok(None) => {
356 relay_log::info!(
357 "serving default global configs due to lacking static global config file"
358 );
359 self.global_config_watch
360 .send_replace(Status::Ready(Arc::default()));
361 }
362 Err(e) => {
363 relay_log::error!("failed to load global config from file: {}", e);
364 relay_log::info!(
365 "serving default global configs due to failure to load global config from file"
366 );
367 self.global_config_watch
368 .send_replace(Status::Ready(Arc::default()));
369 }
370 }
371 };
372
373 loop {
374 tokio::select! {
375 biased;
376
377 () = &mut self.fetch_handle => self.request_global_config(),
378 Some(result) = self.internal_rx.recv() => self.handle_result(result),
379 Some(message) = rx.recv() => self.handle_message(message),
380 _ = shutdown_handle.notified() => self.handle_shutdown(),
381
382 else => break,
383 }
384 }
385 relay_log::info!("global config service stopped");
386 }
387}
388
389#[cfg(test)]
390mod tests {
391 use std::sync::Arc;
392 use std::time::Duration;
393
394 use relay_config::{Config, RelayMode};
395 use relay_system::{Controller, Service, ShutdownMode};
396 use relay_test::mock_service;
397
398 use crate::services::global_config::{Get, GlobalConfigService};
399
400 #[tokio::test]
403 async fn shutdown_service() {
404 relay_test::setup();
405 tokio::time::pause();
406
407 let (upstream, _) = mock_service("upstream", 0, |state, _| {
408 *state += 1;
409
410 if *state > 1 {
411 panic!("should not receive requests after shutdown");
412 }
413 });
414
415 Controller::start(Duration::from_secs(1));
416 let mut config = Config::default();
417 config.regenerate_credentials(false).unwrap();
418 let fetch_interval = config.global_config_fetch_interval();
419
420 let service = GlobalConfigService::new(Arc::new(config), upstream)
421 .0
422 .start_detached();
423
424 assert!(service.send(Get).await.is_ok());
425
426 Controller::shutdown(ShutdownMode::Immediate);
427 tokio::time::sleep(fetch_interval * 2).await;
428
429 assert!(service.send(Get).await.is_ok());
430 }
431
432 #[tokio::test]
433 #[should_panic]
434 async fn managed_relay_makes_upstream_request() {
435 relay_test::setup();
436 tokio::time::pause();
437
438 let (upstream, handle) = mock_service("upstream", (), |(), _| {
439 panic!();
440 });
441
442 let mut config = Config::from_json_value(serde_json::json!({
443 "relay": {
444 "mode": RelayMode::Managed
445 }
446 }))
447 .unwrap();
448 config.regenerate_credentials(false).unwrap();
449
450 let fetch_interval = config.global_config_fetch_interval();
451 let service = GlobalConfigService::new(Arc::new(config), upstream)
452 .0
453 .start_detached();
454 service.send(Get).await.unwrap();
455
456 tokio::time::sleep(fetch_interval * 2).await;
457 handle.await.unwrap();
458 }
459
460 #[tokio::test]
461 async fn proxy_relay_does_not_make_upstream_request() {
462 relay_test::setup();
463 tokio::time::pause();
464
465 let (upstream, _) = mock_service("upstream", (), |(), _| {
466 panic!("upstream should not be called outside of managed mode");
467 });
468
469 let config = Config::from_json_value(serde_json::json!({
470 "relay": {
471 "mode": RelayMode::Proxy
472 }
473 }))
474 .unwrap();
475
476 let fetch_interval = config.global_config_fetch_interval();
477
478 let service = GlobalConfigService::new(Arc::new(config), upstream)
479 .0
480 .start_detached();
481 service.send(Get).await.unwrap();
482
483 tokio::time::sleep(fetch_interval * 2).await;
484 }
485}