relay_server/services/projects/source/
local.rs1use std::collections::HashMap;
2use std::ffi::OsStr;
3use std::path::Path;
4use std::sync::Arc;
5
6use relay_base_schema::project::{ProjectId, ProjectKey};
7use relay_config::Config;
8use relay_system::{AsyncResponse, FromMessage, Interface, Receiver, Sender, Service};
9use tokio::sync::mpsc;
10use tokio::time::Instant;
11
12use crate::services::projects::project::{ParsedProjectState, ProjectState};
13use crate::services::projects::source::FetchOptionalProjectState;
14
15#[derive(Debug)]
17pub struct LocalProjectSource(FetchOptionalProjectState, Sender<Option<ProjectState>>);
18
19impl Interface for LocalProjectSource {}
20
21impl FromMessage<FetchOptionalProjectState> for LocalProjectSource {
22 type Response = AsyncResponse<Option<ProjectState>>;
23 fn from_message(
24 message: FetchOptionalProjectState,
25 sender: Sender<Option<ProjectState>>,
26 ) -> Self {
27 Self(message, sender)
28 }
29}
30
31#[derive(Debug)]
33pub struct LocalProjectSourceService {
34 config: Arc<Config>,
35 local_states: HashMap<ProjectKey, ProjectState>,
36}
37
38impl LocalProjectSourceService {
39 pub fn new(config: Arc<Config>) -> Self {
40 Self {
41 config,
42 local_states: HashMap::new(),
43 }
44 }
45
46 fn handle_message(&mut self, message: LocalProjectSource) {
47 let LocalProjectSource(message, sender) = message;
48 let states = self.local_states.get(&message.project_key()).cloned();
49 sender.send(states);
50 }
51}
52
53fn get_project_id(path: &Path) -> Option<ProjectId> {
54 path.file_stem()
55 .and_then(OsStr::to_str)
56 .and_then(|stem| stem.parse().ok())
57}
58
59fn parse_file(
60 path: std::path::PathBuf,
61) -> tokio::io::Result<(std::path::PathBuf, ParsedProjectState)> {
62 let file = std::fs::File::open(&path)?;
63 let reader = std::io::BufReader::new(file);
64 let state = serde_json::from_reader(reader)?;
65 Ok((path, state))
66}
67
68async fn load_local_states(
69 projects_path: &Path,
70) -> tokio::io::Result<HashMap<ProjectKey, ProjectState>> {
71 let mut states = HashMap::new();
72
73 let mut directory = match tokio::fs::read_dir(projects_path).await {
74 Ok(directory) => directory,
75 Err(error) => {
76 return match error.kind() {
77 tokio::io::ErrorKind::NotFound => Ok(states),
78 _ => Err(error),
79 };
80 }
81 };
82
83 relay_log::debug!(directory = ?projects_path, "loading local states from file system");
85
86 while let Some(entry) = directory.next_entry().await? {
87 let path = entry.path();
88
89 let metadata = entry.metadata().await?;
90 if !(metadata.is_file() || metadata.is_symlink()) {
91 relay_log::warn!(?path, "skipping file, not a file");
92 continue;
93 }
94
95 if path.extension().map(|x| x != "json").unwrap_or(true) {
96 relay_log::warn!(?path, "skipping file, file extension must be .json");
97 continue;
98 }
99
100 let (path, mut state) = tokio::task::spawn_blocking(move || parse_file(path)).await??;
102
103 if state.info.project_id.is_none() {
104 if let Some(project_id) = get_project_id(&path) {
105 state.info.project_id = Some(project_id);
106 } else {
107 relay_log::warn!(?path, "skipping file, filename is not a valid project id");
108 continue;
109 }
110 }
111
112 let keys = std::mem::take(&mut state.info.public_keys);
114 if keys.is_empty() {
115 relay_log::warn!(
116 ?path,
117 "skipping file, project config is missing public keys"
118 );
119 }
120
121 for key in keys {
122 let mut state = state.clone();
123 state.info.public_keys = smallvec::smallvec![key.clone()];
124 states.insert(key.public_key, ProjectState::from(state).sanitized());
125 }
126 }
127
128 Ok(states)
129}
130
131async fn poll_local_states(path: &Path, tx: &mpsc::Sender<HashMap<ProjectKey, ProjectState>>) {
132 let states = load_local_states(path).await;
133 match states {
134 Ok(states) => {
135 let res = tx.send(states).await;
136 if res.is_err() {
137 relay_log::error!("failed to store static project configs");
138 }
139 }
140 Err(error) => relay_log::error!(
141 error = &error as &dyn std::error::Error,
142 "failed to load static project configs",
143 ),
144 };
145}
146
147async fn spawn_poll_local_states(
148 config: &Config,
149 tx: mpsc::Sender<HashMap<ProjectKey, ProjectState>>,
150) {
151 let project_path = config.project_configs_path();
152 let period = config.local_cache_interval();
153
154 poll_local_states(&project_path, &tx).await;
157
158 relay_system::spawn!(async move {
160 let start_at = Instant::now() + period;
162 let mut ticker = tokio::time::interval_at(start_at, period);
163
164 loop {
165 ticker.tick().await;
166 poll_local_states(&project_path, &tx).await;
167 }
168 });
169}
170
171impl Service for LocalProjectSourceService {
172 type Interface = LocalProjectSource;
173
174 async fn run(mut self, mut rx: Receiver<Self::Interface>) {
175 let (state_tx, mut state_rx) = mpsc::channel(1);
178
179 relay_log::info!("project local cache started");
180
181 spawn_poll_local_states(&self.config, state_tx).await;
183
184 loop {
185 tokio::select! {
186 biased;
187 Some(message) = rx.recv() => self.handle_message(message),
188 Some(states) = state_rx.recv() => self.local_states = states,
189
190 else => break,
191 }
192 }
193 relay_log::info!("project local cache stopped");
194 }
195}
196
197#[cfg(not(target_os = "windows"))]
199#[cfg(test)]
200mod tests {
201 use std::str::FromStr;
202
203 use super::*;
204 use crate::services::projects::project::{ProjectInfo, PublicKeyConfig};
205
206 #[tokio::test]
208 async fn test_symlinked_projects() {
209 let temp1 = tempfile::tempdir().unwrap();
210 let temp2 = tempfile::tempdir().unwrap();
211
212 let tmp_project_file = "111111.json";
213 let project_key = ProjectKey::parse("55f6b2d962564e99832a39890ee4573e").unwrap();
214
215 let mut tmp_project_state = ProjectInfo::default();
216 tmp_project_state.public_keys.push(PublicKeyConfig {
217 public_key: project_key,
218 numeric_id: None,
219 });
220
221 let project_state = serde_json::to_string(&tmp_project_state).unwrap();
223 tokio::fs::write(
224 temp1.path().join(tmp_project_file),
225 project_state.as_bytes(),
226 )
227 .await
228 .unwrap();
229
230 tokio::fs::symlink(
231 temp1.path().join(tmp_project_file),
232 temp2.path().join(tmp_project_file),
233 )
234 .await
235 .unwrap();
236
237 let extracted_project_state = load_local_states(temp2.path()).await.unwrap();
238 let project_info = extracted_project_state
239 .get(&project_key)
240 .unwrap()
241 .clone()
242 .enabled()
243 .unwrap();
244
245 assert_eq!(
246 project_info.project_id,
247 Some(ProjectId::from_str("111111").unwrap())
248 );
249
250 assert_eq!(
251 project_info.public_keys.first().unwrap().public_key,
252 project_key,
253 )
254 }
255
256 #[tokio::test]
257 async fn test_multi_pub_static_config() {
258 let temp = tempfile::tempdir().unwrap();
259
260 let tmp_project_file = "111111.json";
261 let project_key1 = ProjectKey::parse("55f6b2d962564e99832a39890ee4573e").unwrap();
262 let project_key2 = ProjectKey::parse("55bbb2d96256bb9983bb39890bb457bb").unwrap();
263
264 let mut tmp_project_state = ProjectInfo::default();
265 tmp_project_state.public_keys.extend(vec![
266 PublicKeyConfig {
267 public_key: project_key1,
268 numeric_id: None,
269 },
270 PublicKeyConfig {
271 public_key: project_key2,
272 numeric_id: None,
273 },
274 ]);
275
276 let project_state = serde_json::to_string(&tmp_project_state).unwrap();
278 tokio::fs::write(temp.path().join(tmp_project_file), project_state.as_bytes())
279 .await
280 .unwrap();
281
282 let extracted_project_state = load_local_states(temp.path()).await.unwrap();
283
284 assert_eq!(extracted_project_state.len(), 2);
285 assert!(extracted_project_state.contains_key(&project_key1));
286 assert!(extracted_project_state.contains_key(&project_key2));
287 }
288}