relay_server/services/projects/cache/
state.rs

1use futures::StreamExt;
2use std::fmt;
3use std::sync::Arc;
4use std::time::Duration;
5use tokio::time::Instant;
6
7use arc_swap::ArcSwap;
8use relay_base_schema::project::ProjectKey;
9use relay_quotas::CachedRateLimits;
10use relay_sampling::evaluation::ReservoirCounters;
11use relay_statsd::metric;
12
13use crate::services::projects::project::{ProjectState, Revision};
14use crate::services::projects::source::SourceProjectState;
15use crate::statsd::{RelayDistributions, RelayTimers};
16use crate::utils::{RetryBackoff, UniqueScheduledQueue};
17
18/// The backing storage for a project cache.
19///
20/// Exposes the only interface to delete from [`Shared`], guaranteed by
21/// requiring exclusive/mutable access to [`ProjectStore`].
22///
23/// [`Shared`] can be extended through [`Shared::get_or_create`], in which case
24/// the private state is missing. Users of [`Shared::get_or_create`] *must* trigger
25/// a fetch to create the private state and keep it updated.
26/// This guarantees that eventually the project state is populated, but for a undetermined,
27/// time it is possible that shared state exists without the respective private state.
28pub struct ProjectStore {
29    config: Config,
30    /// The shared state, which can be accessed concurrently.
31    shared: Arc<Shared>,
32    /// The private, mutably exclusive state, used to maintain the project state.
33    private: hashbrown::HashMap<ProjectKey, PrivateProjectState>,
34    /// Scheduled queue tracking all evictions.
35    evictions: UniqueScheduledQueue<ProjectKey>,
36    /// Scheduled queue tracking all refreshes.
37    refreshes: UniqueScheduledQueue<ProjectKey>,
38}
39
40impl ProjectStore {
41    pub fn new(config: &relay_config::Config) -> Self {
42        Self {
43            config: Config::new(config),
44            shared: Default::default(),
45            private: Default::default(),
46            evictions: Default::default(),
47            refreshes: Default::default(),
48        }
49    }
50
51    /// Retrieves a [`Shared`] handle which can be freely shared with multiple consumers.
52    pub fn shared(&self) -> Arc<Shared> {
53        Arc::clone(&self.shared)
54    }
55
56    /// Tries to begin a new fetch for the passed `project_key`.
57    ///
58    /// Returns `None` if no fetch is necessary or there is already a fetch ongoing.
59    /// A returned [`Fetch`] must be scheduled and completed with [`Fetch::complete`] and
60    /// [`Self::complete_fetch`].
61    pub fn try_begin_fetch(&mut self, project_key: ProjectKey) -> Option<Fetch> {
62        self.do_try_begin_fetch(project_key, false)
63    }
64
65    /// Completes a [`CompletedFetch`] started with [`Self::try_begin_fetch`].
66    ///
67    /// Returns a new [`Fetch`] if another fetch must be scheduled. This happens when the fetched
68    /// [`ProjectState`] is still pending or already deemed expired.
69    #[must_use = "an incomplete fetch must be retried"]
70    pub fn complete_fetch(&mut self, fetch: CompletedFetch) -> Option<Fetch> {
71        let project_key = fetch.project_key();
72
73        // Eviction is not possible for projects which are currently being fetched.
74        // Hence if there was a started fetch, the project state must always exist at this stage.
75        debug_assert!(self.shared.projects.pin().get(&project_key).is_some());
76        debug_assert!(self.private.get(&project_key).is_some());
77
78        let mut project = self.get_or_create(project_key);
79        // Schedule another fetch if necessary, usually should only happen if
80        // the completed fetch is pending.
81        let new_fetch = match project.complete_fetch(fetch) {
82            FetchResult::ReSchedule { refresh } => project.try_begin_fetch(refresh),
83            FetchResult::Done { expiry, refresh } => {
84                self.evictions.schedule(expiry.0, project_key);
85                if let Some(RefreshTime(refresh)) = refresh {
86                    self.refreshes.schedule(refresh, project_key);
87                }
88                None
89            }
90        };
91
92        metric!(
93            distribution(RelayDistributions::ProjectStateCacheSize) =
94                self.shared.projects.len() as u64,
95            storage = "shared"
96        );
97        metric!(
98            distribution(RelayDistributions::ProjectStateCacheSize) = self.private.len() as u64,
99            storage = "private"
100        );
101
102        new_fetch
103    }
104
105    /// Waits for the next scheduled action.
106    ///
107    /// The returned [`Action`] must be immediately turned in using the corresponding handlers,
108    /// [`Self::evict`] or [`Self::refresh`].
109    ///
110    /// The returned future is cancellation safe.
111    pub async fn poll(&mut self) -> Option<Action> {
112        let eviction = self.evictions.next();
113        let refresh = self.refreshes.next();
114
115        tokio::select! {
116            biased;
117
118            Some(e) = eviction => Some(Action::Eviction(Eviction(e))),
119            Some(r) = refresh => Some(Action::Refresh(Refresh(r))),
120            else => None,
121        }
122    }
123
124    /// Refreshes a project using an [`Refresh`] token returned from [`Self::poll`].
125    ///
126    /// Like [`Self::try_begin_fetch`], this returns a [`Fetch`], if there was no fetch
127    /// already started in the meantime.
128    ///
129    /// A returned [`Fetch`] must be scheduled and completed with [`Fetch::complete`] and
130    /// [`Self::complete_fetch`].
131    pub fn refresh(&mut self, Refresh(project_key): Refresh) -> Option<Fetch> {
132        self.do_try_begin_fetch(project_key, true)
133    }
134
135    /// Evicts a project using an [`Eviction`] token returned from [`Self::poll`].
136    pub fn evict(&mut self, Eviction(project_key): Eviction) {
137        // Remove the private part.
138        let Some(private) = self.private.remove(&project_key) else {
139            // Not possible if all invariants are upheld.
140            debug_assert!(false, "no private state for eviction");
141            return;
142        };
143
144        debug_assert!(
145            matches!(private.state, FetchState::Complete { .. }),
146            "private state must be completed"
147        );
148
149        // Remove the shared part.
150        let shared = self.shared.projects.pin();
151        let _removed = shared.remove(&project_key);
152        debug_assert!(
153            _removed.is_some(),
154            "an expired project must exist in the shared state"
155        );
156
157        // Cancel next refresh, while not necessary (trying to refresh a project which does not
158        // exist, will do nothing), but we can also spare us the extra work.
159        self.refreshes.remove(&project_key);
160    }
161
162    /// Internal handler to begin a new fetch for the passed `project_key`, which can also handle
163    /// refreshes.
164    fn do_try_begin_fetch(&mut self, project_key: ProjectKey, is_refresh: bool) -> Option<Fetch> {
165        let fetch = match is_refresh {
166            // A rogue refresh does not need to trigger an actual fetch.
167            // In practice this should never happen, as the refresh time is validated against
168            // the eviction time.
169            // But it may happen due to a race of the eviction and refresh (e.g. when setting them
170            // to close to the same value), in which case we don't want to re-populate the cache.
171            true => self.get(project_key)?,
172            false => self.get_or_create(project_key),
173        }
174        .try_begin_fetch(is_refresh);
175
176        // If there is a new fetch, remove the pending eviction, it will be re-scheduled once the
177        // fetch is completed.
178        if fetch.is_some() {
179            self.evictions.remove(&project_key);
180            // There is no need to clear the refresh here, if it triggers while a fetch is ongoing,
181            // it is simply discarded.
182        }
183
184        fetch
185    }
186
187    /// Get a reference to the current project or create a new project.
188    ///
189    /// For internal use only, a created project must always be fetched immediately.
190    fn get(&mut self, project_key: ProjectKey) -> Option<ProjectRef<'_>> {
191        let private = self.private.get_mut(&project_key)?;
192
193        // Same invariant as in `get_or_create`, we have exclusive access to the private
194        // project here, there must be a shared project if there is a private project.
195        debug_assert!(self.shared.projects.pin().contains_key(&project_key));
196
197        let shared = self
198            .shared
199            .projects
200            .pin()
201            .get_or_insert_with(project_key, Default::default)
202            .clone();
203
204        Some(ProjectRef {
205            private,
206            shared,
207            config: &self.config,
208        })
209    }
210
211    /// Get a reference to the current project or create a new project.
212    ///
213    /// For internal use only, a created project must always be fetched immediately.
214    fn get_or_create(&mut self, project_key: ProjectKey) -> ProjectRef<'_> {
215        #[cfg(debug_assertions)]
216        if self.private.contains_key(&project_key) {
217            // We have exclusive access to the private part, there are no concurrent deletions
218            // hence if we have a private state there must always be a shared state as well.
219            //
220            // The opposite is not true, the shared state may have been created concurrently
221            // through the shared access.
222            debug_assert!(self.shared.projects.pin().contains_key(&project_key));
223        }
224
225        let private = self
226            .private
227            .entry(project_key)
228            .or_insert_with(|| PrivateProjectState::new(project_key, &self.config));
229
230        let shared = self
231            .shared
232            .projects
233            .pin()
234            .get_or_insert_with(project_key, Default::default)
235            .clone();
236
237        ProjectRef {
238            private,
239            shared,
240            config: &self.config,
241        }
242    }
243}
244
245/// Configuration for a [`ProjectStore`].
246struct Config {
247    /// Expiry timeout for individual project configs.
248    ///
249    /// Note: the total expiry is the sum of the expiry and grace period.
250    expiry: Duration,
251    /// Grace period for a project config.
252    ///
253    /// A project config is considered stale and will be updated asynchronously,
254    /// after reaching the grace period.
255    grace_period: Duration,
256    /// Refresh interval for a single project.
257    ///
258    /// A project will be asynchronously refreshed repeatedly using this interval.
259    ///
260    /// The refresh interval is validated to be between expiration and grace period. An invalid refresh
261    /// time is ignored.
262    refresh_interval: Option<Duration>,
263    /// Maximum backoff for continuously failing project updates.
264    max_retry_backoff: Duration,
265}
266
267impl Config {
268    fn new(config: &relay_config::Config) -> Self {
269        let expiry = config.project_cache_expiry();
270        let grace_period = config.project_grace_period();
271
272        // Make sure the refresh time is:
273        // - at least the expiration, refreshing a non-stale project makes no sense.
274        // - at most the end of the grace period, refreshing an expired project also makes no sense.
275        let refresh_interval = config
276            .project_refresh_interval()
277            .filter(|rt| *rt < (expiry + grace_period))
278            .filter(|rt| *rt > expiry);
279
280        Self {
281            expiry: config.project_cache_expiry(),
282            grace_period: config.project_grace_period(),
283            refresh_interval,
284            max_retry_backoff: config.http_max_retry_interval(),
285        }
286    }
287}
288
289/// The shared and concurrently accessible handle to the project cache.
290#[derive(Default)]
291pub struct Shared {
292    projects: papaya::HashMap<ProjectKey, SharedProjectState, ahash::RandomState>,
293}
294
295impl Shared {
296    /// Returns the existing project state or creates a new one.
297    ///
298    /// The caller must ensure that the project cache is instructed to
299    /// [`super::ProjectCache::Fetch`] the retrieved project.
300    pub fn get_or_create(&self, project_key: ProjectKey) -> SharedProject {
301        // The fast path, we expect the project to exist.
302        let projects = self.projects.pin();
303        if let Some(project) = projects.get(&project_key) {
304            return project.to_shared_project();
305        }
306
307        // The slow path, try to attempt to insert, somebody else may have been faster, but that's okay.
308        match projects.try_insert(project_key, Default::default()) {
309            Ok(inserted) => inserted.to_shared_project(),
310            Err(occupied) => occupied.current.to_shared_project(),
311        }
312    }
313}
314
315/// TEST ONLY bypass to make the project cache mockable.
316#[cfg(test)]
317impl Shared {
318    /// Updates the project state for a project.
319    ///
320    /// TEST ONLY!
321    pub fn test_set_project_state(&self, project_key: ProjectKey, state: ProjectState) {
322        self.projects
323            .pin()
324            .get_or_insert_with(project_key, Default::default)
325            .set_project_state(state);
326    }
327
328    /// Returns `true` if there exists a shared state for the passed `project_key`.
329    pub fn test_has_project_created(&self, project_key: ProjectKey) -> bool {
330        self.projects.pin().contains_key(&project_key)
331    }
332}
333
334impl fmt::Debug for Shared {
335    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
336        f.debug_struct("Shared")
337            .field("num_projects", &self.projects.len())
338            .finish()
339    }
340}
341
342/// A single project from the [`Shared`] project cache.
343pub struct SharedProject(Arc<SharedProjectStateInner>);
344
345impl SharedProject {
346    /// Returns a reference to the contained [`ProjectState`].
347    pub fn project_state(&self) -> &ProjectState {
348        &self.0.state
349    }
350
351    /// Returns a reference to the contained [`CachedRateLimits`].
352    pub fn cached_rate_limits(&self) -> &CachedRateLimits {
353        // Exposing cached rate limits may be a bad idea, this allows mutation
354        // and caching of rate limits for pending projects, which may or may not be fine.
355        // Although, for now this is fine.
356        //
357        // Read only access is easily achievable if we return only the current rate limits.
358        &self.0.rate_limits
359    }
360
361    /// Returns a reference to the contained [`ReservoirCounters`].
362    pub fn reservoir_counters(&self) -> &ReservoirCounters {
363        &self.0.reservoir_counters
364    }
365}
366
367/// TEST ONLY bypass to make the project cache mockable.
368#[cfg(test)]
369impl SharedProject {
370    /// Creates a new [`SharedProject`] for testing only.
371    pub fn for_test(state: ProjectState) -> Self {
372        Self(Arc::new(SharedProjectStateInner {
373            state,
374            ..Default::default()
375        }))
376    }
377}
378
379/// Reference to a full project wrapping shared and private state.
380struct ProjectRef<'a> {
381    shared: SharedProjectState,
382    private: &'a mut PrivateProjectState,
383    config: &'a Config,
384}
385
386impl ProjectRef<'_> {
387    fn try_begin_fetch(&mut self, is_refresh: bool) -> Option<Fetch> {
388        let now = Instant::now();
389        self.private
390            .try_begin_fetch(now, is_refresh, self.config)
391            .map(|fetch| fetch.with_revision(self.shared.revision()))
392    }
393
394    fn complete_fetch(&mut self, fetch: CompletedFetch) -> FetchResult {
395        let now = Instant::now();
396
397        if let Some(latency) = fetch.latency() {
398            let delay = match fetch.delay() {
399                Some(delay) if delay.as_secs() <= 15 => "lte15s",
400                Some(delay) if delay.as_secs() <= 30 => "lte30s",
401                Some(delay) if delay.as_secs() <= 60 => "lte60s",
402                Some(delay) if delay.as_secs() <= 120 => "lte120",
403                Some(delay) if delay.as_secs() <= 300 => "lte300s",
404                Some(delay) if delay.as_secs() <= 600 => "lte600s",
405                Some(delay) if delay.as_secs() <= 1800 => "lte1800s",
406                Some(delay) if delay.as_secs() <= 3600 => "lte3600s",
407                Some(_) => "gt3600s",
408                None => "none",
409            };
410            metric!(
411                timer(RelayTimers::ProjectCacheUpdateLatency) = latency,
412                delay = delay
413            );
414        }
415
416        if !fetch.is_pending() {
417            let state = match fetch.state {
418                SourceProjectState::New(_) => "new",
419                SourceProjectState::NotModified => "not_modified",
420            };
421
422            metric!(
423                timer(RelayTimers::ProjectCacheFetchDuration) = fetch.duration(now),
424                state = state
425            );
426        }
427
428        // Update private and shared state with the new data.
429        let result = self.private.complete_fetch(&fetch, now, self.config);
430        match fetch.state {
431            // Keep the old state around if the current fetch is pending.
432            // It may still be useful to callers.
433            SourceProjectState::New(state) if !state.is_pending() => {
434                self.shared.set_project_state(state);
435            }
436            _ => {}
437        }
438
439        result
440    }
441}
442
443pub enum Action {
444    Eviction(Eviction),
445    Refresh(Refresh),
446}
447
448/// A [`Refresh`] token.
449///
450/// The token must be turned in using [`ProjectStore::refresh`].
451#[derive(Debug)]
452#[must_use = "a refresh must be used"]
453pub struct Refresh(ProjectKey);
454
455impl Refresh {
456    /// Returns the [`ProjectKey`] of the project that needs to be refreshed.
457    pub fn project_key(&self) -> ProjectKey {
458        self.0
459    }
460}
461
462/// A [`Eviction`] token.
463///
464/// The token must be turned in using [`ProjectStore::evict`].
465#[derive(Debug)]
466#[must_use = "an eviction must be used"]
467pub struct Eviction(ProjectKey);
468
469impl Eviction {
470    /// Returns the [`ProjectKey`] of the project that needs to be evicted.
471    pub fn project_key(&self) -> ProjectKey {
472        self.0
473    }
474}
475
476/// A [`Fetch`] token.
477///
478/// When returned it must be executed and completed using [`Self::complete`].
479#[must_use = "a fetch must be executed"]
480#[derive(Debug)]
481pub struct Fetch {
482    project_key: ProjectKey,
483    previous_fetch: Option<Instant>,
484    initiated: Instant,
485    when: Option<Instant>,
486    revision: Revision,
487}
488
489impl Fetch {
490    /// Returns the [`ProjectKey`] of the project to fetch.
491    pub fn project_key(&self) -> ProjectKey {
492        self.project_key
493    }
494
495    /// Returns when the fetch for the project should be scheduled.
496    ///
497    /// This can be now (as soon as possible, indicated by `None`) or a later point in time,
498    /// if the project is currently in a backoff.
499    pub fn when(&self) -> Option<Instant> {
500        self.when
501    }
502
503    /// Returns the revisions of the currently cached project.
504    ///
505    /// If the upstream indicates it does not have a different version of this project
506    /// we do not need to update the local state.
507    pub fn revision(&self) -> Revision {
508        self.revision.clone()
509    }
510
511    /// Completes the fetch with a result and returns a [`CompletedFetch`].
512    pub fn complete(self, state: SourceProjectState) -> CompletedFetch {
513        CompletedFetch { fetch: self, state }
514    }
515
516    fn with_revision(mut self, revision: Revision) -> Self {
517        self.revision = revision;
518        self
519    }
520}
521
522/// The result of an executed [`Fetch`].
523#[must_use = "a completed fetch must be acted upon"]
524#[derive(Debug)]
525pub struct CompletedFetch {
526    fetch: Fetch,
527    state: SourceProjectState,
528}
529
530impl CompletedFetch {
531    /// Returns the [`ProjectKey`] of the project which was fetched.
532    pub fn project_key(&self) -> ProjectKey {
533        self.fetch.project_key()
534    }
535
536    /// Returns the amount of time passed between the last successful fetch for this project and the start of this fetch.
537    ///
538    /// `None` if this is the first fetch.
539    fn delay(&self) -> Option<Duration> {
540        self.fetch
541            .previous_fetch
542            .map(|pf| self.fetch.initiated.duration_since(pf))
543    }
544
545    /// Returns the duration between first initiating the fetch and `now`.
546    fn duration(&self, now: Instant) -> Duration {
547        now.duration_since(self.fetch.initiated)
548    }
549
550    /// Returns the update latency of the fetched project config from the upstream.
551    ///
552    /// Is `None`, when no project config could be fetched, or if this was the first
553    /// fetch of a project config.
554    ///
555    /// Note: this latency is computed on access, it does not use the time when the [`Fetch`]
556    /// was marked as (completed)[`Fetch::complete`].
557    fn latency(&self) -> Option<Duration> {
558        // We're not interested in initial fetches. The latency on the first fetch
559        // has no meaning about how long it takes for an updated project config to be
560        // propagated to a Relay.
561        let is_first_fetch = self.fetch.revision().as_str().is_none();
562        if is_first_fetch {
563            return None;
564        }
565
566        let project_info = match &self.state {
567            SourceProjectState::New(ProjectState::Enabled(project_info)) => project_info,
568            // Not modified or deleted/disabled -> no latency to track.
569            //
570            // Currently we discard the last changed timestamp for disabled projects,
571            // it would be possible to do so and then also expose a latency for disabled projects.
572            _ => return None,
573        };
574
575        // A matching revision is not an update.
576        if project_info.rev == self.fetch.revision {
577            return None;
578        }
579
580        let elapsed = chrono::Utc::now() - project_info.last_change?;
581        elapsed.to_std().ok()
582    }
583
584    /// Returns `true` if the fetch completed with a pending status.
585    fn is_pending(&self) -> bool {
586        match &self.state {
587            SourceProjectState::New(state) => state.is_pending(),
588            SourceProjectState::NotModified => false,
589        }
590    }
591}
592
593/// The state of a project contained in the [`Shared`] project cache.
594///
595/// This state is interior mutable and allows updates to the project.
596#[derive(Debug, Default, Clone)]
597struct SharedProjectState(Arc<ArcSwap<SharedProjectStateInner>>);
598
599impl SharedProjectState {
600    /// Updates the project state.
601    fn set_project_state(&self, state: ProjectState) {
602        let prev = self.0.rcu(|stored| SharedProjectStateInner {
603            state: state.clone(),
604            rate_limits: Arc::clone(&stored.rate_limits),
605            reservoir_counters: Arc::clone(&stored.reservoir_counters),
606        });
607
608        // Try clean expired reservoir counters.
609        //
610        // We do it after the `rcu`, to not re-run this more often than necessary.
611        if let Some(state) = state.enabled() {
612            let config = state.config.sampling.as_ref();
613            if let Some(config) = config.and_then(|eb| eb.as_ref().ok()) {
614                // We can safely use previous here, the `rcu` just replaced the state, the
615                // reservoir counters did not change.
616                //
617                // `try_lock` to not potentially block, it's a best effort cleanup.
618                if let Ok(mut counters) = prev.reservoir_counters.try_lock() {
619                    counters.retain(|key, _| config.rules.iter().any(|rule| rule.id == *key));
620                }
621            }
622        }
623    }
624
625    /// Extracts and clones the revision from the contained project state.
626    fn revision(&self) -> Revision {
627        self.0.as_ref().load().state.revision().clone()
628    }
629
630    /// Transforms this interior mutable handle to an immutable [`SharedProject`].
631    fn to_shared_project(&self) -> SharedProject {
632        SharedProject(self.0.as_ref().load_full())
633    }
634}
635
636/// The data contained in a [`SharedProjectState`].
637///
638/// All fields must be cheap to clone and are ideally just a single `Arc`.
639/// Partial updates to [`SharedProjectState`], are performed using `rcu` cloning all fields.
640#[derive(Debug, Default)]
641struct SharedProjectStateInner {
642    state: ProjectState,
643    rate_limits: Arc<CachedRateLimits>,
644    reservoir_counters: ReservoirCounters,
645}
646
647/// Current fetch state for a project.
648///
649/// ─────► Pending ◄─────┐
650///           │          │
651///           │          │Backoff
652///           ▼          │
653/// ┌───► InProgress ────┘
654/// │         │
655/// │         │
656/// │         ▼
657/// └───── Complete
658#[derive(Debug)]
659enum FetchState {
660    /// There is a fetch currently in progress.
661    InProgress {
662        /// Whether the current check in progress was triggered from a refresh.
663        ///
664        /// Triggering a non-refresh fetch while a refresh fetch is currently in progress,
665        /// will overwrite this property.
666        is_refresh: bool,
667    },
668    /// A successful fetch is pending.
669    ///
670    /// Projects which have not yet been fetched are in the pending state,
671    /// as well as projects which have a fetch in progress but were notified
672    /// from upstream that the project config is still pending.
673    ///
674    /// If the upstream notifies this instance about a pending config,
675    /// a backoff is applied, before trying again.
676    Pending {
677        /// Instant when the fetch was first initiated.
678        ///
679        /// A state may be transitioned multiple times from [`Self::Pending`] to [`Self::InProgress`]
680        /// and back to [`Self::Pending`]. This timestamp is the first time when the state
681        /// was transitioned from [`Self::Complete`] to [`Self::InProgress`].
682        ///
683        /// Only `None` on first fetch.
684        initiated: Option<Instant>,
685        /// Time when the next fetch should be attempted.
686        ///
687        /// `None` means soon as possible.
688        next_fetch_attempt: Option<Instant>,
689    },
690    /// There was a successful non-pending fetch.
691    Complete {
692        /// Time when the fetch was completed.
693        when: LastFetch,
694    },
695}
696
697/// Contains all mutable state necessary to maintain the project cache.
698struct PrivateProjectState {
699    /// Project key this state belongs to.
700    project_key: ProjectKey,
701
702    /// The current fetch state.
703    state: FetchState,
704    /// The current backoff used for calculating the next fetch attempt.
705    ///
706    /// The backoff is reset after a successful, non-pending fetch.
707    backoff: RetryBackoff,
708
709    /// The last time the state was successfully fetched.
710    ///
711    /// May be `None` when the state has never been successfully fetched.
712    ///
713    /// This is purely informational, all necessary information to make
714    /// state transitions is contained in [`FetchState`].
715    last_fetch: Option<Instant>,
716
717    /// The expiry time of this project.
718    ///
719    /// A refresh of the project, will not push the expiration time.
720    expiry: Option<Instant>,
721}
722
723impl PrivateProjectState {
724    fn new(project_key: ProjectKey, config: &Config) -> Self {
725        Self {
726            project_key,
727            state: FetchState::Pending {
728                initiated: None,
729                next_fetch_attempt: None,
730            },
731            backoff: RetryBackoff::new(config.max_retry_backoff),
732            last_fetch: None,
733            expiry: None,
734        }
735    }
736
737    fn try_begin_fetch(
738        &mut self,
739        now: Instant,
740        is_refresh: bool,
741        config: &Config,
742    ) -> Option<Fetch> {
743        let (initiated, when) = match &mut self.state {
744            FetchState::InProgress {
745                is_refresh: refresh_in_progress,
746            } => {
747                relay_log::trace!(
748                    tags.project_key = self.project_key.as_str(),
749                    "project fetch skipped, fetch in progress"
750                );
751                // Upgrade the refresh status if necessary.
752                *refresh_in_progress = *refresh_in_progress && is_refresh;
753                return None;
754            }
755            FetchState::Pending {
756                initiated,
757                next_fetch_attempt,
758            } => {
759                // Schedule a new fetch, even if there is a backoff, it will just be sleeping for a while.
760                (initiated.unwrap_or(now), *next_fetch_attempt)
761            }
762            FetchState::Complete { when } => {
763                // Sanity check to make sure timestamps do not drift.
764                debug_assert_eq!(Some(when.0), self.last_fetch);
765
766                if when.check_expiry(now, config).is_fresh() {
767                    // The current state is up to date, no need to start another fetch.
768                    relay_log::trace!(
769                        tags.project_key = self.project_key.as_str(),
770                        "project fetch skipped, already up to date"
771                    );
772                    return None;
773                }
774
775                (now, None)
776            }
777        };
778
779        // Mark a current fetch in progress.
780        self.state = FetchState::InProgress { is_refresh };
781
782        relay_log::trace!(
783            tags.project_key = &self.project_key.as_str(),
784            attempts = self.backoff.attempt() + 1,
785            "project state {} scheduled in {:?}",
786            if is_refresh { "refresh" } else { "fetch" },
787            when.unwrap_or(now).saturating_duration_since(now),
788        );
789
790        Some(Fetch {
791            project_key: self.project_key,
792            previous_fetch: self.last_fetch,
793            initiated,
794            when,
795            revision: Revision::default(),
796        })
797    }
798
799    fn complete_fetch(
800        &mut self,
801        fetch: &CompletedFetch,
802        now: Instant,
803        config: &Config,
804    ) -> FetchResult {
805        let FetchState::InProgress { is_refresh } = self.state else {
806            debug_assert!(
807                false,
808                "fetch completed while there was no current fetch registered"
809            );
810            // Be conservative in production.
811            return FetchResult::ReSchedule { refresh: false };
812        };
813
814        if fetch.is_pending() {
815            let next_backoff = self.backoff.next_backoff();
816            let next_fetch_attempt = match next_backoff.is_zero() {
817                false => now.checked_add(next_backoff),
818                true => None,
819            };
820            self.state = FetchState::Pending {
821                next_fetch_attempt,
822                initiated: Some(fetch.fetch.initiated),
823            };
824            relay_log::trace!(
825                tags.project_key = &self.project_key.as_str(),
826                "project state {} completed but still pending",
827                if is_refresh { "refresh" } else { "fetch" },
828            );
829
830            FetchResult::ReSchedule {
831                refresh: is_refresh,
832            }
833        } else {
834            relay_log::trace!(
835                tags.project_key = &self.project_key.as_str(),
836                "project state {} completed with non-pending config",
837                if is_refresh { "refresh" } else { "fetch" },
838            );
839
840            self.backoff.reset();
841            self.last_fetch = Some(now);
842
843            let when = LastFetch(now);
844
845            let refresh = when.refresh_time(config);
846            let expiry = match self.expiry {
847                Some(expiry) if is_refresh => ExpiryTime(expiry),
848                // Only bump/re-compute the expiry time if the fetch was not a refresh,
849                // to not keep refreshed projects forever in the cache.
850                Some(_) | None => when.expiry_time(config),
851            };
852            self.expiry = Some(expiry.0);
853
854            self.state = FetchState::Complete { when };
855            FetchResult::Done { expiry, refresh }
856        }
857    }
858}
859
860/// Result returned when completing a fetch.
861#[derive(Debug)]
862#[must_use = "fetch result must be used"]
863enum FetchResult {
864    /// Another fetch must be scheduled immediately.
865    ReSchedule {
866        /// Whether the fetch should be re-scheduled as a refresh.
867        refresh: bool,
868    },
869    /// The fetch is completed and should be registered for refresh and eviction.
870    Done {
871        /// When the project should be expired.
872        expiry: ExpiryTime,
873        /// When the project should be refreshed.
874        refresh: Option<RefreshTime>,
875    },
876}
877
878/// New type containing the last successful fetch time as an [`Instant`].
879#[derive(Debug, Copy, Clone)]
880struct LastFetch(Instant);
881
882impl LastFetch {
883    /// Returns the [`Expiry`] of the last fetch in relation to `now`.
884    fn check_expiry(&self, now: Instant, config: &Config) -> Expiry {
885        let elapsed = now.saturating_duration_since(self.0);
886
887        if elapsed >= config.expiry + config.grace_period {
888            Expiry::Expired
889        } else if elapsed >= config.expiry {
890            Expiry::Stale
891        } else {
892            Expiry::Fresh
893        }
894    }
895
896    /// Returns when the project needs to be queued for a refresh.
897    fn refresh_time(&self, config: &Config) -> Option<RefreshTime> {
898        config
899            .refresh_interval
900            .map(|duration| self.0 + duration)
901            .map(RefreshTime)
902    }
903
904    /// Returns when the project is based to expire based on the current [`LastFetch`].
905    fn expiry_time(&self, config: &Config) -> ExpiryTime {
906        ExpiryTime(self.0 + config.grace_period + config.expiry)
907    }
908}
909
910/// Expiry state of a project.
911#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
912enum Expiry {
913    /// The project state is perfectly up to date.
914    Fresh,
915    /// The project state is outdated but events depending on this project state can still be
916    /// processed. The state should be refreshed in the background though.
917    Stale,
918    /// The project state is completely outdated and events need to be buffered up until the new
919    /// state has been fetched.
920    Expired,
921}
922
923impl Expiry {
924    /// Returns `true` if the project is up-to-date and does not need to be fetched.
925    fn is_fresh(&self) -> bool {
926        matches!(self, Self::Fresh)
927    }
928}
929
930/// Instant when a project is scheduled for refresh.
931#[derive(Debug)]
932#[must_use = "an refresh time must be used to schedule a refresh"]
933struct RefreshTime(Instant);
934
935/// Instant when a project is scheduled for expiry.
936#[derive(Debug)]
937#[must_use = "an expiry time must be used to schedule an eviction"]
938struct ExpiryTime(Instant);
939
940#[cfg(test)]
941mod tests {
942    use std::time::Duration;
943
944    use super::*;
945
946    async fn collect_evicted(store: &mut ProjectStore) -> Vec<ProjectKey> {
947        let mut evicted = Vec::new();
948        // Small timeout to really only get what is ready to be evicted right now.
949        while let Ok(Some(Action::Eviction(eviction))) =
950            tokio::time::timeout(Duration::from_nanos(5), store.poll()).await
951        {
952            evicted.push(eviction.0);
953            store.evict(eviction);
954        }
955        evicted
956    }
957
958    macro_rules! assert_state {
959        ($store:ident, $project_key:ident, $state:pat) => {
960            assert!(matches!(
961                $store.shared().get_or_create($project_key).project_state(),
962                $state
963            ));
964        };
965    }
966
967    #[tokio::test(start_paused = true)]
968    async fn test_store_fetch() {
969        let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
970        let mut store = ProjectStore::new(&Default::default());
971
972        let fetch = store.try_begin_fetch(project_key).unwrap();
973        assert_eq!(fetch.project_key(), project_key);
974        assert_eq!(fetch.when(), None);
975        assert_eq!(fetch.revision().as_str(), None);
976        assert_state!(store, project_key, ProjectState::Pending);
977
978        // Fetch already in progress, nothing to do.
979        assert!(store.try_begin_fetch(project_key).is_none());
980
981        // A pending fetch should trigger a new fetch immediately.
982        let fetch = fetch.complete(ProjectState::Pending.into());
983        let fetch = store.complete_fetch(fetch).unwrap();
984        assert_eq!(fetch.project_key(), project_key);
985        // First backoff is still immediately.
986        assert_eq!(fetch.when(), None);
987        assert_eq!(fetch.revision().as_str(), None);
988        assert_state!(store, project_key, ProjectState::Pending);
989
990        // Pending again.
991        let fetch = fetch.complete(ProjectState::Pending.into());
992        let fetch = store.complete_fetch(fetch).unwrap();
993        assert_eq!(fetch.project_key(), project_key);
994        // This time it needs to be in the future (backoff).
995        assert!(fetch.when() > Some(Instant::now()));
996        assert_eq!(fetch.revision().as_str(), None);
997        assert_state!(store, project_key, ProjectState::Pending);
998
999        // Now complete with disabled.
1000        let fetch = fetch.complete(ProjectState::Disabled.into());
1001        assert!(store.complete_fetch(fetch).is_none());
1002        assert_state!(store, project_key, ProjectState::Disabled);
1003
1004        // A new fetch is not yet necessary.
1005        assert!(store.try_begin_fetch(project_key).is_none());
1006    }
1007
1008    #[tokio::test(start_paused = true)]
1009    async fn test_store_fetch_pending_does_not_replace_state() {
1010        let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1011        let mut store = ProjectStore::new(
1012            &relay_config::Config::from_json_value(serde_json::json!({
1013                "cache": {
1014                    "project_expiry": 5,
1015                    "project_grace_period": 5,
1016                }
1017            }))
1018            .unwrap(),
1019        );
1020
1021        let fetch = store.try_begin_fetch(project_key).unwrap();
1022        let fetch = fetch.complete(ProjectState::Disabled.into());
1023        assert!(store.complete_fetch(fetch).is_none());
1024        assert_state!(store, project_key, ProjectState::Disabled);
1025
1026        tokio::time::advance(Duration::from_secs(6)).await;
1027
1028        let fetch = store.try_begin_fetch(project_key).unwrap();
1029        let fetch = fetch.complete(ProjectState::Pending.into());
1030        // We're returned a new fetch, because the current one completed pending.
1031        let fetch = store.complete_fetch(fetch).unwrap();
1032        // The old cached state is still available and not replaced.
1033        assert_state!(store, project_key, ProjectState::Disabled);
1034
1035        let fetch = fetch.complete(ProjectState::new_allowed().into());
1036        assert!(store.complete_fetch(fetch).is_none());
1037        assert_state!(store, project_key, ProjectState::Enabled(_));
1038    }
1039
1040    #[tokio::test(start_paused = true)]
1041    async fn test_store_evict_projects() {
1042        let project_key1 = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1043        let project_key2 = ProjectKey::parse("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb").unwrap();
1044        let mut store = ProjectStore::new(
1045            &relay_config::Config::from_json_value(serde_json::json!({
1046                "cache": {
1047                    "project_expiry": 5,
1048                    "project_grace_period": 0,
1049                }
1050            }))
1051            .unwrap(),
1052        );
1053
1054        let fetch = store.try_begin_fetch(project_key1).unwrap();
1055        let fetch = fetch.complete(ProjectState::Disabled.into());
1056        assert!(store.complete_fetch(fetch).is_none());
1057
1058        assert_eq!(collect_evicted(&mut store).await, Vec::new());
1059        assert_state!(store, project_key1, ProjectState::Disabled);
1060
1061        // 3 seconds is not enough to expire any project.
1062        tokio::time::advance(Duration::from_secs(3)).await;
1063
1064        assert_eq!(collect_evicted(&mut store).await, Vec::new());
1065        assert_state!(store, project_key1, ProjectState::Disabled);
1066
1067        let fetch = store.try_begin_fetch(project_key2).unwrap();
1068        let fetch = fetch.complete(ProjectState::Disabled.into());
1069        assert!(store.complete_fetch(fetch).is_none());
1070
1071        // A total of 6 seconds should expire the first project.
1072        tokio::time::advance(Duration::from_secs(3)).await;
1073
1074        assert_eq!(collect_evicted(&mut store).await, vec![project_key1]);
1075        assert_state!(store, project_key1, ProjectState::Pending);
1076        assert_state!(store, project_key2, ProjectState::Disabled);
1077    }
1078
1079    #[tokio::test(start_paused = true)]
1080    async fn test_store_evict_projects_pending_not_expired() {
1081        let project_key1 = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1082        let project_key2 = ProjectKey::parse("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb").unwrap();
1083        let mut store = ProjectStore::new(
1084            &relay_config::Config::from_json_value(serde_json::json!({
1085                "cache": {
1086                    "project_expiry": 5,
1087                    "project_grace_period": 0,
1088                }
1089            }))
1090            .unwrap(),
1091        );
1092
1093        let fetch = store.try_begin_fetch(project_key1).unwrap();
1094        // Create a new project in a pending state, but never fetch it, this should also never expire.
1095        store.shared().get_or_create(project_key2);
1096
1097        tokio::time::advance(Duration::from_secs(6)).await;
1098
1099        // No evictions, project is pending.
1100        assert_eq!(collect_evicted(&mut store).await, Vec::new());
1101
1102        // Complete the project.
1103        let fetch = fetch.complete(ProjectState::Disabled.into());
1104        assert!(store.complete_fetch(fetch).is_none());
1105
1106        // Still should not be evicted, because we do have 5 seconds to expire since completion.
1107        assert_eq!(collect_evicted(&mut store).await, Vec::new());
1108        tokio::time::advance(Duration::from_secs(4)).await;
1109        assert_eq!(collect_evicted(&mut store).await, Vec::new());
1110        assert_state!(store, project_key1, ProjectState::Disabled);
1111
1112        // Just enough to expire the project.
1113        tokio::time::advance(Duration::from_millis(1001)).await;
1114        assert_eq!(collect_evicted(&mut store).await, vec![project_key1]);
1115        assert_state!(store, project_key1, ProjectState::Pending);
1116        assert_state!(store, project_key2, ProjectState::Pending);
1117    }
1118
1119    #[tokio::test(start_paused = true)]
1120    async fn test_store_evict_projects_stale() {
1121        let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1122        let mut store = ProjectStore::new(
1123            &relay_config::Config::from_json_value(serde_json::json!({
1124                "cache": {
1125                    "project_expiry": 5,
1126                    "project_grace_period": 5,
1127                }
1128            }))
1129            .unwrap(),
1130        );
1131
1132        let fetch = store.try_begin_fetch(project_key).unwrap();
1133        let fetch = fetch.complete(ProjectState::Disabled.into());
1134        assert!(store.complete_fetch(fetch).is_none());
1135
1136        // This is in the grace period, but not yet expired.
1137        tokio::time::advance(Duration::from_millis(9500)).await;
1138
1139        assert_eq!(collect_evicted(&mut store).await, Vec::new());
1140        assert_state!(store, project_key, ProjectState::Disabled);
1141
1142        // Now it's expired.
1143        tokio::time::advance(Duration::from_secs(1)).await;
1144
1145        assert_eq!(collect_evicted(&mut store).await, vec![project_key]);
1146        assert_state!(store, project_key, ProjectState::Pending);
1147    }
1148
1149    #[tokio::test(start_paused = true)]
1150    async fn test_store_no_eviction_during_fetch() {
1151        let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1152        let mut store = ProjectStore::new(
1153            &relay_config::Config::from_json_value(serde_json::json!({
1154                "cache": {
1155                    "project_expiry": 5,
1156                    "project_grace_period": 5,
1157                }
1158            }))
1159            .unwrap(),
1160        );
1161
1162        let fetch = store.try_begin_fetch(project_key).unwrap();
1163
1164        // Project is expired, but there is an ongoing fetch.
1165        tokio::time::advance(Duration::from_millis(10500)).await;
1166        // No evictions, there is a fetch ongoing!
1167        assert_eq!(collect_evicted(&mut store).await, Vec::new());
1168
1169        // Complete the project.
1170        let fetch = fetch.complete(ProjectState::Disabled.into());
1171        assert!(store.complete_fetch(fetch).is_none());
1172        // But start a new fetch asap (after grace period).
1173        tokio::time::advance(Duration::from_millis(5001)).await;
1174        let fetch = store.try_begin_fetch(project_key).unwrap();
1175
1176        // Again, expire the project.
1177        tokio::time::advance(Duration::from_millis(10500)).await;
1178        // No evictions, there is a fetch ongoing!
1179        assert_eq!(collect_evicted(&mut store).await, Vec::new());
1180
1181        // Complete the project.
1182        let fetch = fetch.complete(ProjectState::Disabled.into());
1183        assert!(store.complete_fetch(fetch).is_none());
1184
1185        // Not quite yet expired.
1186        tokio::time::advance(Duration::from_millis(9500)).await;
1187        assert_eq!(collect_evicted(&mut store).await, Vec::new());
1188        // Now it's expired.
1189        tokio::time::advance(Duration::from_millis(501)).await;
1190        assert_eq!(collect_evicted(&mut store).await, vec![project_key]);
1191        assert_state!(store, project_key, ProjectState::Pending);
1192    }
1193
1194    #[tokio::test(start_paused = true)]
1195    async fn test_store_refresh() {
1196        let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1197        let mut store = ProjectStore::new(
1198            &relay_config::Config::from_json_value(serde_json::json!({
1199                "cache": {
1200                    "project_expiry": 5,
1201                    "project_grace_period": 5,
1202                    "project_refresh_interval": 7,
1203                }
1204            }))
1205            .unwrap(),
1206        );
1207
1208        let fetch = store.try_begin_fetch(project_key).unwrap();
1209        let fetch = fetch.complete(ProjectState::Disabled.into());
1210        assert!(store.complete_fetch(fetch).is_none());
1211        assert_state!(store, project_key, ProjectState::Disabled);
1212
1213        // Wait for a refresh.
1214        let Some(Action::Refresh(refresh)) = store.poll().await else {
1215            panic!();
1216        };
1217        assert_eq!(refresh.project_key(), project_key);
1218
1219        let fetch = store.refresh(refresh).unwrap();
1220        // Upgrade the pending refresh fetch to a non-refresh fetch.
1221        assert!(store.try_begin_fetch(project_key).is_none());
1222        let fetch = fetch.complete(ProjectState::Disabled.into());
1223        assert!(store.complete_fetch(fetch).is_none());
1224
1225        // Since the previous refresh has been upgraded to a proper fetch.
1226        // Expiration has been rescheduled and a new refresh is planned to happen in 7 seconds from
1227        // now.
1228        let Some(Action::Refresh(refresh)) = store.poll().await else {
1229            panic!();
1230        };
1231        let fetch = store.refresh(refresh).unwrap();
1232        let fetch = fetch.complete(ProjectState::Disabled.into());
1233        assert!(store.complete_fetch(fetch).is_none());
1234
1235        // At this point the refresh is through, but expiration is around the corner.
1236        // Because the refresh doesn't bump the expiration deadline.
1237        let Some(Action::Eviction(eviction)) = store.poll().await else {
1238            panic!();
1239        };
1240        assert_eq!(eviction.project_key(), project_key);
1241    }
1242
1243    #[tokio::test(start_paused = true)]
1244    async fn test_store_refresh_overtaken_by_eviction() {
1245        let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1246        let mut store = ProjectStore::new(
1247            &relay_config::Config::from_json_value(serde_json::json!({
1248                "cache": {
1249                    "project_expiry": 5,
1250                    "project_grace_period": 5,
1251                    "project_refresh_interval": 7,
1252                }
1253            }))
1254            .unwrap(),
1255        );
1256
1257        let fetch = store.try_begin_fetch(project_key).unwrap();
1258        let fetch = fetch.complete(ProjectState::Disabled.into());
1259        assert!(store.complete_fetch(fetch).is_none());
1260        assert_state!(store, project_key, ProjectState::Disabled);
1261
1262        // Move way past the expiration time.
1263        tokio::time::advance(Duration::from_secs(20)).await;
1264
1265        // The eviction should be prioritized, there is no reason to refresh an already evicted
1266        // project.
1267        let Some(Action::Eviction(eviction)) = store.poll().await else {
1268            panic!();
1269        };
1270        assert_eq!(eviction.project_key(), project_key);
1271        store.evict(eviction);
1272
1273        // Make sure there is not another refresh queued.
1274        // This would not technically be necessary because refresh code must be able to handle
1275        // refreshes for non-fetched projects, but the current implementation should enforce this.
1276        assert!(
1277            tokio::time::timeout(Duration::from_secs(60), store.poll())
1278                .await
1279                .is_err()
1280        );
1281    }
1282
1283    #[tokio::test(start_paused = true)]
1284    async fn test_store_refresh_during_eviction() {
1285        let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1286        let mut store = ProjectStore::new(
1287            &relay_config::Config::from_json_value(serde_json::json!({
1288                "cache": {
1289                    "project_expiry": 5,
1290                    "project_grace_period": 5,
1291                    "project_refresh_interval": 7,
1292                }
1293            }))
1294            .unwrap(),
1295        );
1296
1297        let fetch = store.try_begin_fetch(project_key).unwrap();
1298        let fetch = fetch.complete(ProjectState::Disabled.into());
1299        assert!(store.complete_fetch(fetch).is_none());
1300        assert_state!(store, project_key, ProjectState::Disabled);
1301
1302        // Move way past the expiration time.
1303        tokio::time::advance(Duration::from_secs(20)).await;
1304
1305        // Poll both the eviction and refresh token, while a proper implementation should prevent
1306        // this, it's a good way to test that a refresh for an evicted project does not fetch the
1307        // project.
1308        let Some(Action::Eviction(eviction)) = store.poll().await else {
1309            panic!();
1310        };
1311        let Some(Action::Refresh(refresh)) = store.poll().await else {
1312            panic!();
1313        };
1314        assert_eq!(eviction.project_key(), project_key);
1315        assert_eq!(refresh.project_key(), project_key);
1316        store.evict(eviction);
1317
1318        assert!(store.refresh(refresh).is_none());
1319    }
1320}