1use futures::StreamExt;
2use std::fmt;
3use std::sync::Arc;
4use std::time::Duration;
5use tokio::sync::Notify;
6use tokio::sync::futures::Notified;
7use tokio::time::Instant;
8
9use arc_swap::ArcSwap;
10use relay_base_schema::project::ProjectKey;
11use relay_quotas::CachedRateLimits;
12use relay_sampling::evaluation::ReservoirCounters;
13use relay_statsd::metric;
14
15use crate::services::projects::project::{ProjectState, Revision};
16use crate::services::projects::source::SourceProjectState;
17use crate::statsd::{RelayDistributions, RelayTimers};
18use crate::utils::{RetryBackoff, UniqueScheduledQueue};
19
20pub struct ProjectStore {
31 config: Config,
32 shared: Arc<Shared>,
34 private: hashbrown::HashMap<ProjectKey, PrivateProjectState>,
36 evictions: UniqueScheduledQueue<ProjectKey>,
38 refreshes: UniqueScheduledQueue<ProjectKey>,
40}
41
42impl ProjectStore {
43 pub fn new(config: &relay_config::Config) -> Self {
44 Self {
45 config: Config::new(config),
46 shared: Default::default(),
47 private: Default::default(),
48 evictions: Default::default(),
49 refreshes: Default::default(),
50 }
51 }
52
53 pub fn shared(&self) -> Arc<Shared> {
55 Arc::clone(&self.shared)
56 }
57
58 pub fn try_begin_fetch(&mut self, project_key: ProjectKey) -> Option<Fetch> {
64 self.do_try_begin_fetch(project_key, false)
65 }
66
67 #[must_use = "an incomplete fetch must be retried"]
72 pub fn complete_fetch(&mut self, fetch: CompletedFetch) -> Option<Fetch> {
73 let project_key = fetch.project_key();
74
75 debug_assert!(self.shared.projects.pin().get(&project_key).is_some());
78 debug_assert!(self.private.get(&project_key).is_some());
79
80 let mut project = self.get_or_create(project_key);
81 let new_fetch = match project.complete_fetch(fetch) {
84 FetchResult::ReSchedule { refresh } => project.try_begin_fetch(refresh),
85 FetchResult::Done { expiry, refresh } => {
86 self.evictions.schedule(expiry.0, project_key);
87 if let Some(RefreshTime(refresh)) = refresh {
88 self.refreshes.schedule(refresh, project_key);
89 }
90 None
91 }
92 };
93
94 metric!(
95 distribution(RelayDistributions::ProjectStateCacheSize) =
96 self.shared.projects.len() as u64,
97 storage = "shared"
98 );
99 metric!(
100 distribution(RelayDistributions::ProjectStateCacheSize) = self.private.len() as u64,
101 storage = "private"
102 );
103
104 new_fetch
105 }
106
107 pub async fn poll(&mut self) -> Option<Action> {
114 let eviction = self.evictions.next();
115 let refresh = self.refreshes.next();
116
117 tokio::select! {
118 biased;
119
120 Some(e) = eviction => Some(Action::Eviction(Eviction(e))),
121 Some(r) = refresh => Some(Action::Refresh(Refresh(r))),
122 else => None,
123 }
124 }
125
126 pub fn refresh(&mut self, Refresh(project_key): Refresh) -> Option<Fetch> {
134 self.do_try_begin_fetch(project_key, true)
135 }
136
137 pub fn evict(&mut self, Eviction(project_key): Eviction) {
139 let Some(private) = self.private.remove(&project_key) else {
141 debug_assert!(false, "no private state for eviction");
143 return;
144 };
145
146 debug_assert!(
147 matches!(private.state, FetchState::Complete { .. }),
148 "private state must be completed"
149 );
150
151 let shared = self.shared.projects.pin();
153 let _removed = shared.remove(&project_key);
154 debug_assert!(
155 _removed.is_some(),
156 "an expired project must exist in the shared state"
157 );
158
159 self.refreshes.remove(&project_key);
162 }
163
164 fn do_try_begin_fetch(&mut self, project_key: ProjectKey, is_refresh: bool) -> Option<Fetch> {
167 let fetch = match is_refresh {
168 true => self.get(project_key)?,
174 false => self.get_or_create(project_key),
175 }
176 .try_begin_fetch(is_refresh);
177
178 if fetch.is_some() {
181 self.evictions.remove(&project_key);
182 }
185
186 fetch
187 }
188
189 fn get(&mut self, project_key: ProjectKey) -> Option<ProjectRef<'_>> {
193 let private = self.private.get_mut(&project_key)?;
194
195 debug_assert!(self.shared.projects.pin().contains_key(&project_key));
198
199 let shared = self
200 .shared
201 .projects
202 .pin()
203 .get_or_insert_with(project_key, Default::default)
204 .clone();
205
206 Some(ProjectRef {
207 private,
208 shared,
209 config: &self.config,
210 })
211 }
212
213 fn get_or_create(&mut self, project_key: ProjectKey) -> ProjectRef<'_> {
217 #[cfg(debug_assertions)]
218 if self.private.contains_key(&project_key) {
219 debug_assert!(self.shared.projects.pin().contains_key(&project_key));
225 }
226
227 let private = self
228 .private
229 .entry(project_key)
230 .or_insert_with(|| PrivateProjectState::new(project_key, &self.config));
231
232 let shared = self
233 .shared
234 .projects
235 .pin()
236 .get_or_insert_with(project_key, Default::default)
237 .clone();
238
239 ProjectRef {
240 private,
241 shared,
242 config: &self.config,
243 }
244 }
245}
246
247struct Config {
249 expiry: Duration,
253 grace_period: Duration,
258 refresh_interval: Option<Duration>,
265 max_retry_backoff: Duration,
267}
268
269impl Config {
270 fn new(config: &relay_config::Config) -> Self {
271 let expiry = config.project_cache_expiry();
272 let grace_period = config.project_grace_period();
273
274 let refresh_interval = config
278 .project_refresh_interval()
279 .filter(|rt| *rt < (expiry + grace_period))
280 .filter(|rt| *rt > expiry);
281
282 Self {
283 expiry: config.project_cache_expiry(),
284 grace_period: config.project_grace_period(),
285 refresh_interval,
286 max_retry_backoff: config.http_max_retry_interval(),
287 }
288 }
289}
290
291#[derive(Default)]
293pub struct Shared {
294 projects: papaya::HashMap<ProjectKey, SharedProjectState, ahash::RandomState>,
295}
296
297impl Shared {
298 pub fn get_or_create(&self, project_key: ProjectKey) -> SharedProject {
303 self.get_or_create_inner(project_key).to_shared_project()
304 }
305
306 fn get_or_create_inner(&self, project_key: ProjectKey) -> SharedProjectState {
307 let projects = self.projects.pin();
309 if let Some(project) = projects.get(&project_key) {
310 return project.clone();
311 }
312
313 match projects.try_insert(project_key, Default::default()) {
315 Ok(inserted) => inserted.clone(),
316 Err(occupied) => occupied.current.clone(),
317 }
318 }
319}
320
321#[cfg(test)]
323impl Shared {
324 pub fn test_set_project_state(&self, project_key: ProjectKey, state: ProjectState) {
328 self.projects
329 .pin()
330 .get_or_insert_with(project_key, Default::default)
331 .set_project_state(state);
332 }
333
334 pub fn test_has_project_created(&self, project_key: ProjectKey) -> bool {
336 self.projects.pin().contains_key(&project_key)
337 }
338}
339
340impl fmt::Debug for Shared {
341 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
342 f.debug_struct("Shared")
343 .field("num_projects", &self.projects.len())
344 .finish()
345 }
346}
347
348pub struct SharedProject(Arc<SharedProjectStateInner>);
350
351impl SharedProject {
352 pub fn project_state(&self) -> &ProjectState {
354 &self.0.state
355 }
356
357 pub fn cached_rate_limits(&self) -> &CachedRateLimits {
359 &self.0.rate_limits
365 }
366
367 pub fn reservoir_counters(&self) -> &ReservoirCounters {
369 &self.0.reservoir_counters
370 }
371
372 pub fn outdated(&self) -> Notified<'_> {
376 self.0.notify.notified()
377 }
378}
379
380#[cfg(test)]
382impl SharedProject {
383 pub fn for_test(state: ProjectState) -> Self {
385 Self(Arc::new(SharedProjectStateInner {
386 state,
387 ..Default::default()
388 }))
389 }
390}
391
392struct ProjectRef<'a> {
394 shared: SharedProjectState,
395 private: &'a mut PrivateProjectState,
396 config: &'a Config,
397}
398
399impl ProjectRef<'_> {
400 fn try_begin_fetch(&mut self, is_refresh: bool) -> Option<Fetch> {
401 let now = Instant::now();
402 self.private
403 .try_begin_fetch(now, is_refresh, self.config)
404 .map(|fetch| fetch.with_revision(self.shared.revision()))
405 }
406
407 fn complete_fetch(&mut self, fetch: CompletedFetch) -> FetchResult {
408 let now = Instant::now();
409
410 if let Some(latency) = fetch.latency() {
411 let delay = match fetch.delay() {
412 Some(delay) if delay.as_secs() <= 15 => "lte15s",
413 Some(delay) if delay.as_secs() <= 30 => "lte30s",
414 Some(delay) if delay.as_secs() <= 60 => "lte60s",
415 Some(delay) if delay.as_secs() <= 120 => "lte120",
416 Some(delay) if delay.as_secs() <= 300 => "lte300s",
417 Some(delay) if delay.as_secs() <= 600 => "lte600s",
418 Some(delay) if delay.as_secs() <= 1800 => "lte1800s",
419 Some(delay) if delay.as_secs() <= 3600 => "lte3600s",
420 Some(_) => "gt3600s",
421 None => "none",
422 };
423 metric!(
424 timer(RelayTimers::ProjectCacheUpdateLatency) = latency,
425 delay = delay
426 );
427 }
428
429 if !fetch.is_pending() {
430 let state = match fetch.state {
431 SourceProjectState::New(_) => "new",
432 SourceProjectState::NotModified => "not_modified",
433 };
434
435 metric!(
436 timer(RelayTimers::ProjectCacheFetchDuration) = fetch.duration(now),
437 state = state
438 );
439 }
440
441 let result = self.private.complete_fetch(&fetch, now, self.config);
443 match fetch.state {
444 SourceProjectState::New(state) if !state.is_pending() => {
447 self.shared.set_project_state(state);
448 }
449 _ => {}
450 }
451
452 result
453 }
454}
455
456pub enum Action {
457 Eviction(Eviction),
458 Refresh(Refresh),
459}
460
461#[derive(Debug)]
465#[must_use = "a refresh must be used"]
466pub struct Refresh(ProjectKey);
467
468impl Refresh {
469 pub fn project_key(&self) -> ProjectKey {
471 self.0
472 }
473}
474
475#[derive(Debug)]
479#[must_use = "an eviction must be used"]
480pub struct Eviction(ProjectKey);
481
482impl Eviction {
483 pub fn project_key(&self) -> ProjectKey {
485 self.0
486 }
487}
488
489#[must_use = "a fetch must be executed"]
493#[derive(Debug)]
494pub struct Fetch {
495 project_key: ProjectKey,
496 previous_fetch: Option<Instant>,
497 initiated: Instant,
498 when: Option<Instant>,
499 revision: Revision,
500}
501
502impl Fetch {
503 pub fn project_key(&self) -> ProjectKey {
505 self.project_key
506 }
507
508 pub fn when(&self) -> Option<Instant> {
513 self.when
514 }
515
516 pub fn revision(&self) -> Revision {
521 self.revision.clone()
522 }
523
524 pub fn complete(self, state: SourceProjectState) -> CompletedFetch {
526 CompletedFetch { fetch: self, state }
527 }
528
529 fn with_revision(mut self, revision: Revision) -> Self {
530 self.revision = revision;
531 self
532 }
533}
534
535#[must_use = "a completed fetch must be acted upon"]
537#[derive(Debug)]
538pub struct CompletedFetch {
539 fetch: Fetch,
540 state: SourceProjectState,
541}
542
543impl CompletedFetch {
544 pub fn project_key(&self) -> ProjectKey {
546 self.fetch.project_key()
547 }
548
549 fn delay(&self) -> Option<Duration> {
553 self.fetch
554 .previous_fetch
555 .map(|pf| self.fetch.initiated.duration_since(pf))
556 }
557
558 fn duration(&self, now: Instant) -> Duration {
560 now.duration_since(self.fetch.initiated)
561 }
562
563 fn latency(&self) -> Option<Duration> {
571 let is_first_fetch = self.fetch.revision().as_str().is_none();
575 if is_first_fetch {
576 return None;
577 }
578
579 let project_info = match &self.state {
580 SourceProjectState::New(ProjectState::Enabled(project_info)) => project_info,
581 _ => return None,
586 };
587
588 if project_info.rev == self.fetch.revision {
590 return None;
591 }
592
593 let elapsed = chrono::Utc::now() - project_info.last_change?;
594 elapsed.to_std().ok()
595 }
596
597 fn is_pending(&self) -> bool {
599 match &self.state {
600 SourceProjectState::New(state) => state.is_pending(),
601 SourceProjectState::NotModified => false,
602 }
603 }
604}
605
606#[derive(Debug, Default, Clone)]
610struct SharedProjectState(Arc<ArcSwap<SharedProjectStateInner>>);
611
612impl SharedProjectState {
613 fn set_project_state(&self, state: ProjectState) {
615 let prev = self.0.rcu(|stored| SharedProjectStateInner {
616 state: state.clone(),
617 rate_limits: Arc::clone(&stored.rate_limits),
618 reservoir_counters: Arc::clone(&stored.reservoir_counters),
619 notify: Arc::clone(&stored.notify),
620 });
621
622 if let Some(state) = state.enabled() {
626 let config = state.config.sampling.as_ref();
627 if let Some(config) = config.and_then(|eb| eb.as_ref().ok()) {
628 if let Ok(mut counters) = prev.reservoir_counters.try_lock() {
633 counters.retain(|key, _| config.rules.iter().any(|rule| rule.id == *key));
634 }
635 }
636 }
637
638 prev.notify.notify_waiters();
640 }
641
642 fn revision(&self) -> Revision {
644 self.0.as_ref().load().state.revision().clone()
645 }
646
647 fn to_shared_project(&self) -> SharedProject {
649 SharedProject(self.0.as_ref().load_full())
650 }
651}
652
653#[derive(Debug, Default)]
658struct SharedProjectStateInner {
659 state: ProjectState,
660 rate_limits: Arc<CachedRateLimits>,
661 reservoir_counters: ReservoirCounters,
662 notify: Arc<Notify>,
663}
664
665#[derive(Debug)]
677enum FetchState {
678 InProgress {
680 is_refresh: bool,
685 },
686 Pending {
695 initiated: Option<Instant>,
703 next_fetch_attempt: Option<Instant>,
707 },
708 Complete {
710 when: LastFetch,
712 },
713}
714
715struct PrivateProjectState {
717 project_key: ProjectKey,
719
720 state: FetchState,
722 backoff: RetryBackoff,
726
727 last_fetch: Option<Instant>,
734
735 expiry: Option<Instant>,
739}
740
741impl PrivateProjectState {
742 fn new(project_key: ProjectKey, config: &Config) -> Self {
743 Self {
744 project_key,
745 state: FetchState::Pending {
746 initiated: None,
747 next_fetch_attempt: None,
748 },
749 backoff: RetryBackoff::new(config.max_retry_backoff),
750 last_fetch: None,
751 expiry: None,
752 }
753 }
754
755 fn try_begin_fetch(
756 &mut self,
757 now: Instant,
758 is_refresh: bool,
759 config: &Config,
760 ) -> Option<Fetch> {
761 let (initiated, when) = match &mut self.state {
762 FetchState::InProgress {
763 is_refresh: refresh_in_progress,
764 } => {
765 relay_log::trace!(
766 tags.project_key = self.project_key.as_str(),
767 "project fetch skipped, fetch in progress"
768 );
769 *refresh_in_progress = *refresh_in_progress && is_refresh;
771 return None;
772 }
773 FetchState::Pending {
774 initiated,
775 next_fetch_attempt,
776 } => {
777 (initiated.unwrap_or(now), *next_fetch_attempt)
779 }
780 FetchState::Complete { when } => {
781 debug_assert_eq!(Some(when.0), self.last_fetch);
783
784 if when.check_expiry(now, config).is_fresh() {
785 relay_log::trace!(
787 tags.project_key = self.project_key.as_str(),
788 "project fetch skipped, already up to date"
789 );
790 return None;
791 }
792
793 (now, None)
794 }
795 };
796
797 self.state = FetchState::InProgress { is_refresh };
799
800 relay_log::trace!(
801 tags.project_key = &self.project_key.as_str(),
802 attempts = self.backoff.attempt() + 1,
803 "project state {} scheduled in {:?}",
804 if is_refresh { "refresh" } else { "fetch" },
805 when.unwrap_or(now).saturating_duration_since(now),
806 );
807
808 Some(Fetch {
809 project_key: self.project_key,
810 previous_fetch: self.last_fetch,
811 initiated,
812 when,
813 revision: Revision::default(),
814 })
815 }
816
817 fn complete_fetch(
818 &mut self,
819 fetch: &CompletedFetch,
820 now: Instant,
821 config: &Config,
822 ) -> FetchResult {
823 let FetchState::InProgress { is_refresh } = self.state else {
824 debug_assert!(
825 false,
826 "fetch completed while there was no current fetch registered"
827 );
828 return FetchResult::ReSchedule { refresh: false };
830 };
831
832 if fetch.is_pending() {
833 let next_backoff = self.backoff.next_backoff();
834 let next_fetch_attempt = match next_backoff.is_zero() {
835 false => now.checked_add(next_backoff),
836 true => None,
837 };
838 self.state = FetchState::Pending {
839 next_fetch_attempt,
840 initiated: Some(fetch.fetch.initiated),
841 };
842 relay_log::trace!(
843 tags.project_key = &self.project_key.as_str(),
844 "project state {} completed but still pending",
845 if is_refresh { "refresh" } else { "fetch" },
846 );
847
848 FetchResult::ReSchedule {
849 refresh: is_refresh,
850 }
851 } else {
852 relay_log::trace!(
853 tags.project_key = &self.project_key.as_str(),
854 "project state {} completed with non-pending config",
855 if is_refresh { "refresh" } else { "fetch" },
856 );
857
858 self.backoff.reset();
859 self.last_fetch = Some(now);
860
861 let when = LastFetch(now);
862
863 let refresh = when.refresh_time(config);
864 let expiry = match self.expiry {
865 Some(expiry) if is_refresh => ExpiryTime(expiry),
866 Some(_) | None => when.expiry_time(config),
869 };
870 self.expiry = Some(expiry.0);
871
872 self.state = FetchState::Complete { when };
873 FetchResult::Done { expiry, refresh }
874 }
875 }
876}
877
878#[derive(Debug)]
880#[must_use = "fetch result must be used"]
881enum FetchResult {
882 ReSchedule {
884 refresh: bool,
886 },
887 Done {
889 expiry: ExpiryTime,
891 refresh: Option<RefreshTime>,
893 },
894}
895
896#[derive(Debug, Copy, Clone)]
898struct LastFetch(Instant);
899
900impl LastFetch {
901 fn check_expiry(&self, now: Instant, config: &Config) -> Expiry {
903 let elapsed = now.saturating_duration_since(self.0);
904
905 if elapsed >= config.expiry + config.grace_period {
906 Expiry::Expired
907 } else if elapsed >= config.expiry {
908 Expiry::Stale
909 } else {
910 Expiry::Fresh
911 }
912 }
913
914 fn refresh_time(&self, config: &Config) -> Option<RefreshTime> {
916 config
917 .refresh_interval
918 .map(|duration| self.0 + duration)
919 .map(RefreshTime)
920 }
921
922 fn expiry_time(&self, config: &Config) -> ExpiryTime {
924 ExpiryTime(self.0 + config.grace_period + config.expiry)
925 }
926}
927
928#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
930enum Expiry {
931 Fresh,
933 Stale,
936 Expired,
939}
940
941impl Expiry {
942 fn is_fresh(&self) -> bool {
944 matches!(self, Self::Fresh)
945 }
946}
947
948#[derive(Debug)]
950#[must_use = "an refresh time must be used to schedule a refresh"]
951struct RefreshTime(Instant);
952
953#[derive(Debug)]
955#[must_use = "an expiry time must be used to schedule an eviction"]
956struct ExpiryTime(Instant);
957
958#[cfg(test)]
959mod tests {
960 use std::time::Duration;
961
962 use super::*;
963
964 async fn collect_evicted(store: &mut ProjectStore) -> Vec<ProjectKey> {
965 let mut evicted = Vec::new();
966 while let Ok(Some(Action::Eviction(eviction))) =
968 tokio::time::timeout(Duration::from_nanos(5), store.poll()).await
969 {
970 evicted.push(eviction.0);
971 store.evict(eviction);
972 }
973 evicted
974 }
975
976 macro_rules! assert_state {
977 ($store:ident, $project_key:ident, $state:pat) => {
978 assert!(matches!(
979 $store.shared().get_or_create($project_key).project_state(),
980 $state
981 ));
982 };
983 }
984
985 #[tokio::test(start_paused = true)]
986 async fn test_store_fetch() {
987 let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
988 let mut store = ProjectStore::new(&Default::default());
989
990 let fetch = store.try_begin_fetch(project_key).unwrap();
991 assert_eq!(fetch.project_key(), project_key);
992 assert_eq!(fetch.when(), None);
993 assert_eq!(fetch.revision().as_str(), None);
994 assert_state!(store, project_key, ProjectState::Pending);
995
996 assert!(store.try_begin_fetch(project_key).is_none());
998
999 let fetch = fetch.complete(ProjectState::Pending.into());
1001 let fetch = store.complete_fetch(fetch).unwrap();
1002 assert_eq!(fetch.project_key(), project_key);
1003 assert_eq!(fetch.when(), None);
1005 assert_eq!(fetch.revision().as_str(), None);
1006 assert_state!(store, project_key, ProjectState::Pending);
1007
1008 let fetch = fetch.complete(ProjectState::Pending.into());
1010 let fetch = store.complete_fetch(fetch).unwrap();
1011 assert_eq!(fetch.project_key(), project_key);
1012 assert!(fetch.when() > Some(Instant::now()));
1014 assert_eq!(fetch.revision().as_str(), None);
1015 assert_state!(store, project_key, ProjectState::Pending);
1016
1017 let fetch = fetch.complete(ProjectState::Disabled.into());
1019 assert!(store.complete_fetch(fetch).is_none());
1020 assert_state!(store, project_key, ProjectState::Disabled);
1021
1022 assert!(store.try_begin_fetch(project_key).is_none());
1024 }
1025
1026 #[tokio::test(start_paused = true)]
1027 async fn test_store_fetch_pending_does_not_replace_state() {
1028 let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1029 let mut store = ProjectStore::new(
1030 &relay_config::Config::from_json_value(serde_json::json!({
1031 "cache": {
1032 "project_expiry": 5,
1033 "project_grace_period": 5,
1034 }
1035 }))
1036 .unwrap(),
1037 );
1038
1039 let fetch = store.try_begin_fetch(project_key).unwrap();
1040 let fetch = fetch.complete(ProjectState::Disabled.into());
1041 assert!(store.complete_fetch(fetch).is_none());
1042 assert_state!(store, project_key, ProjectState::Disabled);
1043
1044 tokio::time::advance(Duration::from_secs(6)).await;
1045
1046 let fetch = store.try_begin_fetch(project_key).unwrap();
1047 let fetch = fetch.complete(ProjectState::Pending.into());
1048 let fetch = store.complete_fetch(fetch).unwrap();
1050 assert_state!(store, project_key, ProjectState::Disabled);
1052
1053 let fetch = fetch.complete(ProjectState::new_allowed().into());
1054 assert!(store.complete_fetch(fetch).is_none());
1055 assert_state!(store, project_key, ProjectState::Enabled(_));
1056 }
1057
1058 #[tokio::test(start_paused = true)]
1059 async fn test_store_evict_projects() {
1060 let project_key1 = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1061 let project_key2 = ProjectKey::parse("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb").unwrap();
1062 let mut store = ProjectStore::new(
1063 &relay_config::Config::from_json_value(serde_json::json!({
1064 "cache": {
1065 "project_expiry": 5,
1066 "project_grace_period": 0,
1067 }
1068 }))
1069 .unwrap(),
1070 );
1071
1072 let fetch = store.try_begin_fetch(project_key1).unwrap();
1073 let fetch = fetch.complete(ProjectState::Disabled.into());
1074 assert!(store.complete_fetch(fetch).is_none());
1075
1076 assert_eq!(collect_evicted(&mut store).await, Vec::new());
1077 assert_state!(store, project_key1, ProjectState::Disabled);
1078
1079 tokio::time::advance(Duration::from_secs(3)).await;
1081
1082 assert_eq!(collect_evicted(&mut store).await, Vec::new());
1083 assert_state!(store, project_key1, ProjectState::Disabled);
1084
1085 let fetch = store.try_begin_fetch(project_key2).unwrap();
1086 let fetch = fetch.complete(ProjectState::Disabled.into());
1087 assert!(store.complete_fetch(fetch).is_none());
1088
1089 tokio::time::advance(Duration::from_secs(3)).await;
1091
1092 assert_eq!(collect_evicted(&mut store).await, vec![project_key1]);
1093 assert_state!(store, project_key1, ProjectState::Pending);
1094 assert_state!(store, project_key2, ProjectState::Disabled);
1095 }
1096
1097 #[tokio::test(start_paused = true)]
1098 async fn test_store_evict_projects_pending_not_expired() {
1099 let project_key1 = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1100 let project_key2 = ProjectKey::parse("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb").unwrap();
1101 let mut store = ProjectStore::new(
1102 &relay_config::Config::from_json_value(serde_json::json!({
1103 "cache": {
1104 "project_expiry": 5,
1105 "project_grace_period": 0,
1106 }
1107 }))
1108 .unwrap(),
1109 );
1110
1111 let fetch = store.try_begin_fetch(project_key1).unwrap();
1112 store.shared().get_or_create(project_key2);
1114
1115 tokio::time::advance(Duration::from_secs(6)).await;
1116
1117 assert_eq!(collect_evicted(&mut store).await, Vec::new());
1119
1120 let fetch = fetch.complete(ProjectState::Disabled.into());
1122 assert!(store.complete_fetch(fetch).is_none());
1123
1124 assert_eq!(collect_evicted(&mut store).await, Vec::new());
1126 tokio::time::advance(Duration::from_secs(4)).await;
1127 assert_eq!(collect_evicted(&mut store).await, Vec::new());
1128 assert_state!(store, project_key1, ProjectState::Disabled);
1129
1130 tokio::time::advance(Duration::from_millis(1001)).await;
1132 assert_eq!(collect_evicted(&mut store).await, vec![project_key1]);
1133 assert_state!(store, project_key1, ProjectState::Pending);
1134 assert_state!(store, project_key2, ProjectState::Pending);
1135 }
1136
1137 #[tokio::test(start_paused = true)]
1138 async fn test_store_evict_projects_stale() {
1139 let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1140 let mut store = ProjectStore::new(
1141 &relay_config::Config::from_json_value(serde_json::json!({
1142 "cache": {
1143 "project_expiry": 5,
1144 "project_grace_period": 5,
1145 }
1146 }))
1147 .unwrap(),
1148 );
1149
1150 let fetch = store.try_begin_fetch(project_key).unwrap();
1151 let fetch = fetch.complete(ProjectState::Disabled.into());
1152 assert!(store.complete_fetch(fetch).is_none());
1153
1154 tokio::time::advance(Duration::from_millis(9500)).await;
1156
1157 assert_eq!(collect_evicted(&mut store).await, Vec::new());
1158 assert_state!(store, project_key, ProjectState::Disabled);
1159
1160 tokio::time::advance(Duration::from_secs(1)).await;
1162
1163 assert_eq!(collect_evicted(&mut store).await, vec![project_key]);
1164 assert_state!(store, project_key, ProjectState::Pending);
1165 }
1166
1167 #[tokio::test(start_paused = true)]
1168 async fn test_store_no_eviction_during_fetch() {
1169 let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1170 let mut store = ProjectStore::new(
1171 &relay_config::Config::from_json_value(serde_json::json!({
1172 "cache": {
1173 "project_expiry": 5,
1174 "project_grace_period": 5,
1175 }
1176 }))
1177 .unwrap(),
1178 );
1179
1180 let fetch = store.try_begin_fetch(project_key).unwrap();
1181
1182 tokio::time::advance(Duration::from_millis(10500)).await;
1184 assert_eq!(collect_evicted(&mut store).await, Vec::new());
1186
1187 let fetch = fetch.complete(ProjectState::Disabled.into());
1189 assert!(store.complete_fetch(fetch).is_none());
1190 tokio::time::advance(Duration::from_millis(5001)).await;
1192 let fetch = store.try_begin_fetch(project_key).unwrap();
1193
1194 tokio::time::advance(Duration::from_millis(10500)).await;
1196 assert_eq!(collect_evicted(&mut store).await, Vec::new());
1198
1199 let fetch = fetch.complete(ProjectState::Disabled.into());
1201 assert!(store.complete_fetch(fetch).is_none());
1202
1203 tokio::time::advance(Duration::from_millis(9500)).await;
1205 assert_eq!(collect_evicted(&mut store).await, Vec::new());
1206 tokio::time::advance(Duration::from_millis(501)).await;
1208 assert_eq!(collect_evicted(&mut store).await, vec![project_key]);
1209 assert_state!(store, project_key, ProjectState::Pending);
1210 }
1211
1212 #[tokio::test(start_paused = true)]
1213 async fn test_store_refresh() {
1214 let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1215 let mut store = ProjectStore::new(
1216 &relay_config::Config::from_json_value(serde_json::json!({
1217 "cache": {
1218 "project_expiry": 5,
1219 "project_grace_period": 5,
1220 "project_refresh_interval": 7,
1221 }
1222 }))
1223 .unwrap(),
1224 );
1225
1226 let fetch = store.try_begin_fetch(project_key).unwrap();
1227 let fetch = fetch.complete(ProjectState::Disabled.into());
1228 assert!(store.complete_fetch(fetch).is_none());
1229 assert_state!(store, project_key, ProjectState::Disabled);
1230
1231 let Some(Action::Refresh(refresh)) = store.poll().await else {
1233 panic!();
1234 };
1235 assert_eq!(refresh.project_key(), project_key);
1236
1237 let fetch = store.refresh(refresh).unwrap();
1238 assert!(store.try_begin_fetch(project_key).is_none());
1240 let fetch = fetch.complete(ProjectState::Disabled.into());
1241 assert!(store.complete_fetch(fetch).is_none());
1242
1243 let Some(Action::Refresh(refresh)) = store.poll().await else {
1247 panic!();
1248 };
1249 let fetch = store.refresh(refresh).unwrap();
1250 let fetch = fetch.complete(ProjectState::Disabled.into());
1251 assert!(store.complete_fetch(fetch).is_none());
1252
1253 let Some(Action::Eviction(eviction)) = store.poll().await else {
1256 panic!();
1257 };
1258 assert_eq!(eviction.project_key(), project_key);
1259 }
1260
1261 #[tokio::test(start_paused = true)]
1262 async fn test_store_refresh_overtaken_by_eviction() {
1263 let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1264 let mut store = ProjectStore::new(
1265 &relay_config::Config::from_json_value(serde_json::json!({
1266 "cache": {
1267 "project_expiry": 5,
1268 "project_grace_period": 5,
1269 "project_refresh_interval": 7,
1270 }
1271 }))
1272 .unwrap(),
1273 );
1274
1275 let fetch = store.try_begin_fetch(project_key).unwrap();
1276 let fetch = fetch.complete(ProjectState::Disabled.into());
1277 assert!(store.complete_fetch(fetch).is_none());
1278 assert_state!(store, project_key, ProjectState::Disabled);
1279
1280 tokio::time::advance(Duration::from_secs(20)).await;
1282
1283 let Some(Action::Eviction(eviction)) = store.poll().await else {
1286 panic!();
1287 };
1288 assert_eq!(eviction.project_key(), project_key);
1289 store.evict(eviction);
1290
1291 assert!(
1295 tokio::time::timeout(Duration::from_secs(60), store.poll())
1296 .await
1297 .is_err()
1298 );
1299 }
1300
1301 #[tokio::test(start_paused = true)]
1302 async fn test_store_refresh_during_eviction() {
1303 let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1304 let mut store = ProjectStore::new(
1305 &relay_config::Config::from_json_value(serde_json::json!({
1306 "cache": {
1307 "project_expiry": 5,
1308 "project_grace_period": 5,
1309 "project_refresh_interval": 7,
1310 }
1311 }))
1312 .unwrap(),
1313 );
1314
1315 let fetch = store.try_begin_fetch(project_key).unwrap();
1316 let fetch = fetch.complete(ProjectState::Disabled.into());
1317 assert!(store.complete_fetch(fetch).is_none());
1318 assert_state!(store, project_key, ProjectState::Disabled);
1319
1320 tokio::time::advance(Duration::from_secs(20)).await;
1322
1323 let Some(Action::Eviction(eviction)) = store.poll().await else {
1327 panic!();
1328 };
1329 let Some(Action::Refresh(refresh)) = store.poll().await else {
1330 panic!();
1331 };
1332 assert_eq!(eviction.project_key(), project_key);
1333 assert_eq!(refresh.project_key(), project_key);
1334 store.evict(eviction);
1335
1336 assert!(store.refresh(refresh).is_none());
1337 }
1338
1339 #[tokio::test(start_paused = true)]
1340 async fn test_ready_state() {
1341 let shared = SharedProjectState::default();
1342
1343 let shared_project = shared.to_shared_project();
1344 assert!(shared_project.project_state().is_pending());
1345 let mut listener = std::pin::pin!(shared_project.outdated());
1346
1347 let result = tokio::time::timeout(Duration::from_secs(5), listener.as_mut()).await;
1349 assert!(result.is_err()); assert!(shared.to_shared_project().project_state().is_pending());
1351
1352 shared.set_project_state(ProjectState::Disabled);
1354
1355 let result = tokio::time::timeout(Duration::from_secs(1), listener).await;
1357 assert!(result.is_ok()); assert!(shared_project.project_state().is_pending());
1361
1362 assert!(matches!(
1364 shared.to_shared_project().project_state(),
1365 &ProjectState::Disabled
1366 ));
1367 }
1368}