Skip to main content

relay_server/services/projects/cache/
state.rs

1use futures::StreamExt;
2use std::fmt;
3use std::sync::Arc;
4use std::time::Duration;
5use tokio::sync::Notify;
6use tokio::sync::futures::Notified;
7use tokio::time::Instant;
8
9use arc_swap::ArcSwap;
10use relay_base_schema::project::ProjectKey;
11use relay_quotas::CachedRateLimits;
12use relay_statsd::metric;
13
14use crate::services::projects::project::{ProjectState, Revision};
15use crate::services::projects::source::SourceProjectState;
16use crate::statsd::{RelayDistributions, RelayTimers};
17use crate::utils::{RetryBackoff, UniqueScheduledQueue};
18
19/// The backing storage for a project cache.
20///
21/// Exposes the only interface to delete from [`Shared`], guaranteed by
22/// requiring exclusive/mutable access to [`ProjectStore`].
23///
24/// [`Shared`] can be extended through [`Shared::get_or_create`], in which case
25/// the private state is missing. Users of [`Shared::get_or_create`] *must* trigger
26/// a fetch to create the private state and keep it updated.
27/// This guarantees that eventually the project state is populated, but for a undetermined,
28/// time it is possible that shared state exists without the respective private state.
29pub struct ProjectStore {
30    config: Config,
31    /// The shared state, which can be accessed concurrently.
32    shared: Arc<Shared>,
33    /// The private, mutably exclusive state, used to maintain the project state.
34    private: hashbrown::HashMap<ProjectKey, PrivateProjectState>,
35    /// Scheduled queue tracking all evictions.
36    evictions: UniqueScheduledQueue<ProjectKey>,
37    /// Scheduled queue tracking all refreshes.
38    refreshes: UniqueScheduledQueue<ProjectKey>,
39}
40
41impl ProjectStore {
42    pub fn new(config: &relay_config::Config) -> Self {
43        Self {
44            config: Config::new(config),
45            shared: Default::default(),
46            private: Default::default(),
47            evictions: Default::default(),
48            refreshes: Default::default(),
49        }
50    }
51
52    /// Retrieves a [`Shared`] handle which can be freely shared with multiple consumers.
53    pub fn shared(&self) -> Arc<Shared> {
54        Arc::clone(&self.shared)
55    }
56
57    /// Tries to begin a new fetch for the passed `project_key`.
58    ///
59    /// Returns `None` if no fetch is necessary or there is already a fetch ongoing.
60    /// A returned [`Fetch`] must be scheduled and completed with [`Fetch::complete`] and
61    /// [`Self::complete_fetch`].
62    pub fn try_begin_fetch(&mut self, project_key: ProjectKey) -> Option<Fetch> {
63        self.do_try_begin_fetch(project_key, false)
64    }
65
66    /// Completes a [`CompletedFetch`] started with [`Self::try_begin_fetch`].
67    ///
68    /// Returns a new [`Fetch`] if another fetch must be scheduled. This happens when the fetched
69    /// [`ProjectState`] is still pending or already deemed expired.
70    #[must_use = "an incomplete fetch must be retried"]
71    pub fn complete_fetch(&mut self, fetch: CompletedFetch) -> Option<Fetch> {
72        let project_key = fetch.project_key();
73
74        // Eviction is not possible for projects which are currently being fetched.
75        // Hence if there was a started fetch, the project state must always exist at this stage.
76        debug_assert!(self.shared.projects.pin().get(&project_key).is_some());
77        debug_assert!(self.private.get(&project_key).is_some());
78
79        let mut project = self.get_or_create(project_key);
80        // Schedule another fetch if necessary, usually should only happen if
81        // the completed fetch is pending.
82        let new_fetch = match project.complete_fetch(fetch) {
83            FetchResult::ReSchedule { refresh } => project.try_begin_fetch(refresh),
84            FetchResult::Done { expiry, refresh } => {
85                self.evictions.schedule(expiry.0, project_key);
86                if let Some(RefreshTime(refresh)) = refresh {
87                    self.refreshes.schedule(refresh, project_key);
88                }
89                None
90            }
91        };
92
93        metric!(
94            distribution(RelayDistributions::ProjectStateCacheSize) =
95                self.shared.projects.len() as u64,
96            storage = "shared"
97        );
98        metric!(
99            distribution(RelayDistributions::ProjectStateCacheSize) = self.private.len() as u64,
100            storage = "private"
101        );
102
103        new_fetch
104    }
105
106    /// Waits for the next scheduled action.
107    ///
108    /// The returned [`Action`] must be immediately turned in using the corresponding handlers,
109    /// [`Self::evict`] or [`Self::refresh`].
110    ///
111    /// The returned future is cancellation safe.
112    pub async fn poll(&mut self) -> Option<Action> {
113        let eviction = self.evictions.next();
114        let refresh = self.refreshes.next();
115
116        tokio::select! {
117            biased;
118
119            Some(e) = eviction => Some(Action::Eviction(Eviction(e))),
120            Some(r) = refresh => Some(Action::Refresh(Refresh(r))),
121            else => None,
122        }
123    }
124
125    /// Refreshes a project using an [`Refresh`] token returned from [`Self::poll`].
126    ///
127    /// Like [`Self::try_begin_fetch`], this returns a [`Fetch`], if there was no fetch
128    /// already started in the meantime.
129    ///
130    /// A returned [`Fetch`] must be scheduled and completed with [`Fetch::complete`] and
131    /// [`Self::complete_fetch`].
132    pub fn refresh(&mut self, Refresh(project_key): Refresh) -> Option<Fetch> {
133        self.do_try_begin_fetch(project_key, true)
134    }
135
136    /// Evicts a project using an [`Eviction`] token returned from [`Self::poll`].
137    pub fn evict(&mut self, Eviction(project_key): Eviction) {
138        // Remove the private part.
139        let Some(private) = self.private.remove(&project_key) else {
140            // Not possible if all invariants are upheld.
141            debug_assert!(false, "no private state for eviction");
142            return;
143        };
144
145        debug_assert!(
146            matches!(private.state, FetchState::Complete { .. }),
147            "private state must be completed"
148        );
149
150        // Remove the shared part.
151        let shared = self.shared.projects.pin();
152        let _removed = shared.remove(&project_key);
153        debug_assert!(
154            _removed.is_some(),
155            "an expired project must exist in the shared state"
156        );
157
158        // Cancel next refresh, while not necessary (trying to refresh a project which does not
159        // exist, will do nothing), but we can also spare us the extra work.
160        self.refreshes.remove(&project_key);
161    }
162
163    /// Internal handler to begin a new fetch for the passed `project_key`, which can also handle
164    /// refreshes.
165    fn do_try_begin_fetch(&mut self, project_key: ProjectKey, is_refresh: bool) -> Option<Fetch> {
166        let fetch = match is_refresh {
167            // A rogue refresh does not need to trigger an actual fetch.
168            // In practice this should never happen, as the refresh time is validated against
169            // the eviction time.
170            // But it may happen due to a race of the eviction and refresh (e.g. when setting them
171            // to close to the same value), in which case we don't want to re-populate the cache.
172            true => self.get(project_key)?,
173            false => self.get_or_create(project_key),
174        }
175        .try_begin_fetch(is_refresh);
176
177        // If there is a new fetch, remove the pending eviction, it will be re-scheduled once the
178        // fetch is completed.
179        if fetch.is_some() {
180            self.evictions.remove(&project_key);
181            // There is no need to clear the refresh here, if it triggers while a fetch is ongoing,
182            // it is simply discarded.
183        }
184
185        fetch
186    }
187
188    /// Get a reference to the current project or create a new project.
189    ///
190    /// For internal use only, a created project must always be fetched immediately.
191    fn get(&mut self, project_key: ProjectKey) -> Option<ProjectRef<'_>> {
192        let private = self.private.get_mut(&project_key)?;
193
194        // Same invariant as in `get_or_create`, we have exclusive access to the private
195        // project here, there must be a shared project if there is a private project.
196        debug_assert!(self.shared.projects.pin().contains_key(&project_key));
197
198        let shared = self
199            .shared
200            .projects
201            .pin()
202            .get_or_insert_with(project_key, Default::default)
203            .clone();
204
205        Some(ProjectRef {
206            private,
207            shared,
208            config: &self.config,
209        })
210    }
211
212    /// Get a reference to the current project or create a new project.
213    ///
214    /// For internal use only, a created project must always be fetched immediately.
215    fn get_or_create(&mut self, project_key: ProjectKey) -> ProjectRef<'_> {
216        #[cfg(debug_assertions)]
217        if self.private.contains_key(&project_key) {
218            // We have exclusive access to the private part, there are no concurrent deletions
219            // hence if we have a private state there must always be a shared state as well.
220            //
221            // The opposite is not true, the shared state may have been created concurrently
222            // through the shared access.
223            debug_assert!(self.shared.projects.pin().contains_key(&project_key));
224        }
225
226        let private = self
227            .private
228            .entry(project_key)
229            .or_insert_with(|| PrivateProjectState::new(project_key, &self.config));
230
231        let shared = self
232            .shared
233            .projects
234            .pin()
235            .get_or_insert_with(project_key, Default::default)
236            .clone();
237
238        ProjectRef {
239            private,
240            shared,
241            config: &self.config,
242        }
243    }
244}
245
246/// Configuration for a [`ProjectStore`].
247struct Config {
248    /// Expiry timeout for individual project configs.
249    ///
250    /// Note: the total expiry is the sum of the expiry and grace period.
251    expiry: Duration,
252    /// Grace period for a project config.
253    ///
254    /// A project config is considered stale and will be updated asynchronously,
255    /// after reaching the grace period.
256    grace_period: Duration,
257    /// Refresh interval for a single project.
258    ///
259    /// A project will be asynchronously refreshed repeatedly using this interval.
260    ///
261    /// The refresh interval is validated to be between expiration and grace period. An invalid refresh
262    /// time is ignored.
263    refresh_interval: Option<Duration>,
264    /// Maximum backoff for continuously failing project updates.
265    max_retry_backoff: Duration,
266}
267
268impl Config {
269    fn new(config: &relay_config::Config) -> Self {
270        let expiry = config.project_cache_expiry();
271        let grace_period = config.project_grace_period();
272
273        // Make sure the refresh time is:
274        // - at least the expiration, refreshing a non-stale project makes no sense.
275        // - at most the end of the grace period, refreshing an expired project also makes no sense.
276        let refresh_interval = config
277            .project_refresh_interval()
278            .filter(|rt| *rt < (expiry + grace_period))
279            .filter(|rt| *rt > expiry);
280
281        Self {
282            expiry: config.project_cache_expiry(),
283            grace_period: config.project_grace_period(),
284            refresh_interval,
285            max_retry_backoff: config.http_max_retry_interval(),
286        }
287    }
288}
289
290/// The shared and concurrently accessible handle to the project cache.
291#[derive(Default)]
292pub struct Shared {
293    projects: papaya::HashMap<ProjectKey, SharedProjectState, ahash::RandomState>,
294}
295
296impl Shared {
297    /// Returns the existing project state or creates a new one.
298    ///
299    /// The caller must ensure that the project cache is instructed to
300    /// [`super::ProjectCache::Fetch`] the retrieved project.
301    pub fn get_or_create(&self, project_key: ProjectKey) -> SharedProject {
302        self.get_or_create_inner(project_key).to_shared_project()
303    }
304
305    fn get_or_create_inner(&self, project_key: ProjectKey) -> SharedProjectState {
306        // The fast path, we expect the project to exist.
307        let projects = self.projects.pin();
308        if let Some(project) = projects.get(&project_key) {
309            return project.clone();
310        }
311
312        // The slow path, try to attempt to insert, somebody else may have been faster, but that's okay.
313        match projects.try_insert(project_key, Default::default()) {
314            Ok(inserted) => inserted.clone(),
315            Err(occupied) => occupied.current.clone(),
316        }
317    }
318}
319
320/// TEST ONLY bypass to make the project cache mockable.
321#[cfg(test)]
322impl Shared {
323    /// Updates the project state for a project.
324    ///
325    /// TEST ONLY!
326    pub fn test_set_project_state(&self, project_key: ProjectKey, state: ProjectState) {
327        self.projects
328            .pin()
329            .get_or_insert_with(project_key, Default::default)
330            .set_project_state(state);
331    }
332
333    /// Returns `true` if there exists a shared state for the passed `project_key`.
334    pub fn test_has_project_created(&self, project_key: ProjectKey) -> bool {
335        self.projects.pin().contains_key(&project_key)
336    }
337}
338
339impl fmt::Debug for Shared {
340    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
341        f.debug_struct("Shared")
342            .field("num_projects", &self.projects.len())
343            .finish()
344    }
345}
346
347/// A single project from the [`Shared`] project cache.
348pub struct SharedProject(Arc<SharedProjectStateInner>);
349
350impl SharedProject {
351    /// Returns a reference to the contained [`ProjectState`].
352    pub fn project_state(&self) -> &ProjectState {
353        &self.0.state
354    }
355
356    /// Returns a reference to the contained [`CachedRateLimits`].
357    pub fn cached_rate_limits(&self) -> &CachedRateLimits {
358        // Exposing cached rate limits may be a bad idea, this allows mutation
359        // and caching of rate limits for pending projects, which may or may not be fine.
360        // Although, for now this is fine.
361        //
362        // Read only access is easily achievable if we return only the current rate limits.
363        &self.0.rate_limits
364    }
365
366    /// Waits for the event of a changed project state, triggered by [`SharedProjectState::set_project_state`].
367    ///
368    /// Note that the content of this instance does not change when the event is triggered.
369    pub fn outdated(&self) -> Notified<'_> {
370        self.0.notify.notified()
371    }
372}
373
374/// TEST ONLY bypass to make the project cache mockable.
375#[cfg(test)]
376impl SharedProject {
377    /// Creates a new [`SharedProject`] for testing only.
378    pub fn for_test(state: ProjectState) -> Self {
379        Self(Arc::new(SharedProjectStateInner {
380            state,
381            ..Default::default()
382        }))
383    }
384}
385
386/// Reference to a full project wrapping shared and private state.
387struct ProjectRef<'a> {
388    shared: SharedProjectState,
389    private: &'a mut PrivateProjectState,
390    config: &'a Config,
391}
392
393impl ProjectRef<'_> {
394    fn try_begin_fetch(&mut self, is_refresh: bool) -> Option<Fetch> {
395        let now = Instant::now();
396        self.private
397            .try_begin_fetch(now, is_refresh, self.config)
398            .map(|fetch| fetch.with_revision(self.shared.revision()))
399    }
400
401    fn complete_fetch(&mut self, fetch: CompletedFetch) -> FetchResult {
402        let now = Instant::now();
403
404        if let Some(latency) = fetch.latency() {
405            let delay = match fetch.delay() {
406                Some(delay) if delay.as_secs() <= 15 => "lte15s",
407                Some(delay) if delay.as_secs() <= 30 => "lte30s",
408                Some(delay) if delay.as_secs() <= 60 => "lte60s",
409                Some(delay) if delay.as_secs() <= 120 => "lte120",
410                Some(delay) if delay.as_secs() <= 300 => "lte300s",
411                Some(delay) if delay.as_secs() <= 600 => "lte600s",
412                Some(delay) if delay.as_secs() <= 1800 => "lte1800s",
413                Some(delay) if delay.as_secs() <= 3600 => "lte3600s",
414                Some(_) => "gt3600s",
415                None => "none",
416            };
417            metric!(
418                timer(RelayTimers::ProjectCacheUpdateLatency) = latency,
419                delay = delay
420            );
421        }
422
423        if !fetch.is_pending() {
424            let state = match fetch.state {
425                SourceProjectState::New(_) => "new",
426                SourceProjectState::NotModified => "not_modified",
427            };
428
429            metric!(
430                timer(RelayTimers::ProjectCacheFetchDuration) = fetch.duration(now),
431                state = state
432            );
433        }
434
435        // Update private and shared state with the new data.
436        let result = self.private.complete_fetch(&fetch, now, self.config);
437        match fetch.state {
438            // Keep the old state around if the current fetch is pending.
439            // It may still be useful to callers.
440            SourceProjectState::New(state) if !state.is_pending() => {
441                self.shared.set_project_state(state);
442            }
443            _ => {}
444        }
445
446        result
447    }
448}
449
450pub enum Action {
451    Eviction(Eviction),
452    Refresh(Refresh),
453}
454
455/// A [`Refresh`] token.
456///
457/// The token must be turned in using [`ProjectStore::refresh`].
458#[derive(Debug)]
459#[must_use = "a refresh must be used"]
460pub struct Refresh(ProjectKey);
461
462impl Refresh {
463    /// Returns the [`ProjectKey`] of the project that needs to be refreshed.
464    pub fn project_key(&self) -> ProjectKey {
465        self.0
466    }
467}
468
469/// A [`Eviction`] token.
470///
471/// The token must be turned in using [`ProjectStore::evict`].
472#[derive(Debug)]
473#[must_use = "an eviction must be used"]
474pub struct Eviction(ProjectKey);
475
476impl Eviction {
477    /// Returns the [`ProjectKey`] of the project that needs to be evicted.
478    pub fn project_key(&self) -> ProjectKey {
479        self.0
480    }
481}
482
483/// A [`Fetch`] token.
484///
485/// When returned it must be executed and completed using [`Self::complete`].
486#[must_use = "a fetch must be executed"]
487#[derive(Debug)]
488pub struct Fetch {
489    project_key: ProjectKey,
490    previous_fetch: Option<Instant>,
491    initiated: Instant,
492    when: Option<Instant>,
493    revision: Revision,
494}
495
496impl Fetch {
497    /// Returns the [`ProjectKey`] of the project to fetch.
498    pub fn project_key(&self) -> ProjectKey {
499        self.project_key
500    }
501
502    /// Returns when the fetch for the project should be scheduled.
503    ///
504    /// This can be now (as soon as possible, indicated by `None`) or a later point in time,
505    /// if the project is currently in a backoff.
506    pub fn when(&self) -> Option<Instant> {
507        self.when
508    }
509
510    /// Returns the revisions of the currently cached project.
511    ///
512    /// If the upstream indicates it does not have a different version of this project
513    /// we do not need to update the local state.
514    pub fn revision(&self) -> Revision {
515        self.revision.clone()
516    }
517
518    /// Completes the fetch with a result and returns a [`CompletedFetch`].
519    pub fn complete(self, state: SourceProjectState) -> CompletedFetch {
520        CompletedFetch { fetch: self, state }
521    }
522
523    fn with_revision(mut self, revision: Revision) -> Self {
524        self.revision = revision;
525        self
526    }
527}
528
529/// The result of an executed [`Fetch`].
530#[must_use = "a completed fetch must be acted upon"]
531#[derive(Debug)]
532pub struct CompletedFetch {
533    fetch: Fetch,
534    state: SourceProjectState,
535}
536
537impl CompletedFetch {
538    /// Returns the [`ProjectKey`] of the project which was fetched.
539    pub fn project_key(&self) -> ProjectKey {
540        self.fetch.project_key()
541    }
542
543    /// Returns the amount of time passed between the last successful fetch for this project and the start of this fetch.
544    ///
545    /// `None` if this is the first fetch.
546    fn delay(&self) -> Option<Duration> {
547        self.fetch
548            .previous_fetch
549            .map(|pf| self.fetch.initiated.duration_since(pf))
550    }
551
552    /// Returns the duration between first initiating the fetch and `now`.
553    fn duration(&self, now: Instant) -> Duration {
554        now.duration_since(self.fetch.initiated)
555    }
556
557    /// Returns the update latency of the fetched project config from the upstream.
558    ///
559    /// Is `None`, when no project config could be fetched, or if this was the first
560    /// fetch of a project config.
561    ///
562    /// Note: this latency is computed on access, it does not use the time when the [`Fetch`]
563    /// was marked as (completed)[`Fetch::complete`].
564    fn latency(&self) -> Option<Duration> {
565        // We're not interested in initial fetches. The latency on the first fetch
566        // has no meaning about how long it takes for an updated project config to be
567        // propagated to a Relay.
568        let is_first_fetch = self.fetch.revision().as_str().is_none();
569        if is_first_fetch {
570            return None;
571        }
572
573        let project_info = match &self.state {
574            SourceProjectState::New(ProjectState::Enabled(project_info)) => project_info,
575            // Not modified or deleted/disabled -> no latency to track.
576            //
577            // Currently we discard the last changed timestamp for disabled projects,
578            // it would be possible to do so and then also expose a latency for disabled projects.
579            _ => return None,
580        };
581
582        // A matching revision is not an update.
583        if project_info.rev == self.fetch.revision {
584            return None;
585        }
586
587        let elapsed = chrono::Utc::now() - project_info.last_change?;
588        elapsed.to_std().ok()
589    }
590
591    /// Returns `true` if the fetch completed with a pending status.
592    fn is_pending(&self) -> bool {
593        match &self.state {
594            SourceProjectState::New(state) => state.is_pending(),
595            SourceProjectState::NotModified => false,
596        }
597    }
598}
599
600/// The state of a project contained in the [`Shared`] project cache.
601///
602/// This state is interior mutable and allows updates to the project.
603#[derive(Debug, Default, Clone)]
604struct SharedProjectState(Arc<ArcSwap<SharedProjectStateInner>>);
605
606impl SharedProjectState {
607    /// Updates the project state.
608    fn set_project_state(&self, state: ProjectState) {
609        let prev = self.0.rcu(|stored| SharedProjectStateInner {
610            state: state.clone(),
611            rate_limits: Arc::clone(&stored.rate_limits),
612            notify: Arc::clone(&stored.notify),
613        });
614
615        // Finally, notify listeners:
616        prev.notify.notify_waiters();
617    }
618
619    /// Extracts and clones the revision from the contained project state.
620    fn revision(&self) -> Revision {
621        self.0.as_ref().load().state.revision().clone()
622    }
623
624    /// Transforms this interior mutable handle to an immutable [`SharedProject`].
625    fn to_shared_project(&self) -> SharedProject {
626        SharedProject(self.0.as_ref().load_full())
627    }
628}
629
630/// The data contained in a [`SharedProjectState`].
631///
632/// All fields must be cheap to clone and are ideally just a single `Arc`.
633/// Partial updates to [`SharedProjectState`], are performed using `rcu` cloning all fields.
634#[derive(Debug, Default)]
635struct SharedProjectStateInner {
636    state: ProjectState,
637    rate_limits: Arc<CachedRateLimits>,
638    notify: Arc<Notify>,
639}
640
641/// Current fetch state for a project.
642///
643/// ─────► Pending ◄─────┐
644///           │          │
645///           │          │Backoff
646///           ▼          │
647/// ┌───► InProgress ────┘
648/// │         │
649/// │         │
650/// │         ▼
651/// └───── Complete
652#[derive(Debug)]
653enum FetchState {
654    /// There is a fetch currently in progress.
655    InProgress {
656        /// Whether the current check in progress was triggered from a refresh.
657        ///
658        /// Triggering a non-refresh fetch while a refresh fetch is currently in progress,
659        /// will overwrite this property.
660        is_refresh: bool,
661    },
662    /// A successful fetch is pending.
663    ///
664    /// Projects which have not yet been fetched are in the pending state,
665    /// as well as projects which have a fetch in progress but were notified
666    /// from upstream that the project config is still pending.
667    ///
668    /// If the upstream notifies this instance about a pending config,
669    /// a backoff is applied, before trying again.
670    Pending {
671        /// Instant when the fetch was first initiated.
672        ///
673        /// A state may be transitioned multiple times from [`Self::Pending`] to [`Self::InProgress`]
674        /// and back to [`Self::Pending`]. This timestamp is the first time when the state
675        /// was transitioned from [`Self::Complete`] to [`Self::InProgress`].
676        ///
677        /// Only `None` on first fetch.
678        initiated: Option<Instant>,
679        /// Time when the next fetch should be attempted.
680        ///
681        /// `None` means soon as possible.
682        next_fetch_attempt: Option<Instant>,
683    },
684    /// There was a successful non-pending fetch.
685    Complete {
686        /// Time when the fetch was completed.
687        when: LastFetch,
688    },
689}
690
691/// Contains all mutable state necessary to maintain the project cache.
692struct PrivateProjectState {
693    /// Project key this state belongs to.
694    project_key: ProjectKey,
695
696    /// The current fetch state.
697    state: FetchState,
698    /// The current backoff used for calculating the next fetch attempt.
699    ///
700    /// The backoff is reset after a successful, non-pending fetch.
701    backoff: RetryBackoff,
702
703    /// The last time the state was successfully fetched.
704    ///
705    /// May be `None` when the state has never been successfully fetched.
706    ///
707    /// This is purely informational, all necessary information to make
708    /// state transitions is contained in [`FetchState`].
709    last_fetch: Option<Instant>,
710
711    /// The expiry time of this project.
712    ///
713    /// A refresh of the project, will not push the expiration time.
714    expiry: Option<Instant>,
715}
716
717impl PrivateProjectState {
718    fn new(project_key: ProjectKey, config: &Config) -> Self {
719        Self {
720            project_key,
721            state: FetchState::Pending {
722                initiated: None,
723                next_fetch_attempt: None,
724            },
725            backoff: RetryBackoff::new(config.max_retry_backoff),
726            last_fetch: None,
727            expiry: None,
728        }
729    }
730
731    fn try_begin_fetch(
732        &mut self,
733        now: Instant,
734        is_refresh: bool,
735        config: &Config,
736    ) -> Option<Fetch> {
737        let (initiated, when) = match &mut self.state {
738            FetchState::InProgress {
739                is_refresh: refresh_in_progress,
740            } => {
741                relay_log::trace!(
742                    tags.project_key = self.project_key.as_str(),
743                    "project fetch skipped, fetch in progress"
744                );
745                // Upgrade the refresh status if necessary.
746                *refresh_in_progress = *refresh_in_progress && is_refresh;
747                return None;
748            }
749            FetchState::Pending {
750                initiated,
751                next_fetch_attempt,
752            } => {
753                // Schedule a new fetch, even if there is a backoff, it will just be sleeping for a while.
754                (initiated.unwrap_or(now), *next_fetch_attempt)
755            }
756            FetchState::Complete { when } => {
757                // Sanity check to make sure timestamps do not drift.
758                debug_assert_eq!(Some(when.0), self.last_fetch);
759
760                if when.check_expiry(now, config).is_fresh() {
761                    // The current state is up to date, no need to start another fetch.
762                    relay_log::trace!(
763                        tags.project_key = self.project_key.as_str(),
764                        "project fetch skipped, already up to date"
765                    );
766                    return None;
767                }
768
769                (now, None)
770            }
771        };
772
773        // Mark a current fetch in progress.
774        self.state = FetchState::InProgress { is_refresh };
775
776        relay_log::trace!(
777            tags.project_key = &self.project_key.as_str(),
778            attempts = self.backoff.attempt() + 1,
779            "project state {} scheduled in {:?}",
780            if is_refresh { "refresh" } else { "fetch" },
781            when.unwrap_or(now).saturating_duration_since(now),
782        );
783
784        Some(Fetch {
785            project_key: self.project_key,
786            previous_fetch: self.last_fetch,
787            initiated,
788            when,
789            revision: Revision::default(),
790        })
791    }
792
793    fn complete_fetch(
794        &mut self,
795        fetch: &CompletedFetch,
796        now: Instant,
797        config: &Config,
798    ) -> FetchResult {
799        let FetchState::InProgress { is_refresh } = self.state else {
800            debug_assert!(
801                false,
802                "fetch completed while there was no current fetch registered"
803            );
804            // Be conservative in production.
805            return FetchResult::ReSchedule { refresh: false };
806        };
807
808        if fetch.is_pending() {
809            let next_backoff = self.backoff.next_backoff();
810            let next_fetch_attempt = match next_backoff.is_zero() {
811                false => now.checked_add(next_backoff),
812                true => None,
813            };
814            self.state = FetchState::Pending {
815                next_fetch_attempt,
816                initiated: Some(fetch.fetch.initiated),
817            };
818            relay_log::trace!(
819                tags.project_key = &self.project_key.as_str(),
820                "project state {} completed but still pending",
821                if is_refresh { "refresh" } else { "fetch" },
822            );
823
824            FetchResult::ReSchedule {
825                refresh: is_refresh,
826            }
827        } else {
828            relay_log::trace!(
829                tags.project_key = &self.project_key.as_str(),
830                "project state {} completed with non-pending config",
831                if is_refresh { "refresh" } else { "fetch" },
832            );
833
834            self.backoff.reset();
835            self.last_fetch = Some(now);
836
837            let when = LastFetch(now);
838
839            let refresh = when.refresh_time(config);
840            let expiry = match self.expiry {
841                Some(expiry) if is_refresh => ExpiryTime(expiry),
842                // Only bump/re-compute the expiry time if the fetch was not a refresh,
843                // to not keep refreshed projects forever in the cache.
844                Some(_) | None => when.expiry_time(config),
845            };
846            self.expiry = Some(expiry.0);
847
848            self.state = FetchState::Complete { when };
849            FetchResult::Done { expiry, refresh }
850        }
851    }
852}
853
854/// Result returned when completing a fetch.
855#[derive(Debug)]
856#[must_use = "fetch result must be used"]
857enum FetchResult {
858    /// Another fetch must be scheduled immediately.
859    ReSchedule {
860        /// Whether the fetch should be re-scheduled as a refresh.
861        refresh: bool,
862    },
863    /// The fetch is completed and should be registered for refresh and eviction.
864    Done {
865        /// When the project should be expired.
866        expiry: ExpiryTime,
867        /// When the project should be refreshed.
868        refresh: Option<RefreshTime>,
869    },
870}
871
872/// New type containing the last successful fetch time as an [`Instant`].
873#[derive(Debug, Copy, Clone)]
874struct LastFetch(Instant);
875
876impl LastFetch {
877    /// Returns the [`Expiry`] of the last fetch in relation to `now`.
878    fn check_expiry(&self, now: Instant, config: &Config) -> Expiry {
879        let elapsed = now.saturating_duration_since(self.0);
880
881        if elapsed >= config.expiry + config.grace_period {
882            Expiry::Expired
883        } else if elapsed >= config.expiry {
884            Expiry::Stale
885        } else {
886            Expiry::Fresh
887        }
888    }
889
890    /// Returns when the project needs to be queued for a refresh.
891    fn refresh_time(&self, config: &Config) -> Option<RefreshTime> {
892        config
893            .refresh_interval
894            .map(|duration| self.0 + duration)
895            .map(RefreshTime)
896    }
897
898    /// Returns when the project is based to expire based on the current [`LastFetch`].
899    fn expiry_time(&self, config: &Config) -> ExpiryTime {
900        ExpiryTime(self.0 + config.grace_period + config.expiry)
901    }
902}
903
904/// Expiry state of a project.
905#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
906enum Expiry {
907    /// The project state is perfectly up to date.
908    Fresh,
909    /// The project state is outdated but events depending on this project state can still be
910    /// processed. The state should be refreshed in the background though.
911    Stale,
912    /// The project state is completely outdated and events need to be buffered up until the new
913    /// state has been fetched.
914    Expired,
915}
916
917impl Expiry {
918    /// Returns `true` if the project is up-to-date and does not need to be fetched.
919    fn is_fresh(&self) -> bool {
920        matches!(self, Self::Fresh)
921    }
922}
923
924/// Instant when a project is scheduled for refresh.
925#[derive(Debug)]
926#[must_use = "an refresh time must be used to schedule a refresh"]
927struct RefreshTime(Instant);
928
929/// Instant when a project is scheduled for expiry.
930#[derive(Debug)]
931#[must_use = "an expiry time must be used to schedule an eviction"]
932struct ExpiryTime(Instant);
933
934#[cfg(test)]
935mod tests {
936    use std::time::Duration;
937
938    use super::*;
939
940    async fn collect_evicted(store: &mut ProjectStore) -> Vec<ProjectKey> {
941        let mut evicted = Vec::new();
942        // Small timeout to really only get what is ready to be evicted right now.
943        while let Ok(Some(Action::Eviction(eviction))) =
944            tokio::time::timeout(Duration::from_nanos(5), store.poll()).await
945        {
946            evicted.push(eviction.0);
947            store.evict(eviction);
948        }
949        evicted
950    }
951
952    macro_rules! assert_state {
953        ($store:ident, $project_key:ident, $state:pat) => {
954            assert!(matches!(
955                $store.shared().get_or_create($project_key).project_state(),
956                $state
957            ));
958        };
959    }
960
961    #[tokio::test(start_paused = true)]
962    async fn test_store_fetch() {
963        let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
964        let mut store = ProjectStore::new(&Default::default());
965
966        let fetch = store.try_begin_fetch(project_key).unwrap();
967        assert_eq!(fetch.project_key(), project_key);
968        assert_eq!(fetch.when(), None);
969        assert_eq!(fetch.revision().as_str(), None);
970        assert_state!(store, project_key, ProjectState::Pending);
971
972        // Fetch already in progress, nothing to do.
973        assert!(store.try_begin_fetch(project_key).is_none());
974
975        // A pending fetch should trigger a new fetch immediately.
976        let fetch = fetch.complete(ProjectState::Pending.into());
977        let fetch = store.complete_fetch(fetch).unwrap();
978        assert_eq!(fetch.project_key(), project_key);
979        // First backoff is still immediately.
980        assert_eq!(fetch.when(), None);
981        assert_eq!(fetch.revision().as_str(), None);
982        assert_state!(store, project_key, ProjectState::Pending);
983
984        // Pending again.
985        let fetch = fetch.complete(ProjectState::Pending.into());
986        let fetch = store.complete_fetch(fetch).unwrap();
987        assert_eq!(fetch.project_key(), project_key);
988        // This time it needs to be in the future (backoff).
989        assert!(fetch.when() > Some(Instant::now()));
990        assert_eq!(fetch.revision().as_str(), None);
991        assert_state!(store, project_key, ProjectState::Pending);
992
993        // Now complete with disabled.
994        let fetch = fetch.complete(ProjectState::Disabled.into());
995        assert!(store.complete_fetch(fetch).is_none());
996        assert_state!(store, project_key, ProjectState::Disabled);
997
998        // A new fetch is not yet necessary.
999        assert!(store.try_begin_fetch(project_key).is_none());
1000    }
1001
1002    #[tokio::test(start_paused = true)]
1003    async fn test_store_fetch_pending_does_not_replace_state() {
1004        let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1005        let mut store = ProjectStore::new(
1006            &relay_config::Config::from_json_value(serde_json::json!({
1007                "cache": {
1008                    "project_expiry": 5,
1009                    "project_grace_period": 5,
1010                }
1011            }))
1012            .unwrap(),
1013        );
1014
1015        let fetch = store.try_begin_fetch(project_key).unwrap();
1016        let fetch = fetch.complete(ProjectState::Disabled.into());
1017        assert!(store.complete_fetch(fetch).is_none());
1018        assert_state!(store, project_key, ProjectState::Disabled);
1019
1020        tokio::time::advance(Duration::from_secs(6)).await;
1021
1022        let fetch = store.try_begin_fetch(project_key).unwrap();
1023        let fetch = fetch.complete(ProjectState::Pending.into());
1024        // We're returned a new fetch, because the current one completed pending.
1025        let fetch = store.complete_fetch(fetch).unwrap();
1026        // The old cached state is still available and not replaced.
1027        assert_state!(store, project_key, ProjectState::Disabled);
1028
1029        let fetch = fetch.complete(ProjectState::new_allowed().into());
1030        assert!(store.complete_fetch(fetch).is_none());
1031        assert_state!(store, project_key, ProjectState::Enabled(_));
1032    }
1033
1034    #[tokio::test(start_paused = true)]
1035    async fn test_store_evict_projects() {
1036        let project_key1 = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1037        let project_key2 = ProjectKey::parse("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb").unwrap();
1038        let mut store = ProjectStore::new(
1039            &relay_config::Config::from_json_value(serde_json::json!({
1040                "cache": {
1041                    "project_expiry": 5,
1042                    "project_grace_period": 0,
1043                }
1044            }))
1045            .unwrap(),
1046        );
1047
1048        let fetch = store.try_begin_fetch(project_key1).unwrap();
1049        let fetch = fetch.complete(ProjectState::Disabled.into());
1050        assert!(store.complete_fetch(fetch).is_none());
1051
1052        assert_eq!(collect_evicted(&mut store).await, Vec::new());
1053        assert_state!(store, project_key1, ProjectState::Disabled);
1054
1055        // 3 seconds is not enough to expire any project.
1056        tokio::time::advance(Duration::from_secs(3)).await;
1057
1058        assert_eq!(collect_evicted(&mut store).await, Vec::new());
1059        assert_state!(store, project_key1, ProjectState::Disabled);
1060
1061        let fetch = store.try_begin_fetch(project_key2).unwrap();
1062        let fetch = fetch.complete(ProjectState::Disabled.into());
1063        assert!(store.complete_fetch(fetch).is_none());
1064
1065        // A total of 6 seconds should expire the first project.
1066        tokio::time::advance(Duration::from_secs(3)).await;
1067
1068        assert_eq!(collect_evicted(&mut store).await, vec![project_key1]);
1069        assert_state!(store, project_key1, ProjectState::Pending);
1070        assert_state!(store, project_key2, ProjectState::Disabled);
1071    }
1072
1073    #[tokio::test(start_paused = true)]
1074    async fn test_store_evict_projects_pending_not_expired() {
1075        let project_key1 = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1076        let project_key2 = ProjectKey::parse("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb").unwrap();
1077        let mut store = ProjectStore::new(
1078            &relay_config::Config::from_json_value(serde_json::json!({
1079                "cache": {
1080                    "project_expiry": 5,
1081                    "project_grace_period": 0,
1082                }
1083            }))
1084            .unwrap(),
1085        );
1086
1087        let fetch = store.try_begin_fetch(project_key1).unwrap();
1088        // Create a new project in a pending state, but never fetch it, this should also never expire.
1089        store.shared().get_or_create(project_key2);
1090
1091        tokio::time::advance(Duration::from_secs(6)).await;
1092
1093        // No evictions, project is pending.
1094        assert_eq!(collect_evicted(&mut store).await, Vec::new());
1095
1096        // Complete the project.
1097        let fetch = fetch.complete(ProjectState::Disabled.into());
1098        assert!(store.complete_fetch(fetch).is_none());
1099
1100        // Still should not be evicted, because we do have 5 seconds to expire since completion.
1101        assert_eq!(collect_evicted(&mut store).await, Vec::new());
1102        tokio::time::advance(Duration::from_secs(4)).await;
1103        assert_eq!(collect_evicted(&mut store).await, Vec::new());
1104        assert_state!(store, project_key1, ProjectState::Disabled);
1105
1106        // Just enough to expire the project.
1107        tokio::time::advance(Duration::from_millis(1001)).await;
1108        assert_eq!(collect_evicted(&mut store).await, vec![project_key1]);
1109        assert_state!(store, project_key1, ProjectState::Pending);
1110        assert_state!(store, project_key2, ProjectState::Pending);
1111    }
1112
1113    #[tokio::test(start_paused = true)]
1114    async fn test_store_evict_projects_stale() {
1115        let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1116        let mut store = ProjectStore::new(
1117            &relay_config::Config::from_json_value(serde_json::json!({
1118                "cache": {
1119                    "project_expiry": 5,
1120                    "project_grace_period": 5,
1121                }
1122            }))
1123            .unwrap(),
1124        );
1125
1126        let fetch = store.try_begin_fetch(project_key).unwrap();
1127        let fetch = fetch.complete(ProjectState::Disabled.into());
1128        assert!(store.complete_fetch(fetch).is_none());
1129
1130        // This is in the grace period, but not yet expired.
1131        tokio::time::advance(Duration::from_millis(9500)).await;
1132
1133        assert_eq!(collect_evicted(&mut store).await, Vec::new());
1134        assert_state!(store, project_key, ProjectState::Disabled);
1135
1136        // Now it's expired.
1137        tokio::time::advance(Duration::from_secs(1)).await;
1138
1139        assert_eq!(collect_evicted(&mut store).await, vec![project_key]);
1140        assert_state!(store, project_key, ProjectState::Pending);
1141    }
1142
1143    #[tokio::test(start_paused = true)]
1144    async fn test_store_no_eviction_during_fetch() {
1145        let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1146        let mut store = ProjectStore::new(
1147            &relay_config::Config::from_json_value(serde_json::json!({
1148                "cache": {
1149                    "project_expiry": 5,
1150                    "project_grace_period": 5,
1151                }
1152            }))
1153            .unwrap(),
1154        );
1155
1156        let fetch = store.try_begin_fetch(project_key).unwrap();
1157
1158        // Project is expired, but there is an ongoing fetch.
1159        tokio::time::advance(Duration::from_millis(10500)).await;
1160        // No evictions, there is a fetch ongoing!
1161        assert_eq!(collect_evicted(&mut store).await, Vec::new());
1162
1163        // Complete the project.
1164        let fetch = fetch.complete(ProjectState::Disabled.into());
1165        assert!(store.complete_fetch(fetch).is_none());
1166        // But start a new fetch asap (after grace period).
1167        tokio::time::advance(Duration::from_millis(5001)).await;
1168        let fetch = store.try_begin_fetch(project_key).unwrap();
1169
1170        // Again, expire the project.
1171        tokio::time::advance(Duration::from_millis(10500)).await;
1172        // No evictions, there is a fetch ongoing!
1173        assert_eq!(collect_evicted(&mut store).await, Vec::new());
1174
1175        // Complete the project.
1176        let fetch = fetch.complete(ProjectState::Disabled.into());
1177        assert!(store.complete_fetch(fetch).is_none());
1178
1179        // Not quite yet expired.
1180        tokio::time::advance(Duration::from_millis(9500)).await;
1181        assert_eq!(collect_evicted(&mut store).await, Vec::new());
1182        // Now it's expired.
1183        tokio::time::advance(Duration::from_millis(501)).await;
1184        assert_eq!(collect_evicted(&mut store).await, vec![project_key]);
1185        assert_state!(store, project_key, ProjectState::Pending);
1186    }
1187
1188    #[tokio::test(start_paused = true)]
1189    async fn test_store_refresh() {
1190        let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1191        let mut store = ProjectStore::new(
1192            &relay_config::Config::from_json_value(serde_json::json!({
1193                "cache": {
1194                    "project_expiry": 5,
1195                    "project_grace_period": 5,
1196                    "project_refresh_interval": 7,
1197                }
1198            }))
1199            .unwrap(),
1200        );
1201
1202        let fetch = store.try_begin_fetch(project_key).unwrap();
1203        let fetch = fetch.complete(ProjectState::Disabled.into());
1204        assert!(store.complete_fetch(fetch).is_none());
1205        assert_state!(store, project_key, ProjectState::Disabled);
1206
1207        // Wait for a refresh.
1208        let Some(Action::Refresh(refresh)) = store.poll().await else {
1209            panic!();
1210        };
1211        assert_eq!(refresh.project_key(), project_key);
1212
1213        let fetch = store.refresh(refresh).unwrap();
1214        // Upgrade the pending refresh fetch to a non-refresh fetch.
1215        assert!(store.try_begin_fetch(project_key).is_none());
1216        let fetch = fetch.complete(ProjectState::Disabled.into());
1217        assert!(store.complete_fetch(fetch).is_none());
1218
1219        // Since the previous refresh has been upgraded to a proper fetch.
1220        // Expiration has been rescheduled and a new refresh is planned to happen in 7 seconds from
1221        // now.
1222        let Some(Action::Refresh(refresh)) = store.poll().await else {
1223            panic!();
1224        };
1225        let fetch = store.refresh(refresh).unwrap();
1226        let fetch = fetch.complete(ProjectState::Disabled.into());
1227        assert!(store.complete_fetch(fetch).is_none());
1228
1229        // At this point the refresh is through, but expiration is around the corner.
1230        // Because the refresh doesn't bump the expiration deadline.
1231        let Some(Action::Eviction(eviction)) = store.poll().await else {
1232            panic!();
1233        };
1234        assert_eq!(eviction.project_key(), project_key);
1235    }
1236
1237    #[tokio::test(start_paused = true)]
1238    async fn test_store_refresh_overtaken_by_eviction() {
1239        let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1240        let mut store = ProjectStore::new(
1241            &relay_config::Config::from_json_value(serde_json::json!({
1242                "cache": {
1243                    "project_expiry": 5,
1244                    "project_grace_period": 5,
1245                    "project_refresh_interval": 7,
1246                }
1247            }))
1248            .unwrap(),
1249        );
1250
1251        let fetch = store.try_begin_fetch(project_key).unwrap();
1252        let fetch = fetch.complete(ProjectState::Disabled.into());
1253        assert!(store.complete_fetch(fetch).is_none());
1254        assert_state!(store, project_key, ProjectState::Disabled);
1255
1256        // Move way past the expiration time.
1257        tokio::time::advance(Duration::from_secs(20)).await;
1258
1259        // The eviction should be prioritized, there is no reason to refresh an already evicted
1260        // project.
1261        let Some(Action::Eviction(eviction)) = store.poll().await else {
1262            panic!();
1263        };
1264        assert_eq!(eviction.project_key(), project_key);
1265        store.evict(eviction);
1266
1267        // Make sure there is not another refresh queued.
1268        // This would not technically be necessary because refresh code must be able to handle
1269        // refreshes for non-fetched projects, but the current implementation should enforce this.
1270        assert!(
1271            tokio::time::timeout(Duration::from_secs(60), store.poll())
1272                .await
1273                .is_err()
1274        );
1275    }
1276
1277    #[tokio::test(start_paused = true)]
1278    async fn test_store_refresh_during_eviction() {
1279        let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1280        let mut store = ProjectStore::new(
1281            &relay_config::Config::from_json_value(serde_json::json!({
1282                "cache": {
1283                    "project_expiry": 5,
1284                    "project_grace_period": 5,
1285                    "project_refresh_interval": 7,
1286                }
1287            }))
1288            .unwrap(),
1289        );
1290
1291        let fetch = store.try_begin_fetch(project_key).unwrap();
1292        let fetch = fetch.complete(ProjectState::Disabled.into());
1293        assert!(store.complete_fetch(fetch).is_none());
1294        assert_state!(store, project_key, ProjectState::Disabled);
1295
1296        // Move way past the expiration time.
1297        tokio::time::advance(Duration::from_secs(20)).await;
1298
1299        // Poll both the eviction and refresh token, while a proper implementation should prevent
1300        // this, it's a good way to test that a refresh for an evicted project does not fetch the
1301        // project.
1302        let Some(Action::Eviction(eviction)) = store.poll().await else {
1303            panic!();
1304        };
1305        let Some(Action::Refresh(refresh)) = store.poll().await else {
1306            panic!();
1307        };
1308        assert_eq!(eviction.project_key(), project_key);
1309        assert_eq!(refresh.project_key(), project_key);
1310        store.evict(eviction);
1311
1312        assert!(store.refresh(refresh).is_none());
1313    }
1314
1315    #[tokio::test(start_paused = true)]
1316    async fn test_ready_state() {
1317        let shared = SharedProjectState::default();
1318
1319        let shared_project = shared.to_shared_project();
1320        assert!(shared_project.project_state().is_pending());
1321        let mut listener = std::pin::pin!(shared_project.outdated());
1322
1323        // After five seconds, project state is still pending:
1324        let result = tokio::time::timeout(Duration::from_secs(5), listener.as_mut()).await;
1325        assert!(result.is_err()); // timed out before notify
1326        assert!(shared.to_shared_project().project_state().is_pending());
1327
1328        // Change the state:
1329        shared.set_project_state(ProjectState::Disabled);
1330
1331        // The listener gets notified immediately:
1332        let result = tokio::time::timeout(Duration::from_secs(1), listener).await;
1333        assert!(result.is_ok()); // notified before timeout
1334
1335        // The old snapshot is still pending:
1336        assert!(shared_project.project_state().is_pending());
1337
1338        // The up-to-date snapshot is Disabled:
1339        assert!(matches!(
1340            shared.to_shared_project().project_state(),
1341            &ProjectState::Disabled
1342        ));
1343    }
1344}