relay_server/services/projects/cache/
service.rs

1use std::sync::Arc;
2
3use futures::StreamExt as _;
4use futures::future::BoxFuture;
5use relay_base_schema::project::ProjectKey;
6use relay_config::Config;
7use relay_statsd::metric;
8use relay_system::{Service, ServiceSpawn, ServiceSpawnExt as _};
9use tokio::sync::broadcast;
10
11use crate::services::projects::cache::handle::ProjectCacheHandle;
12use crate::services::projects::cache::state::{
13    self, CompletedFetch, Eviction, Fetch, ProjectStore, Refresh,
14};
15use crate::services::projects::project::ProjectState;
16use crate::services::projects::source::ProjectSource;
17use crate::statsd::{RelayCounters, RelayGauges, RelayTimers};
18use crate::utils::FuturesScheduled;
19
20/// Size of the broadcast channel for project events.
21///
22/// This is set to a value which theoretically should never be reachable,
23/// the number of events is approximately bounded by the amount of projects
24/// receiving events.
25///
26/// It is set to such a large amount because receivers of events currently
27/// do not deal with lags in the channel gracefully.
28const PROJECT_EVENTS_CHANNEL_SIZE: usize = 512_000;
29
30/// A cache for projects, which allows concurrent access to the cached projects.
31#[derive(Debug)]
32pub enum ProjectCache {
33    /// Schedules another fetch or update for the specified project.
34    ///
35    /// A project which is not fetched will eventually expire and be evicted
36    /// from the cache. Fetches for an already cached project ensure the project
37    /// is always up to date and not evicted.
38    Fetch(ProjectKey),
39}
40
41impl ProjectCache {
42    fn variant(&self) -> &'static str {
43        match self {
44            Self::Fetch(_) => "fetch",
45        }
46    }
47}
48
49impl relay_system::Interface for ProjectCache {}
50
51impl relay_system::FromMessage<Self> for ProjectCache {
52    type Response = relay_system::NoResponse;
53
54    fn from_message(message: Self, _: ()) -> Self {
55        message
56    }
57}
58
59/// Project life-cycle changes produced by the project cache.
60#[derive(Debug, Copy, Clone)]
61pub enum ProjectChange {
62    /// A project was successfully fetched and is now ready to use.
63    Ready(ProjectKey),
64    /// A project expired from the cache and was evicted.
65    Evicted(ProjectKey),
66}
67
68/// A service implementing the [`ProjectCache`] interface.
69pub struct ProjectCacheService {
70    store: ProjectStore,
71    source: ProjectSource,
72    config: Arc<Config>,
73
74    scheduled_fetches: FuturesScheduled<BoxFuture<'static, CompletedFetch>>,
75
76    project_events_tx: broadcast::Sender<ProjectChange>,
77}
78
79impl ProjectCacheService {
80    /// Creates a new [`ProjectCacheService`].
81    pub fn new(config: Arc<Config>, source: ProjectSource) -> Self {
82        let project_events_tx = broadcast::channel(PROJECT_EVENTS_CHANNEL_SIZE).0;
83
84        Self {
85            store: ProjectStore::new(&config),
86            source,
87            config,
88            scheduled_fetches: FuturesScheduled::default(),
89            project_events_tx,
90        }
91    }
92
93    /// Consumes and starts a [`ProjectCacheService`].
94    ///
95    /// Returns a [`ProjectCacheHandle`] to access the cache concurrently.
96    pub fn start_in(self, services: &dyn ServiceSpawn) -> ProjectCacheHandle {
97        let (addr, addr_rx) = relay_system::channel(Self::name());
98
99        let handle = ProjectCacheHandle {
100            shared: self.store.shared(),
101            config: Arc::clone(&self.config),
102            service: addr,
103            project_changes: self.project_events_tx.clone(),
104        };
105
106        services.start_with(self, addr_rx);
107
108        handle
109    }
110
111    /// Schedules a new [`Fetch`] in [`Self::scheduled_fetches`].
112    fn schedule_fetch(&mut self, fetch: Fetch) {
113        let source = self.source.clone();
114
115        let when = fetch.when();
116        let task = async move {
117            let state = match source
118                .fetch(fetch.project_key(), false, fetch.revision())
119                .await
120            {
121                Ok(result) => result,
122                Err(err) => {
123                    relay_log::error!(
124                        tags.project_key = fetch.project_key().as_str(),
125                        tags.has_revision = fetch.revision().as_str().is_some(),
126                        error = &err as &dyn std::error::Error,
127                        "failed to fetch project from source: {fetch:?}"
128                    );
129
130                    ProjectState::Pending.into()
131                }
132            };
133
134            fetch.complete(state)
135        };
136        self.scheduled_fetches.schedule(when, Box::pin(task));
137
138        metric!(counter(RelayCounters::ProjectCacheSchedule) += 1);
139        metric!(
140            gauge(RelayGauges::ProjectCacheScheduledFetches) = self.scheduled_fetches.len() as u64
141        );
142    }
143}
144
145/// All [`ProjectCacheService`] message handlers.
146impl ProjectCacheService {
147    fn handle_fetch(&mut self, project_key: ProjectKey) {
148        if let Some(fetch) = self.store.try_begin_fetch(project_key) {
149            self.schedule_fetch(fetch);
150        }
151    }
152
153    fn handle_completed_fetch(&mut self, fetch: CompletedFetch) {
154        let project_key = fetch.project_key();
155
156        if let Some(fetch) = self.store.complete_fetch(fetch) {
157            relay_log::trace!(
158                project_key = fetch.project_key().as_str(),
159                "re-scheduling project fetch: {fetch:?}"
160            );
161            self.schedule_fetch(fetch);
162            return;
163        }
164
165        let _ = self
166            .project_events_tx
167            .send(ProjectChange::Ready(project_key));
168
169        metric!(
170            gauge(RelayGauges::ProjectCacheNotificationChannel) =
171                self.project_events_tx.len() as u64
172        );
173    }
174
175    fn handle_eviction(&mut self, eviction: Eviction) {
176        let project_key = eviction.project_key();
177
178        self.store.evict(eviction);
179
180        let _ = self
181            .project_events_tx
182            .send(ProjectChange::Evicted(project_key));
183
184        relay_log::trace!(tags.project_key = project_key.as_str(), "project evicted");
185        metric!(counter(RelayCounters::EvictingStaleProjectCaches) += 1);
186    }
187
188    fn handle_refresh(&mut self, refresh: Refresh) {
189        let project_key = refresh.project_key();
190
191        if let Some(fetch) = self.store.refresh(refresh) {
192            self.schedule_fetch(fetch);
193        }
194
195        relay_log::trace!(tags.project_key = project_key.as_str(), "project refreshed");
196        metric!(counter(RelayCounters::RefreshStaleProjectCaches) += 1);
197    }
198
199    fn handle_message(&mut self, message: ProjectCache) {
200        match message {
201            ProjectCache::Fetch(project_key) => self.handle_fetch(project_key),
202        }
203    }
204}
205
206impl relay_system::Service for ProjectCacheService {
207    type Interface = ProjectCache;
208
209    async fn run(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
210        macro_rules! timed {
211            ($task:expr, $body:expr) => {{
212                let task_name = $task;
213                metric!(
214                    timer(RelayTimers::ProjectCacheTaskDuration),
215                    task = task_name,
216                    { $body }
217                )
218            }};
219        }
220
221        loop {
222            tokio::select! {
223                biased;
224
225                Some(fetch) = self.scheduled_fetches.next() => timed!(
226                    "completed_fetch",
227                    self.handle_completed_fetch(fetch)
228                ),
229                Some(message) = rx.recv() => timed!(
230                    message.variant(),
231                    self.handle_message(message)
232                ),
233                Some(action) = self.store.poll() => match action {
234                    state::Action::Eviction(eviction) => timed!(
235                        "eviction",
236                        self.handle_eviction(eviction)
237                    ),
238                    state::Action::Refresh(refresh) => timed!(
239                        "refresh",
240                        self.handle_refresh(refresh)
241                    ),
242                }
243            }
244        }
245    }
246}