relay_server/services/projects/cache/
state.rs

1use futures::StreamExt;
2use std::fmt;
3use std::sync::Arc;
4use std::time::Duration;
5use tokio::sync::Notify;
6use tokio::sync::futures::Notified;
7use tokio::time::Instant;
8
9use arc_swap::ArcSwap;
10use relay_base_schema::project::ProjectKey;
11use relay_quotas::CachedRateLimits;
12use relay_sampling::evaluation::ReservoirCounters;
13use relay_statsd::metric;
14
15use crate::services::projects::project::{ProjectState, Revision};
16use crate::services::projects::source::SourceProjectState;
17use crate::statsd::{RelayDistributions, RelayTimers};
18use crate::utils::{RetryBackoff, UniqueScheduledQueue};
19
20/// The backing storage for a project cache.
21///
22/// Exposes the only interface to delete from [`Shared`], guaranteed by
23/// requiring exclusive/mutable access to [`ProjectStore`].
24///
25/// [`Shared`] can be extended through [`Shared::get_or_create`], in which case
26/// the private state is missing. Users of [`Shared::get_or_create`] *must* trigger
27/// a fetch to create the private state and keep it updated.
28/// This guarantees that eventually the project state is populated, but for a undetermined,
29/// time it is possible that shared state exists without the respective private state.
30pub struct ProjectStore {
31    config: Config,
32    /// The shared state, which can be accessed concurrently.
33    shared: Arc<Shared>,
34    /// The private, mutably exclusive state, used to maintain the project state.
35    private: hashbrown::HashMap<ProjectKey, PrivateProjectState>,
36    /// Scheduled queue tracking all evictions.
37    evictions: UniqueScheduledQueue<ProjectKey>,
38    /// Scheduled queue tracking all refreshes.
39    refreshes: UniqueScheduledQueue<ProjectKey>,
40}
41
42impl ProjectStore {
43    pub fn new(config: &relay_config::Config) -> Self {
44        Self {
45            config: Config::new(config),
46            shared: Default::default(),
47            private: Default::default(),
48            evictions: Default::default(),
49            refreshes: Default::default(),
50        }
51    }
52
53    /// Retrieves a [`Shared`] handle which can be freely shared with multiple consumers.
54    pub fn shared(&self) -> Arc<Shared> {
55        Arc::clone(&self.shared)
56    }
57
58    /// Tries to begin a new fetch for the passed `project_key`.
59    ///
60    /// Returns `None` if no fetch is necessary or there is already a fetch ongoing.
61    /// A returned [`Fetch`] must be scheduled and completed with [`Fetch::complete`] and
62    /// [`Self::complete_fetch`].
63    pub fn try_begin_fetch(&mut self, project_key: ProjectKey) -> Option<Fetch> {
64        self.do_try_begin_fetch(project_key, false)
65    }
66
67    /// Completes a [`CompletedFetch`] started with [`Self::try_begin_fetch`].
68    ///
69    /// Returns a new [`Fetch`] if another fetch must be scheduled. This happens when the fetched
70    /// [`ProjectState`] is still pending or already deemed expired.
71    #[must_use = "an incomplete fetch must be retried"]
72    pub fn complete_fetch(&mut self, fetch: CompletedFetch) -> Option<Fetch> {
73        let project_key = fetch.project_key();
74
75        // Eviction is not possible for projects which are currently being fetched.
76        // Hence if there was a started fetch, the project state must always exist at this stage.
77        debug_assert!(self.shared.projects.pin().get(&project_key).is_some());
78        debug_assert!(self.private.get(&project_key).is_some());
79
80        let mut project = self.get_or_create(project_key);
81        // Schedule another fetch if necessary, usually should only happen if
82        // the completed fetch is pending.
83        let new_fetch = match project.complete_fetch(fetch) {
84            FetchResult::ReSchedule { refresh } => project.try_begin_fetch(refresh),
85            FetchResult::Done { expiry, refresh } => {
86                self.evictions.schedule(expiry.0, project_key);
87                if let Some(RefreshTime(refresh)) = refresh {
88                    self.refreshes.schedule(refresh, project_key);
89                }
90                None
91            }
92        };
93
94        metric!(
95            distribution(RelayDistributions::ProjectStateCacheSize) =
96                self.shared.projects.len() as u64,
97            storage = "shared"
98        );
99        metric!(
100            distribution(RelayDistributions::ProjectStateCacheSize) = self.private.len() as u64,
101            storage = "private"
102        );
103
104        new_fetch
105    }
106
107    /// Waits for the next scheduled action.
108    ///
109    /// The returned [`Action`] must be immediately turned in using the corresponding handlers,
110    /// [`Self::evict`] or [`Self::refresh`].
111    ///
112    /// The returned future is cancellation safe.
113    pub async fn poll(&mut self) -> Option<Action> {
114        let eviction = self.evictions.next();
115        let refresh = self.refreshes.next();
116
117        tokio::select! {
118            biased;
119
120            Some(e) = eviction => Some(Action::Eviction(Eviction(e))),
121            Some(r) = refresh => Some(Action::Refresh(Refresh(r))),
122            else => None,
123        }
124    }
125
126    /// Refreshes a project using an [`Refresh`] token returned from [`Self::poll`].
127    ///
128    /// Like [`Self::try_begin_fetch`], this returns a [`Fetch`], if there was no fetch
129    /// already started in the meantime.
130    ///
131    /// A returned [`Fetch`] must be scheduled and completed with [`Fetch::complete`] and
132    /// [`Self::complete_fetch`].
133    pub fn refresh(&mut self, Refresh(project_key): Refresh) -> Option<Fetch> {
134        self.do_try_begin_fetch(project_key, true)
135    }
136
137    /// Evicts a project using an [`Eviction`] token returned from [`Self::poll`].
138    pub fn evict(&mut self, Eviction(project_key): Eviction) {
139        // Remove the private part.
140        let Some(private) = self.private.remove(&project_key) else {
141            // Not possible if all invariants are upheld.
142            debug_assert!(false, "no private state for eviction");
143            return;
144        };
145
146        debug_assert!(
147            matches!(private.state, FetchState::Complete { .. }),
148            "private state must be completed"
149        );
150
151        // Remove the shared part.
152        let shared = self.shared.projects.pin();
153        let _removed = shared.remove(&project_key);
154        debug_assert!(
155            _removed.is_some(),
156            "an expired project must exist in the shared state"
157        );
158
159        // Cancel next refresh, while not necessary (trying to refresh a project which does not
160        // exist, will do nothing), but we can also spare us the extra work.
161        self.refreshes.remove(&project_key);
162    }
163
164    /// Internal handler to begin a new fetch for the passed `project_key`, which can also handle
165    /// refreshes.
166    fn do_try_begin_fetch(&mut self, project_key: ProjectKey, is_refresh: bool) -> Option<Fetch> {
167        let fetch = match is_refresh {
168            // A rogue refresh does not need to trigger an actual fetch.
169            // In practice this should never happen, as the refresh time is validated against
170            // the eviction time.
171            // But it may happen due to a race of the eviction and refresh (e.g. when setting them
172            // to close to the same value), in which case we don't want to re-populate the cache.
173            true => self.get(project_key)?,
174            false => self.get_or_create(project_key),
175        }
176        .try_begin_fetch(is_refresh);
177
178        // If there is a new fetch, remove the pending eviction, it will be re-scheduled once the
179        // fetch is completed.
180        if fetch.is_some() {
181            self.evictions.remove(&project_key);
182            // There is no need to clear the refresh here, if it triggers while a fetch is ongoing,
183            // it is simply discarded.
184        }
185
186        fetch
187    }
188
189    /// Get a reference to the current project or create a new project.
190    ///
191    /// For internal use only, a created project must always be fetched immediately.
192    fn get(&mut self, project_key: ProjectKey) -> Option<ProjectRef<'_>> {
193        let private = self.private.get_mut(&project_key)?;
194
195        // Same invariant as in `get_or_create`, we have exclusive access to the private
196        // project here, there must be a shared project if there is a private project.
197        debug_assert!(self.shared.projects.pin().contains_key(&project_key));
198
199        let shared = self
200            .shared
201            .projects
202            .pin()
203            .get_or_insert_with(project_key, Default::default)
204            .clone();
205
206        Some(ProjectRef {
207            private,
208            shared,
209            config: &self.config,
210        })
211    }
212
213    /// Get a reference to the current project or create a new project.
214    ///
215    /// For internal use only, a created project must always be fetched immediately.
216    fn get_or_create(&mut self, project_key: ProjectKey) -> ProjectRef<'_> {
217        #[cfg(debug_assertions)]
218        if self.private.contains_key(&project_key) {
219            // We have exclusive access to the private part, there are no concurrent deletions
220            // hence if we have a private state there must always be a shared state as well.
221            //
222            // The opposite is not true, the shared state may have been created concurrently
223            // through the shared access.
224            debug_assert!(self.shared.projects.pin().contains_key(&project_key));
225        }
226
227        let private = self
228            .private
229            .entry(project_key)
230            .or_insert_with(|| PrivateProjectState::new(project_key, &self.config));
231
232        let shared = self
233            .shared
234            .projects
235            .pin()
236            .get_or_insert_with(project_key, Default::default)
237            .clone();
238
239        ProjectRef {
240            private,
241            shared,
242            config: &self.config,
243        }
244    }
245}
246
247/// Configuration for a [`ProjectStore`].
248struct Config {
249    /// Expiry timeout for individual project configs.
250    ///
251    /// Note: the total expiry is the sum of the expiry and grace period.
252    expiry: Duration,
253    /// Grace period for a project config.
254    ///
255    /// A project config is considered stale and will be updated asynchronously,
256    /// after reaching the grace period.
257    grace_period: Duration,
258    /// Refresh interval for a single project.
259    ///
260    /// A project will be asynchronously refreshed repeatedly using this interval.
261    ///
262    /// The refresh interval is validated to be between expiration and grace period. An invalid refresh
263    /// time is ignored.
264    refresh_interval: Option<Duration>,
265    /// Maximum backoff for continuously failing project updates.
266    max_retry_backoff: Duration,
267}
268
269impl Config {
270    fn new(config: &relay_config::Config) -> Self {
271        let expiry = config.project_cache_expiry();
272        let grace_period = config.project_grace_period();
273
274        // Make sure the refresh time is:
275        // - at least the expiration, refreshing a non-stale project makes no sense.
276        // - at most the end of the grace period, refreshing an expired project also makes no sense.
277        let refresh_interval = config
278            .project_refresh_interval()
279            .filter(|rt| *rt < (expiry + grace_period))
280            .filter(|rt| *rt > expiry);
281
282        Self {
283            expiry: config.project_cache_expiry(),
284            grace_period: config.project_grace_period(),
285            refresh_interval,
286            max_retry_backoff: config.http_max_retry_interval(),
287        }
288    }
289}
290
291/// The shared and concurrently accessible handle to the project cache.
292#[derive(Default)]
293pub struct Shared {
294    projects: papaya::HashMap<ProjectKey, SharedProjectState, ahash::RandomState>,
295}
296
297impl Shared {
298    /// Returns the existing project state or creates a new one.
299    ///
300    /// The caller must ensure that the project cache is instructed to
301    /// [`super::ProjectCache::Fetch`] the retrieved project.
302    pub fn get_or_create(&self, project_key: ProjectKey) -> SharedProject {
303        self.get_or_create_inner(project_key).to_shared_project()
304    }
305
306    fn get_or_create_inner(&self, project_key: ProjectKey) -> SharedProjectState {
307        // The fast path, we expect the project to exist.
308        let projects = self.projects.pin();
309        if let Some(project) = projects.get(&project_key) {
310            return project.clone();
311        }
312
313        // The slow path, try to attempt to insert, somebody else may have been faster, but that's okay.
314        match projects.try_insert(project_key, Default::default()) {
315            Ok(inserted) => inserted.clone(),
316            Err(occupied) => occupied.current.clone(),
317        }
318    }
319}
320
321/// TEST ONLY bypass to make the project cache mockable.
322#[cfg(test)]
323impl Shared {
324    /// Updates the project state for a project.
325    ///
326    /// TEST ONLY!
327    pub fn test_set_project_state(&self, project_key: ProjectKey, state: ProjectState) {
328        self.projects
329            .pin()
330            .get_or_insert_with(project_key, Default::default)
331            .set_project_state(state);
332    }
333
334    /// Returns `true` if there exists a shared state for the passed `project_key`.
335    pub fn test_has_project_created(&self, project_key: ProjectKey) -> bool {
336        self.projects.pin().contains_key(&project_key)
337    }
338}
339
340impl fmt::Debug for Shared {
341    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
342        f.debug_struct("Shared")
343            .field("num_projects", &self.projects.len())
344            .finish()
345    }
346}
347
348/// A single project from the [`Shared`] project cache.
349pub struct SharedProject(Arc<SharedProjectStateInner>);
350
351impl SharedProject {
352    /// Returns a reference to the contained [`ProjectState`].
353    pub fn project_state(&self) -> &ProjectState {
354        &self.0.state
355    }
356
357    /// Returns a reference to the contained [`CachedRateLimits`].
358    pub fn cached_rate_limits(&self) -> &CachedRateLimits {
359        // Exposing cached rate limits may be a bad idea, this allows mutation
360        // and caching of rate limits for pending projects, which may or may not be fine.
361        // Although, for now this is fine.
362        //
363        // Read only access is easily achievable if we return only the current rate limits.
364        &self.0.rate_limits
365    }
366
367    /// Returns a reference to the contained [`ReservoirCounters`].
368    pub fn reservoir_counters(&self) -> &ReservoirCounters {
369        &self.0.reservoir_counters
370    }
371
372    /// Waits for the event of a changed project state, triggered by [`SharedProjectState::set_project_state`].
373    ///
374    /// Note that the content of this instance does not change when the event is triggered.
375    pub fn outdated(&self) -> Notified<'_> {
376        self.0.notify.notified()
377    }
378}
379
380/// TEST ONLY bypass to make the project cache mockable.
381#[cfg(test)]
382impl SharedProject {
383    /// Creates a new [`SharedProject`] for testing only.
384    pub fn for_test(state: ProjectState) -> Self {
385        Self(Arc::new(SharedProjectStateInner {
386            state,
387            ..Default::default()
388        }))
389    }
390}
391
392/// Reference to a full project wrapping shared and private state.
393struct ProjectRef<'a> {
394    shared: SharedProjectState,
395    private: &'a mut PrivateProjectState,
396    config: &'a Config,
397}
398
399impl ProjectRef<'_> {
400    fn try_begin_fetch(&mut self, is_refresh: bool) -> Option<Fetch> {
401        let now = Instant::now();
402        self.private
403            .try_begin_fetch(now, is_refresh, self.config)
404            .map(|fetch| fetch.with_revision(self.shared.revision()))
405    }
406
407    fn complete_fetch(&mut self, fetch: CompletedFetch) -> FetchResult {
408        let now = Instant::now();
409
410        if let Some(latency) = fetch.latency() {
411            let delay = match fetch.delay() {
412                Some(delay) if delay.as_secs() <= 15 => "lte15s",
413                Some(delay) if delay.as_secs() <= 30 => "lte30s",
414                Some(delay) if delay.as_secs() <= 60 => "lte60s",
415                Some(delay) if delay.as_secs() <= 120 => "lte120",
416                Some(delay) if delay.as_secs() <= 300 => "lte300s",
417                Some(delay) if delay.as_secs() <= 600 => "lte600s",
418                Some(delay) if delay.as_secs() <= 1800 => "lte1800s",
419                Some(delay) if delay.as_secs() <= 3600 => "lte3600s",
420                Some(_) => "gt3600s",
421                None => "none",
422            };
423            metric!(
424                timer(RelayTimers::ProjectCacheUpdateLatency) = latency,
425                delay = delay
426            );
427        }
428
429        if !fetch.is_pending() {
430            let state = match fetch.state {
431                SourceProjectState::New(_) => "new",
432                SourceProjectState::NotModified => "not_modified",
433            };
434
435            metric!(
436                timer(RelayTimers::ProjectCacheFetchDuration) = fetch.duration(now),
437                state = state
438            );
439        }
440
441        // Update private and shared state with the new data.
442        let result = self.private.complete_fetch(&fetch, now, self.config);
443        match fetch.state {
444            // Keep the old state around if the current fetch is pending.
445            // It may still be useful to callers.
446            SourceProjectState::New(state) if !state.is_pending() => {
447                self.shared.set_project_state(state);
448            }
449            _ => {}
450        }
451
452        result
453    }
454}
455
456pub enum Action {
457    Eviction(Eviction),
458    Refresh(Refresh),
459}
460
461/// A [`Refresh`] token.
462///
463/// The token must be turned in using [`ProjectStore::refresh`].
464#[derive(Debug)]
465#[must_use = "a refresh must be used"]
466pub struct Refresh(ProjectKey);
467
468impl Refresh {
469    /// Returns the [`ProjectKey`] of the project that needs to be refreshed.
470    pub fn project_key(&self) -> ProjectKey {
471        self.0
472    }
473}
474
475/// A [`Eviction`] token.
476///
477/// The token must be turned in using [`ProjectStore::evict`].
478#[derive(Debug)]
479#[must_use = "an eviction must be used"]
480pub struct Eviction(ProjectKey);
481
482impl Eviction {
483    /// Returns the [`ProjectKey`] of the project that needs to be evicted.
484    pub fn project_key(&self) -> ProjectKey {
485        self.0
486    }
487}
488
489/// A [`Fetch`] token.
490///
491/// When returned it must be executed and completed using [`Self::complete`].
492#[must_use = "a fetch must be executed"]
493#[derive(Debug)]
494pub struct Fetch {
495    project_key: ProjectKey,
496    previous_fetch: Option<Instant>,
497    initiated: Instant,
498    when: Option<Instant>,
499    revision: Revision,
500}
501
502impl Fetch {
503    /// Returns the [`ProjectKey`] of the project to fetch.
504    pub fn project_key(&self) -> ProjectKey {
505        self.project_key
506    }
507
508    /// Returns when the fetch for the project should be scheduled.
509    ///
510    /// This can be now (as soon as possible, indicated by `None`) or a later point in time,
511    /// if the project is currently in a backoff.
512    pub fn when(&self) -> Option<Instant> {
513        self.when
514    }
515
516    /// Returns the revisions of the currently cached project.
517    ///
518    /// If the upstream indicates it does not have a different version of this project
519    /// we do not need to update the local state.
520    pub fn revision(&self) -> Revision {
521        self.revision.clone()
522    }
523
524    /// Completes the fetch with a result and returns a [`CompletedFetch`].
525    pub fn complete(self, state: SourceProjectState) -> CompletedFetch {
526        CompletedFetch { fetch: self, state }
527    }
528
529    fn with_revision(mut self, revision: Revision) -> Self {
530        self.revision = revision;
531        self
532    }
533}
534
535/// The result of an executed [`Fetch`].
536#[must_use = "a completed fetch must be acted upon"]
537#[derive(Debug)]
538pub struct CompletedFetch {
539    fetch: Fetch,
540    state: SourceProjectState,
541}
542
543impl CompletedFetch {
544    /// Returns the [`ProjectKey`] of the project which was fetched.
545    pub fn project_key(&self) -> ProjectKey {
546        self.fetch.project_key()
547    }
548
549    /// Returns the amount of time passed between the last successful fetch for this project and the start of this fetch.
550    ///
551    /// `None` if this is the first fetch.
552    fn delay(&self) -> Option<Duration> {
553        self.fetch
554            .previous_fetch
555            .map(|pf| self.fetch.initiated.duration_since(pf))
556    }
557
558    /// Returns the duration between first initiating the fetch and `now`.
559    fn duration(&self, now: Instant) -> Duration {
560        now.duration_since(self.fetch.initiated)
561    }
562
563    /// Returns the update latency of the fetched project config from the upstream.
564    ///
565    /// Is `None`, when no project config could be fetched, or if this was the first
566    /// fetch of a project config.
567    ///
568    /// Note: this latency is computed on access, it does not use the time when the [`Fetch`]
569    /// was marked as (completed)[`Fetch::complete`].
570    fn latency(&self) -> Option<Duration> {
571        // We're not interested in initial fetches. The latency on the first fetch
572        // has no meaning about how long it takes for an updated project config to be
573        // propagated to a Relay.
574        let is_first_fetch = self.fetch.revision().as_str().is_none();
575        if is_first_fetch {
576            return None;
577        }
578
579        let project_info = match &self.state {
580            SourceProjectState::New(ProjectState::Enabled(project_info)) => project_info,
581            // Not modified or deleted/disabled -> no latency to track.
582            //
583            // Currently we discard the last changed timestamp for disabled projects,
584            // it would be possible to do so and then also expose a latency for disabled projects.
585            _ => return None,
586        };
587
588        // A matching revision is not an update.
589        if project_info.rev == self.fetch.revision {
590            return None;
591        }
592
593        let elapsed = chrono::Utc::now() - project_info.last_change?;
594        elapsed.to_std().ok()
595    }
596
597    /// Returns `true` if the fetch completed with a pending status.
598    fn is_pending(&self) -> bool {
599        match &self.state {
600            SourceProjectState::New(state) => state.is_pending(),
601            SourceProjectState::NotModified => false,
602        }
603    }
604}
605
606/// The state of a project contained in the [`Shared`] project cache.
607///
608/// This state is interior mutable and allows updates to the project.
609#[derive(Debug, Default, Clone)]
610struct SharedProjectState(Arc<ArcSwap<SharedProjectStateInner>>);
611
612impl SharedProjectState {
613    /// Updates the project state.
614    fn set_project_state(&self, state: ProjectState) {
615        let prev = self.0.rcu(|stored| SharedProjectStateInner {
616            state: state.clone(),
617            rate_limits: Arc::clone(&stored.rate_limits),
618            reservoir_counters: Arc::clone(&stored.reservoir_counters),
619            notify: Arc::clone(&stored.notify),
620        });
621
622        // Try clean expired reservoir counters.
623        //
624        // We do it after the `rcu`, to not re-run this more often than necessary.
625        if let Some(state) = state.enabled() {
626            let config = state.config.sampling.as_ref();
627            if let Some(config) = config.and_then(|eb| eb.as_ref().ok()) {
628                // We can safely use previous here, the `rcu` just replaced the state, the
629                // reservoir counters did not change.
630                //
631                // `try_lock` to not potentially block, it's a best effort cleanup.
632                if let Ok(mut counters) = prev.reservoir_counters.try_lock() {
633                    counters.retain(|key, _| config.rules.iter().any(|rule| rule.id == *key));
634                }
635            }
636        }
637
638        // Finally, notify listeners:
639        prev.notify.notify_waiters();
640    }
641
642    /// Extracts and clones the revision from the contained project state.
643    fn revision(&self) -> Revision {
644        self.0.as_ref().load().state.revision().clone()
645    }
646
647    /// Transforms this interior mutable handle to an immutable [`SharedProject`].
648    fn to_shared_project(&self) -> SharedProject {
649        SharedProject(self.0.as_ref().load_full())
650    }
651}
652
653/// The data contained in a [`SharedProjectState`].
654///
655/// All fields must be cheap to clone and are ideally just a single `Arc`.
656/// Partial updates to [`SharedProjectState`], are performed using `rcu` cloning all fields.
657#[derive(Debug, Default)]
658struct SharedProjectStateInner {
659    state: ProjectState,
660    rate_limits: Arc<CachedRateLimits>,
661    reservoir_counters: ReservoirCounters,
662    notify: Arc<Notify>,
663}
664
665/// Current fetch state for a project.
666///
667/// ─────► Pending ◄─────┐
668///           │          │
669///           │          │Backoff
670///           ▼          │
671/// ┌───► InProgress ────┘
672/// │         │
673/// │         │
674/// │         ▼
675/// └───── Complete
676#[derive(Debug)]
677enum FetchState {
678    /// There is a fetch currently in progress.
679    InProgress {
680        /// Whether the current check in progress was triggered from a refresh.
681        ///
682        /// Triggering a non-refresh fetch while a refresh fetch is currently in progress,
683        /// will overwrite this property.
684        is_refresh: bool,
685    },
686    /// A successful fetch is pending.
687    ///
688    /// Projects which have not yet been fetched are in the pending state,
689    /// as well as projects which have a fetch in progress but were notified
690    /// from upstream that the project config is still pending.
691    ///
692    /// If the upstream notifies this instance about a pending config,
693    /// a backoff is applied, before trying again.
694    Pending {
695        /// Instant when the fetch was first initiated.
696        ///
697        /// A state may be transitioned multiple times from [`Self::Pending`] to [`Self::InProgress`]
698        /// and back to [`Self::Pending`]. This timestamp is the first time when the state
699        /// was transitioned from [`Self::Complete`] to [`Self::InProgress`].
700        ///
701        /// Only `None` on first fetch.
702        initiated: Option<Instant>,
703        /// Time when the next fetch should be attempted.
704        ///
705        /// `None` means soon as possible.
706        next_fetch_attempt: Option<Instant>,
707    },
708    /// There was a successful non-pending fetch.
709    Complete {
710        /// Time when the fetch was completed.
711        when: LastFetch,
712    },
713}
714
715/// Contains all mutable state necessary to maintain the project cache.
716struct PrivateProjectState {
717    /// Project key this state belongs to.
718    project_key: ProjectKey,
719
720    /// The current fetch state.
721    state: FetchState,
722    /// The current backoff used for calculating the next fetch attempt.
723    ///
724    /// The backoff is reset after a successful, non-pending fetch.
725    backoff: RetryBackoff,
726
727    /// The last time the state was successfully fetched.
728    ///
729    /// May be `None` when the state has never been successfully fetched.
730    ///
731    /// This is purely informational, all necessary information to make
732    /// state transitions is contained in [`FetchState`].
733    last_fetch: Option<Instant>,
734
735    /// The expiry time of this project.
736    ///
737    /// A refresh of the project, will not push the expiration time.
738    expiry: Option<Instant>,
739}
740
741impl PrivateProjectState {
742    fn new(project_key: ProjectKey, config: &Config) -> Self {
743        Self {
744            project_key,
745            state: FetchState::Pending {
746                initiated: None,
747                next_fetch_attempt: None,
748            },
749            backoff: RetryBackoff::new(config.max_retry_backoff),
750            last_fetch: None,
751            expiry: None,
752        }
753    }
754
755    fn try_begin_fetch(
756        &mut self,
757        now: Instant,
758        is_refresh: bool,
759        config: &Config,
760    ) -> Option<Fetch> {
761        let (initiated, when) = match &mut self.state {
762            FetchState::InProgress {
763                is_refresh: refresh_in_progress,
764            } => {
765                relay_log::trace!(
766                    tags.project_key = self.project_key.as_str(),
767                    "project fetch skipped, fetch in progress"
768                );
769                // Upgrade the refresh status if necessary.
770                *refresh_in_progress = *refresh_in_progress && is_refresh;
771                return None;
772            }
773            FetchState::Pending {
774                initiated,
775                next_fetch_attempt,
776            } => {
777                // Schedule a new fetch, even if there is a backoff, it will just be sleeping for a while.
778                (initiated.unwrap_or(now), *next_fetch_attempt)
779            }
780            FetchState::Complete { when } => {
781                // Sanity check to make sure timestamps do not drift.
782                debug_assert_eq!(Some(when.0), self.last_fetch);
783
784                if when.check_expiry(now, config).is_fresh() {
785                    // The current state is up to date, no need to start another fetch.
786                    relay_log::trace!(
787                        tags.project_key = self.project_key.as_str(),
788                        "project fetch skipped, already up to date"
789                    );
790                    return None;
791                }
792
793                (now, None)
794            }
795        };
796
797        // Mark a current fetch in progress.
798        self.state = FetchState::InProgress { is_refresh };
799
800        relay_log::trace!(
801            tags.project_key = &self.project_key.as_str(),
802            attempts = self.backoff.attempt() + 1,
803            "project state {} scheduled in {:?}",
804            if is_refresh { "refresh" } else { "fetch" },
805            when.unwrap_or(now).saturating_duration_since(now),
806        );
807
808        Some(Fetch {
809            project_key: self.project_key,
810            previous_fetch: self.last_fetch,
811            initiated,
812            when,
813            revision: Revision::default(),
814        })
815    }
816
817    fn complete_fetch(
818        &mut self,
819        fetch: &CompletedFetch,
820        now: Instant,
821        config: &Config,
822    ) -> FetchResult {
823        let FetchState::InProgress { is_refresh } = self.state else {
824            debug_assert!(
825                false,
826                "fetch completed while there was no current fetch registered"
827            );
828            // Be conservative in production.
829            return FetchResult::ReSchedule { refresh: false };
830        };
831
832        if fetch.is_pending() {
833            let next_backoff = self.backoff.next_backoff();
834            let next_fetch_attempt = match next_backoff.is_zero() {
835                false => now.checked_add(next_backoff),
836                true => None,
837            };
838            self.state = FetchState::Pending {
839                next_fetch_attempt,
840                initiated: Some(fetch.fetch.initiated),
841            };
842            relay_log::trace!(
843                tags.project_key = &self.project_key.as_str(),
844                "project state {} completed but still pending",
845                if is_refresh { "refresh" } else { "fetch" },
846            );
847
848            FetchResult::ReSchedule {
849                refresh: is_refresh,
850            }
851        } else {
852            relay_log::trace!(
853                tags.project_key = &self.project_key.as_str(),
854                "project state {} completed with non-pending config",
855                if is_refresh { "refresh" } else { "fetch" },
856            );
857
858            self.backoff.reset();
859            self.last_fetch = Some(now);
860
861            let when = LastFetch(now);
862
863            let refresh = when.refresh_time(config);
864            let expiry = match self.expiry {
865                Some(expiry) if is_refresh => ExpiryTime(expiry),
866                // Only bump/re-compute the expiry time if the fetch was not a refresh,
867                // to not keep refreshed projects forever in the cache.
868                Some(_) | None => when.expiry_time(config),
869            };
870            self.expiry = Some(expiry.0);
871
872            self.state = FetchState::Complete { when };
873            FetchResult::Done { expiry, refresh }
874        }
875    }
876}
877
878/// Result returned when completing a fetch.
879#[derive(Debug)]
880#[must_use = "fetch result must be used"]
881enum FetchResult {
882    /// Another fetch must be scheduled immediately.
883    ReSchedule {
884        /// Whether the fetch should be re-scheduled as a refresh.
885        refresh: bool,
886    },
887    /// The fetch is completed and should be registered for refresh and eviction.
888    Done {
889        /// When the project should be expired.
890        expiry: ExpiryTime,
891        /// When the project should be refreshed.
892        refresh: Option<RefreshTime>,
893    },
894}
895
896/// New type containing the last successful fetch time as an [`Instant`].
897#[derive(Debug, Copy, Clone)]
898struct LastFetch(Instant);
899
900impl LastFetch {
901    /// Returns the [`Expiry`] of the last fetch in relation to `now`.
902    fn check_expiry(&self, now: Instant, config: &Config) -> Expiry {
903        let elapsed = now.saturating_duration_since(self.0);
904
905        if elapsed >= config.expiry + config.grace_period {
906            Expiry::Expired
907        } else if elapsed >= config.expiry {
908            Expiry::Stale
909        } else {
910            Expiry::Fresh
911        }
912    }
913
914    /// Returns when the project needs to be queued for a refresh.
915    fn refresh_time(&self, config: &Config) -> Option<RefreshTime> {
916        config
917            .refresh_interval
918            .map(|duration| self.0 + duration)
919            .map(RefreshTime)
920    }
921
922    /// Returns when the project is based to expire based on the current [`LastFetch`].
923    fn expiry_time(&self, config: &Config) -> ExpiryTime {
924        ExpiryTime(self.0 + config.grace_period + config.expiry)
925    }
926}
927
928/// Expiry state of a project.
929#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
930enum Expiry {
931    /// The project state is perfectly up to date.
932    Fresh,
933    /// The project state is outdated but events depending on this project state can still be
934    /// processed. The state should be refreshed in the background though.
935    Stale,
936    /// The project state is completely outdated and events need to be buffered up until the new
937    /// state has been fetched.
938    Expired,
939}
940
941impl Expiry {
942    /// Returns `true` if the project is up-to-date and does not need to be fetched.
943    fn is_fresh(&self) -> bool {
944        matches!(self, Self::Fresh)
945    }
946}
947
948/// Instant when a project is scheduled for refresh.
949#[derive(Debug)]
950#[must_use = "an refresh time must be used to schedule a refresh"]
951struct RefreshTime(Instant);
952
953/// Instant when a project is scheduled for expiry.
954#[derive(Debug)]
955#[must_use = "an expiry time must be used to schedule an eviction"]
956struct ExpiryTime(Instant);
957
958#[cfg(test)]
959mod tests {
960    use std::time::Duration;
961
962    use super::*;
963
964    async fn collect_evicted(store: &mut ProjectStore) -> Vec<ProjectKey> {
965        let mut evicted = Vec::new();
966        // Small timeout to really only get what is ready to be evicted right now.
967        while let Ok(Some(Action::Eviction(eviction))) =
968            tokio::time::timeout(Duration::from_nanos(5), store.poll()).await
969        {
970            evicted.push(eviction.0);
971            store.evict(eviction);
972        }
973        evicted
974    }
975
976    macro_rules! assert_state {
977        ($store:ident, $project_key:ident, $state:pat) => {
978            assert!(matches!(
979                $store.shared().get_or_create($project_key).project_state(),
980                $state
981            ));
982        };
983    }
984
985    #[tokio::test(start_paused = true)]
986    async fn test_store_fetch() {
987        let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
988        let mut store = ProjectStore::new(&Default::default());
989
990        let fetch = store.try_begin_fetch(project_key).unwrap();
991        assert_eq!(fetch.project_key(), project_key);
992        assert_eq!(fetch.when(), None);
993        assert_eq!(fetch.revision().as_str(), None);
994        assert_state!(store, project_key, ProjectState::Pending);
995
996        // Fetch already in progress, nothing to do.
997        assert!(store.try_begin_fetch(project_key).is_none());
998
999        // A pending fetch should trigger a new fetch immediately.
1000        let fetch = fetch.complete(ProjectState::Pending.into());
1001        let fetch = store.complete_fetch(fetch).unwrap();
1002        assert_eq!(fetch.project_key(), project_key);
1003        // First backoff is still immediately.
1004        assert_eq!(fetch.when(), None);
1005        assert_eq!(fetch.revision().as_str(), None);
1006        assert_state!(store, project_key, ProjectState::Pending);
1007
1008        // Pending again.
1009        let fetch = fetch.complete(ProjectState::Pending.into());
1010        let fetch = store.complete_fetch(fetch).unwrap();
1011        assert_eq!(fetch.project_key(), project_key);
1012        // This time it needs to be in the future (backoff).
1013        assert!(fetch.when() > Some(Instant::now()));
1014        assert_eq!(fetch.revision().as_str(), None);
1015        assert_state!(store, project_key, ProjectState::Pending);
1016
1017        // Now complete with disabled.
1018        let fetch = fetch.complete(ProjectState::Disabled.into());
1019        assert!(store.complete_fetch(fetch).is_none());
1020        assert_state!(store, project_key, ProjectState::Disabled);
1021
1022        // A new fetch is not yet necessary.
1023        assert!(store.try_begin_fetch(project_key).is_none());
1024    }
1025
1026    #[tokio::test(start_paused = true)]
1027    async fn test_store_fetch_pending_does_not_replace_state() {
1028        let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1029        let mut store = ProjectStore::new(
1030            &relay_config::Config::from_json_value(serde_json::json!({
1031                "cache": {
1032                    "project_expiry": 5,
1033                    "project_grace_period": 5,
1034                }
1035            }))
1036            .unwrap(),
1037        );
1038
1039        let fetch = store.try_begin_fetch(project_key).unwrap();
1040        let fetch = fetch.complete(ProjectState::Disabled.into());
1041        assert!(store.complete_fetch(fetch).is_none());
1042        assert_state!(store, project_key, ProjectState::Disabled);
1043
1044        tokio::time::advance(Duration::from_secs(6)).await;
1045
1046        let fetch = store.try_begin_fetch(project_key).unwrap();
1047        let fetch = fetch.complete(ProjectState::Pending.into());
1048        // We're returned a new fetch, because the current one completed pending.
1049        let fetch = store.complete_fetch(fetch).unwrap();
1050        // The old cached state is still available and not replaced.
1051        assert_state!(store, project_key, ProjectState::Disabled);
1052
1053        let fetch = fetch.complete(ProjectState::new_allowed().into());
1054        assert!(store.complete_fetch(fetch).is_none());
1055        assert_state!(store, project_key, ProjectState::Enabled(_));
1056    }
1057
1058    #[tokio::test(start_paused = true)]
1059    async fn test_store_evict_projects() {
1060        let project_key1 = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1061        let project_key2 = ProjectKey::parse("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb").unwrap();
1062        let mut store = ProjectStore::new(
1063            &relay_config::Config::from_json_value(serde_json::json!({
1064                "cache": {
1065                    "project_expiry": 5,
1066                    "project_grace_period": 0,
1067                }
1068            }))
1069            .unwrap(),
1070        );
1071
1072        let fetch = store.try_begin_fetch(project_key1).unwrap();
1073        let fetch = fetch.complete(ProjectState::Disabled.into());
1074        assert!(store.complete_fetch(fetch).is_none());
1075
1076        assert_eq!(collect_evicted(&mut store).await, Vec::new());
1077        assert_state!(store, project_key1, ProjectState::Disabled);
1078
1079        // 3 seconds is not enough to expire any project.
1080        tokio::time::advance(Duration::from_secs(3)).await;
1081
1082        assert_eq!(collect_evicted(&mut store).await, Vec::new());
1083        assert_state!(store, project_key1, ProjectState::Disabled);
1084
1085        let fetch = store.try_begin_fetch(project_key2).unwrap();
1086        let fetch = fetch.complete(ProjectState::Disabled.into());
1087        assert!(store.complete_fetch(fetch).is_none());
1088
1089        // A total of 6 seconds should expire the first project.
1090        tokio::time::advance(Duration::from_secs(3)).await;
1091
1092        assert_eq!(collect_evicted(&mut store).await, vec![project_key1]);
1093        assert_state!(store, project_key1, ProjectState::Pending);
1094        assert_state!(store, project_key2, ProjectState::Disabled);
1095    }
1096
1097    #[tokio::test(start_paused = true)]
1098    async fn test_store_evict_projects_pending_not_expired() {
1099        let project_key1 = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1100        let project_key2 = ProjectKey::parse("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb").unwrap();
1101        let mut store = ProjectStore::new(
1102            &relay_config::Config::from_json_value(serde_json::json!({
1103                "cache": {
1104                    "project_expiry": 5,
1105                    "project_grace_period": 0,
1106                }
1107            }))
1108            .unwrap(),
1109        );
1110
1111        let fetch = store.try_begin_fetch(project_key1).unwrap();
1112        // Create a new project in a pending state, but never fetch it, this should also never expire.
1113        store.shared().get_or_create(project_key2);
1114
1115        tokio::time::advance(Duration::from_secs(6)).await;
1116
1117        // No evictions, project is pending.
1118        assert_eq!(collect_evicted(&mut store).await, Vec::new());
1119
1120        // Complete the project.
1121        let fetch = fetch.complete(ProjectState::Disabled.into());
1122        assert!(store.complete_fetch(fetch).is_none());
1123
1124        // Still should not be evicted, because we do have 5 seconds to expire since completion.
1125        assert_eq!(collect_evicted(&mut store).await, Vec::new());
1126        tokio::time::advance(Duration::from_secs(4)).await;
1127        assert_eq!(collect_evicted(&mut store).await, Vec::new());
1128        assert_state!(store, project_key1, ProjectState::Disabled);
1129
1130        // Just enough to expire the project.
1131        tokio::time::advance(Duration::from_millis(1001)).await;
1132        assert_eq!(collect_evicted(&mut store).await, vec![project_key1]);
1133        assert_state!(store, project_key1, ProjectState::Pending);
1134        assert_state!(store, project_key2, ProjectState::Pending);
1135    }
1136
1137    #[tokio::test(start_paused = true)]
1138    async fn test_store_evict_projects_stale() {
1139        let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1140        let mut store = ProjectStore::new(
1141            &relay_config::Config::from_json_value(serde_json::json!({
1142                "cache": {
1143                    "project_expiry": 5,
1144                    "project_grace_period": 5,
1145                }
1146            }))
1147            .unwrap(),
1148        );
1149
1150        let fetch = store.try_begin_fetch(project_key).unwrap();
1151        let fetch = fetch.complete(ProjectState::Disabled.into());
1152        assert!(store.complete_fetch(fetch).is_none());
1153
1154        // This is in the grace period, but not yet expired.
1155        tokio::time::advance(Duration::from_millis(9500)).await;
1156
1157        assert_eq!(collect_evicted(&mut store).await, Vec::new());
1158        assert_state!(store, project_key, ProjectState::Disabled);
1159
1160        // Now it's expired.
1161        tokio::time::advance(Duration::from_secs(1)).await;
1162
1163        assert_eq!(collect_evicted(&mut store).await, vec![project_key]);
1164        assert_state!(store, project_key, ProjectState::Pending);
1165    }
1166
1167    #[tokio::test(start_paused = true)]
1168    async fn test_store_no_eviction_during_fetch() {
1169        let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1170        let mut store = ProjectStore::new(
1171            &relay_config::Config::from_json_value(serde_json::json!({
1172                "cache": {
1173                    "project_expiry": 5,
1174                    "project_grace_period": 5,
1175                }
1176            }))
1177            .unwrap(),
1178        );
1179
1180        let fetch = store.try_begin_fetch(project_key).unwrap();
1181
1182        // Project is expired, but there is an ongoing fetch.
1183        tokio::time::advance(Duration::from_millis(10500)).await;
1184        // No evictions, there is a fetch ongoing!
1185        assert_eq!(collect_evicted(&mut store).await, Vec::new());
1186
1187        // Complete the project.
1188        let fetch = fetch.complete(ProjectState::Disabled.into());
1189        assert!(store.complete_fetch(fetch).is_none());
1190        // But start a new fetch asap (after grace period).
1191        tokio::time::advance(Duration::from_millis(5001)).await;
1192        let fetch = store.try_begin_fetch(project_key).unwrap();
1193
1194        // Again, expire the project.
1195        tokio::time::advance(Duration::from_millis(10500)).await;
1196        // No evictions, there is a fetch ongoing!
1197        assert_eq!(collect_evicted(&mut store).await, Vec::new());
1198
1199        // Complete the project.
1200        let fetch = fetch.complete(ProjectState::Disabled.into());
1201        assert!(store.complete_fetch(fetch).is_none());
1202
1203        // Not quite yet expired.
1204        tokio::time::advance(Duration::from_millis(9500)).await;
1205        assert_eq!(collect_evicted(&mut store).await, Vec::new());
1206        // Now it's expired.
1207        tokio::time::advance(Duration::from_millis(501)).await;
1208        assert_eq!(collect_evicted(&mut store).await, vec![project_key]);
1209        assert_state!(store, project_key, ProjectState::Pending);
1210    }
1211
1212    #[tokio::test(start_paused = true)]
1213    async fn test_store_refresh() {
1214        let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1215        let mut store = ProjectStore::new(
1216            &relay_config::Config::from_json_value(serde_json::json!({
1217                "cache": {
1218                    "project_expiry": 5,
1219                    "project_grace_period": 5,
1220                    "project_refresh_interval": 7,
1221                }
1222            }))
1223            .unwrap(),
1224        );
1225
1226        let fetch = store.try_begin_fetch(project_key).unwrap();
1227        let fetch = fetch.complete(ProjectState::Disabled.into());
1228        assert!(store.complete_fetch(fetch).is_none());
1229        assert_state!(store, project_key, ProjectState::Disabled);
1230
1231        // Wait for a refresh.
1232        let Some(Action::Refresh(refresh)) = store.poll().await else {
1233            panic!();
1234        };
1235        assert_eq!(refresh.project_key(), project_key);
1236
1237        let fetch = store.refresh(refresh).unwrap();
1238        // Upgrade the pending refresh fetch to a non-refresh fetch.
1239        assert!(store.try_begin_fetch(project_key).is_none());
1240        let fetch = fetch.complete(ProjectState::Disabled.into());
1241        assert!(store.complete_fetch(fetch).is_none());
1242
1243        // Since the previous refresh has been upgraded to a proper fetch.
1244        // Expiration has been rescheduled and a new refresh is planned to happen in 7 seconds from
1245        // now.
1246        let Some(Action::Refresh(refresh)) = store.poll().await else {
1247            panic!();
1248        };
1249        let fetch = store.refresh(refresh).unwrap();
1250        let fetch = fetch.complete(ProjectState::Disabled.into());
1251        assert!(store.complete_fetch(fetch).is_none());
1252
1253        // At this point the refresh is through, but expiration is around the corner.
1254        // Because the refresh doesn't bump the expiration deadline.
1255        let Some(Action::Eviction(eviction)) = store.poll().await else {
1256            panic!();
1257        };
1258        assert_eq!(eviction.project_key(), project_key);
1259    }
1260
1261    #[tokio::test(start_paused = true)]
1262    async fn test_store_refresh_overtaken_by_eviction() {
1263        let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1264        let mut store = ProjectStore::new(
1265            &relay_config::Config::from_json_value(serde_json::json!({
1266                "cache": {
1267                    "project_expiry": 5,
1268                    "project_grace_period": 5,
1269                    "project_refresh_interval": 7,
1270                }
1271            }))
1272            .unwrap(),
1273        );
1274
1275        let fetch = store.try_begin_fetch(project_key).unwrap();
1276        let fetch = fetch.complete(ProjectState::Disabled.into());
1277        assert!(store.complete_fetch(fetch).is_none());
1278        assert_state!(store, project_key, ProjectState::Disabled);
1279
1280        // Move way past the expiration time.
1281        tokio::time::advance(Duration::from_secs(20)).await;
1282
1283        // The eviction should be prioritized, there is no reason to refresh an already evicted
1284        // project.
1285        let Some(Action::Eviction(eviction)) = store.poll().await else {
1286            panic!();
1287        };
1288        assert_eq!(eviction.project_key(), project_key);
1289        store.evict(eviction);
1290
1291        // Make sure there is not another refresh queued.
1292        // This would not technically be necessary because refresh code must be able to handle
1293        // refreshes for non-fetched projects, but the current implementation should enforce this.
1294        assert!(
1295            tokio::time::timeout(Duration::from_secs(60), store.poll())
1296                .await
1297                .is_err()
1298        );
1299    }
1300
1301    #[tokio::test(start_paused = true)]
1302    async fn test_store_refresh_during_eviction() {
1303        let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1304        let mut store = ProjectStore::new(
1305            &relay_config::Config::from_json_value(serde_json::json!({
1306                "cache": {
1307                    "project_expiry": 5,
1308                    "project_grace_period": 5,
1309                    "project_refresh_interval": 7,
1310                }
1311            }))
1312            .unwrap(),
1313        );
1314
1315        let fetch = store.try_begin_fetch(project_key).unwrap();
1316        let fetch = fetch.complete(ProjectState::Disabled.into());
1317        assert!(store.complete_fetch(fetch).is_none());
1318        assert_state!(store, project_key, ProjectState::Disabled);
1319
1320        // Move way past the expiration time.
1321        tokio::time::advance(Duration::from_secs(20)).await;
1322
1323        // Poll both the eviction and refresh token, while a proper implementation should prevent
1324        // this, it's a good way to test that a refresh for an evicted project does not fetch the
1325        // project.
1326        let Some(Action::Eviction(eviction)) = store.poll().await else {
1327            panic!();
1328        };
1329        let Some(Action::Refresh(refresh)) = store.poll().await else {
1330            panic!();
1331        };
1332        assert_eq!(eviction.project_key(), project_key);
1333        assert_eq!(refresh.project_key(), project_key);
1334        store.evict(eviction);
1335
1336        assert!(store.refresh(refresh).is_none());
1337    }
1338
1339    #[tokio::test(start_paused = true)]
1340    async fn test_ready_state() {
1341        let shared = SharedProjectState::default();
1342
1343        let shared_project = shared.to_shared_project();
1344        assert!(shared_project.project_state().is_pending());
1345        let mut listener = std::pin::pin!(shared_project.outdated());
1346
1347        // After five seconds, project state is still pending:
1348        let result = tokio::time::timeout(Duration::from_secs(5), listener.as_mut()).await;
1349        assert!(result.is_err()); // timed out before notify
1350        assert!(shared.to_shared_project().project_state().is_pending());
1351
1352        // Change the state:
1353        shared.set_project_state(ProjectState::Disabled);
1354
1355        // The listener gets notified immediately:
1356        let result = tokio::time::timeout(Duration::from_secs(1), listener).await;
1357        assert!(result.is_ok()); // notified before timeout
1358
1359        // The old snapshot is still pending:
1360        assert!(shared_project.project_state().is_pending());
1361
1362        // The up-to-date snapshot is Disabled:
1363        assert!(matches!(
1364            shared.to_shared_project().project_state(),
1365            &ProjectState::Disabled
1366        ));
1367    }
1368}