relay_server/services/projects/cache/
service.rs1use std::sync::Arc;
2
3use futures::StreamExt as _;
4use futures::future::BoxFuture;
5use relay_base_schema::project::ProjectKey;
6use relay_config::Config;
7use relay_statsd::metric;
8use relay_system::{Service, ServiceSpawn, ServiceSpawnExt as _};
9use tokio::sync::broadcast;
10
11use crate::services::projects::cache::handle::ProjectCacheHandle;
12use crate::services::projects::cache::state::{
13 self, CompletedFetch, Eviction, Fetch, ProjectStore, Refresh,
14};
15use crate::services::projects::project::ProjectState;
16use crate::services::projects::source::{ProjectSource, ProjectSourceError, upstream};
17use crate::statsd::{RelayCounters, RelayGauges, RelayTimers};
18use crate::utils::FuturesScheduled;
19
20const PROJECT_EVENTS_CHANNEL_SIZE: usize = 512_000;
29
30#[derive(Debug)]
32pub enum ProjectCache {
33 Fetch(ProjectKey),
39}
40
41impl ProjectCache {
42 fn variant(&self) -> &'static str {
43 match self {
44 Self::Fetch(_) => "fetch",
45 }
46 }
47}
48
49impl relay_system::Interface for ProjectCache {}
50
51impl relay_system::FromMessage<Self> for ProjectCache {
52 type Response = relay_system::NoResponse;
53
54 fn from_message(message: Self, _: ()) -> Self {
55 message
56 }
57}
58
59#[derive(Debug, Copy, Clone)]
61pub enum ProjectChange {
62 Ready(ProjectKey),
64 Evicted(ProjectKey),
66}
67
68pub struct ProjectCacheService {
70 store: ProjectStore,
71 source: ProjectSource,
72 config: Arc<Config>,
73
74 scheduled_fetches: FuturesScheduled<BoxFuture<'static, CompletedFetch>>,
75
76 project_events_tx: broadcast::Sender<ProjectChange>,
77}
78
79impl ProjectCacheService {
80 pub fn new(config: Arc<Config>, source: ProjectSource) -> Self {
82 let project_events_tx = broadcast::channel(PROJECT_EVENTS_CHANNEL_SIZE).0;
83
84 Self {
85 store: ProjectStore::new(&config),
86 source,
87 config,
88 scheduled_fetches: FuturesScheduled::default(),
89 project_events_tx,
90 }
91 }
92
93 pub fn start_in(self, services: &dyn ServiceSpawn) -> ProjectCacheHandle {
97 let (addr, addr_rx) = relay_system::channel(Self::name());
98
99 let handle = ProjectCacheHandle {
100 shared: self.store.shared(),
101 config: Arc::clone(&self.config),
102 service: addr,
103 project_changes: self.project_events_tx.clone(),
104 };
105
106 services.start_with(self, addr_rx);
107
108 handle
109 }
110
111 fn schedule_fetch(&mut self, fetch: Fetch) {
113 let source = self.source.clone();
114
115 let when = fetch.when();
116 let task = async move {
117 let state = match source
118 .fetch(fetch.project_key(), false, fetch.revision())
119 .await
120 {
121 Ok(result) => result,
122 Err(ProjectSourceError::Upstream(upstream::Error::DeadlineExceeded)) => {
123 ProjectState::Pending.into()
127 }
128 Err(err @ ProjectSourceError::FatalUpstream) => {
129 relay_log::error!(
130 tags.project_key = fetch.project_key().as_str(),
131 tags.has_revision = fetch.revision().as_str().is_some(),
132 error = &err as &dyn std::error::Error,
133 "failed to fetch project from source: {fetch:?}"
134 );
135
136 ProjectState::Pending.into()
137 }
138 };
139
140 fetch.complete(state)
141 };
142 self.scheduled_fetches.schedule(when, Box::pin(task));
143
144 metric!(counter(RelayCounters::ProjectCacheSchedule) += 1);
145 metric!(
146 gauge(RelayGauges::ProjectCacheScheduledFetches) = self.scheduled_fetches.len() as u64
147 );
148 }
149}
150
151impl ProjectCacheService {
153 fn handle_fetch(&mut self, project_key: ProjectKey) {
154 if let Some(fetch) = self.store.try_begin_fetch(project_key) {
155 self.schedule_fetch(fetch);
156 }
157 }
158
159 fn handle_completed_fetch(&mut self, fetch: CompletedFetch) {
160 let project_key = fetch.project_key();
161
162 if let Some(fetch) = self.store.complete_fetch(fetch) {
163 relay_log::trace!(
164 project_key = fetch.project_key().as_str(),
165 "re-scheduling project fetch: {fetch:?}"
166 );
167 self.schedule_fetch(fetch);
168 return;
169 }
170
171 let _ = self
172 .project_events_tx
173 .send(ProjectChange::Ready(project_key));
174
175 metric!(
176 gauge(RelayGauges::ProjectCacheNotificationChannel) =
177 self.project_events_tx.len() as u64
178 );
179 }
180
181 fn handle_eviction(&mut self, eviction: Eviction) {
182 let project_key = eviction.project_key();
183
184 self.store.evict(eviction);
185
186 let _ = self
187 .project_events_tx
188 .send(ProjectChange::Evicted(project_key));
189
190 relay_log::trace!(tags.project_key = project_key.as_str(), "project evicted");
191 metric!(counter(RelayCounters::EvictingStaleProjectCaches) += 1);
192 }
193
194 fn handle_refresh(&mut self, refresh: Refresh) {
195 let project_key = refresh.project_key();
196
197 if let Some(fetch) = self.store.refresh(refresh) {
198 self.schedule_fetch(fetch);
199 }
200
201 relay_log::trace!(tags.project_key = project_key.as_str(), "project refreshed");
202 metric!(counter(RelayCounters::RefreshStaleProjectCaches) += 1);
203 }
204
205 fn handle_message(&mut self, message: ProjectCache) {
206 match message {
207 ProjectCache::Fetch(project_key) => self.handle_fetch(project_key),
208 }
209 }
210}
211
212impl relay_system::Service for ProjectCacheService {
213 type Interface = ProjectCache;
214
215 async fn run(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
216 macro_rules! timed {
217 ($task:expr, $body:expr) => {{
218 let task_name = $task;
219 metric!(
220 timer(RelayTimers::ProjectCacheTaskDuration),
221 task = task_name,
222 { $body }
223 )
224 }};
225 }
226
227 loop {
228 tokio::select! {
229 biased;
230
231 Some(fetch) = self.scheduled_fetches.next() => timed!(
232 "completed_fetch",
233 self.handle_completed_fetch(fetch)
234 ),
235 Some(message) = rx.recv() => timed!(
236 message.variant(),
237 self.handle_message(message)
238 ),
239 Some(action) = self.store.poll() => match action {
240 state::Action::Eviction(eviction) => timed!(
241 "eviction",
242 self.handle_eviction(eviction)
243 ),
244 state::Action::Refresh(refresh) => timed!(
245 "refresh",
246 self.handle_refresh(refresh)
247 ),
248 }
249 }
250 }
251 }
252}