1use futures::StreamExt;
2use std::fmt;
3use std::sync::Arc;
4use std::time::Duration;
5use tokio::sync::Notify;
6use tokio::sync::futures::Notified;
7use tokio::time::Instant;
8
9use arc_swap::ArcSwap;
10use relay_base_schema::project::ProjectKey;
11use relay_quotas::CachedRateLimits;
12use relay_statsd::metric;
13
14use crate::services::projects::project::{ProjectState, Revision};
15use crate::services::projects::source::SourceProjectState;
16use crate::statsd::{RelayDistributions, RelayTimers};
17use crate::utils::{RetryBackoff, UniqueScheduledQueue};
18
19pub struct ProjectStore {
30 config: Config,
31 shared: Arc<Shared>,
33 private: hashbrown::HashMap<ProjectKey, PrivateProjectState>,
35 evictions: UniqueScheduledQueue<ProjectKey>,
37 refreshes: UniqueScheduledQueue<ProjectKey>,
39}
40
41impl ProjectStore {
42 pub fn new(config: &relay_config::Config) -> Self {
43 Self {
44 config: Config::new(config),
45 shared: Default::default(),
46 private: Default::default(),
47 evictions: Default::default(),
48 refreshes: Default::default(),
49 }
50 }
51
52 pub fn shared(&self) -> Arc<Shared> {
54 Arc::clone(&self.shared)
55 }
56
57 pub fn try_begin_fetch(&mut self, project_key: ProjectKey) -> Option<Fetch> {
63 self.do_try_begin_fetch(project_key, false)
64 }
65
66 #[must_use = "an incomplete fetch must be retried"]
71 pub fn complete_fetch(&mut self, fetch: CompletedFetch) -> Option<Fetch> {
72 let project_key = fetch.project_key();
73
74 debug_assert!(self.shared.projects.pin().get(&project_key).is_some());
77 debug_assert!(self.private.get(&project_key).is_some());
78
79 let mut project = self.get_or_create(project_key);
80 let new_fetch = match project.complete_fetch(fetch) {
83 FetchResult::ReSchedule { refresh } => project.try_begin_fetch(refresh),
84 FetchResult::Done { expiry, refresh } => {
85 self.evictions.schedule(expiry.0, project_key);
86 if let Some(RefreshTime(refresh)) = refresh {
87 self.refreshes.schedule(refresh, project_key);
88 }
89 None
90 }
91 };
92
93 metric!(
94 distribution(RelayDistributions::ProjectStateCacheSize) =
95 self.shared.projects.len() as u64,
96 storage = "shared"
97 );
98 metric!(
99 distribution(RelayDistributions::ProjectStateCacheSize) = self.private.len() as u64,
100 storage = "private"
101 );
102
103 new_fetch
104 }
105
106 pub async fn poll(&mut self) -> Option<Action> {
113 let eviction = self.evictions.next();
114 let refresh = self.refreshes.next();
115
116 tokio::select! {
117 biased;
118
119 Some(e) = eviction => Some(Action::Eviction(Eviction(e))),
120 Some(r) = refresh => Some(Action::Refresh(Refresh(r))),
121 else => None,
122 }
123 }
124
125 pub fn refresh(&mut self, Refresh(project_key): Refresh) -> Option<Fetch> {
133 self.do_try_begin_fetch(project_key, true)
134 }
135
136 pub fn evict(&mut self, Eviction(project_key): Eviction) {
138 let Some(private) = self.private.remove(&project_key) else {
140 debug_assert!(false, "no private state for eviction");
142 return;
143 };
144
145 debug_assert!(
146 matches!(private.state, FetchState::Complete { .. }),
147 "private state must be completed"
148 );
149
150 let shared = self.shared.projects.pin();
152 let _removed = shared.remove(&project_key);
153 debug_assert!(
154 _removed.is_some(),
155 "an expired project must exist in the shared state"
156 );
157
158 self.refreshes.remove(&project_key);
161 }
162
163 fn do_try_begin_fetch(&mut self, project_key: ProjectKey, is_refresh: bool) -> Option<Fetch> {
166 let fetch = match is_refresh {
167 true => self.get(project_key)?,
173 false => self.get_or_create(project_key),
174 }
175 .try_begin_fetch(is_refresh);
176
177 if fetch.is_some() {
180 self.evictions.remove(&project_key);
181 }
184
185 fetch
186 }
187
188 fn get(&mut self, project_key: ProjectKey) -> Option<ProjectRef<'_>> {
192 let private = self.private.get_mut(&project_key)?;
193
194 debug_assert!(self.shared.projects.pin().contains_key(&project_key));
197
198 let shared = self
199 .shared
200 .projects
201 .pin()
202 .get_or_insert_with(project_key, Default::default)
203 .clone();
204
205 Some(ProjectRef {
206 private,
207 shared,
208 config: &self.config,
209 })
210 }
211
212 fn get_or_create(&mut self, project_key: ProjectKey) -> ProjectRef<'_> {
216 #[cfg(debug_assertions)]
217 if self.private.contains_key(&project_key) {
218 debug_assert!(self.shared.projects.pin().contains_key(&project_key));
224 }
225
226 let private = self
227 .private
228 .entry(project_key)
229 .or_insert_with(|| PrivateProjectState::new(project_key, &self.config));
230
231 let shared = self
232 .shared
233 .projects
234 .pin()
235 .get_or_insert_with(project_key, Default::default)
236 .clone();
237
238 ProjectRef {
239 private,
240 shared,
241 config: &self.config,
242 }
243 }
244}
245
246struct Config {
248 expiry: Duration,
252 grace_period: Duration,
257 refresh_interval: Option<Duration>,
264 max_retry_backoff: Duration,
266}
267
268impl Config {
269 fn new(config: &relay_config::Config) -> Self {
270 let expiry = config.project_cache_expiry();
271 let grace_period = config.project_grace_period();
272
273 let refresh_interval = config
277 .project_refresh_interval()
278 .filter(|rt| *rt < (expiry + grace_period))
279 .filter(|rt| *rt > expiry);
280
281 Self {
282 expiry: config.project_cache_expiry(),
283 grace_period: config.project_grace_period(),
284 refresh_interval,
285 max_retry_backoff: config.http_max_retry_interval(),
286 }
287 }
288}
289
290#[derive(Default)]
292pub struct Shared {
293 projects: papaya::HashMap<ProjectKey, SharedProjectState, ahash::RandomState>,
294}
295
296impl Shared {
297 pub fn get_or_create(&self, project_key: ProjectKey) -> SharedProject {
302 self.get_or_create_inner(project_key).to_shared_project()
303 }
304
305 fn get_or_create_inner(&self, project_key: ProjectKey) -> SharedProjectState {
306 let projects = self.projects.pin();
308 if let Some(project) = projects.get(&project_key) {
309 return project.clone();
310 }
311
312 match projects.try_insert(project_key, Default::default()) {
314 Ok(inserted) => inserted.clone(),
315 Err(occupied) => occupied.current.clone(),
316 }
317 }
318}
319
320#[cfg(test)]
322impl Shared {
323 pub fn test_set_project_state(&self, project_key: ProjectKey, state: ProjectState) {
327 self.projects
328 .pin()
329 .get_or_insert_with(project_key, Default::default)
330 .set_project_state(state);
331 }
332
333 pub fn test_has_project_created(&self, project_key: ProjectKey) -> bool {
335 self.projects.pin().contains_key(&project_key)
336 }
337}
338
339impl fmt::Debug for Shared {
340 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
341 f.debug_struct("Shared")
342 .field("num_projects", &self.projects.len())
343 .finish()
344 }
345}
346
347pub struct SharedProject(Arc<SharedProjectStateInner>);
349
350impl SharedProject {
351 pub fn project_state(&self) -> &ProjectState {
353 &self.0.state
354 }
355
356 pub fn cached_rate_limits(&self) -> &CachedRateLimits {
358 &self.0.rate_limits
364 }
365
366 pub fn outdated(&self) -> Notified<'_> {
370 self.0.notify.notified()
371 }
372}
373
374#[cfg(test)]
376impl SharedProject {
377 pub fn for_test(state: ProjectState) -> Self {
379 Self(Arc::new(SharedProjectStateInner {
380 state,
381 ..Default::default()
382 }))
383 }
384}
385
386struct ProjectRef<'a> {
388 shared: SharedProjectState,
389 private: &'a mut PrivateProjectState,
390 config: &'a Config,
391}
392
393impl ProjectRef<'_> {
394 fn try_begin_fetch(&mut self, is_refresh: bool) -> Option<Fetch> {
395 let now = Instant::now();
396 self.private
397 .try_begin_fetch(now, is_refresh, self.config)
398 .map(|fetch| fetch.with_revision(self.shared.revision()))
399 }
400
401 fn complete_fetch(&mut self, fetch: CompletedFetch) -> FetchResult {
402 let now = Instant::now();
403
404 if let Some(latency) = fetch.latency() {
405 let delay = match fetch.delay() {
406 Some(delay) if delay.as_secs() <= 15 => "lte15s",
407 Some(delay) if delay.as_secs() <= 30 => "lte30s",
408 Some(delay) if delay.as_secs() <= 60 => "lte60s",
409 Some(delay) if delay.as_secs() <= 120 => "lte120",
410 Some(delay) if delay.as_secs() <= 300 => "lte300s",
411 Some(delay) if delay.as_secs() <= 600 => "lte600s",
412 Some(delay) if delay.as_secs() <= 1800 => "lte1800s",
413 Some(delay) if delay.as_secs() <= 3600 => "lte3600s",
414 Some(_) => "gt3600s",
415 None => "none",
416 };
417 metric!(
418 timer(RelayTimers::ProjectCacheUpdateLatency) = latency,
419 delay = delay
420 );
421 }
422
423 if !fetch.is_pending() {
424 let state = match fetch.state {
425 SourceProjectState::New(_) => "new",
426 SourceProjectState::NotModified => "not_modified",
427 };
428
429 metric!(
430 timer(RelayTimers::ProjectCacheFetchDuration) = fetch.duration(now),
431 state = state
432 );
433 }
434
435 let result = self.private.complete_fetch(&fetch, now, self.config);
437 match fetch.state {
438 SourceProjectState::New(state) if !state.is_pending() => {
441 self.shared.set_project_state(state);
442 }
443 _ => {}
444 }
445
446 result
447 }
448}
449
450pub enum Action {
451 Eviction(Eviction),
452 Refresh(Refresh),
453}
454
455#[derive(Debug)]
459#[must_use = "a refresh must be used"]
460pub struct Refresh(ProjectKey);
461
462impl Refresh {
463 pub fn project_key(&self) -> ProjectKey {
465 self.0
466 }
467}
468
469#[derive(Debug)]
473#[must_use = "an eviction must be used"]
474pub struct Eviction(ProjectKey);
475
476impl Eviction {
477 pub fn project_key(&self) -> ProjectKey {
479 self.0
480 }
481}
482
483#[must_use = "a fetch must be executed"]
487#[derive(Debug)]
488pub struct Fetch {
489 project_key: ProjectKey,
490 previous_fetch: Option<Instant>,
491 initiated: Instant,
492 when: Option<Instant>,
493 revision: Revision,
494}
495
496impl Fetch {
497 pub fn project_key(&self) -> ProjectKey {
499 self.project_key
500 }
501
502 pub fn when(&self) -> Option<Instant> {
507 self.when
508 }
509
510 pub fn revision(&self) -> Revision {
515 self.revision.clone()
516 }
517
518 pub fn complete(self, state: SourceProjectState) -> CompletedFetch {
520 CompletedFetch { fetch: self, state }
521 }
522
523 fn with_revision(mut self, revision: Revision) -> Self {
524 self.revision = revision;
525 self
526 }
527}
528
529#[must_use = "a completed fetch must be acted upon"]
531#[derive(Debug)]
532pub struct CompletedFetch {
533 fetch: Fetch,
534 state: SourceProjectState,
535}
536
537impl CompletedFetch {
538 pub fn project_key(&self) -> ProjectKey {
540 self.fetch.project_key()
541 }
542
543 fn delay(&self) -> Option<Duration> {
547 self.fetch
548 .previous_fetch
549 .map(|pf| self.fetch.initiated.duration_since(pf))
550 }
551
552 fn duration(&self, now: Instant) -> Duration {
554 now.duration_since(self.fetch.initiated)
555 }
556
557 fn latency(&self) -> Option<Duration> {
565 let is_first_fetch = self.fetch.revision().as_str().is_none();
569 if is_first_fetch {
570 return None;
571 }
572
573 let project_info = match &self.state {
574 SourceProjectState::New(ProjectState::Enabled(project_info)) => project_info,
575 _ => return None,
580 };
581
582 if project_info.rev == self.fetch.revision {
584 return None;
585 }
586
587 let elapsed = chrono::Utc::now() - project_info.last_change?;
588 elapsed.to_std().ok()
589 }
590
591 fn is_pending(&self) -> bool {
593 match &self.state {
594 SourceProjectState::New(state) => state.is_pending(),
595 SourceProjectState::NotModified => false,
596 }
597 }
598}
599
600#[derive(Debug, Default, Clone)]
604struct SharedProjectState(Arc<ArcSwap<SharedProjectStateInner>>);
605
606impl SharedProjectState {
607 fn set_project_state(&self, state: ProjectState) {
609 let prev = self.0.rcu(|stored| SharedProjectStateInner {
610 state: state.clone(),
611 rate_limits: Arc::clone(&stored.rate_limits),
612 notify: Arc::clone(&stored.notify),
613 });
614
615 prev.notify.notify_waiters();
617 }
618
619 fn revision(&self) -> Revision {
621 self.0.as_ref().load().state.revision().clone()
622 }
623
624 fn to_shared_project(&self) -> SharedProject {
626 SharedProject(self.0.as_ref().load_full())
627 }
628}
629
630#[derive(Debug, Default)]
635struct SharedProjectStateInner {
636 state: ProjectState,
637 rate_limits: Arc<CachedRateLimits>,
638 notify: Arc<Notify>,
639}
640
641#[derive(Debug)]
653enum FetchState {
654 InProgress {
656 is_refresh: bool,
661 },
662 Pending {
671 initiated: Option<Instant>,
679 next_fetch_attempt: Option<Instant>,
683 },
684 Complete {
686 when: LastFetch,
688 },
689}
690
691struct PrivateProjectState {
693 project_key: ProjectKey,
695
696 state: FetchState,
698 backoff: RetryBackoff,
702
703 last_fetch: Option<Instant>,
710
711 expiry: Option<Instant>,
715}
716
717impl PrivateProjectState {
718 fn new(project_key: ProjectKey, config: &Config) -> Self {
719 Self {
720 project_key,
721 state: FetchState::Pending {
722 initiated: None,
723 next_fetch_attempt: None,
724 },
725 backoff: RetryBackoff::new(config.max_retry_backoff),
726 last_fetch: None,
727 expiry: None,
728 }
729 }
730
731 fn try_begin_fetch(
732 &mut self,
733 now: Instant,
734 is_refresh: bool,
735 config: &Config,
736 ) -> Option<Fetch> {
737 let (initiated, when) = match &mut self.state {
738 FetchState::InProgress {
739 is_refresh: refresh_in_progress,
740 } => {
741 relay_log::trace!(
742 tags.project_key = self.project_key.as_str(),
743 "project fetch skipped, fetch in progress"
744 );
745 *refresh_in_progress = *refresh_in_progress && is_refresh;
747 return None;
748 }
749 FetchState::Pending {
750 initiated,
751 next_fetch_attempt,
752 } => {
753 (initiated.unwrap_or(now), *next_fetch_attempt)
755 }
756 FetchState::Complete { when } => {
757 debug_assert_eq!(Some(when.0), self.last_fetch);
759
760 if when.check_expiry(now, config).is_fresh() {
761 relay_log::trace!(
763 tags.project_key = self.project_key.as_str(),
764 "project fetch skipped, already up to date"
765 );
766 return None;
767 }
768
769 (now, None)
770 }
771 };
772
773 self.state = FetchState::InProgress { is_refresh };
775
776 relay_log::trace!(
777 tags.project_key = &self.project_key.as_str(),
778 attempts = self.backoff.attempt() + 1,
779 "project state {} scheduled in {:?}",
780 if is_refresh { "refresh" } else { "fetch" },
781 when.unwrap_or(now).saturating_duration_since(now),
782 );
783
784 Some(Fetch {
785 project_key: self.project_key,
786 previous_fetch: self.last_fetch,
787 initiated,
788 when,
789 revision: Revision::default(),
790 })
791 }
792
793 fn complete_fetch(
794 &mut self,
795 fetch: &CompletedFetch,
796 now: Instant,
797 config: &Config,
798 ) -> FetchResult {
799 let FetchState::InProgress { is_refresh } = self.state else {
800 debug_assert!(
801 false,
802 "fetch completed while there was no current fetch registered"
803 );
804 return FetchResult::ReSchedule { refresh: false };
806 };
807
808 if fetch.is_pending() {
809 let next_backoff = self.backoff.next_backoff();
810 let next_fetch_attempt = match next_backoff.is_zero() {
811 false => now.checked_add(next_backoff),
812 true => None,
813 };
814 self.state = FetchState::Pending {
815 next_fetch_attempt,
816 initiated: Some(fetch.fetch.initiated),
817 };
818 relay_log::trace!(
819 tags.project_key = &self.project_key.as_str(),
820 "project state {} completed but still pending",
821 if is_refresh { "refresh" } else { "fetch" },
822 );
823
824 FetchResult::ReSchedule {
825 refresh: is_refresh,
826 }
827 } else {
828 relay_log::trace!(
829 tags.project_key = &self.project_key.as_str(),
830 "project state {} completed with non-pending config",
831 if is_refresh { "refresh" } else { "fetch" },
832 );
833
834 self.backoff.reset();
835 self.last_fetch = Some(now);
836
837 let when = LastFetch(now);
838
839 let refresh = when.refresh_time(config);
840 let expiry = match self.expiry {
841 Some(expiry) if is_refresh => ExpiryTime(expiry),
842 Some(_) | None => when.expiry_time(config),
845 };
846 self.expiry = Some(expiry.0);
847
848 self.state = FetchState::Complete { when };
849 FetchResult::Done { expiry, refresh }
850 }
851 }
852}
853
854#[derive(Debug)]
856#[must_use = "fetch result must be used"]
857enum FetchResult {
858 ReSchedule {
860 refresh: bool,
862 },
863 Done {
865 expiry: ExpiryTime,
867 refresh: Option<RefreshTime>,
869 },
870}
871
872#[derive(Debug, Copy, Clone)]
874struct LastFetch(Instant);
875
876impl LastFetch {
877 fn check_expiry(&self, now: Instant, config: &Config) -> Expiry {
879 let elapsed = now.saturating_duration_since(self.0);
880
881 if elapsed >= config.expiry + config.grace_period {
882 Expiry::Expired
883 } else if elapsed >= config.expiry {
884 Expiry::Stale
885 } else {
886 Expiry::Fresh
887 }
888 }
889
890 fn refresh_time(&self, config: &Config) -> Option<RefreshTime> {
892 config
893 .refresh_interval
894 .map(|duration| self.0 + duration)
895 .map(RefreshTime)
896 }
897
898 fn expiry_time(&self, config: &Config) -> ExpiryTime {
900 ExpiryTime(self.0 + config.grace_period + config.expiry)
901 }
902}
903
904#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
906enum Expiry {
907 Fresh,
909 Stale,
912 Expired,
915}
916
917impl Expiry {
918 fn is_fresh(&self) -> bool {
920 matches!(self, Self::Fresh)
921 }
922}
923
924#[derive(Debug)]
926#[must_use = "an refresh time must be used to schedule a refresh"]
927struct RefreshTime(Instant);
928
929#[derive(Debug)]
931#[must_use = "an expiry time must be used to schedule an eviction"]
932struct ExpiryTime(Instant);
933
934#[cfg(test)]
935mod tests {
936 use std::time::Duration;
937
938 use super::*;
939
940 async fn collect_evicted(store: &mut ProjectStore) -> Vec<ProjectKey> {
941 let mut evicted = Vec::new();
942 while let Ok(Some(Action::Eviction(eviction))) =
944 tokio::time::timeout(Duration::from_nanos(5), store.poll()).await
945 {
946 evicted.push(eviction.0);
947 store.evict(eviction);
948 }
949 evicted
950 }
951
952 macro_rules! assert_state {
953 ($store:ident, $project_key:ident, $state:pat) => {
954 assert!(matches!(
955 $store.shared().get_or_create($project_key).project_state(),
956 $state
957 ));
958 };
959 }
960
961 #[tokio::test(start_paused = true)]
962 async fn test_store_fetch() {
963 let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
964 let mut store = ProjectStore::new(&Default::default());
965
966 let fetch = store.try_begin_fetch(project_key).unwrap();
967 assert_eq!(fetch.project_key(), project_key);
968 assert_eq!(fetch.when(), None);
969 assert_eq!(fetch.revision().as_str(), None);
970 assert_state!(store, project_key, ProjectState::Pending);
971
972 assert!(store.try_begin_fetch(project_key).is_none());
974
975 let fetch = fetch.complete(ProjectState::Pending.into());
977 let fetch = store.complete_fetch(fetch).unwrap();
978 assert_eq!(fetch.project_key(), project_key);
979 assert_eq!(fetch.when(), None);
981 assert_eq!(fetch.revision().as_str(), None);
982 assert_state!(store, project_key, ProjectState::Pending);
983
984 let fetch = fetch.complete(ProjectState::Pending.into());
986 let fetch = store.complete_fetch(fetch).unwrap();
987 assert_eq!(fetch.project_key(), project_key);
988 assert!(fetch.when() > Some(Instant::now()));
990 assert_eq!(fetch.revision().as_str(), None);
991 assert_state!(store, project_key, ProjectState::Pending);
992
993 let fetch = fetch.complete(ProjectState::Disabled.into());
995 assert!(store.complete_fetch(fetch).is_none());
996 assert_state!(store, project_key, ProjectState::Disabled);
997
998 assert!(store.try_begin_fetch(project_key).is_none());
1000 }
1001
1002 #[tokio::test(start_paused = true)]
1003 async fn test_store_fetch_pending_does_not_replace_state() {
1004 let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1005 let mut store = ProjectStore::new(
1006 &relay_config::Config::from_json_value(serde_json::json!({
1007 "cache": {
1008 "project_expiry": 5,
1009 "project_grace_period": 5,
1010 }
1011 }))
1012 .unwrap(),
1013 );
1014
1015 let fetch = store.try_begin_fetch(project_key).unwrap();
1016 let fetch = fetch.complete(ProjectState::Disabled.into());
1017 assert!(store.complete_fetch(fetch).is_none());
1018 assert_state!(store, project_key, ProjectState::Disabled);
1019
1020 tokio::time::advance(Duration::from_secs(6)).await;
1021
1022 let fetch = store.try_begin_fetch(project_key).unwrap();
1023 let fetch = fetch.complete(ProjectState::Pending.into());
1024 let fetch = store.complete_fetch(fetch).unwrap();
1026 assert_state!(store, project_key, ProjectState::Disabled);
1028
1029 let fetch = fetch.complete(ProjectState::new_allowed().into());
1030 assert!(store.complete_fetch(fetch).is_none());
1031 assert_state!(store, project_key, ProjectState::Enabled(_));
1032 }
1033
1034 #[tokio::test(start_paused = true)]
1035 async fn test_store_evict_projects() {
1036 let project_key1 = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1037 let project_key2 = ProjectKey::parse("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb").unwrap();
1038 let mut store = ProjectStore::new(
1039 &relay_config::Config::from_json_value(serde_json::json!({
1040 "cache": {
1041 "project_expiry": 5,
1042 "project_grace_period": 0,
1043 }
1044 }))
1045 .unwrap(),
1046 );
1047
1048 let fetch = store.try_begin_fetch(project_key1).unwrap();
1049 let fetch = fetch.complete(ProjectState::Disabled.into());
1050 assert!(store.complete_fetch(fetch).is_none());
1051
1052 assert_eq!(collect_evicted(&mut store).await, Vec::new());
1053 assert_state!(store, project_key1, ProjectState::Disabled);
1054
1055 tokio::time::advance(Duration::from_secs(3)).await;
1057
1058 assert_eq!(collect_evicted(&mut store).await, Vec::new());
1059 assert_state!(store, project_key1, ProjectState::Disabled);
1060
1061 let fetch = store.try_begin_fetch(project_key2).unwrap();
1062 let fetch = fetch.complete(ProjectState::Disabled.into());
1063 assert!(store.complete_fetch(fetch).is_none());
1064
1065 tokio::time::advance(Duration::from_secs(3)).await;
1067
1068 assert_eq!(collect_evicted(&mut store).await, vec![project_key1]);
1069 assert_state!(store, project_key1, ProjectState::Pending);
1070 assert_state!(store, project_key2, ProjectState::Disabled);
1071 }
1072
1073 #[tokio::test(start_paused = true)]
1074 async fn test_store_evict_projects_pending_not_expired() {
1075 let project_key1 = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1076 let project_key2 = ProjectKey::parse("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb").unwrap();
1077 let mut store = ProjectStore::new(
1078 &relay_config::Config::from_json_value(serde_json::json!({
1079 "cache": {
1080 "project_expiry": 5,
1081 "project_grace_period": 0,
1082 }
1083 }))
1084 .unwrap(),
1085 );
1086
1087 let fetch = store.try_begin_fetch(project_key1).unwrap();
1088 store.shared().get_or_create(project_key2);
1090
1091 tokio::time::advance(Duration::from_secs(6)).await;
1092
1093 assert_eq!(collect_evicted(&mut store).await, Vec::new());
1095
1096 let fetch = fetch.complete(ProjectState::Disabled.into());
1098 assert!(store.complete_fetch(fetch).is_none());
1099
1100 assert_eq!(collect_evicted(&mut store).await, Vec::new());
1102 tokio::time::advance(Duration::from_secs(4)).await;
1103 assert_eq!(collect_evicted(&mut store).await, Vec::new());
1104 assert_state!(store, project_key1, ProjectState::Disabled);
1105
1106 tokio::time::advance(Duration::from_millis(1001)).await;
1108 assert_eq!(collect_evicted(&mut store).await, vec![project_key1]);
1109 assert_state!(store, project_key1, ProjectState::Pending);
1110 assert_state!(store, project_key2, ProjectState::Pending);
1111 }
1112
1113 #[tokio::test(start_paused = true)]
1114 async fn test_store_evict_projects_stale() {
1115 let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1116 let mut store = ProjectStore::new(
1117 &relay_config::Config::from_json_value(serde_json::json!({
1118 "cache": {
1119 "project_expiry": 5,
1120 "project_grace_period": 5,
1121 }
1122 }))
1123 .unwrap(),
1124 );
1125
1126 let fetch = store.try_begin_fetch(project_key).unwrap();
1127 let fetch = fetch.complete(ProjectState::Disabled.into());
1128 assert!(store.complete_fetch(fetch).is_none());
1129
1130 tokio::time::advance(Duration::from_millis(9500)).await;
1132
1133 assert_eq!(collect_evicted(&mut store).await, Vec::new());
1134 assert_state!(store, project_key, ProjectState::Disabled);
1135
1136 tokio::time::advance(Duration::from_secs(1)).await;
1138
1139 assert_eq!(collect_evicted(&mut store).await, vec![project_key]);
1140 assert_state!(store, project_key, ProjectState::Pending);
1141 }
1142
1143 #[tokio::test(start_paused = true)]
1144 async fn test_store_no_eviction_during_fetch() {
1145 let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1146 let mut store = ProjectStore::new(
1147 &relay_config::Config::from_json_value(serde_json::json!({
1148 "cache": {
1149 "project_expiry": 5,
1150 "project_grace_period": 5,
1151 }
1152 }))
1153 .unwrap(),
1154 );
1155
1156 let fetch = store.try_begin_fetch(project_key).unwrap();
1157
1158 tokio::time::advance(Duration::from_millis(10500)).await;
1160 assert_eq!(collect_evicted(&mut store).await, Vec::new());
1162
1163 let fetch = fetch.complete(ProjectState::Disabled.into());
1165 assert!(store.complete_fetch(fetch).is_none());
1166 tokio::time::advance(Duration::from_millis(5001)).await;
1168 let fetch = store.try_begin_fetch(project_key).unwrap();
1169
1170 tokio::time::advance(Duration::from_millis(10500)).await;
1172 assert_eq!(collect_evicted(&mut store).await, Vec::new());
1174
1175 let fetch = fetch.complete(ProjectState::Disabled.into());
1177 assert!(store.complete_fetch(fetch).is_none());
1178
1179 tokio::time::advance(Duration::from_millis(9500)).await;
1181 assert_eq!(collect_evicted(&mut store).await, Vec::new());
1182 tokio::time::advance(Duration::from_millis(501)).await;
1184 assert_eq!(collect_evicted(&mut store).await, vec![project_key]);
1185 assert_state!(store, project_key, ProjectState::Pending);
1186 }
1187
1188 #[tokio::test(start_paused = true)]
1189 async fn test_store_refresh() {
1190 let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1191 let mut store = ProjectStore::new(
1192 &relay_config::Config::from_json_value(serde_json::json!({
1193 "cache": {
1194 "project_expiry": 5,
1195 "project_grace_period": 5,
1196 "project_refresh_interval": 7,
1197 }
1198 }))
1199 .unwrap(),
1200 );
1201
1202 let fetch = store.try_begin_fetch(project_key).unwrap();
1203 let fetch = fetch.complete(ProjectState::Disabled.into());
1204 assert!(store.complete_fetch(fetch).is_none());
1205 assert_state!(store, project_key, ProjectState::Disabled);
1206
1207 let Some(Action::Refresh(refresh)) = store.poll().await else {
1209 panic!();
1210 };
1211 assert_eq!(refresh.project_key(), project_key);
1212
1213 let fetch = store.refresh(refresh).unwrap();
1214 assert!(store.try_begin_fetch(project_key).is_none());
1216 let fetch = fetch.complete(ProjectState::Disabled.into());
1217 assert!(store.complete_fetch(fetch).is_none());
1218
1219 let Some(Action::Refresh(refresh)) = store.poll().await else {
1223 panic!();
1224 };
1225 let fetch = store.refresh(refresh).unwrap();
1226 let fetch = fetch.complete(ProjectState::Disabled.into());
1227 assert!(store.complete_fetch(fetch).is_none());
1228
1229 let Some(Action::Eviction(eviction)) = store.poll().await else {
1232 panic!();
1233 };
1234 assert_eq!(eviction.project_key(), project_key);
1235 }
1236
1237 #[tokio::test(start_paused = true)]
1238 async fn test_store_refresh_overtaken_by_eviction() {
1239 let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1240 let mut store = ProjectStore::new(
1241 &relay_config::Config::from_json_value(serde_json::json!({
1242 "cache": {
1243 "project_expiry": 5,
1244 "project_grace_period": 5,
1245 "project_refresh_interval": 7,
1246 }
1247 }))
1248 .unwrap(),
1249 );
1250
1251 let fetch = store.try_begin_fetch(project_key).unwrap();
1252 let fetch = fetch.complete(ProjectState::Disabled.into());
1253 assert!(store.complete_fetch(fetch).is_none());
1254 assert_state!(store, project_key, ProjectState::Disabled);
1255
1256 tokio::time::advance(Duration::from_secs(20)).await;
1258
1259 let Some(Action::Eviction(eviction)) = store.poll().await else {
1262 panic!();
1263 };
1264 assert_eq!(eviction.project_key(), project_key);
1265 store.evict(eviction);
1266
1267 assert!(
1271 tokio::time::timeout(Duration::from_secs(60), store.poll())
1272 .await
1273 .is_err()
1274 );
1275 }
1276
1277 #[tokio::test(start_paused = true)]
1278 async fn test_store_refresh_during_eviction() {
1279 let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1280 let mut store = ProjectStore::new(
1281 &relay_config::Config::from_json_value(serde_json::json!({
1282 "cache": {
1283 "project_expiry": 5,
1284 "project_grace_period": 5,
1285 "project_refresh_interval": 7,
1286 }
1287 }))
1288 .unwrap(),
1289 );
1290
1291 let fetch = store.try_begin_fetch(project_key).unwrap();
1292 let fetch = fetch.complete(ProjectState::Disabled.into());
1293 assert!(store.complete_fetch(fetch).is_none());
1294 assert_state!(store, project_key, ProjectState::Disabled);
1295
1296 tokio::time::advance(Duration::from_secs(20)).await;
1298
1299 let Some(Action::Eviction(eviction)) = store.poll().await else {
1303 panic!();
1304 };
1305 let Some(Action::Refresh(refresh)) = store.poll().await else {
1306 panic!();
1307 };
1308 assert_eq!(eviction.project_key(), project_key);
1309 assert_eq!(refresh.project_key(), project_key);
1310 store.evict(eviction);
1311
1312 assert!(store.refresh(refresh).is_none());
1313 }
1314
1315 #[tokio::test(start_paused = true)]
1316 async fn test_ready_state() {
1317 let shared = SharedProjectState::default();
1318
1319 let shared_project = shared.to_shared_project();
1320 assert!(shared_project.project_state().is_pending());
1321 let mut listener = std::pin::pin!(shared_project.outdated());
1322
1323 let result = tokio::time::timeout(Duration::from_secs(5), listener.as_mut()).await;
1325 assert!(result.is_err()); assert!(shared.to_shared_project().project_state().is_pending());
1327
1328 shared.set_project_state(ProjectState::Disabled);
1330
1331 let result = tokio::time::timeout(Duration::from_secs(1), listener).await;
1333 assert!(result.is_ok()); assert!(shared_project.project_state().is_pending());
1337
1338 assert!(matches!(
1340 shared.to_shared_project().project_state(),
1341 &ProjectState::Disabled
1342 ));
1343 }
1344}