relay_server/services/projects/cache/
service.rs1use std::sync::Arc;
2
3use futures::StreamExt as _;
4use futures::future::BoxFuture;
5use relay_base_schema::project::ProjectKey;
6use relay_config::Config;
7use relay_statsd::metric;
8use relay_system::{Service, ServiceSpawn, ServiceSpawnExt as _};
9use tokio::sync::broadcast;
10
11use crate::services::projects::cache::handle::ProjectCacheHandle;
12use crate::services::projects::cache::state::{
13 self, CompletedFetch, Eviction, Fetch, ProjectStore, Refresh,
14};
15use crate::services::projects::project::ProjectState;
16use crate::services::projects::source::ProjectSource;
17use crate::statsd::{RelayCounters, RelayGauges, RelayTimers};
18use crate::utils::FuturesScheduled;
19
20const PROJECT_EVENTS_CHANNEL_SIZE: usize = 512_000;
29
30#[derive(Debug)]
32pub enum ProjectCache {
33 Fetch(ProjectKey),
39}
40
41impl ProjectCache {
42 fn variant(&self) -> &'static str {
43 match self {
44 Self::Fetch(_) => "fetch",
45 }
46 }
47}
48
49impl relay_system::Interface for ProjectCache {}
50
51impl relay_system::FromMessage<Self> for ProjectCache {
52 type Response = relay_system::NoResponse;
53
54 fn from_message(message: Self, _: ()) -> Self {
55 message
56 }
57}
58
59#[derive(Debug, Copy, Clone)]
61pub enum ProjectChange {
62 Ready(ProjectKey),
64 Evicted(ProjectKey),
66}
67
68pub struct ProjectCacheService {
70 store: ProjectStore,
71 source: ProjectSource,
72 config: Arc<Config>,
73
74 scheduled_fetches: FuturesScheduled<BoxFuture<'static, CompletedFetch>>,
75
76 project_events_tx: broadcast::Sender<ProjectChange>,
77}
78
79impl ProjectCacheService {
80 pub fn new(config: Arc<Config>, source: ProjectSource) -> Self {
82 let project_events_tx = broadcast::channel(PROJECT_EVENTS_CHANNEL_SIZE).0;
83
84 Self {
85 store: ProjectStore::new(&config),
86 source,
87 config,
88 scheduled_fetches: FuturesScheduled::default(),
89 project_events_tx,
90 }
91 }
92
93 pub fn start_in(self, services: &dyn ServiceSpawn) -> ProjectCacheHandle {
97 let (addr, addr_rx) = relay_system::channel(Self::name());
98
99 let handle = ProjectCacheHandle {
100 shared: self.store.shared(),
101 config: Arc::clone(&self.config),
102 service: addr,
103 project_changes: self.project_events_tx.clone(),
104 };
105
106 services.start_with(self, addr_rx);
107
108 handle
109 }
110
111 fn schedule_fetch(&mut self, fetch: Fetch) {
113 let source = self.source.clone();
114
115 let when = fetch.when();
116 let task = async move {
117 let state = match source
118 .fetch(fetch.project_key(), false, fetch.revision())
119 .await
120 {
121 Ok(result) => result,
122 Err(err) => {
123 relay_log::error!(
124 tags.project_key = fetch.project_key().as_str(),
125 tags.has_revision = fetch.revision().as_str().is_some(),
126 error = &err as &dyn std::error::Error,
127 "failed to fetch project from source: {fetch:?}"
128 );
129
130 ProjectState::Pending.into()
131 }
132 };
133
134 fetch.complete(state)
135 };
136 self.scheduled_fetches.schedule(when, Box::pin(task));
137
138 metric!(counter(RelayCounters::ProjectCacheSchedule) += 1);
139 metric!(
140 gauge(RelayGauges::ProjectCacheScheduledFetches) = self.scheduled_fetches.len() as u64
141 );
142 }
143}
144
145impl ProjectCacheService {
147 fn handle_fetch(&mut self, project_key: ProjectKey) {
148 if let Some(fetch) = self.store.try_begin_fetch(project_key) {
149 self.schedule_fetch(fetch);
150 }
151 }
152
153 fn handle_completed_fetch(&mut self, fetch: CompletedFetch) {
154 let project_key = fetch.project_key();
155
156 if let Some(fetch) = self.store.complete_fetch(fetch) {
157 relay_log::trace!(
158 project_key = fetch.project_key().as_str(),
159 "re-scheduling project fetch: {fetch:?}"
160 );
161 self.schedule_fetch(fetch);
162 return;
163 }
164
165 let _ = self
166 .project_events_tx
167 .send(ProjectChange::Ready(project_key));
168
169 metric!(
170 gauge(RelayGauges::ProjectCacheNotificationChannel) =
171 self.project_events_tx.len() as u64
172 );
173 }
174
175 fn handle_eviction(&mut self, eviction: Eviction) {
176 let project_key = eviction.project_key();
177
178 self.store.evict(eviction);
179
180 let _ = self
181 .project_events_tx
182 .send(ProjectChange::Evicted(project_key));
183
184 relay_log::trace!(tags.project_key = project_key.as_str(), "project evicted");
185 metric!(counter(RelayCounters::EvictingStaleProjectCaches) += 1);
186 }
187
188 fn handle_refresh(&mut self, refresh: Refresh) {
189 let project_key = refresh.project_key();
190
191 if let Some(fetch) = self.store.refresh(refresh) {
192 self.schedule_fetch(fetch);
193 }
194
195 relay_log::trace!(tags.project_key = project_key.as_str(), "project refreshed");
196 metric!(counter(RelayCounters::RefreshStaleProjectCaches) += 1);
197 }
198
199 fn handle_message(&mut self, message: ProjectCache) {
200 match message {
201 ProjectCache::Fetch(project_key) => self.handle_fetch(project_key),
202 }
203 }
204}
205
206impl relay_system::Service for ProjectCacheService {
207 type Interface = ProjectCache;
208
209 async fn run(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
210 macro_rules! timed {
211 ($task:expr, $body:expr) => {{
212 let task_name = $task;
213 metric!(
214 timer(RelayTimers::ProjectCacheTaskDuration),
215 task = task_name,
216 { $body }
217 )
218 }};
219 }
220
221 loop {
222 tokio::select! {
223 biased;
224
225 Some(fetch) = self.scheduled_fetches.next() => timed!(
226 "completed_fetch",
227 self.handle_completed_fetch(fetch)
228 ),
229 Some(message) = rx.recv() => timed!(
230 message.variant(),
231 self.handle_message(message)
232 ),
233 Some(action) = self.store.poll() => match action {
234 state::Action::Eviction(eviction) => timed!(
235 "eviction",
236 self.handle_eviction(eviction)
237 ),
238 state::Action::Refresh(refresh) => timed!(
239 "refresh",
240 self.handle_refresh(refresh)
241 ),
242 }
243 }
244 }
245 }
246}