1use futures::StreamExt;
2use std::fmt;
3use std::sync::Arc;
4use std::time::Duration;
5use tokio::time::Instant;
6
7use arc_swap::ArcSwap;
8use relay_base_schema::project::ProjectKey;
9use relay_quotas::CachedRateLimits;
10use relay_sampling::evaluation::ReservoirCounters;
11use relay_statsd::metric;
12
13use crate::services::projects::project::{ProjectState, Revision};
14use crate::services::projects::source::SourceProjectState;
15use crate::statsd::{RelayDistributions, RelayTimers};
16use crate::utils::{RetryBackoff, UniqueScheduledQueue};
17
18pub struct ProjectStore {
29 config: Config,
30 shared: Arc<Shared>,
32 private: hashbrown::HashMap<ProjectKey, PrivateProjectState>,
34 evictions: UniqueScheduledQueue<ProjectKey>,
36 refreshes: UniqueScheduledQueue<ProjectKey>,
38}
39
40impl ProjectStore {
41 pub fn new(config: &relay_config::Config) -> Self {
42 Self {
43 config: Config::new(config),
44 shared: Default::default(),
45 private: Default::default(),
46 evictions: Default::default(),
47 refreshes: Default::default(),
48 }
49 }
50
51 pub fn shared(&self) -> Arc<Shared> {
53 Arc::clone(&self.shared)
54 }
55
56 pub fn try_begin_fetch(&mut self, project_key: ProjectKey) -> Option<Fetch> {
62 self.do_try_begin_fetch(project_key, false)
63 }
64
65 #[must_use = "an incomplete fetch must be retried"]
70 pub fn complete_fetch(&mut self, fetch: CompletedFetch) -> Option<Fetch> {
71 let project_key = fetch.project_key();
72
73 debug_assert!(self.shared.projects.pin().get(&project_key).is_some());
76 debug_assert!(self.private.get(&project_key).is_some());
77
78 let mut project = self.get_or_create(project_key);
79 let new_fetch = match project.complete_fetch(fetch) {
82 FetchResult::ReSchedule { refresh } => project.try_begin_fetch(refresh),
83 FetchResult::Done { expiry, refresh } => {
84 self.evictions.schedule(expiry.0, project_key);
85 if let Some(RefreshTime(refresh)) = refresh {
86 self.refreshes.schedule(refresh, project_key);
87 }
88 None
89 }
90 };
91
92 metric!(
93 distribution(RelayDistributions::ProjectStateCacheSize) =
94 self.shared.projects.len() as u64,
95 storage = "shared"
96 );
97 metric!(
98 distribution(RelayDistributions::ProjectStateCacheSize) = self.private.len() as u64,
99 storage = "private"
100 );
101
102 new_fetch
103 }
104
105 pub async fn poll(&mut self) -> Option<Action> {
112 let eviction = self.evictions.next();
113 let refresh = self.refreshes.next();
114
115 tokio::select! {
116 biased;
117
118 Some(e) = eviction => Some(Action::Eviction(Eviction(e))),
119 Some(r) = refresh => Some(Action::Refresh(Refresh(r))),
120 else => None,
121 }
122 }
123
124 pub fn refresh(&mut self, Refresh(project_key): Refresh) -> Option<Fetch> {
132 self.do_try_begin_fetch(project_key, true)
133 }
134
135 pub fn evict(&mut self, Eviction(project_key): Eviction) {
137 let Some(private) = self.private.remove(&project_key) else {
139 debug_assert!(false, "no private state for eviction");
141 return;
142 };
143
144 debug_assert!(
145 matches!(private.state, FetchState::Complete { .. }),
146 "private state must be completed"
147 );
148
149 let shared = self.shared.projects.pin();
151 let _removed = shared.remove(&project_key);
152 debug_assert!(
153 _removed.is_some(),
154 "an expired project must exist in the shared state"
155 );
156
157 self.refreshes.remove(&project_key);
160 }
161
162 fn do_try_begin_fetch(&mut self, project_key: ProjectKey, is_refresh: bool) -> Option<Fetch> {
165 let fetch = match is_refresh {
166 true => self.get(project_key)?,
172 false => self.get_or_create(project_key),
173 }
174 .try_begin_fetch(is_refresh);
175
176 if fetch.is_some() {
179 self.evictions.remove(&project_key);
180 }
183
184 fetch
185 }
186
187 fn get(&mut self, project_key: ProjectKey) -> Option<ProjectRef<'_>> {
191 let private = self.private.get_mut(&project_key)?;
192
193 debug_assert!(self.shared.projects.pin().contains_key(&project_key));
196
197 let shared = self
198 .shared
199 .projects
200 .pin()
201 .get_or_insert_with(project_key, Default::default)
202 .clone();
203
204 Some(ProjectRef {
205 private,
206 shared,
207 config: &self.config,
208 })
209 }
210
211 fn get_or_create(&mut self, project_key: ProjectKey) -> ProjectRef<'_> {
215 #[cfg(debug_assertions)]
216 if self.private.contains_key(&project_key) {
217 debug_assert!(self.shared.projects.pin().contains_key(&project_key));
223 }
224
225 let private = self
226 .private
227 .entry(project_key)
228 .or_insert_with(|| PrivateProjectState::new(project_key, &self.config));
229
230 let shared = self
231 .shared
232 .projects
233 .pin()
234 .get_or_insert_with(project_key, Default::default)
235 .clone();
236
237 ProjectRef {
238 private,
239 shared,
240 config: &self.config,
241 }
242 }
243}
244
245struct Config {
247 expiry: Duration,
251 grace_period: Duration,
256 refresh_interval: Option<Duration>,
263 max_retry_backoff: Duration,
265}
266
267impl Config {
268 fn new(config: &relay_config::Config) -> Self {
269 let expiry = config.project_cache_expiry();
270 let grace_period = config.project_grace_period();
271
272 let refresh_interval = config
276 .project_refresh_interval()
277 .filter(|rt| *rt < (expiry + grace_period))
278 .filter(|rt| *rt > expiry);
279
280 Self {
281 expiry: config.project_cache_expiry(),
282 grace_period: config.project_grace_period(),
283 refresh_interval,
284 max_retry_backoff: config.http_max_retry_interval(),
285 }
286 }
287}
288
289#[derive(Default)]
291pub struct Shared {
292 projects: papaya::HashMap<ProjectKey, SharedProjectState, ahash::RandomState>,
293}
294
295impl Shared {
296 pub fn get_or_create(&self, project_key: ProjectKey) -> SharedProject {
301 let projects = self.projects.pin();
303 if let Some(project) = projects.get(&project_key) {
304 return project.to_shared_project();
305 }
306
307 match projects.try_insert(project_key, Default::default()) {
309 Ok(inserted) => inserted.to_shared_project(),
310 Err(occupied) => occupied.current.to_shared_project(),
311 }
312 }
313}
314
315#[cfg(test)]
317impl Shared {
318 pub fn test_set_project_state(&self, project_key: ProjectKey, state: ProjectState) {
322 self.projects
323 .pin()
324 .get_or_insert_with(project_key, Default::default)
325 .set_project_state(state);
326 }
327
328 pub fn test_has_project_created(&self, project_key: ProjectKey) -> bool {
330 self.projects.pin().contains_key(&project_key)
331 }
332}
333
334impl fmt::Debug for Shared {
335 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
336 f.debug_struct("Shared")
337 .field("num_projects", &self.projects.len())
338 .finish()
339 }
340}
341
342pub struct SharedProject(Arc<SharedProjectStateInner>);
344
345impl SharedProject {
346 pub fn project_state(&self) -> &ProjectState {
348 &self.0.state
349 }
350
351 pub fn cached_rate_limits(&self) -> &CachedRateLimits {
353 &self.0.rate_limits
359 }
360
361 pub fn reservoir_counters(&self) -> &ReservoirCounters {
363 &self.0.reservoir_counters
364 }
365}
366
367#[cfg(test)]
369impl SharedProject {
370 pub fn for_test(state: ProjectState) -> Self {
372 Self(Arc::new(SharedProjectStateInner {
373 state,
374 ..Default::default()
375 }))
376 }
377}
378
379struct ProjectRef<'a> {
381 shared: SharedProjectState,
382 private: &'a mut PrivateProjectState,
383 config: &'a Config,
384}
385
386impl ProjectRef<'_> {
387 fn try_begin_fetch(&mut self, is_refresh: bool) -> Option<Fetch> {
388 let now = Instant::now();
389 self.private
390 .try_begin_fetch(now, is_refresh, self.config)
391 .map(|fetch| fetch.with_revision(self.shared.revision()))
392 }
393
394 fn complete_fetch(&mut self, fetch: CompletedFetch) -> FetchResult {
395 let now = Instant::now();
396
397 if let Some(latency) = fetch.latency() {
398 let delay = match fetch.delay() {
399 Some(delay) if delay.as_secs() <= 15 => "lte15s",
400 Some(delay) if delay.as_secs() <= 30 => "lte30s",
401 Some(delay) if delay.as_secs() <= 60 => "lte60s",
402 Some(delay) if delay.as_secs() <= 120 => "lte120",
403 Some(delay) if delay.as_secs() <= 300 => "lte300s",
404 Some(delay) if delay.as_secs() <= 600 => "lte600s",
405 Some(delay) if delay.as_secs() <= 1800 => "lte1800s",
406 Some(delay) if delay.as_secs() <= 3600 => "lte3600s",
407 Some(_) => "gt3600s",
408 None => "none",
409 };
410 metric!(
411 timer(RelayTimers::ProjectCacheUpdateLatency) = latency,
412 delay = delay
413 );
414 }
415
416 if !fetch.is_pending() {
417 let state = match fetch.state {
418 SourceProjectState::New(_) => "new",
419 SourceProjectState::NotModified => "not_modified",
420 };
421
422 metric!(
423 timer(RelayTimers::ProjectCacheFetchDuration) = fetch.duration(now),
424 state = state
425 );
426 }
427
428 let result = self.private.complete_fetch(&fetch, now, self.config);
430 match fetch.state {
431 SourceProjectState::New(state) if !state.is_pending() => {
434 self.shared.set_project_state(state);
435 }
436 _ => {}
437 }
438
439 result
440 }
441}
442
443pub enum Action {
444 Eviction(Eviction),
445 Refresh(Refresh),
446}
447
448#[derive(Debug)]
452#[must_use = "a refresh must be used"]
453pub struct Refresh(ProjectKey);
454
455impl Refresh {
456 pub fn project_key(&self) -> ProjectKey {
458 self.0
459 }
460}
461
462#[derive(Debug)]
466#[must_use = "an eviction must be used"]
467pub struct Eviction(ProjectKey);
468
469impl Eviction {
470 pub fn project_key(&self) -> ProjectKey {
472 self.0
473 }
474}
475
476#[must_use = "a fetch must be executed"]
480#[derive(Debug)]
481pub struct Fetch {
482 project_key: ProjectKey,
483 previous_fetch: Option<Instant>,
484 initiated: Instant,
485 when: Option<Instant>,
486 revision: Revision,
487}
488
489impl Fetch {
490 pub fn project_key(&self) -> ProjectKey {
492 self.project_key
493 }
494
495 pub fn when(&self) -> Option<Instant> {
500 self.when
501 }
502
503 pub fn revision(&self) -> Revision {
508 self.revision.clone()
509 }
510
511 pub fn complete(self, state: SourceProjectState) -> CompletedFetch {
513 CompletedFetch { fetch: self, state }
514 }
515
516 fn with_revision(mut self, revision: Revision) -> Self {
517 self.revision = revision;
518 self
519 }
520}
521
522#[must_use = "a completed fetch must be acted upon"]
524#[derive(Debug)]
525pub struct CompletedFetch {
526 fetch: Fetch,
527 state: SourceProjectState,
528}
529
530impl CompletedFetch {
531 pub fn project_key(&self) -> ProjectKey {
533 self.fetch.project_key()
534 }
535
536 fn delay(&self) -> Option<Duration> {
540 self.fetch
541 .previous_fetch
542 .map(|pf| self.fetch.initiated.duration_since(pf))
543 }
544
545 fn duration(&self, now: Instant) -> Duration {
547 now.duration_since(self.fetch.initiated)
548 }
549
550 fn latency(&self) -> Option<Duration> {
558 let is_first_fetch = self.fetch.revision().as_str().is_none();
562 if is_first_fetch {
563 return None;
564 }
565
566 let project_info = match &self.state {
567 SourceProjectState::New(ProjectState::Enabled(project_info)) => project_info,
568 _ => return None,
573 };
574
575 if project_info.rev == self.fetch.revision {
577 return None;
578 }
579
580 let elapsed = chrono::Utc::now() - project_info.last_change?;
581 elapsed.to_std().ok()
582 }
583
584 fn is_pending(&self) -> bool {
586 match &self.state {
587 SourceProjectState::New(state) => state.is_pending(),
588 SourceProjectState::NotModified => false,
589 }
590 }
591}
592
593#[derive(Debug, Default, Clone)]
597struct SharedProjectState(Arc<ArcSwap<SharedProjectStateInner>>);
598
599impl SharedProjectState {
600 fn set_project_state(&self, state: ProjectState) {
602 let prev = self.0.rcu(|stored| SharedProjectStateInner {
603 state: state.clone(),
604 rate_limits: Arc::clone(&stored.rate_limits),
605 reservoir_counters: Arc::clone(&stored.reservoir_counters),
606 });
607
608 if let Some(state) = state.enabled() {
612 let config = state.config.sampling.as_ref();
613 if let Some(config) = config.and_then(|eb| eb.as_ref().ok()) {
614 if let Ok(mut counters) = prev.reservoir_counters.try_lock() {
619 counters.retain(|key, _| config.rules.iter().any(|rule| rule.id == *key));
620 }
621 }
622 }
623 }
624
625 fn revision(&self) -> Revision {
627 self.0.as_ref().load().state.revision().clone()
628 }
629
630 fn to_shared_project(&self) -> SharedProject {
632 SharedProject(self.0.as_ref().load_full())
633 }
634}
635
636#[derive(Debug, Default)]
641struct SharedProjectStateInner {
642 state: ProjectState,
643 rate_limits: Arc<CachedRateLimits>,
644 reservoir_counters: ReservoirCounters,
645}
646
647#[derive(Debug)]
659enum FetchState {
660 InProgress {
662 is_refresh: bool,
667 },
668 Pending {
677 initiated: Option<Instant>,
685 next_fetch_attempt: Option<Instant>,
689 },
690 Complete {
692 when: LastFetch,
694 },
695}
696
697struct PrivateProjectState {
699 project_key: ProjectKey,
701
702 state: FetchState,
704 backoff: RetryBackoff,
708
709 last_fetch: Option<Instant>,
716
717 expiry: Option<Instant>,
721}
722
723impl PrivateProjectState {
724 fn new(project_key: ProjectKey, config: &Config) -> Self {
725 Self {
726 project_key,
727 state: FetchState::Pending {
728 initiated: None,
729 next_fetch_attempt: None,
730 },
731 backoff: RetryBackoff::new(config.max_retry_backoff),
732 last_fetch: None,
733 expiry: None,
734 }
735 }
736
737 fn try_begin_fetch(
738 &mut self,
739 now: Instant,
740 is_refresh: bool,
741 config: &Config,
742 ) -> Option<Fetch> {
743 let (initiated, when) = match &mut self.state {
744 FetchState::InProgress {
745 is_refresh: refresh_in_progress,
746 } => {
747 relay_log::trace!(
748 tags.project_key = self.project_key.as_str(),
749 "project fetch skipped, fetch in progress"
750 );
751 *refresh_in_progress = *refresh_in_progress && is_refresh;
753 return None;
754 }
755 FetchState::Pending {
756 initiated,
757 next_fetch_attempt,
758 } => {
759 (initiated.unwrap_or(now), *next_fetch_attempt)
761 }
762 FetchState::Complete { when } => {
763 debug_assert_eq!(Some(when.0), self.last_fetch);
765
766 if when.check_expiry(now, config).is_fresh() {
767 relay_log::trace!(
769 tags.project_key = self.project_key.as_str(),
770 "project fetch skipped, already up to date"
771 );
772 return None;
773 }
774
775 (now, None)
776 }
777 };
778
779 self.state = FetchState::InProgress { is_refresh };
781
782 relay_log::trace!(
783 tags.project_key = &self.project_key.as_str(),
784 attempts = self.backoff.attempt() + 1,
785 "project state {} scheduled in {:?}",
786 if is_refresh { "refresh" } else { "fetch" },
787 when.unwrap_or(now).saturating_duration_since(now),
788 );
789
790 Some(Fetch {
791 project_key: self.project_key,
792 previous_fetch: self.last_fetch,
793 initiated,
794 when,
795 revision: Revision::default(),
796 })
797 }
798
799 fn complete_fetch(
800 &mut self,
801 fetch: &CompletedFetch,
802 now: Instant,
803 config: &Config,
804 ) -> FetchResult {
805 let FetchState::InProgress { is_refresh } = self.state else {
806 debug_assert!(
807 false,
808 "fetch completed while there was no current fetch registered"
809 );
810 return FetchResult::ReSchedule { refresh: false };
812 };
813
814 if fetch.is_pending() {
815 let next_backoff = self.backoff.next_backoff();
816 let next_fetch_attempt = match next_backoff.is_zero() {
817 false => now.checked_add(next_backoff),
818 true => None,
819 };
820 self.state = FetchState::Pending {
821 next_fetch_attempt,
822 initiated: Some(fetch.fetch.initiated),
823 };
824 relay_log::trace!(
825 tags.project_key = &self.project_key.as_str(),
826 "project state {} completed but still pending",
827 if is_refresh { "refresh" } else { "fetch" },
828 );
829
830 FetchResult::ReSchedule {
831 refresh: is_refresh,
832 }
833 } else {
834 relay_log::trace!(
835 tags.project_key = &self.project_key.as_str(),
836 "project state {} completed with non-pending config",
837 if is_refresh { "refresh" } else { "fetch" },
838 );
839
840 self.backoff.reset();
841 self.last_fetch = Some(now);
842
843 let when = LastFetch(now);
844
845 let refresh = when.refresh_time(config);
846 let expiry = match self.expiry {
847 Some(expiry) if is_refresh => ExpiryTime(expiry),
848 Some(_) | None => when.expiry_time(config),
851 };
852 self.expiry = Some(expiry.0);
853
854 self.state = FetchState::Complete { when };
855 FetchResult::Done { expiry, refresh }
856 }
857 }
858}
859
860#[derive(Debug)]
862#[must_use = "fetch result must be used"]
863enum FetchResult {
864 ReSchedule {
866 refresh: bool,
868 },
869 Done {
871 expiry: ExpiryTime,
873 refresh: Option<RefreshTime>,
875 },
876}
877
878#[derive(Debug, Copy, Clone)]
880struct LastFetch(Instant);
881
882impl LastFetch {
883 fn check_expiry(&self, now: Instant, config: &Config) -> Expiry {
885 let elapsed = now.saturating_duration_since(self.0);
886
887 if elapsed >= config.expiry + config.grace_period {
888 Expiry::Expired
889 } else if elapsed >= config.expiry {
890 Expiry::Stale
891 } else {
892 Expiry::Fresh
893 }
894 }
895
896 fn refresh_time(&self, config: &Config) -> Option<RefreshTime> {
898 config
899 .refresh_interval
900 .map(|duration| self.0 + duration)
901 .map(RefreshTime)
902 }
903
904 fn expiry_time(&self, config: &Config) -> ExpiryTime {
906 ExpiryTime(self.0 + config.grace_period + config.expiry)
907 }
908}
909
910#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
912enum Expiry {
913 Fresh,
915 Stale,
918 Expired,
921}
922
923impl Expiry {
924 fn is_fresh(&self) -> bool {
926 matches!(self, Self::Fresh)
927 }
928}
929
930#[derive(Debug)]
932#[must_use = "an refresh time must be used to schedule a refresh"]
933struct RefreshTime(Instant);
934
935#[derive(Debug)]
937#[must_use = "an expiry time must be used to schedule an eviction"]
938struct ExpiryTime(Instant);
939
940#[cfg(test)]
941mod tests {
942 use std::time::Duration;
943
944 use super::*;
945
946 async fn collect_evicted(store: &mut ProjectStore) -> Vec<ProjectKey> {
947 let mut evicted = Vec::new();
948 while let Ok(Some(Action::Eviction(eviction))) =
950 tokio::time::timeout(Duration::from_nanos(5), store.poll()).await
951 {
952 evicted.push(eviction.0);
953 store.evict(eviction);
954 }
955 evicted
956 }
957
958 macro_rules! assert_state {
959 ($store:ident, $project_key:ident, $state:pat) => {
960 assert!(matches!(
961 $store.shared().get_or_create($project_key).project_state(),
962 $state
963 ));
964 };
965 }
966
967 #[tokio::test(start_paused = true)]
968 async fn test_store_fetch() {
969 let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
970 let mut store = ProjectStore::new(&Default::default());
971
972 let fetch = store.try_begin_fetch(project_key).unwrap();
973 assert_eq!(fetch.project_key(), project_key);
974 assert_eq!(fetch.when(), None);
975 assert_eq!(fetch.revision().as_str(), None);
976 assert_state!(store, project_key, ProjectState::Pending);
977
978 assert!(store.try_begin_fetch(project_key).is_none());
980
981 let fetch = fetch.complete(ProjectState::Pending.into());
983 let fetch = store.complete_fetch(fetch).unwrap();
984 assert_eq!(fetch.project_key(), project_key);
985 assert_eq!(fetch.when(), None);
987 assert_eq!(fetch.revision().as_str(), None);
988 assert_state!(store, project_key, ProjectState::Pending);
989
990 let fetch = fetch.complete(ProjectState::Pending.into());
992 let fetch = store.complete_fetch(fetch).unwrap();
993 assert_eq!(fetch.project_key(), project_key);
994 assert!(fetch.when() > Some(Instant::now()));
996 assert_eq!(fetch.revision().as_str(), None);
997 assert_state!(store, project_key, ProjectState::Pending);
998
999 let fetch = fetch.complete(ProjectState::Disabled.into());
1001 assert!(store.complete_fetch(fetch).is_none());
1002 assert_state!(store, project_key, ProjectState::Disabled);
1003
1004 assert!(store.try_begin_fetch(project_key).is_none());
1006 }
1007
1008 #[tokio::test(start_paused = true)]
1009 async fn test_store_fetch_pending_does_not_replace_state() {
1010 let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1011 let mut store = ProjectStore::new(
1012 &relay_config::Config::from_json_value(serde_json::json!({
1013 "cache": {
1014 "project_expiry": 5,
1015 "project_grace_period": 5,
1016 }
1017 }))
1018 .unwrap(),
1019 );
1020
1021 let fetch = store.try_begin_fetch(project_key).unwrap();
1022 let fetch = fetch.complete(ProjectState::Disabled.into());
1023 assert!(store.complete_fetch(fetch).is_none());
1024 assert_state!(store, project_key, ProjectState::Disabled);
1025
1026 tokio::time::advance(Duration::from_secs(6)).await;
1027
1028 let fetch = store.try_begin_fetch(project_key).unwrap();
1029 let fetch = fetch.complete(ProjectState::Pending.into());
1030 let fetch = store.complete_fetch(fetch).unwrap();
1032 assert_state!(store, project_key, ProjectState::Disabled);
1034
1035 let fetch = fetch.complete(ProjectState::new_allowed().into());
1036 assert!(store.complete_fetch(fetch).is_none());
1037 assert_state!(store, project_key, ProjectState::Enabled(_));
1038 }
1039
1040 #[tokio::test(start_paused = true)]
1041 async fn test_store_evict_projects() {
1042 let project_key1 = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1043 let project_key2 = ProjectKey::parse("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb").unwrap();
1044 let mut store = ProjectStore::new(
1045 &relay_config::Config::from_json_value(serde_json::json!({
1046 "cache": {
1047 "project_expiry": 5,
1048 "project_grace_period": 0,
1049 }
1050 }))
1051 .unwrap(),
1052 );
1053
1054 let fetch = store.try_begin_fetch(project_key1).unwrap();
1055 let fetch = fetch.complete(ProjectState::Disabled.into());
1056 assert!(store.complete_fetch(fetch).is_none());
1057
1058 assert_eq!(collect_evicted(&mut store).await, Vec::new());
1059 assert_state!(store, project_key1, ProjectState::Disabled);
1060
1061 tokio::time::advance(Duration::from_secs(3)).await;
1063
1064 assert_eq!(collect_evicted(&mut store).await, Vec::new());
1065 assert_state!(store, project_key1, ProjectState::Disabled);
1066
1067 let fetch = store.try_begin_fetch(project_key2).unwrap();
1068 let fetch = fetch.complete(ProjectState::Disabled.into());
1069 assert!(store.complete_fetch(fetch).is_none());
1070
1071 tokio::time::advance(Duration::from_secs(3)).await;
1073
1074 assert_eq!(collect_evicted(&mut store).await, vec![project_key1]);
1075 assert_state!(store, project_key1, ProjectState::Pending);
1076 assert_state!(store, project_key2, ProjectState::Disabled);
1077 }
1078
1079 #[tokio::test(start_paused = true)]
1080 async fn test_store_evict_projects_pending_not_expired() {
1081 let project_key1 = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1082 let project_key2 = ProjectKey::parse("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb").unwrap();
1083 let mut store = ProjectStore::new(
1084 &relay_config::Config::from_json_value(serde_json::json!({
1085 "cache": {
1086 "project_expiry": 5,
1087 "project_grace_period": 0,
1088 }
1089 }))
1090 .unwrap(),
1091 );
1092
1093 let fetch = store.try_begin_fetch(project_key1).unwrap();
1094 store.shared().get_or_create(project_key2);
1096
1097 tokio::time::advance(Duration::from_secs(6)).await;
1098
1099 assert_eq!(collect_evicted(&mut store).await, Vec::new());
1101
1102 let fetch = fetch.complete(ProjectState::Disabled.into());
1104 assert!(store.complete_fetch(fetch).is_none());
1105
1106 assert_eq!(collect_evicted(&mut store).await, Vec::new());
1108 tokio::time::advance(Duration::from_secs(4)).await;
1109 assert_eq!(collect_evicted(&mut store).await, Vec::new());
1110 assert_state!(store, project_key1, ProjectState::Disabled);
1111
1112 tokio::time::advance(Duration::from_millis(1001)).await;
1114 assert_eq!(collect_evicted(&mut store).await, vec![project_key1]);
1115 assert_state!(store, project_key1, ProjectState::Pending);
1116 assert_state!(store, project_key2, ProjectState::Pending);
1117 }
1118
1119 #[tokio::test(start_paused = true)]
1120 async fn test_store_evict_projects_stale() {
1121 let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1122 let mut store = ProjectStore::new(
1123 &relay_config::Config::from_json_value(serde_json::json!({
1124 "cache": {
1125 "project_expiry": 5,
1126 "project_grace_period": 5,
1127 }
1128 }))
1129 .unwrap(),
1130 );
1131
1132 let fetch = store.try_begin_fetch(project_key).unwrap();
1133 let fetch = fetch.complete(ProjectState::Disabled.into());
1134 assert!(store.complete_fetch(fetch).is_none());
1135
1136 tokio::time::advance(Duration::from_millis(9500)).await;
1138
1139 assert_eq!(collect_evicted(&mut store).await, Vec::new());
1140 assert_state!(store, project_key, ProjectState::Disabled);
1141
1142 tokio::time::advance(Duration::from_secs(1)).await;
1144
1145 assert_eq!(collect_evicted(&mut store).await, vec![project_key]);
1146 assert_state!(store, project_key, ProjectState::Pending);
1147 }
1148
1149 #[tokio::test(start_paused = true)]
1150 async fn test_store_no_eviction_during_fetch() {
1151 let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1152 let mut store = ProjectStore::new(
1153 &relay_config::Config::from_json_value(serde_json::json!({
1154 "cache": {
1155 "project_expiry": 5,
1156 "project_grace_period": 5,
1157 }
1158 }))
1159 .unwrap(),
1160 );
1161
1162 let fetch = store.try_begin_fetch(project_key).unwrap();
1163
1164 tokio::time::advance(Duration::from_millis(10500)).await;
1166 assert_eq!(collect_evicted(&mut store).await, Vec::new());
1168
1169 let fetch = fetch.complete(ProjectState::Disabled.into());
1171 assert!(store.complete_fetch(fetch).is_none());
1172 tokio::time::advance(Duration::from_millis(5001)).await;
1174 let fetch = store.try_begin_fetch(project_key).unwrap();
1175
1176 tokio::time::advance(Duration::from_millis(10500)).await;
1178 assert_eq!(collect_evicted(&mut store).await, Vec::new());
1180
1181 let fetch = fetch.complete(ProjectState::Disabled.into());
1183 assert!(store.complete_fetch(fetch).is_none());
1184
1185 tokio::time::advance(Duration::from_millis(9500)).await;
1187 assert_eq!(collect_evicted(&mut store).await, Vec::new());
1188 tokio::time::advance(Duration::from_millis(501)).await;
1190 assert_eq!(collect_evicted(&mut store).await, vec![project_key]);
1191 assert_state!(store, project_key, ProjectState::Pending);
1192 }
1193
1194 #[tokio::test(start_paused = true)]
1195 async fn test_store_refresh() {
1196 let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1197 let mut store = ProjectStore::new(
1198 &relay_config::Config::from_json_value(serde_json::json!({
1199 "cache": {
1200 "project_expiry": 5,
1201 "project_grace_period": 5,
1202 "project_refresh_interval": 7,
1203 }
1204 }))
1205 .unwrap(),
1206 );
1207
1208 let fetch = store.try_begin_fetch(project_key).unwrap();
1209 let fetch = fetch.complete(ProjectState::Disabled.into());
1210 assert!(store.complete_fetch(fetch).is_none());
1211 assert_state!(store, project_key, ProjectState::Disabled);
1212
1213 let Some(Action::Refresh(refresh)) = store.poll().await else {
1215 panic!();
1216 };
1217 assert_eq!(refresh.project_key(), project_key);
1218
1219 let fetch = store.refresh(refresh).unwrap();
1220 assert!(store.try_begin_fetch(project_key).is_none());
1222 let fetch = fetch.complete(ProjectState::Disabled.into());
1223 assert!(store.complete_fetch(fetch).is_none());
1224
1225 let Some(Action::Refresh(refresh)) = store.poll().await else {
1229 panic!();
1230 };
1231 let fetch = store.refresh(refresh).unwrap();
1232 let fetch = fetch.complete(ProjectState::Disabled.into());
1233 assert!(store.complete_fetch(fetch).is_none());
1234
1235 let Some(Action::Eviction(eviction)) = store.poll().await else {
1238 panic!();
1239 };
1240 assert_eq!(eviction.project_key(), project_key);
1241 }
1242
1243 #[tokio::test(start_paused = true)]
1244 async fn test_store_refresh_overtaken_by_eviction() {
1245 let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1246 let mut store = ProjectStore::new(
1247 &relay_config::Config::from_json_value(serde_json::json!({
1248 "cache": {
1249 "project_expiry": 5,
1250 "project_grace_period": 5,
1251 "project_refresh_interval": 7,
1252 }
1253 }))
1254 .unwrap(),
1255 );
1256
1257 let fetch = store.try_begin_fetch(project_key).unwrap();
1258 let fetch = fetch.complete(ProjectState::Disabled.into());
1259 assert!(store.complete_fetch(fetch).is_none());
1260 assert_state!(store, project_key, ProjectState::Disabled);
1261
1262 tokio::time::advance(Duration::from_secs(20)).await;
1264
1265 let Some(Action::Eviction(eviction)) = store.poll().await else {
1268 panic!();
1269 };
1270 assert_eq!(eviction.project_key(), project_key);
1271 store.evict(eviction);
1272
1273 assert!(
1277 tokio::time::timeout(Duration::from_secs(60), store.poll())
1278 .await
1279 .is_err()
1280 );
1281 }
1282
1283 #[tokio::test(start_paused = true)]
1284 async fn test_store_refresh_during_eviction() {
1285 let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1286 let mut store = ProjectStore::new(
1287 &relay_config::Config::from_json_value(serde_json::json!({
1288 "cache": {
1289 "project_expiry": 5,
1290 "project_grace_period": 5,
1291 "project_refresh_interval": 7,
1292 }
1293 }))
1294 .unwrap(),
1295 );
1296
1297 let fetch = store.try_begin_fetch(project_key).unwrap();
1298 let fetch = fetch.complete(ProjectState::Disabled.into());
1299 assert!(store.complete_fetch(fetch).is_none());
1300 assert_state!(store, project_key, ProjectState::Disabled);
1301
1302 tokio::time::advance(Duration::from_secs(20)).await;
1304
1305 let Some(Action::Eviction(eviction)) = store.poll().await else {
1309 panic!();
1310 };
1311 let Some(Action::Refresh(refresh)) = store.poll().await else {
1312 panic!();
1313 };
1314 assert_eq!(eviction.project_key(), project_key);
1315 assert_eq!(refresh.project_key(), project_key);
1316 store.evict(eviction);
1317
1318 assert!(store.refresh(refresh).is_none());
1319 }
1320}