relay_server/services/projects/cache/
handle.rs

1use std::fmt;
2use std::sync::Arc;
3use std::time::Duration;
4
5use relay_base_schema::project::ProjectKey;
6use relay_config::Config;
7use relay_system::Addr;
8use tokio::sync::broadcast;
9
10use super::state::Shared;
11use crate::services::projects::cache::service::ProjectChange;
12use crate::services::projects::cache::{Project, ProjectCache};
13
14/// A synchronous handle to the [`ProjectCache`].
15///
16/// The handle allows lock free access to cached projects. It also acts as an interface
17/// to the [`ProjectCacheService`](super::ProjectCacheService).
18#[derive(Clone)]
19pub struct ProjectCacheHandle {
20    pub(super) shared: Arc<Shared>,
21    pub(super) config: Arc<Config>,
22    pub(super) service: Addr<ProjectCache>,
23    pub(super) project_changes: broadcast::Sender<ProjectChange>,
24}
25
26impl ProjectCacheHandle {
27    /// Returns the current project state for the `project_key`.
28    pub fn get(&self, project_key: ProjectKey) -> Project<'_> {
29        let project = self.shared.get_or_create(project_key);
30        // Always trigger a fetch after retrieving the project to make sure the state is up to date.
31        self.fetch(project_key);
32
33        Project::new(project, &self.config)
34    }
35
36    /// Awaits until the given project state becomes ready (enabled or disabled).
37    ///
38    /// Returns [`None`] if the project config cannot be resolved in the given time.
39    pub async fn ready(&self, project_key: ProjectKey, timeout: Duration) -> Option<Project<'_>> {
40        let project = self.get(project_key);
41        if !project.state().is_pending() {
42            return Some(project);
43        }
44
45        tokio::time::timeout(timeout, self.ready_inner(project_key))
46            .await
47            .ok()
48    }
49
50    async fn ready_inner(&self, project_key: ProjectKey) -> Project<'_> {
51        loop {
52            let project = self.shared.get_or_create(project_key);
53            // Create the `Notified` before checking the project_state, to prevent missing
54            // an update between the check and the registration of the listener.
55            //
56            // From [`tokio::sync::futures::Notified::enabled`]:
57            // > notifications sent using notify_waiters [...] are received
58            // > as long as they happen after the creation of the Notified
59            let change_listener = project.outdated();
60            if !project.project_state().is_pending() {
61                drop(change_listener);
62                return Project::new(project, &self.config);
63            }
64            change_listener.await;
65        }
66    }
67
68    /// Triggers a fetch/update check in the project cache for the supplied project.
69    pub fn fetch(&self, project_key: ProjectKey) {
70        self.service.send(ProjectCache::Fetch(project_key));
71    }
72
73    /// Returns a subscription to all [`ProjectChange`]'s.
74    ///
75    /// This stream notifies the subscriber about project state changes in the project cache.
76    /// Events may arrive in arbitrary order and be delivered multiple times.
77    pub fn changes(&self) -> broadcast::Receiver<ProjectChange> {
78        self.project_changes.subscribe()
79    }
80}
81
82impl fmt::Debug for ProjectCacheHandle {
83    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84        f.debug_struct("ProjectCacheHandle")
85            .field("shared", &self.shared)
86            .finish()
87    }
88}
89
90#[cfg(test)]
91mod test {
92    use super::*;
93    use crate::services::projects::project::ProjectState;
94
95    impl ProjectCacheHandle {
96        /// Creates a new [`ProjectCacheHandle`] for testing only.
97        ///
98        /// A project cache handle created this way does not require a service to function.
99        pub fn for_test() -> Self {
100            Self {
101                shared: Default::default(),
102                config: Default::default(),
103                service: Addr::dummy(),
104                project_changes: broadcast::channel(999_999).0,
105            }
106        }
107
108        /// Sets the project state for a project.
109        ///
110        /// This can be used to emulate a project cache update in tests.
111        pub fn test_set_project_state(&self, project_key: ProjectKey, state: ProjectState) {
112            let is_pending = state.is_pending();
113            self.shared.test_set_project_state(project_key, state);
114            if is_pending {
115                let _ = self
116                    .project_changes
117                    .send(ProjectChange::Evicted(project_key));
118            } else {
119                let _ = self.project_changes.send(ProjectChange::Ready(project_key));
120            }
121        }
122
123        /// Returns `true` if there is a project created for this `project_key`.
124        ///
125        /// A project is automatically created on access via [`Self::get`].
126        pub fn test_has_project_created(&self, project_key: ProjectKey) -> bool {
127            self.shared.test_has_project_created(project_key)
128        }
129
130        /// The amount of fetches triggered for projects.
131        ///
132        /// A fetch is triggered for both [`Self::get`] and [`Self::fetch`].
133        pub fn test_num_fetches(&self) -> u64 {
134            self.service.len()
135        }
136    }
137}