relay_server/services/projects/cache/
service.rs

1use std::sync::Arc;
2
3use futures::StreamExt as _;
4use futures::future::BoxFuture;
5use relay_base_schema::project::ProjectKey;
6use relay_config::Config;
7use relay_statsd::metric;
8use relay_system::{Service, ServiceSpawn, ServiceSpawnExt as _};
9use tokio::sync::broadcast;
10
11use crate::services::projects::cache::handle::ProjectCacheHandle;
12use crate::services::projects::cache::state::{
13    self, CompletedFetch, Eviction, Fetch, ProjectStore, Refresh,
14};
15use crate::services::projects::project::ProjectState;
16use crate::services::projects::source::{ProjectSource, ProjectSourceError, upstream};
17use crate::statsd::{RelayCounters, RelayGauges, RelayTimers};
18use crate::utils::FuturesScheduled;
19
20/// Size of the broadcast channel for project events.
21///
22/// This is set to a value which theoretically should never be reachable,
23/// the number of events is approximately bounded by the amount of projects
24/// receiving events.
25///
26/// It is set to such a large amount because receivers of events currently
27/// do not deal with lags in the channel gracefully.
28const PROJECT_EVENTS_CHANNEL_SIZE: usize = 512_000;
29
30/// A cache for projects, which allows concurrent access to the cached projects.
31#[derive(Debug)]
32pub enum ProjectCache {
33    /// Schedules another fetch or update for the specified project.
34    ///
35    /// A project which is not fetched will eventually expire and be evicted
36    /// from the cache. Fetches for an already cached project ensure the project
37    /// is always up to date and not evicted.
38    Fetch(ProjectKey),
39}
40
41impl ProjectCache {
42    fn variant(&self) -> &'static str {
43        match self {
44            Self::Fetch(_) => "fetch",
45        }
46    }
47}
48
49impl relay_system::Interface for ProjectCache {}
50
51impl relay_system::FromMessage<Self> for ProjectCache {
52    type Response = relay_system::NoResponse;
53
54    fn from_message(message: Self, _: ()) -> Self {
55        message
56    }
57}
58
59/// Project life-cycle changes produced by the project cache.
60#[derive(Debug, Copy, Clone)]
61pub enum ProjectChange {
62    /// A project was successfully fetched and is now ready to use.
63    Ready(ProjectKey),
64    /// A project expired from the cache and was evicted.
65    Evicted(ProjectKey),
66}
67
68/// A service implementing the [`ProjectCache`] interface.
69pub struct ProjectCacheService {
70    store: ProjectStore,
71    source: ProjectSource,
72    config: Arc<Config>,
73
74    scheduled_fetches: FuturesScheduled<BoxFuture<'static, CompletedFetch>>,
75
76    project_events_tx: broadcast::Sender<ProjectChange>,
77}
78
79impl ProjectCacheService {
80    /// Creates a new [`ProjectCacheService`].
81    pub fn new(config: Arc<Config>, source: ProjectSource) -> Self {
82        let project_events_tx = broadcast::channel(PROJECT_EVENTS_CHANNEL_SIZE).0;
83
84        Self {
85            store: ProjectStore::new(&config),
86            source,
87            config,
88            scheduled_fetches: FuturesScheduled::default(),
89            project_events_tx,
90        }
91    }
92
93    /// Consumes and starts a [`ProjectCacheService`].
94    ///
95    /// Returns a [`ProjectCacheHandle`] to access the cache concurrently.
96    pub fn start_in(self, services: &dyn ServiceSpawn) -> ProjectCacheHandle {
97        let (addr, addr_rx) = relay_system::channel(Self::name());
98
99        let handle = ProjectCacheHandle {
100            shared: self.store.shared(),
101            config: Arc::clone(&self.config),
102            service: addr,
103            project_changes: self.project_events_tx.clone(),
104        };
105
106        services.start_with(self, addr_rx);
107
108        handle
109    }
110
111    /// Schedules a new [`Fetch`] in [`Self::scheduled_fetches`].
112    fn schedule_fetch(&mut self, fetch: Fetch) {
113        let source = self.source.clone();
114
115        let when = fetch.when();
116        let task = async move {
117            let state = match source
118                .fetch(fetch.project_key(), false, fetch.revision())
119                .await
120            {
121                Ok(result) => result,
122                Err(ProjectSourceError::Upstream(upstream::Error::DeadlineExceeded)) => {
123                    // Somewhat of an expected error which is already logged on the upstream side.
124                    //
125                    // -> we can just go into our usual pending handling.
126                    ProjectState::Pending.into()
127                }
128                Err(err @ ProjectSourceError::FatalUpstream) => {
129                    relay_log::error!(
130                        tags.project_key = fetch.project_key().as_str(),
131                        tags.has_revision = fetch.revision().as_str().is_some(),
132                        error = &err as &dyn std::error::Error,
133                        "failed to fetch project from source: {fetch:?}"
134                    );
135
136                    ProjectState::Pending.into()
137                }
138            };
139
140            fetch.complete(state)
141        };
142        self.scheduled_fetches.schedule(when, Box::pin(task));
143
144        metric!(counter(RelayCounters::ProjectCacheSchedule) += 1);
145        metric!(
146            gauge(RelayGauges::ProjectCacheScheduledFetches) = self.scheduled_fetches.len() as u64
147        );
148    }
149}
150
151/// All [`ProjectCacheService`] message handlers.
152impl ProjectCacheService {
153    fn handle_fetch(&mut self, project_key: ProjectKey) {
154        if let Some(fetch) = self.store.try_begin_fetch(project_key) {
155            self.schedule_fetch(fetch);
156        }
157    }
158
159    fn handle_completed_fetch(&mut self, fetch: CompletedFetch) {
160        let project_key = fetch.project_key();
161
162        if let Some(fetch) = self.store.complete_fetch(fetch) {
163            relay_log::trace!(
164                project_key = fetch.project_key().as_str(),
165                "re-scheduling project fetch: {fetch:?}"
166            );
167            self.schedule_fetch(fetch);
168            return;
169        }
170
171        let _ = self
172            .project_events_tx
173            .send(ProjectChange::Ready(project_key));
174
175        metric!(
176            gauge(RelayGauges::ProjectCacheNotificationChannel) =
177                self.project_events_tx.len() as u64
178        );
179    }
180
181    fn handle_eviction(&mut self, eviction: Eviction) {
182        let project_key = eviction.project_key();
183
184        self.store.evict(eviction);
185
186        let _ = self
187            .project_events_tx
188            .send(ProjectChange::Evicted(project_key));
189
190        relay_log::trace!(tags.project_key = project_key.as_str(), "project evicted");
191        metric!(counter(RelayCounters::EvictingStaleProjectCaches) += 1);
192    }
193
194    fn handle_refresh(&mut self, refresh: Refresh) {
195        let project_key = refresh.project_key();
196
197        if let Some(fetch) = self.store.refresh(refresh) {
198            self.schedule_fetch(fetch);
199        }
200
201        relay_log::trace!(tags.project_key = project_key.as_str(), "project refreshed");
202        metric!(counter(RelayCounters::RefreshStaleProjectCaches) += 1);
203    }
204
205    fn handle_message(&mut self, message: ProjectCache) {
206        match message {
207            ProjectCache::Fetch(project_key) => self.handle_fetch(project_key),
208        }
209    }
210}
211
212impl relay_system::Service for ProjectCacheService {
213    type Interface = ProjectCache;
214
215    async fn run(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
216        macro_rules! timed {
217            ($task:expr, $body:expr) => {{
218                let task_name = $task;
219                metric!(
220                    timer(RelayTimers::ProjectCacheTaskDuration),
221                    task = task_name,
222                    { $body }
223                )
224            }};
225        }
226
227        loop {
228            tokio::select! {
229                biased;
230
231                Some(fetch) = self.scheduled_fetches.next() => timed!(
232                    "completed_fetch",
233                    self.handle_completed_fetch(fetch)
234                ),
235                Some(message) = rx.recv() => timed!(
236                    message.variant(),
237                    self.handle_message(message)
238                ),
239                Some(action) = self.store.poll() => match action {
240                    state::Action::Eviction(eviction) => timed!(
241                        "eviction",
242                        self.handle_eviction(eviction)
243                    ),
244                    state::Action::Refresh(refresh) => timed!(
245                        "refresh",
246                        self.handle_refresh(refresh)
247                    ),
248                }
249            }
250        }
251    }
252}