relay_server/services/projects/source/
local.rs

1use std::collections::HashMap;
2use std::ffi::OsStr;
3use std::path::Path;
4use std::sync::Arc;
5
6use relay_base_schema::project::{ProjectId, ProjectKey};
7use relay_config::Config;
8use relay_system::{AsyncResponse, FromMessage, Interface, Receiver, Sender, Service};
9use tokio::sync::mpsc;
10use tokio::time::Instant;
11
12use crate::services::projects::project::{ParsedProjectState, ProjectState};
13use crate::services::projects::source::FetchOptionalProjectState;
14
15/// Service interface of the local project source.
16#[derive(Debug)]
17pub struct LocalProjectSource(FetchOptionalProjectState, Sender<Option<ProjectState>>);
18
19impl Interface for LocalProjectSource {}
20
21impl FromMessage<FetchOptionalProjectState> for LocalProjectSource {
22    type Response = AsyncResponse<Option<ProjectState>>;
23    fn from_message(
24        message: FetchOptionalProjectState,
25        sender: Sender<Option<ProjectState>>,
26    ) -> Self {
27        Self(message, sender)
28    }
29}
30
31/// A service which periodically loads project states from disk.
32#[derive(Debug)]
33pub struct LocalProjectSourceService {
34    config: Arc<Config>,
35    local_states: HashMap<ProjectKey, ProjectState>,
36}
37
38impl LocalProjectSourceService {
39    pub fn new(config: Arc<Config>) -> Self {
40        Self {
41            config,
42            local_states: HashMap::new(),
43        }
44    }
45
46    fn handle_message(&mut self, message: LocalProjectSource) {
47        let LocalProjectSource(message, sender) = message;
48        let states = self.local_states.get(&message.project_key()).cloned();
49        sender.send(states);
50    }
51}
52
53fn get_project_id(path: &Path) -> Option<ProjectId> {
54    path.file_stem()
55        .and_then(OsStr::to_str)
56        .and_then(|stem| stem.parse().ok())
57}
58
59fn parse_file(
60    path: std::path::PathBuf,
61) -> tokio::io::Result<(std::path::PathBuf, ParsedProjectState)> {
62    let file = std::fs::File::open(&path)?;
63    let reader = std::io::BufReader::new(file);
64    let state = serde_json::from_reader(reader)?;
65    Ok((path, state))
66}
67
68async fn load_local_states(
69    projects_path: &Path,
70) -> tokio::io::Result<HashMap<ProjectKey, ProjectState>> {
71    let mut states = HashMap::new();
72
73    let mut directory = match tokio::fs::read_dir(projects_path).await {
74        Ok(directory) => directory,
75        Err(error) => {
76            return match error.kind() {
77                tokio::io::ErrorKind::NotFound => Ok(states),
78                _ => Err(error),
79            };
80        }
81    };
82
83    // only printed when directory even exists.
84    relay_log::debug!(directory = ?projects_path, "loading local states from file system");
85
86    while let Some(entry) = directory.next_entry().await? {
87        let path = entry.path();
88
89        let metadata = entry.metadata().await?;
90        if !(metadata.is_file() || metadata.is_symlink()) {
91            relay_log::warn!(?path, "skipping file, not a file");
92            continue;
93        }
94
95        if path.extension().map(|x| x != "json").unwrap_or(true) {
96            relay_log::warn!(?path, "skipping file, file extension must be .json");
97            continue;
98        }
99
100        // serde_json is not async, so spawn a blocking task here:
101        let (path, mut state) = tokio::task::spawn_blocking(move || parse_file(path)).await??;
102
103        if state.info.project_id.is_none() {
104            if let Some(project_id) = get_project_id(&path) {
105                state.info.project_id = Some(project_id);
106            } else {
107                relay_log::warn!(?path, "skipping file, filename is not a valid project id");
108                continue;
109            }
110        }
111
112        // Keep a separate project state per key.
113        let keys = std::mem::take(&mut state.info.public_keys);
114        if keys.is_empty() {
115            relay_log::warn!(
116                ?path,
117                "skipping file, project config is missing public keys"
118            );
119        }
120
121        for key in keys {
122            let mut state = state.clone();
123            state.info.public_keys = smallvec::smallvec![key.clone()];
124            states.insert(key.public_key, ProjectState::from(state).sanitized());
125        }
126    }
127
128    Ok(states)
129}
130
131async fn poll_local_states(path: &Path, tx: &mpsc::Sender<HashMap<ProjectKey, ProjectState>>) {
132    let states = load_local_states(path).await;
133    match states {
134        Ok(states) => {
135            let res = tx.send(states).await;
136            if res.is_err() {
137                relay_log::error!("failed to store static project configs");
138            }
139        }
140        Err(error) => relay_log::error!(
141            error = &error as &dyn std::error::Error,
142            "failed to load static project configs",
143        ),
144    };
145}
146
147async fn spawn_poll_local_states(
148    config: &Config,
149    tx: mpsc::Sender<HashMap<ProjectKey, ProjectState>>,
150) {
151    let project_path = config.project_configs_path();
152    let period = config.local_cache_interval();
153
154    // Poll local states once before handling any message, such that the projects are
155    // populated.
156    poll_local_states(&project_path, &tx).await;
157
158    // Start a background loop that polls periodically:
159    relay_system::spawn!(async move {
160        // To avoid running two load tasks simultaneously at startup, we delay the interval by one period:
161        let start_at = Instant::now() + period;
162        let mut ticker = tokio::time::interval_at(start_at, period);
163
164        loop {
165            ticker.tick().await;
166            poll_local_states(&project_path, &tx).await;
167        }
168    });
169}
170
171impl Service for LocalProjectSourceService {
172    type Interface = LocalProjectSource;
173
174    async fn run(mut self, mut rx: Receiver<Self::Interface>) {
175        // Use a channel with size 1. If the channel is full because the consumer does not
176        // collect the result, the producer will block, which is acceptable.
177        let (state_tx, mut state_rx) = mpsc::channel(1);
178
179        relay_log::info!("project local cache started");
180
181        // Start the background task that periodically reloads projects from disk:
182        spawn_poll_local_states(&self.config, state_tx).await;
183
184        loop {
185            tokio::select! {
186                biased;
187                Some(message) = rx.recv() => self.handle_message(message),
188                Some(states) = state_rx.recv() => self.local_states = states,
189
190                else => break,
191            }
192        }
193        relay_log::info!("project local cache stopped");
194    }
195}
196
197/// This works only on Unix systems.
198#[cfg(not(target_os = "windows"))]
199#[cfg(test)]
200mod tests {
201    use std::str::FromStr;
202
203    use super::*;
204    use crate::services::projects::project::{ProjectInfo, PublicKeyConfig};
205
206    /// Tests that we can follow the symlinks and read the project file properly.
207    #[tokio::test]
208    async fn test_symlinked_projects() {
209        let temp1 = tempfile::tempdir().unwrap();
210        let temp2 = tempfile::tempdir().unwrap();
211
212        let tmp_project_file = "111111.json";
213        let project_key = ProjectKey::parse("55f6b2d962564e99832a39890ee4573e").unwrap();
214
215        let mut tmp_project_state = ProjectInfo::default();
216        tmp_project_state.public_keys.push(PublicKeyConfig {
217            public_key: project_key,
218            numeric_id: None,
219        });
220
221        // create the project file
222        let project_state = serde_json::to_string(&tmp_project_state).unwrap();
223        tokio::fs::write(
224            temp1.path().join(tmp_project_file),
225            project_state.as_bytes(),
226        )
227        .await
228        .unwrap();
229
230        tokio::fs::symlink(
231            temp1.path().join(tmp_project_file),
232            temp2.path().join(tmp_project_file),
233        )
234        .await
235        .unwrap();
236
237        let extracted_project_state = load_local_states(temp2.path()).await.unwrap();
238        let project_info = extracted_project_state
239            .get(&project_key)
240            .unwrap()
241            .clone()
242            .enabled()
243            .unwrap();
244
245        assert_eq!(
246            project_info.project_id,
247            Some(ProjectId::from_str("111111").unwrap())
248        );
249
250        assert_eq!(
251            project_info.public_keys.first().unwrap().public_key,
252            project_key,
253        )
254    }
255
256    #[tokio::test]
257    async fn test_multi_pub_static_config() {
258        let temp = tempfile::tempdir().unwrap();
259
260        let tmp_project_file = "111111.json";
261        let project_key1 = ProjectKey::parse("55f6b2d962564e99832a39890ee4573e").unwrap();
262        let project_key2 = ProjectKey::parse("55bbb2d96256bb9983bb39890bb457bb").unwrap();
263
264        let mut tmp_project_state = ProjectInfo::default();
265        tmp_project_state.public_keys.extend(vec![
266            PublicKeyConfig {
267                public_key: project_key1,
268                numeric_id: None,
269            },
270            PublicKeyConfig {
271                public_key: project_key2,
272                numeric_id: None,
273            },
274        ]);
275
276        // create the project file
277        let project_state = serde_json::to_string(&tmp_project_state).unwrap();
278        tokio::fs::write(temp.path().join(tmp_project_file), project_state.as_bytes())
279            .await
280            .unwrap();
281
282        let extracted_project_state = load_local_states(temp.path()).await.unwrap();
283
284        assert_eq!(extracted_project_state.len(), 2);
285        assert!(extracted_project_state.contains_key(&project_key1));
286        assert!(extracted_project_state.contains_key(&project_key2));
287    }
288}