relay_server/services/
global_config.rs

1//! This module implements the Global Config service.
2//!
3//! The global config service is a Relay service to manage [`GlobalConfig`]s,
4//! from fetching to forwarding. Once the service is started, it requests
5//! recurrently the configs from upstream in a timely manner to provide it to
6//! the rest of Relay.
7//!
8//! There are two ways to interact with this service: requesting a single global
9//! config update or subscribing for updates; see [`GlobalConfigManager`] for
10//! more details.
11
12use std::borrow::Cow;
13use std::fmt;
14use std::sync::Arc;
15use std::time::Duration;
16
17use relay_config::Config;
18use relay_config::RelayMode;
19use relay_dynamic_config::GlobalConfig;
20use relay_statsd::metric;
21use relay_system::{Addr, AsyncResponse, Controller, FromMessage, Interface, Service};
22use reqwest::Method;
23use serde::{Deserialize, Serialize};
24use tokio::sync::{mpsc, watch};
25use tokio::time::Instant;
26
27use crate::services::upstream::{
28    RequestPriority, SendQuery, UpstreamQuery, UpstreamRelay, UpstreamRequestError,
29};
30use crate::statsd::{RelayCounters, RelayTimers};
31use crate::utils::SleepHandle;
32
33/// The result of sending a global config query to upstream.
34/// It can fail both in sending it, and in the response.
35type UpstreamQueryResult =
36    Result<Result<GetGlobalConfigResponse, UpstreamRequestError>, relay_system::SendError>;
37
38/// The response of a fetch of a global config from upstream.
39#[derive(Debug, Deserialize, Serialize)]
40#[serde(rename_all = "camelCase")]
41struct GetGlobalConfigResponse {
42    global: Option<GlobalConfig>,
43    // Instead of using [`Status`], we use StatusResponse as a separate field in order to not
44    // make breaking changes to the api.
45    global_status: Option<StatusResponse>,
46}
47
48/// A mirror of [`Status`] without the associated data for use in serialization.
49#[derive(Clone, Copy, Debug, Deserialize, Serialize)]
50#[serde(rename_all = "lowercase")]
51pub enum StatusResponse {
52    Ready,
53    Pending,
54}
55
56impl StatusResponse {
57    pub fn is_ready(self) -> bool {
58        matches!(self, Self::Ready)
59    }
60}
61
62/// The request to fetch a global config from upstream.
63#[derive(Debug, Deserialize, Serialize)]
64#[serde(rename_all = "camelCase")]
65struct GetGlobalConfig {
66    global: bool,
67    // Dummy variable - upstream expects a list of public keys.
68    public_keys: Vec<()>,
69}
70
71impl GetGlobalConfig {
72    fn new() -> GetGlobalConfig {
73        GetGlobalConfig {
74            global: true,
75            public_keys: vec![],
76        }
77    }
78}
79
80impl UpstreamQuery for GetGlobalConfig {
81    type Response = GetGlobalConfigResponse;
82
83    fn method(&self) -> reqwest::Method {
84        Method::POST
85    }
86
87    fn path(&self) -> std::borrow::Cow<'static, str> {
88        Cow::Borrowed("/api/0/relays/projectconfigs/?version=3")
89    }
90
91    fn retry() -> bool {
92        false
93    }
94
95    fn priority() -> super::upstream::RequestPriority {
96        RequestPriority::High
97    }
98
99    fn route(&self) -> &'static str {
100        "global_config"
101    }
102}
103
104/// The message for requesting the most recent global config from [`GlobalConfigService`].
105pub struct Get;
106
107/// An interface to get [`GlobalConfig`]s through [`GlobalConfigService`].
108///
109/// For a one-off update, [`GlobalConfigService`] responds to
110/// [`GlobalConfigManager::Get`] messages with the latest instance of the
111/// [`GlobalConfig`].
112pub enum GlobalConfigManager {
113    /// Returns the most recent global config.
114    Get(relay_system::Sender<Status>),
115}
116
117impl Interface for GlobalConfigManager {}
118
119impl FromMessage<Get> for GlobalConfigManager {
120    type Response = AsyncResponse<Status>;
121
122    fn from_message(_: Get, sender: relay_system::Sender<Status>) -> Self {
123        Self::Get(sender)
124    }
125}
126
127/// Describes the current fetching status of the [`GlobalConfig`] from the upstream.
128#[derive(Debug, Clone, Default)]
129pub enum Status {
130    /// Global config ready to be used by other services.
131    ///
132    /// This variant implies different things in different circumstances. In managed mode, it means
133    /// that we have received a config from upstream. In other modes the config is either
134    /// from a file or the default global config.
135    Ready(Arc<GlobalConfig>),
136    /// The global config is requested from the upstream but it has not arrived yet.
137    ///
138    /// This variant should never be sent after the first `Ready` has occured.
139    #[default]
140    Pending,
141}
142
143impl Status {
144    /// Returns `true` if the global config is ready to be read.
145    pub fn is_ready(&self) -> bool {
146        matches!(self, Self::Ready(_))
147    }
148}
149
150#[derive(Clone)]
151pub struct GlobalConfigHandle {
152    watch: watch::Receiver<Status>,
153}
154
155impl GlobalConfigHandle {
156    /// Creates a new global config handle with a fixed global config.
157    #[cfg(test)]
158    pub fn fixed(config: GlobalConfig) -> Self {
159        let (_, watch) = watch::channel(Status::Ready(Arc::new(config)));
160        Self { watch }
161    }
162
163    /// Returns the currently loaded or a default global config.
164    ///
165    /// When no global config has been received from upstream yet,
166    /// this will return a default global config.
167    pub fn current(&self) -> Arc<GlobalConfig> {
168        match &*self.watch.borrow() {
169            Status::Ready(config) => Arc::clone(config),
170            Status::Pending => Default::default(),
171        }
172    }
173}
174
175impl fmt::Debug for GlobalConfigHandle {
176    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
177        f.debug_tuple("GlobalConfigHandle")
178            .field(&*self.watch.borrow())
179            .finish()
180    }
181}
182
183/// Service implementing the [`GlobalConfigManager`] interface.
184#[derive(Debug)]
185pub struct GlobalConfigService {
186    config: Arc<Config>,
187    /// Sender of the [`watch`] channel for the subscribers of the service.
188    global_config_watch: watch::Sender<Status>,
189    /// Sender of the internal channel to forward global configs from upstream.
190    internal_tx: mpsc::Sender<UpstreamQueryResult>,
191    /// Receiver of the internal channel to forward global configs from upstream.
192    internal_rx: mpsc::Receiver<UpstreamQueryResult>,
193    /// Upstream service to request global configs from.
194    upstream: Addr<UpstreamRelay>,
195    /// Handle to avoid multiple outgoing requests.
196    fetch_handle: SleepHandle,
197    /// Last instant the global config was successfully fetched in.
198    last_fetched: Instant,
199    /// Interval of upstream fetching failures before reporting such errors.
200    upstream_failure_interval: Duration,
201    /// Disables the upstream fetch loop.
202    shutdown: bool,
203}
204
205impl GlobalConfigService {
206    /// Creates a new [`GlobalConfigService`].
207    pub fn new(
208        config: Arc<Config>,
209        upstream: Addr<UpstreamRelay>,
210    ) -> (Self, watch::Receiver<Status>) {
211        let (internal_tx, internal_rx) = mpsc::channel(1);
212        let (global_config_watch, rx) = watch::channel(Status::Pending);
213
214        (
215            Self {
216                config,
217                global_config_watch,
218                internal_tx,
219                internal_rx,
220                upstream,
221                fetch_handle: SleepHandle::idle(),
222                last_fetched: Instant::now(),
223                upstream_failure_interval: Duration::from_secs(35),
224                shutdown: false,
225            },
226            rx,
227        )
228    }
229
230    /// Creates a [`GlobalConfigHandle`] which can be used to retrieve the current state
231    /// of the global config at any time.
232    pub fn handle(&self) -> GlobalConfigHandle {
233        GlobalConfigHandle {
234            watch: self.global_config_watch.subscribe(),
235        }
236    }
237
238    /// Handles messages from external services.
239    fn handle_message(&mut self, message: GlobalConfigManager) {
240        match message {
241            GlobalConfigManager::Get(sender) => {
242                sender.send(self.global_config_watch.borrow().clone());
243            }
244        }
245    }
246
247    /// Schedules the next global config request.
248    fn schedule_fetch(&mut self) {
249        if !self.shutdown && self.fetch_handle.is_idle() {
250            self.fetch_handle
251                .set(self.config.global_config_fetch_interval());
252        }
253    }
254
255    /// Requests a new global config from upstream.
256    ///
257    /// We check if we have credentials before sending,
258    /// otherwise we would log an [`UpstreamRequestError::NoCredentials`] error.
259    fn request_global_config(&mut self) {
260        // Disable upstream requests timer until we receive result of query.
261        self.fetch_handle.reset();
262
263        let upstream_relay = self.upstream.clone();
264        let internal_tx = self.internal_tx.clone();
265
266        relay_system::spawn!(async move {
267            metric!(timer(RelayTimers::GlobalConfigRequestDuration), {
268                let query = GetGlobalConfig::new();
269                let res = upstream_relay.send(SendQuery(query)).await;
270                // Internal forwarding should only fail when the internal
271                // receiver is closed.
272                internal_tx.send(res).await.ok();
273            });
274        });
275    }
276
277    /// Handles the response of an attempt to fetch the global config from
278    /// upstream.
279    ///
280    /// This function checks two levels of results:
281    /// 1. Whether the request to the upstream was successful.
282    /// 2. If the request was successful, it then checks whether the returned
283    ///    global config is valid and contains the expected data.
284    fn handle_result(&mut self, result: UpstreamQueryResult) {
285        match result {
286            Ok(Ok(response)) => {
287                let mut success = false;
288                // Older relays won't send a global status, in that case, we will pretend like the
289                // default global config is an up to date one, because that was the old behaviour.
290                let is_ready = response.global_status.is_none_or(|stat| stat.is_ready());
291
292                match response.global {
293                    Some(mut global_config) if is_ready => {
294                        // Log the first time we receive a global config from upstream.
295                        if !self.global_config_watch.borrow().is_ready() {
296                            relay_log::info!("received global config from upstream");
297                        }
298
299                        global_config.normalize();
300
301                        self.global_config_watch
302                            .send_replace(Status::Ready(Arc::new(global_config)));
303                        success = true;
304                        self.last_fetched = Instant::now();
305                    }
306                    Some(_) => relay_log::info!("global config from upstream is not yet ready"),
307                    None => relay_log::error!("global config missing in upstream response"),
308                }
309                metric!(
310                    counter(RelayCounters::GlobalConfigFetched) += 1,
311                    success = if success { "true" } else { "false" },
312                );
313            }
314            Ok(Err(e)) => {
315                if self.last_fetched.elapsed() >= self.upstream_failure_interval {
316                    relay_log::error!(
317                        error = &e as &dyn std::error::Error,
318                        "failed to fetch global config from upstream"
319                    );
320                }
321            }
322            Err(e) => relay_log::error!(
323                error = &e as &dyn std::error::Error,
324                "failed to send request to upstream"
325            ),
326        }
327
328        // Enable upstream requests timer for global configs.
329        self.schedule_fetch();
330    }
331
332    fn handle_shutdown(&mut self) {
333        self.shutdown = true;
334        self.fetch_handle.reset();
335    }
336}
337
338impl Service for GlobalConfigService {
339    type Interface = GlobalConfigManager;
340
341    async fn run(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
342        let mut shutdown_handle = Controller::shutdown_handle();
343
344        relay_log::info!("global config service starting");
345        if self.config.relay_mode() == RelayMode::Managed {
346            relay_log::info!("requesting global config from upstream");
347            self.request_global_config();
348        } else {
349            match GlobalConfig::load(self.config.path()) {
350                Ok(Some(from_file)) => {
351                    relay_log::info!("serving static global config loaded from file");
352                    self.global_config_watch
353                        .send_replace(Status::Ready(Arc::new(from_file)));
354                }
355                Ok(None) => {
356                    relay_log::info!(
357                        "serving default global configs due to lacking static global config file"
358                    );
359                    self.global_config_watch
360                        .send_replace(Status::Ready(Arc::default()));
361                }
362                Err(e) => {
363                    relay_log::error!("failed to load global config from file: {}", e);
364                    relay_log::info!(
365                        "serving default global configs due to failure to load global config from file"
366                    );
367                    self.global_config_watch
368                        .send_replace(Status::Ready(Arc::default()));
369                }
370            }
371        };
372
373        loop {
374            tokio::select! {
375                biased;
376
377                () = &mut self.fetch_handle => self.request_global_config(),
378                Some(result) = self.internal_rx.recv() => self.handle_result(result),
379                Some(message) = rx.recv() => self.handle_message(message),
380                _ = shutdown_handle.notified() => self.handle_shutdown(),
381
382                else => break,
383            }
384        }
385        relay_log::info!("global config service stopped");
386    }
387}
388
389#[cfg(test)]
390mod tests {
391    use std::sync::Arc;
392    use std::time::Duration;
393
394    use relay_config::{Config, RelayMode};
395    use relay_system::{Controller, Service, ShutdownMode};
396    use relay_test::mock_service;
397
398    use crate::services::global_config::{Get, GlobalConfigService};
399
400    /// Tests that the service can still handle requests after sending a
401    /// shutdown signal.
402    #[tokio::test]
403    async fn shutdown_service() {
404        relay_test::setup();
405        tokio::time::pause();
406
407        let (upstream, _) = mock_service("upstream", 0, |state, _| {
408            *state += 1;
409
410            if *state > 1 {
411                panic!("should not receive requests after shutdown");
412            }
413        });
414
415        Controller::start(Duration::from_secs(1));
416        let mut config = Config::default();
417        config.regenerate_credentials(false).unwrap();
418        let fetch_interval = config.global_config_fetch_interval();
419
420        let service = GlobalConfigService::new(Arc::new(config), upstream)
421            .0
422            .start_detached();
423
424        assert!(service.send(Get).await.is_ok());
425
426        Controller::shutdown(ShutdownMode::Immediate);
427        tokio::time::sleep(fetch_interval * 2).await;
428
429        assert!(service.send(Get).await.is_ok());
430    }
431
432    #[tokio::test]
433    #[should_panic]
434    async fn managed_relay_makes_upstream_request() {
435        relay_test::setup();
436        tokio::time::pause();
437
438        let (upstream, handle) = mock_service("upstream", (), |(), _| {
439            panic!();
440        });
441
442        let mut config = Config::from_json_value(serde_json::json!({
443            "relay": {
444                "mode":  RelayMode::Managed
445            }
446        }))
447        .unwrap();
448        config.regenerate_credentials(false).unwrap();
449
450        let fetch_interval = config.global_config_fetch_interval();
451        let service = GlobalConfigService::new(Arc::new(config), upstream)
452            .0
453            .start_detached();
454        service.send(Get).await.unwrap();
455
456        tokio::time::sleep(fetch_interval * 2).await;
457        handle.await.unwrap();
458    }
459
460    #[tokio::test]
461    async fn proxy_relay_does_not_make_upstream_request() {
462        relay_test::setup();
463        tokio::time::pause();
464
465        let (upstream, _) = mock_service("upstream", (), |(), _| {
466            panic!("upstream should not be called outside of managed mode");
467        });
468
469        let config = Config::from_json_value(serde_json::json!({
470            "relay": {
471                "mode":  RelayMode::Proxy
472            }
473        }))
474        .unwrap();
475
476        let fetch_interval = config.global_config_fetch_interval();
477
478        let service = GlobalConfigService::new(Arc::new(config), upstream)
479            .0
480            .start_detached();
481        service.send(Get).await.unwrap();
482
483        tokio::time::sleep(fetch_interval * 2).await;
484    }
485}