relay_server/services/projects/cache/
handle.rs1use std::fmt;
2use std::sync::Arc;
3use std::time::Duration;
4
5use relay_base_schema::project::ProjectKey;
6use relay_config::Config;
7use relay_system::Addr;
8use tokio::sync::broadcast;
9
10use super::state::Shared;
11use crate::services::projects::cache::service::ProjectChange;
12use crate::services::projects::cache::{Project, ProjectCache};
13
14#[derive(Clone)]
19pub struct ProjectCacheHandle {
20 pub(super) shared: Arc<Shared>,
21 pub(super) config: Arc<Config>,
22 pub(super) service: Addr<ProjectCache>,
23 pub(super) project_changes: broadcast::Sender<ProjectChange>,
24}
25
26impl ProjectCacheHandle {
27 pub fn get(&self, project_key: ProjectKey) -> Project<'_> {
29 let project = self.shared.get_or_create(project_key);
30 self.fetch(project_key);
32
33 Project::new(project, &self.config)
34 }
35
36 pub async fn ready(&self, project_key: ProjectKey, timeout: Duration) -> Option<Project<'_>> {
40 let project = self.get(project_key);
41 if !project.state().is_pending() {
42 return Some(project);
43 }
44
45 tokio::time::timeout(timeout, self.ready_inner(project_key))
46 .await
47 .ok()
48 }
49
50 async fn ready_inner(&self, project_key: ProjectKey) -> Project<'_> {
51 loop {
52 let project = self.shared.get_or_create(project_key);
53 let change_listener = project.outdated();
60 if !project.project_state().is_pending() {
61 drop(change_listener);
62 return Project::new(project, &self.config);
63 }
64 change_listener.await;
65 }
66 }
67
68 pub fn fetch(&self, project_key: ProjectKey) {
70 self.service.send(ProjectCache::Fetch(project_key));
71 }
72
73 pub fn changes(&self) -> broadcast::Receiver<ProjectChange> {
78 self.project_changes.subscribe()
79 }
80}
81
82impl fmt::Debug for ProjectCacheHandle {
83 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84 f.debug_struct("ProjectCacheHandle")
85 .field("shared", &self.shared)
86 .finish()
87 }
88}
89
90#[cfg(test)]
91mod test {
92 use super::*;
93 use crate::services::projects::project::ProjectState;
94
95 impl ProjectCacheHandle {
96 pub fn for_test() -> Self {
100 Self {
101 shared: Default::default(),
102 config: Default::default(),
103 service: Addr::dummy(),
104 project_changes: broadcast::channel(999_999).0,
105 }
106 }
107
108 pub fn test_set_project_state(&self, project_key: ProjectKey, state: ProjectState) {
112 let is_pending = state.is_pending();
113 self.shared.test_set_project_state(project_key, state);
114 if is_pending {
115 let _ = self
116 .project_changes
117 .send(ProjectChange::Evicted(project_key));
118 } else {
119 let _ = self.project_changes.send(ProjectChange::Ready(project_key));
120 }
121 }
122
123 pub fn test_has_project_created(&self, project_key: ProjectKey) -> bool {
127 self.shared.test_has_project_created(project_key)
128 }
129
130 pub fn test_num_fetches(&self) -> u64 {
134 self.service.len()
135 }
136 }
137}