relay_server/services/projects/cache/
state.rs

1use futures::StreamExt;
2use std::fmt;
3use std::sync::Arc;
4use std::time::Duration;
5use tokio::time::Instant;
6
7use arc_swap::ArcSwap;
8use relay_base_schema::project::ProjectKey;
9use relay_quotas::CachedRateLimits;
10use relay_sampling::evaluation::ReservoirCounters;
11use relay_statsd::metric;
12
13use crate::services::projects::project::{ProjectState, Revision};
14use crate::services::projects::source::SourceProjectState;
15use crate::statsd::{RelayHistograms, RelayTimers};
16use crate::utils::{RetryBackoff, UniqueScheduledQueue};
17
18/// The backing storage for a project cache.
19///
20/// Exposes the only interface to delete from [`Shared`], guaranteed by
21/// requiring exclusive/mutable access to [`ProjectStore`].
22///
23/// [`Shared`] can be extended through [`Shared::get_or_create`], in which case
24/// the private state is missing. Users of [`Shared::get_or_create`] *must* trigger
25/// a fetch to create the private state and keep it updated.
26/// This guarantees that eventually the project state is populated, but for a undetermined,
27/// time it is possible that shared state exists without the respective private state.
28pub struct ProjectStore {
29    config: Config,
30    /// The shared state, which can be accessed concurrently.
31    shared: Arc<Shared>,
32    /// The private, mutably exclusive state, used to maintain the project state.
33    private: hashbrown::HashMap<ProjectKey, PrivateProjectState>,
34    /// Scheduled queue tracking all evictions.
35    evictions: UniqueScheduledQueue<ProjectKey>,
36    /// Scheduled queue tracking all refreshes.
37    refreshes: UniqueScheduledQueue<ProjectKey>,
38}
39
40impl ProjectStore {
41    pub fn new(config: &relay_config::Config) -> Self {
42        Self {
43            config: Config::new(config),
44            shared: Default::default(),
45            private: Default::default(),
46            evictions: Default::default(),
47            refreshes: Default::default(),
48        }
49    }
50
51    /// Retrieves a [`Shared`] handle which can be freely shared with multiple consumers.
52    pub fn shared(&self) -> Arc<Shared> {
53        Arc::clone(&self.shared)
54    }
55
56    /// Tries to begin a new fetch for the passed `project_key`.
57    ///
58    /// Returns `None` if no fetch is necessary or there is already a fetch ongoing.
59    /// A returned [`Fetch`] must be scheduled and completed with [`Fetch::complete`] and
60    /// [`Self::complete_fetch`].
61    pub fn try_begin_fetch(&mut self, project_key: ProjectKey) -> Option<Fetch> {
62        self.do_try_begin_fetch(project_key, false)
63    }
64
65    /// Completes a [`CompletedFetch`] started with [`Self::try_begin_fetch`].
66    ///
67    /// Returns a new [`Fetch`] if another fetch must be scheduled. This happens when the fetched
68    /// [`ProjectState`] is still pending or already deemed expired.
69    #[must_use = "an incomplete fetch must be retried"]
70    pub fn complete_fetch(&mut self, fetch: CompletedFetch) -> Option<Fetch> {
71        let project_key = fetch.project_key();
72
73        // Eviction is not possible for projects which are currently being fetched.
74        // Hence if there was a started fetch, the project state must always exist at this stage.
75        debug_assert!(self.shared.projects.pin().get(&project_key).is_some());
76        debug_assert!(self.private.get(&project_key).is_some());
77
78        let mut project = self.get_or_create(project_key);
79        // Schedule another fetch if necessary, usually should only happen if
80        // the completed fetch is pending.
81        let new_fetch = match project.complete_fetch(fetch) {
82            FetchResult::ReSchedule { refresh } => project.try_begin_fetch(refresh),
83            FetchResult::Done { expiry, refresh } => {
84                self.evictions.schedule(expiry.0, project_key);
85                if let Some(RefreshTime(refresh)) = refresh {
86                    self.refreshes.schedule(refresh, project_key);
87                }
88                None
89            }
90        };
91
92        metric!(
93            histogram(RelayHistograms::ProjectStateCacheSize) = self.shared.projects.len() as u64,
94            storage = "shared"
95        );
96        metric!(
97            histogram(RelayHistograms::ProjectStateCacheSize) = self.private.len() as u64,
98            storage = "private"
99        );
100
101        new_fetch
102    }
103
104    /// Waits for the next scheduled action.
105    ///
106    /// The returned [`Action`] must be immediately turned in using the corresponding handlers,
107    /// [`Self::evict`] or [`Self::refresh`].
108    ///
109    /// The returned future is cancellation safe.
110    pub async fn poll(&mut self) -> Option<Action> {
111        let eviction = self.evictions.next();
112        let refresh = self.refreshes.next();
113
114        tokio::select! {
115            biased;
116
117            Some(e) = eviction => Some(Action::Eviction(Eviction(e))),
118            Some(r) = refresh => Some(Action::Refresh(Refresh(r))),
119            else => None,
120        }
121    }
122
123    /// Refreshes a project using an [`Refresh`] token returned from [`Self::poll`].
124    ///
125    /// Like [`Self::try_begin_fetch`], this returns a [`Fetch`], if there was no fetch
126    /// already started in the meantime.
127    ///
128    /// A returned [`Fetch`] must be scheduled and completed with [`Fetch::complete`] and
129    /// [`Self::complete_fetch`].
130    pub fn refresh(&mut self, Refresh(project_key): Refresh) -> Option<Fetch> {
131        self.do_try_begin_fetch(project_key, true)
132    }
133
134    /// Evicts a project using an [`Eviction`] token returned from [`Self::poll`].
135    pub fn evict(&mut self, Eviction(project_key): Eviction) {
136        // Remove the private part.
137        let Some(private) = self.private.remove(&project_key) else {
138            // Not possible if all invariants are upheld.
139            debug_assert!(false, "no private state for eviction");
140            return;
141        };
142
143        debug_assert!(
144            matches!(private.state, FetchState::Complete { .. }),
145            "private state must be completed"
146        );
147
148        // Remove the shared part.
149        let shared = self.shared.projects.pin();
150        let _removed = shared.remove(&project_key);
151        debug_assert!(
152            _removed.is_some(),
153            "an expired project must exist in the shared state"
154        );
155
156        // Cancel next refresh, while not necessary (trying to refresh a project which does not
157        // exist, will do nothing), but we can also spare us the extra work.
158        self.refreshes.remove(&project_key);
159    }
160
161    /// Internal handler to begin a new fetch for the passed `project_key`, which can also handle
162    /// refreshes.
163    fn do_try_begin_fetch(&mut self, project_key: ProjectKey, is_refresh: bool) -> Option<Fetch> {
164        let fetch = match is_refresh {
165            // A rogue refresh does not need to trigger an actual fetch.
166            // In practice this should never happen, as the refresh time is validated against
167            // the eviction time.
168            // But it may happen due to a race of the eviction and refresh (e.g. when setting them
169            // to close to the same value), in which case we don't want to re-populate the cache.
170            true => self.get(project_key)?,
171            false => self.get_or_create(project_key),
172        }
173        .try_begin_fetch(is_refresh);
174
175        // If there is a new fetch, remove the pending eviction, it will be re-scheduled once the
176        // fetch is completed.
177        if fetch.is_some() {
178            self.evictions.remove(&project_key);
179            // There is no need to clear the refresh here, if it triggers while a fetch is ongoing,
180            // it is simply discarded.
181        }
182
183        fetch
184    }
185
186    /// Get a reference to the current project or create a new project.
187    ///
188    /// For internal use only, a created project must always be fetched immediately.
189    fn get(&mut self, project_key: ProjectKey) -> Option<ProjectRef<'_>> {
190        let private = self.private.get_mut(&project_key)?;
191
192        // Same invariant as in `get_or_create`, we have exclusive access to the private
193        // project here, there must be a shared project if there is a private project.
194        debug_assert!(self.shared.projects.pin().contains_key(&project_key));
195
196        let shared = self
197            .shared
198            .projects
199            .pin()
200            .get_or_insert_with(project_key, Default::default)
201            .clone();
202
203        Some(ProjectRef {
204            private,
205            shared,
206            config: &self.config,
207        })
208    }
209
210    /// Get a reference to the current project or create a new project.
211    ///
212    /// For internal use only, a created project must always be fetched immediately.
213    fn get_or_create(&mut self, project_key: ProjectKey) -> ProjectRef<'_> {
214        #[cfg(debug_assertions)]
215        if self.private.contains_key(&project_key) {
216            // We have exclusive access to the private part, there are no concurrent deletions
217            // hence if we have a private state there must always be a shared state as well.
218            //
219            // The opposite is not true, the shared state may have been created concurrently
220            // through the shared access.
221            debug_assert!(self.shared.projects.pin().contains_key(&project_key));
222        }
223
224        let private = self
225            .private
226            .entry(project_key)
227            .or_insert_with(|| PrivateProjectState::new(project_key, &self.config));
228
229        let shared = self
230            .shared
231            .projects
232            .pin()
233            .get_or_insert_with(project_key, Default::default)
234            .clone();
235
236        ProjectRef {
237            private,
238            shared,
239            config: &self.config,
240        }
241    }
242}
243
244/// Configuration for a [`ProjectStore`].
245struct Config {
246    /// Expiry timeout for individual project configs.
247    ///
248    /// Note: the total expiry is the sum of the expiry and grace period.
249    expiry: Duration,
250    /// Grace period for a project config.
251    ///
252    /// A project config is considered stale and will be updated asynchronously,
253    /// after reaching the grace period.
254    grace_period: Duration,
255    /// Refresh interval for a single project.
256    ///
257    /// A project will be asynchronously refreshed repeatedly using this interval.
258    ///
259    /// The refresh interval is validated to be between expiration and grace period. An invalid refresh
260    /// time is ignored.
261    refresh_interval: Option<Duration>,
262    /// Maximum backoff for continuously failing project updates.
263    max_retry_backoff: Duration,
264}
265
266impl Config {
267    fn new(config: &relay_config::Config) -> Self {
268        let expiry = config.project_cache_expiry();
269        let grace_period = config.project_grace_period();
270
271        // Make sure the refresh time is:
272        // - at least the expiration, refreshing a non-stale project makes no sense.
273        // - at most the end of the grace period, refreshing an expired project also makes no sense.
274        let refresh_interval = config
275            .project_refresh_interval()
276            .filter(|rt| *rt < (expiry + grace_period))
277            .filter(|rt| *rt > expiry);
278
279        Self {
280            expiry: config.project_cache_expiry(),
281            grace_period: config.project_grace_period(),
282            refresh_interval,
283            max_retry_backoff: config.http_max_retry_interval(),
284        }
285    }
286}
287
288/// The shared and concurrently accessible handle to the project cache.
289#[derive(Default)]
290pub struct Shared {
291    projects: papaya::HashMap<ProjectKey, SharedProjectState, ahash::RandomState>,
292}
293
294impl Shared {
295    /// Returns the existing project state or creates a new one.
296    ///
297    /// The caller must ensure that the project cache is instructed to
298    /// [`super::ProjectCache::Fetch`] the retrieved project.
299    pub fn get_or_create(&self, project_key: ProjectKey) -> SharedProject {
300        // The fast path, we expect the project to exist.
301        let projects = self.projects.pin();
302        if let Some(project) = projects.get(&project_key) {
303            return project.to_shared_project();
304        }
305
306        // The slow path, try to attempt to insert, somebody else may have been faster, but that's okay.
307        match projects.try_insert(project_key, Default::default()) {
308            Ok(inserted) => inserted.to_shared_project(),
309            Err(occupied) => occupied.current.to_shared_project(),
310        }
311    }
312}
313
314/// TEST ONLY bypass to make the project cache mockable.
315#[cfg(test)]
316impl Shared {
317    /// Updates the project state for a project.
318    ///
319    /// TEST ONLY!
320    pub fn test_set_project_state(&self, project_key: ProjectKey, state: ProjectState) {
321        self.projects
322            .pin()
323            .get_or_insert_with(project_key, Default::default)
324            .set_project_state(state);
325    }
326
327    /// Returns `true` if there exists a shared state for the passed `project_key`.
328    pub fn test_has_project_created(&self, project_key: ProjectKey) -> bool {
329        self.projects.pin().contains_key(&project_key)
330    }
331}
332
333impl fmt::Debug for Shared {
334    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
335        f.debug_struct("Shared")
336            .field("num_projects", &self.projects.len())
337            .finish()
338    }
339}
340
341/// A single project from the [`Shared`] project cache.
342pub struct SharedProject(Arc<SharedProjectStateInner>);
343
344impl SharedProject {
345    /// Returns a reference to the contained [`ProjectState`].
346    pub fn project_state(&self) -> &ProjectState {
347        &self.0.state
348    }
349
350    /// Returns a reference to the contained [`CachedRateLimits`].
351    pub fn cached_rate_limits(&self) -> &CachedRateLimits {
352        // Exposing cached rate limits may be a bad idea, this allows mutation
353        // and caching of rate limits for pending projects, which may or may not be fine.
354        // Although, for now this is fine.
355        //
356        // Read only access is easily achievable if we return only the current rate limits.
357        &self.0.rate_limits
358    }
359
360    /// Returns a reference to the contained [`ReservoirCounters`].
361    pub fn reservoir_counters(&self) -> &ReservoirCounters {
362        &self.0.reservoir_counters
363    }
364}
365
366/// TEST ONLY bypass to make the project cache mockable.
367#[cfg(test)]
368impl SharedProject {
369    /// Creates a new [`SharedProject`] for testing only.
370    pub fn for_test(state: ProjectState) -> Self {
371        Self(Arc::new(SharedProjectStateInner {
372            state,
373            ..Default::default()
374        }))
375    }
376}
377
378/// Reference to a full project wrapping shared and private state.
379struct ProjectRef<'a> {
380    shared: SharedProjectState,
381    private: &'a mut PrivateProjectState,
382    config: &'a Config,
383}
384
385impl ProjectRef<'_> {
386    fn try_begin_fetch(&mut self, is_refresh: bool) -> Option<Fetch> {
387        let now = Instant::now();
388        self.private
389            .try_begin_fetch(now, is_refresh, self.config)
390            .map(|fetch| fetch.with_revision(self.shared.revision()))
391    }
392
393    fn complete_fetch(&mut self, fetch: CompletedFetch) -> FetchResult {
394        let now = Instant::now();
395
396        if let Some(latency) = fetch.latency() {
397            let delay = match fetch.delay() {
398                Some(delay) if delay.as_secs() <= 15 => "lte15s",
399                Some(delay) if delay.as_secs() <= 30 => "lte30s",
400                Some(delay) if delay.as_secs() <= 60 => "lte60s",
401                Some(delay) if delay.as_secs() <= 120 => "lte120",
402                Some(delay) if delay.as_secs() <= 300 => "lte300s",
403                Some(delay) if delay.as_secs() <= 600 => "lte600s",
404                Some(delay) if delay.as_secs() <= 1800 => "lte1800s",
405                Some(delay) if delay.as_secs() <= 3600 => "lte3600s",
406                Some(_) => "gt3600s",
407                None => "none",
408            };
409            metric!(
410                timer(RelayTimers::ProjectCacheUpdateLatency) = latency,
411                delay = delay
412            );
413        }
414
415        if !fetch.is_pending() {
416            let state = match fetch.state {
417                SourceProjectState::New(_) => "new",
418                SourceProjectState::NotModified => "not_modified",
419            };
420
421            metric!(
422                timer(RelayTimers::ProjectCacheFetchDuration) = fetch.duration(now),
423                state = state
424            );
425        }
426
427        // Update private and shared state with the new data.
428        let result = self.private.complete_fetch(&fetch, now, self.config);
429        match fetch.state {
430            // Keep the old state around if the current fetch is pending.
431            // It may still be useful to callers.
432            SourceProjectState::New(state) if !state.is_pending() => {
433                self.shared.set_project_state(state);
434            }
435            _ => {}
436        }
437
438        result
439    }
440}
441
442pub enum Action {
443    Eviction(Eviction),
444    Refresh(Refresh),
445}
446
447/// A [`Refresh`] token.
448///
449/// The token must be turned in using [`ProjectStore::refresh`].
450#[derive(Debug)]
451#[must_use = "a refresh must be used"]
452pub struct Refresh(ProjectKey);
453
454impl Refresh {
455    /// Returns the [`ProjectKey`] of the project that needs to be refreshed.
456    pub fn project_key(&self) -> ProjectKey {
457        self.0
458    }
459}
460
461/// A [`Eviction`] token.
462///
463/// The token must be turned in using [`ProjectStore::evict`].
464#[derive(Debug)]
465#[must_use = "an eviction must be used"]
466pub struct Eviction(ProjectKey);
467
468impl Eviction {
469    /// Returns the [`ProjectKey`] of the project that needs to be evicted.
470    pub fn project_key(&self) -> ProjectKey {
471        self.0
472    }
473}
474
475/// A [`Fetch`] token.
476///
477/// When returned it must be executed and completed using [`Self::complete`].
478#[must_use = "a fetch must be executed"]
479#[derive(Debug)]
480pub struct Fetch {
481    project_key: ProjectKey,
482    previous_fetch: Option<Instant>,
483    initiated: Instant,
484    when: Option<Instant>,
485    revision: Revision,
486}
487
488impl Fetch {
489    /// Returns the [`ProjectKey`] of the project to fetch.
490    pub fn project_key(&self) -> ProjectKey {
491        self.project_key
492    }
493
494    /// Returns when the fetch for the project should be scheduled.
495    ///
496    /// This can be now (as soon as possible, indicated by `None`) or a later point in time,
497    /// if the project is currently in a backoff.
498    pub fn when(&self) -> Option<Instant> {
499        self.when
500    }
501
502    /// Returns the revisions of the currently cached project.
503    ///
504    /// If the upstream indicates it does not have a different version of this project
505    /// we do not need to update the local state.
506    pub fn revision(&self) -> Revision {
507        self.revision.clone()
508    }
509
510    /// Completes the fetch with a result and returns a [`CompletedFetch`].
511    pub fn complete(self, state: SourceProjectState) -> CompletedFetch {
512        CompletedFetch { fetch: self, state }
513    }
514
515    fn with_revision(mut self, revision: Revision) -> Self {
516        self.revision = revision;
517        self
518    }
519}
520
521/// The result of an executed [`Fetch`].
522#[must_use = "a completed fetch must be acted upon"]
523#[derive(Debug)]
524pub struct CompletedFetch {
525    fetch: Fetch,
526    state: SourceProjectState,
527}
528
529impl CompletedFetch {
530    /// Returns the [`ProjectKey`] of the project which was fetched.
531    pub fn project_key(&self) -> ProjectKey {
532        self.fetch.project_key()
533    }
534
535    /// Returns the amount of time passed between the last successful fetch for this project and the start of this fetch.
536    ///
537    /// `None` if this is the first fetch.
538    fn delay(&self) -> Option<Duration> {
539        self.fetch
540            .previous_fetch
541            .map(|pf| self.fetch.initiated.duration_since(pf))
542    }
543
544    /// Returns the duration between first initiating the fetch and `now`.
545    fn duration(&self, now: Instant) -> Duration {
546        now.duration_since(self.fetch.initiated)
547    }
548
549    /// Returns the update latency of the fetched project config from the upstream.
550    ///
551    /// Is `None`, when no project config could be fetched, or if this was the first
552    /// fetch of a project config.
553    ///
554    /// Note: this latency is computed on access, it does not use the time when the [`Fetch`]
555    /// was marked as (completed)[`Fetch::complete`].
556    fn latency(&self) -> Option<Duration> {
557        // We're not interested in initial fetches. The latency on the first fetch
558        // has no meaning about how long it takes for an updated project config to be
559        // propagated to a Relay.
560        let is_first_fetch = self.fetch.revision().as_str().is_none();
561        if is_first_fetch {
562            return None;
563        }
564
565        let project_info = match &self.state {
566            SourceProjectState::New(ProjectState::Enabled(project_info)) => project_info,
567            // Not modified or deleted/disabled -> no latency to track.
568            //
569            // Currently we discard the last changed timestamp for disabled projects,
570            // it would be possible to do so and then also expose a latency for disabled projects.
571            _ => return None,
572        };
573
574        // A matching revision is not an update.
575        if project_info.rev == self.fetch.revision {
576            return None;
577        }
578
579        let elapsed = chrono::Utc::now() - project_info.last_change?;
580        elapsed.to_std().ok()
581    }
582
583    /// Returns `true` if the fetch completed with a pending status.
584    fn is_pending(&self) -> bool {
585        match &self.state {
586            SourceProjectState::New(state) => state.is_pending(),
587            SourceProjectState::NotModified => false,
588        }
589    }
590}
591
592/// The state of a project contained in the [`Shared`] project cache.
593///
594/// This state is interior mutable and allows updates to the project.
595#[derive(Debug, Default, Clone)]
596struct SharedProjectState(Arc<ArcSwap<SharedProjectStateInner>>);
597
598impl SharedProjectState {
599    /// Updates the project state.
600    fn set_project_state(&self, state: ProjectState) {
601        let prev = self.0.rcu(|stored| SharedProjectStateInner {
602            state: state.clone(),
603            rate_limits: Arc::clone(&stored.rate_limits),
604            reservoir_counters: Arc::clone(&stored.reservoir_counters),
605        });
606
607        // Try clean expired reservoir counters.
608        //
609        // We do it after the `rcu`, to not re-run this more often than necessary.
610        if let Some(state) = state.enabled() {
611            let config = state.config.sampling.as_ref();
612            if let Some(config) = config.and_then(|eb| eb.as_ref().ok()) {
613                // We can safely use previous here, the `rcu` just replaced the state, the
614                // reservoir counters did not change.
615                //
616                // `try_lock` to not potentially block, it's a best effort cleanup.
617                if let Ok(mut counters) = prev.reservoir_counters.try_lock() {
618                    counters.retain(|key, _| config.rules.iter().any(|rule| rule.id == *key));
619                }
620            }
621        }
622    }
623
624    /// Extracts and clones the revision from the contained project state.
625    fn revision(&self) -> Revision {
626        self.0.as_ref().load().state.revision().clone()
627    }
628
629    /// Transforms this interior mutable handle to an immutable [`SharedProject`].
630    fn to_shared_project(&self) -> SharedProject {
631        SharedProject(self.0.as_ref().load_full())
632    }
633}
634
635/// The data contained in a [`SharedProjectState`].
636///
637/// All fields must be cheap to clone and are ideally just a single `Arc`.
638/// Partial updates to [`SharedProjectState`], are performed using `rcu` cloning all fields.
639#[derive(Debug, Default)]
640struct SharedProjectStateInner {
641    state: ProjectState,
642    rate_limits: Arc<CachedRateLimits>,
643    reservoir_counters: ReservoirCounters,
644}
645
646/// Current fetch state for a project.
647///
648/// ─────► Pending ◄─────┐
649///           │          │
650///           │          │Backoff
651///           ▼          │
652/// ┌───► InProgress ────┘
653/// │         │
654/// │         │
655/// │         ▼
656/// └───── Complete
657#[derive(Debug)]
658enum FetchState {
659    /// There is a fetch currently in progress.
660    InProgress {
661        /// Whether the current check in progress was triggered from a refresh.
662        ///
663        /// Triggering a non-refresh fetch while a refresh fetch is currently in progress,
664        /// will overwrite this property.
665        is_refresh: bool,
666    },
667    /// A successful fetch is pending.
668    ///
669    /// Projects which have not yet been fetched are in the pending state,
670    /// as well as projects which have a fetch in progress but were notified
671    /// from upstream that the project config is still pending.
672    ///
673    /// If the upstream notifies this instance about a pending config,
674    /// a backoff is applied, before trying again.
675    Pending {
676        /// Instant when the fetch was first initiated.
677        ///
678        /// A state may be transitioned multiple times from [`Self::Pending`] to [`Self::InProgress`]
679        /// and back to [`Self::Pending`]. This timestamp is the first time when the state
680        /// was transitioned from [`Self::Complete`] to [`Self::InProgress`].
681        ///
682        /// Only `None` on first fetch.
683        initiated: Option<Instant>,
684        /// Time when the next fetch should be attempted.
685        ///
686        /// `None` means soon as possible.
687        next_fetch_attempt: Option<Instant>,
688    },
689    /// There was a successful non-pending fetch.
690    Complete {
691        /// Time when the fetch was completed.
692        when: LastFetch,
693    },
694}
695
696/// Contains all mutable state necessary to maintain the project cache.
697struct PrivateProjectState {
698    /// Project key this state belongs to.
699    project_key: ProjectKey,
700
701    /// The current fetch state.
702    state: FetchState,
703    /// The current backoff used for calculating the next fetch attempt.
704    ///
705    /// The backoff is reset after a successful, non-pending fetch.
706    backoff: RetryBackoff,
707
708    /// The last time the state was successfully fetched.
709    ///
710    /// May be `None` when the state has never been successfully fetched.
711    ///
712    /// This is purely informational, all necessary information to make
713    /// state transitions is contained in [`FetchState`].
714    last_fetch: Option<Instant>,
715
716    /// The expiry time of this project.
717    ///
718    /// A refresh of the project, will not push the expiration time.
719    expiry: Option<Instant>,
720}
721
722impl PrivateProjectState {
723    fn new(project_key: ProjectKey, config: &Config) -> Self {
724        Self {
725            project_key,
726            state: FetchState::Pending {
727                initiated: None,
728                next_fetch_attempt: None,
729            },
730            backoff: RetryBackoff::new(config.max_retry_backoff),
731            last_fetch: None,
732            expiry: None,
733        }
734    }
735
736    fn try_begin_fetch(
737        &mut self,
738        now: Instant,
739        is_refresh: bool,
740        config: &Config,
741    ) -> Option<Fetch> {
742        let (initiated, when) = match &mut self.state {
743            FetchState::InProgress {
744                is_refresh: refresh_in_progress,
745            } => {
746                relay_log::trace!(
747                    tags.project_key = self.project_key.as_str(),
748                    "project fetch skipped, fetch in progress"
749                );
750                // Upgrade the refresh status if necessary.
751                *refresh_in_progress = *refresh_in_progress && is_refresh;
752                return None;
753            }
754            FetchState::Pending {
755                initiated,
756                next_fetch_attempt,
757            } => {
758                // Schedule a new fetch, even if there is a backoff, it will just be sleeping for a while.
759                (initiated.unwrap_or(now), *next_fetch_attempt)
760            }
761            FetchState::Complete { when } => {
762                // Sanity check to make sure timestamps do not drift.
763                debug_assert_eq!(Some(when.0), self.last_fetch);
764
765                if when.check_expiry(now, config).is_fresh() {
766                    // The current state is up to date, no need to start another fetch.
767                    relay_log::trace!(
768                        tags.project_key = self.project_key.as_str(),
769                        "project fetch skipped, already up to date"
770                    );
771                    return None;
772                }
773
774                (now, None)
775            }
776        };
777
778        // Mark a current fetch in progress.
779        self.state = FetchState::InProgress { is_refresh };
780
781        relay_log::trace!(
782            tags.project_key = &self.project_key.as_str(),
783            attempts = self.backoff.attempt() + 1,
784            "project state {} scheduled in {:?}",
785            if is_refresh { "refresh" } else { "fetch" },
786            when.unwrap_or(now).saturating_duration_since(now),
787        );
788
789        Some(Fetch {
790            project_key: self.project_key,
791            previous_fetch: self.last_fetch,
792            initiated,
793            when,
794            revision: Revision::default(),
795        })
796    }
797
798    fn complete_fetch(
799        &mut self,
800        fetch: &CompletedFetch,
801        now: Instant,
802        config: &Config,
803    ) -> FetchResult {
804        let FetchState::InProgress { is_refresh } = self.state else {
805            debug_assert!(
806                false,
807                "fetch completed while there was no current fetch registered"
808            );
809            // Be conservative in production.
810            return FetchResult::ReSchedule { refresh: false };
811        };
812
813        if fetch.is_pending() {
814            let next_backoff = self.backoff.next_backoff();
815            let next_fetch_attempt = match next_backoff.is_zero() {
816                false => now.checked_add(next_backoff),
817                true => None,
818            };
819            self.state = FetchState::Pending {
820                next_fetch_attempt,
821                initiated: Some(fetch.fetch.initiated),
822            };
823            relay_log::trace!(
824                tags.project_key = &self.project_key.as_str(),
825                "project state {} completed but still pending",
826                if is_refresh { "refresh" } else { "fetch" },
827            );
828
829            FetchResult::ReSchedule {
830                refresh: is_refresh,
831            }
832        } else {
833            relay_log::trace!(
834                tags.project_key = &self.project_key.as_str(),
835                "project state {} completed with non-pending config",
836                if is_refresh { "refresh" } else { "fetch" },
837            );
838
839            self.backoff.reset();
840            self.last_fetch = Some(now);
841
842            let when = LastFetch(now);
843
844            let refresh = when.refresh_time(config);
845            let expiry = match self.expiry {
846                Some(expiry) if is_refresh => ExpiryTime(expiry),
847                // Only bump/re-compute the expiry time if the fetch was not a refresh,
848                // to not keep refreshed projects forever in the cache.
849                Some(_) | None => when.expiry_time(config),
850            };
851            self.expiry = Some(expiry.0);
852
853            self.state = FetchState::Complete { when };
854            FetchResult::Done { expiry, refresh }
855        }
856    }
857}
858
859/// Result returned when completing a fetch.
860#[derive(Debug)]
861#[must_use = "fetch result must be used"]
862enum FetchResult {
863    /// Another fetch must be scheduled immediately.
864    ReSchedule {
865        /// Whether the fetch should be re-scheduled as a refresh.
866        refresh: bool,
867    },
868    /// The fetch is completed and should be registered for refresh and eviction.
869    Done {
870        /// When the project should be expired.
871        expiry: ExpiryTime,
872        /// When the project should be refreshed.
873        refresh: Option<RefreshTime>,
874    },
875}
876
877/// New type containing the last successful fetch time as an [`Instant`].
878#[derive(Debug, Copy, Clone)]
879struct LastFetch(Instant);
880
881impl LastFetch {
882    /// Returns the [`Expiry`] of the last fetch in relation to `now`.
883    fn check_expiry(&self, now: Instant, config: &Config) -> Expiry {
884        let elapsed = now.saturating_duration_since(self.0);
885
886        if elapsed >= config.expiry + config.grace_period {
887            Expiry::Expired
888        } else if elapsed >= config.expiry {
889            Expiry::Stale
890        } else {
891            Expiry::Fresh
892        }
893    }
894
895    /// Returns when the project needs to be queued for a refresh.
896    fn refresh_time(&self, config: &Config) -> Option<RefreshTime> {
897        config
898            .refresh_interval
899            .map(|duration| self.0 + duration)
900            .map(RefreshTime)
901    }
902
903    /// Returns when the project is based to expire based on the current [`LastFetch`].
904    fn expiry_time(&self, config: &Config) -> ExpiryTime {
905        ExpiryTime(self.0 + config.grace_period + config.expiry)
906    }
907}
908
909/// Expiry state of a project.
910#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
911enum Expiry {
912    /// The project state is perfectly up to date.
913    Fresh,
914    /// The project state is outdated but events depending on this project state can still be
915    /// processed. The state should be refreshed in the background though.
916    Stale,
917    /// The project state is completely outdated and events need to be buffered up until the new
918    /// state has been fetched.
919    Expired,
920}
921
922impl Expiry {
923    /// Returns `true` if the project is up-to-date and does not need to be fetched.
924    fn is_fresh(&self) -> bool {
925        matches!(self, Self::Fresh)
926    }
927}
928
929/// Instant when a project is scheduled for refresh.
930#[derive(Debug)]
931#[must_use = "an refresh time must be used to schedule a refresh"]
932struct RefreshTime(Instant);
933
934/// Instant when a project is scheduled for expiry.
935#[derive(Debug)]
936#[must_use = "an expiry time must be used to schedule an eviction"]
937struct ExpiryTime(Instant);
938
939#[cfg(test)]
940mod tests {
941    use std::time::Duration;
942
943    use super::*;
944
945    async fn collect_evicted(store: &mut ProjectStore) -> Vec<ProjectKey> {
946        let mut evicted = Vec::new();
947        // Small timeout to really only get what is ready to be evicted right now.
948        while let Ok(Some(Action::Eviction(eviction))) =
949            tokio::time::timeout(Duration::from_nanos(5), store.poll()).await
950        {
951            evicted.push(eviction.0);
952            store.evict(eviction);
953        }
954        evicted
955    }
956
957    macro_rules! assert_state {
958        ($store:ident, $project_key:ident, $state:pat) => {
959            assert!(matches!(
960                $store.shared().get_or_create($project_key).project_state(),
961                $state
962            ));
963        };
964    }
965
966    #[tokio::test(start_paused = true)]
967    async fn test_store_fetch() {
968        let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
969        let mut store = ProjectStore::new(&Default::default());
970
971        let fetch = store.try_begin_fetch(project_key).unwrap();
972        assert_eq!(fetch.project_key(), project_key);
973        assert_eq!(fetch.when(), None);
974        assert_eq!(fetch.revision().as_str(), None);
975        assert_state!(store, project_key, ProjectState::Pending);
976
977        // Fetch already in progress, nothing to do.
978        assert!(store.try_begin_fetch(project_key).is_none());
979
980        // A pending fetch should trigger a new fetch immediately.
981        let fetch = fetch.complete(ProjectState::Pending.into());
982        let fetch = store.complete_fetch(fetch).unwrap();
983        assert_eq!(fetch.project_key(), project_key);
984        // First backoff is still immediately.
985        assert_eq!(fetch.when(), None);
986        assert_eq!(fetch.revision().as_str(), None);
987        assert_state!(store, project_key, ProjectState::Pending);
988
989        // Pending again.
990        let fetch = fetch.complete(ProjectState::Pending.into());
991        let fetch = store.complete_fetch(fetch).unwrap();
992        assert_eq!(fetch.project_key(), project_key);
993        // This time it needs to be in the future (backoff).
994        assert!(fetch.when() > Some(Instant::now()));
995        assert_eq!(fetch.revision().as_str(), None);
996        assert_state!(store, project_key, ProjectState::Pending);
997
998        // Now complete with disabled.
999        let fetch = fetch.complete(ProjectState::Disabled.into());
1000        assert!(store.complete_fetch(fetch).is_none());
1001        assert_state!(store, project_key, ProjectState::Disabled);
1002
1003        // A new fetch is not yet necessary.
1004        assert!(store.try_begin_fetch(project_key).is_none());
1005    }
1006
1007    #[tokio::test(start_paused = true)]
1008    async fn test_store_fetch_pending_does_not_replace_state() {
1009        let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1010        let mut store = ProjectStore::new(
1011            &relay_config::Config::from_json_value(serde_json::json!({
1012                "cache": {
1013                    "project_expiry": 5,
1014                    "project_grace_period": 5,
1015                }
1016            }))
1017            .unwrap(),
1018        );
1019
1020        let fetch = store.try_begin_fetch(project_key).unwrap();
1021        let fetch = fetch.complete(ProjectState::Disabled.into());
1022        assert!(store.complete_fetch(fetch).is_none());
1023        assert_state!(store, project_key, ProjectState::Disabled);
1024
1025        tokio::time::advance(Duration::from_secs(6)).await;
1026
1027        let fetch = store.try_begin_fetch(project_key).unwrap();
1028        let fetch = fetch.complete(ProjectState::Pending.into());
1029        // We're returned a new fetch, because the current one completed pending.
1030        let fetch = store.complete_fetch(fetch).unwrap();
1031        // The old cached state is still available and not replaced.
1032        assert_state!(store, project_key, ProjectState::Disabled);
1033
1034        let fetch = fetch.complete(ProjectState::new_allowed().into());
1035        assert!(store.complete_fetch(fetch).is_none());
1036        assert_state!(store, project_key, ProjectState::Enabled(_));
1037    }
1038
1039    #[tokio::test(start_paused = true)]
1040    async fn test_store_evict_projects() {
1041        let project_key1 = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1042        let project_key2 = ProjectKey::parse("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb").unwrap();
1043        let mut store = ProjectStore::new(
1044            &relay_config::Config::from_json_value(serde_json::json!({
1045                "cache": {
1046                    "project_expiry": 5,
1047                    "project_grace_period": 0,
1048                }
1049            }))
1050            .unwrap(),
1051        );
1052
1053        let fetch = store.try_begin_fetch(project_key1).unwrap();
1054        let fetch = fetch.complete(ProjectState::Disabled.into());
1055        assert!(store.complete_fetch(fetch).is_none());
1056
1057        assert_eq!(collect_evicted(&mut store).await, Vec::new());
1058        assert_state!(store, project_key1, ProjectState::Disabled);
1059
1060        // 3 seconds is not enough to expire any project.
1061        tokio::time::advance(Duration::from_secs(3)).await;
1062
1063        assert_eq!(collect_evicted(&mut store).await, Vec::new());
1064        assert_state!(store, project_key1, ProjectState::Disabled);
1065
1066        let fetch = store.try_begin_fetch(project_key2).unwrap();
1067        let fetch = fetch.complete(ProjectState::Disabled.into());
1068        assert!(store.complete_fetch(fetch).is_none());
1069
1070        // A total of 6 seconds should expire the first project.
1071        tokio::time::advance(Duration::from_secs(3)).await;
1072
1073        assert_eq!(collect_evicted(&mut store).await, vec![project_key1]);
1074        assert_state!(store, project_key1, ProjectState::Pending);
1075        assert_state!(store, project_key2, ProjectState::Disabled);
1076    }
1077
1078    #[tokio::test(start_paused = true)]
1079    async fn test_store_evict_projects_pending_not_expired() {
1080        let project_key1 = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1081        let project_key2 = ProjectKey::parse("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb").unwrap();
1082        let mut store = ProjectStore::new(
1083            &relay_config::Config::from_json_value(serde_json::json!({
1084                "cache": {
1085                    "project_expiry": 5,
1086                    "project_grace_period": 0,
1087                }
1088            }))
1089            .unwrap(),
1090        );
1091
1092        let fetch = store.try_begin_fetch(project_key1).unwrap();
1093        // Create a new project in a pending state, but never fetch it, this should also never expire.
1094        store.shared().get_or_create(project_key2);
1095
1096        tokio::time::advance(Duration::from_secs(6)).await;
1097
1098        // No evictions, project is pending.
1099        assert_eq!(collect_evicted(&mut store).await, Vec::new());
1100
1101        // Complete the project.
1102        let fetch = fetch.complete(ProjectState::Disabled.into());
1103        assert!(store.complete_fetch(fetch).is_none());
1104
1105        // Still should not be evicted, because we do have 5 seconds to expire since completion.
1106        assert_eq!(collect_evicted(&mut store).await, Vec::new());
1107        tokio::time::advance(Duration::from_secs(4)).await;
1108        assert_eq!(collect_evicted(&mut store).await, Vec::new());
1109        assert_state!(store, project_key1, ProjectState::Disabled);
1110
1111        // Just enough to expire the project.
1112        tokio::time::advance(Duration::from_millis(1001)).await;
1113        assert_eq!(collect_evicted(&mut store).await, vec![project_key1]);
1114        assert_state!(store, project_key1, ProjectState::Pending);
1115        assert_state!(store, project_key2, ProjectState::Pending);
1116    }
1117
1118    #[tokio::test(start_paused = true)]
1119    async fn test_store_evict_projects_stale() {
1120        let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1121        let mut store = ProjectStore::new(
1122            &relay_config::Config::from_json_value(serde_json::json!({
1123                "cache": {
1124                    "project_expiry": 5,
1125                    "project_grace_period": 5,
1126                }
1127            }))
1128            .unwrap(),
1129        );
1130
1131        let fetch = store.try_begin_fetch(project_key).unwrap();
1132        let fetch = fetch.complete(ProjectState::Disabled.into());
1133        assert!(store.complete_fetch(fetch).is_none());
1134
1135        // This is in the grace period, but not yet expired.
1136        tokio::time::advance(Duration::from_millis(9500)).await;
1137
1138        assert_eq!(collect_evicted(&mut store).await, Vec::new());
1139        assert_state!(store, project_key, ProjectState::Disabled);
1140
1141        // Now it's expired.
1142        tokio::time::advance(Duration::from_secs(1)).await;
1143
1144        assert_eq!(collect_evicted(&mut store).await, vec![project_key]);
1145        assert_state!(store, project_key, ProjectState::Pending);
1146    }
1147
1148    #[tokio::test(start_paused = true)]
1149    async fn test_store_no_eviction_during_fetch() {
1150        let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1151        let mut store = ProjectStore::new(
1152            &relay_config::Config::from_json_value(serde_json::json!({
1153                "cache": {
1154                    "project_expiry": 5,
1155                    "project_grace_period": 5,
1156                }
1157            }))
1158            .unwrap(),
1159        );
1160
1161        let fetch = store.try_begin_fetch(project_key).unwrap();
1162
1163        // Project is expired, but there is an ongoing fetch.
1164        tokio::time::advance(Duration::from_millis(10500)).await;
1165        // No evictions, there is a fetch ongoing!
1166        assert_eq!(collect_evicted(&mut store).await, Vec::new());
1167
1168        // Complete the project.
1169        let fetch = fetch.complete(ProjectState::Disabled.into());
1170        assert!(store.complete_fetch(fetch).is_none());
1171        // But start a new fetch asap (after grace period).
1172        tokio::time::advance(Duration::from_millis(5001)).await;
1173        let fetch = store.try_begin_fetch(project_key).unwrap();
1174
1175        // Again, expire the project.
1176        tokio::time::advance(Duration::from_millis(10500)).await;
1177        // No evictions, there is a fetch ongoing!
1178        assert_eq!(collect_evicted(&mut store).await, Vec::new());
1179
1180        // Complete the project.
1181        let fetch = fetch.complete(ProjectState::Disabled.into());
1182        assert!(store.complete_fetch(fetch).is_none());
1183
1184        // Not quite yet expired.
1185        tokio::time::advance(Duration::from_millis(9500)).await;
1186        assert_eq!(collect_evicted(&mut store).await, Vec::new());
1187        // Now it's expired.
1188        tokio::time::advance(Duration::from_millis(501)).await;
1189        assert_eq!(collect_evicted(&mut store).await, vec![project_key]);
1190        assert_state!(store, project_key, ProjectState::Pending);
1191    }
1192
1193    #[tokio::test(start_paused = true)]
1194    async fn test_store_refresh() {
1195        let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1196        let mut store = ProjectStore::new(
1197            &relay_config::Config::from_json_value(serde_json::json!({
1198                "cache": {
1199                    "project_expiry": 5,
1200                    "project_grace_period": 5,
1201                    "project_refresh_interval": 7,
1202                }
1203            }))
1204            .unwrap(),
1205        );
1206
1207        let fetch = store.try_begin_fetch(project_key).unwrap();
1208        let fetch = fetch.complete(ProjectState::Disabled.into());
1209        assert!(store.complete_fetch(fetch).is_none());
1210        assert_state!(store, project_key, ProjectState::Disabled);
1211
1212        // Wait for a refresh.
1213        let Some(Action::Refresh(refresh)) = store.poll().await else {
1214            panic!();
1215        };
1216        assert_eq!(refresh.project_key(), project_key);
1217
1218        let fetch = store.refresh(refresh).unwrap();
1219        // Upgrade the pending refresh fetch to a non-refresh fetch.
1220        assert!(store.try_begin_fetch(project_key).is_none());
1221        let fetch = fetch.complete(ProjectState::Disabled.into());
1222        assert!(store.complete_fetch(fetch).is_none());
1223
1224        // Since the previous refresh has been upgraded to a proper fetch.
1225        // Expiration has been rescheduled and a new refresh is planned to happen in 7 seconds from
1226        // now.
1227        let Some(Action::Refresh(refresh)) = store.poll().await else {
1228            panic!();
1229        };
1230        let fetch = store.refresh(refresh).unwrap();
1231        let fetch = fetch.complete(ProjectState::Disabled.into());
1232        assert!(store.complete_fetch(fetch).is_none());
1233
1234        // At this point the refresh is through, but expiration is around the corner.
1235        // Because the refresh doesn't bump the expiration deadline.
1236        let Some(Action::Eviction(eviction)) = store.poll().await else {
1237            panic!();
1238        };
1239        assert_eq!(eviction.project_key(), project_key);
1240    }
1241
1242    #[tokio::test(start_paused = true)]
1243    async fn test_store_refresh_overtaken_by_eviction() {
1244        let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1245        let mut store = ProjectStore::new(
1246            &relay_config::Config::from_json_value(serde_json::json!({
1247                "cache": {
1248                    "project_expiry": 5,
1249                    "project_grace_period": 5,
1250                    "project_refresh_interval": 7,
1251                }
1252            }))
1253            .unwrap(),
1254        );
1255
1256        let fetch = store.try_begin_fetch(project_key).unwrap();
1257        let fetch = fetch.complete(ProjectState::Disabled.into());
1258        assert!(store.complete_fetch(fetch).is_none());
1259        assert_state!(store, project_key, ProjectState::Disabled);
1260
1261        // Move way past the expiration time.
1262        tokio::time::advance(Duration::from_secs(20)).await;
1263
1264        // The eviction should be prioritized, there is no reason to refresh an already evicted
1265        // project.
1266        let Some(Action::Eviction(eviction)) = store.poll().await else {
1267            panic!();
1268        };
1269        assert_eq!(eviction.project_key(), project_key);
1270        store.evict(eviction);
1271
1272        // Make sure there is not another refresh queued.
1273        // This would not technically be necessary because refresh code must be able to handle
1274        // refreshes for non-fetched projects, but the current implementation should enforce this.
1275        assert!(
1276            tokio::time::timeout(Duration::from_secs(60), store.poll())
1277                .await
1278                .is_err()
1279        );
1280    }
1281
1282    #[tokio::test(start_paused = true)]
1283    async fn test_store_refresh_during_eviction() {
1284        let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1285        let mut store = ProjectStore::new(
1286            &relay_config::Config::from_json_value(serde_json::json!({
1287                "cache": {
1288                    "project_expiry": 5,
1289                    "project_grace_period": 5,
1290                    "project_refresh_interval": 7,
1291                }
1292            }))
1293            .unwrap(),
1294        );
1295
1296        let fetch = store.try_begin_fetch(project_key).unwrap();
1297        let fetch = fetch.complete(ProjectState::Disabled.into());
1298        assert!(store.complete_fetch(fetch).is_none());
1299        assert_state!(store, project_key, ProjectState::Disabled);
1300
1301        // Move way past the expiration time.
1302        tokio::time::advance(Duration::from_secs(20)).await;
1303
1304        // Poll both the eviction and refresh token, while a proper implementation should prevent
1305        // this, it's a good way to test that a refresh for an evicted project does not fetch the
1306        // project.
1307        let Some(Action::Eviction(eviction)) = store.poll().await else {
1308            panic!();
1309        };
1310        let Some(Action::Refresh(refresh)) = store.poll().await else {
1311            panic!();
1312        };
1313        assert_eq!(eviction.project_key(), project_key);
1314        assert_eq!(refresh.project_key(), project_key);
1315        store.evict(eviction);
1316
1317        assert!(store.refresh(refresh).is_none());
1318    }
1319}