1use futures::StreamExt;
2use std::fmt;
3use std::sync::Arc;
4use std::time::Duration;
5use tokio::time::Instant;
6
7use arc_swap::ArcSwap;
8use relay_base_schema::project::ProjectKey;
9use relay_quotas::CachedRateLimits;
10use relay_sampling::evaluation::ReservoirCounters;
11use relay_statsd::metric;
12
13use crate::services::projects::project::{ProjectState, Revision};
14use crate::services::projects::source::SourceProjectState;
15use crate::statsd::{RelayHistograms, RelayTimers};
16use crate::utils::{RetryBackoff, UniqueScheduledQueue};
17
18pub struct ProjectStore {
29 config: Config,
30 shared: Arc<Shared>,
32 private: hashbrown::HashMap<ProjectKey, PrivateProjectState>,
34 evictions: UniqueScheduledQueue<ProjectKey>,
36 refreshes: UniqueScheduledQueue<ProjectKey>,
38}
39
40impl ProjectStore {
41 pub fn new(config: &relay_config::Config) -> Self {
42 Self {
43 config: Config::new(config),
44 shared: Default::default(),
45 private: Default::default(),
46 evictions: Default::default(),
47 refreshes: Default::default(),
48 }
49 }
50
51 pub fn shared(&self) -> Arc<Shared> {
53 Arc::clone(&self.shared)
54 }
55
56 pub fn try_begin_fetch(&mut self, project_key: ProjectKey) -> Option<Fetch> {
62 self.do_try_begin_fetch(project_key, false)
63 }
64
65 #[must_use = "an incomplete fetch must be retried"]
70 pub fn complete_fetch(&mut self, fetch: CompletedFetch) -> Option<Fetch> {
71 let project_key = fetch.project_key();
72
73 debug_assert!(self.shared.projects.pin().get(&project_key).is_some());
76 debug_assert!(self.private.get(&project_key).is_some());
77
78 let mut project = self.get_or_create(project_key);
79 let new_fetch = match project.complete_fetch(fetch) {
82 FetchResult::ReSchedule { refresh } => project.try_begin_fetch(refresh),
83 FetchResult::Done { expiry, refresh } => {
84 self.evictions.schedule(expiry.0, project_key);
85 if let Some(RefreshTime(refresh)) = refresh {
86 self.refreshes.schedule(refresh, project_key);
87 }
88 None
89 }
90 };
91
92 metric!(
93 histogram(RelayHistograms::ProjectStateCacheSize) = self.shared.projects.len() as u64,
94 storage = "shared"
95 );
96 metric!(
97 histogram(RelayHistograms::ProjectStateCacheSize) = self.private.len() as u64,
98 storage = "private"
99 );
100
101 new_fetch
102 }
103
104 pub async fn poll(&mut self) -> Option<Action> {
111 let eviction = self.evictions.next();
112 let refresh = self.refreshes.next();
113
114 tokio::select! {
115 biased;
116
117 Some(e) = eviction => Some(Action::Eviction(Eviction(e))),
118 Some(r) = refresh => Some(Action::Refresh(Refresh(r))),
119 else => None,
120 }
121 }
122
123 pub fn refresh(&mut self, Refresh(project_key): Refresh) -> Option<Fetch> {
131 self.do_try_begin_fetch(project_key, true)
132 }
133
134 pub fn evict(&mut self, Eviction(project_key): Eviction) {
136 let Some(private) = self.private.remove(&project_key) else {
138 debug_assert!(false, "no private state for eviction");
140 return;
141 };
142
143 debug_assert!(
144 matches!(private.state, FetchState::Complete { .. }),
145 "private state must be completed"
146 );
147
148 let shared = self.shared.projects.pin();
150 let _removed = shared.remove(&project_key);
151 debug_assert!(
152 _removed.is_some(),
153 "an expired project must exist in the shared state"
154 );
155
156 self.refreshes.remove(&project_key);
159 }
160
161 fn do_try_begin_fetch(&mut self, project_key: ProjectKey, is_refresh: bool) -> Option<Fetch> {
164 let fetch = match is_refresh {
165 true => self.get(project_key)?,
171 false => self.get_or_create(project_key),
172 }
173 .try_begin_fetch(is_refresh);
174
175 if fetch.is_some() {
178 self.evictions.remove(&project_key);
179 }
182
183 fetch
184 }
185
186 fn get(&mut self, project_key: ProjectKey) -> Option<ProjectRef<'_>> {
190 let private = self.private.get_mut(&project_key)?;
191
192 debug_assert!(self.shared.projects.pin().contains_key(&project_key));
195
196 let shared = self
197 .shared
198 .projects
199 .pin()
200 .get_or_insert_with(project_key, Default::default)
201 .clone();
202
203 Some(ProjectRef {
204 private,
205 shared,
206 config: &self.config,
207 })
208 }
209
210 fn get_or_create(&mut self, project_key: ProjectKey) -> ProjectRef<'_> {
214 #[cfg(debug_assertions)]
215 if self.private.contains_key(&project_key) {
216 debug_assert!(self.shared.projects.pin().contains_key(&project_key));
222 }
223
224 let private = self
225 .private
226 .entry(project_key)
227 .or_insert_with(|| PrivateProjectState::new(project_key, &self.config));
228
229 let shared = self
230 .shared
231 .projects
232 .pin()
233 .get_or_insert_with(project_key, Default::default)
234 .clone();
235
236 ProjectRef {
237 private,
238 shared,
239 config: &self.config,
240 }
241 }
242}
243
244struct Config {
246 expiry: Duration,
250 grace_period: Duration,
255 refresh_interval: Option<Duration>,
262 max_retry_backoff: Duration,
264}
265
266impl Config {
267 fn new(config: &relay_config::Config) -> Self {
268 let expiry = config.project_cache_expiry();
269 let grace_period = config.project_grace_period();
270
271 let refresh_interval = config
275 .project_refresh_interval()
276 .filter(|rt| *rt < (expiry + grace_period))
277 .filter(|rt| *rt > expiry);
278
279 Self {
280 expiry: config.project_cache_expiry(),
281 grace_period: config.project_grace_period(),
282 refresh_interval,
283 max_retry_backoff: config.http_max_retry_interval(),
284 }
285 }
286}
287
288#[derive(Default)]
290pub struct Shared {
291 projects: papaya::HashMap<ProjectKey, SharedProjectState, ahash::RandomState>,
292}
293
294impl Shared {
295 pub fn get_or_create(&self, project_key: ProjectKey) -> SharedProject {
300 let projects = self.projects.pin();
302 if let Some(project) = projects.get(&project_key) {
303 return project.to_shared_project();
304 }
305
306 match projects.try_insert(project_key, Default::default()) {
308 Ok(inserted) => inserted.to_shared_project(),
309 Err(occupied) => occupied.current.to_shared_project(),
310 }
311 }
312}
313
314#[cfg(test)]
316impl Shared {
317 pub fn test_set_project_state(&self, project_key: ProjectKey, state: ProjectState) {
321 self.projects
322 .pin()
323 .get_or_insert_with(project_key, Default::default)
324 .set_project_state(state);
325 }
326
327 pub fn test_has_project_created(&self, project_key: ProjectKey) -> bool {
329 self.projects.pin().contains_key(&project_key)
330 }
331}
332
333impl fmt::Debug for Shared {
334 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
335 f.debug_struct("Shared")
336 .field("num_projects", &self.projects.len())
337 .finish()
338 }
339}
340
341pub struct SharedProject(Arc<SharedProjectStateInner>);
343
344impl SharedProject {
345 pub fn project_state(&self) -> &ProjectState {
347 &self.0.state
348 }
349
350 pub fn cached_rate_limits(&self) -> &CachedRateLimits {
352 &self.0.rate_limits
358 }
359
360 pub fn reservoir_counters(&self) -> &ReservoirCounters {
362 &self.0.reservoir_counters
363 }
364}
365
366#[cfg(test)]
368impl SharedProject {
369 pub fn for_test(state: ProjectState) -> Self {
371 Self(Arc::new(SharedProjectStateInner {
372 state,
373 ..Default::default()
374 }))
375 }
376}
377
378struct ProjectRef<'a> {
380 shared: SharedProjectState,
381 private: &'a mut PrivateProjectState,
382 config: &'a Config,
383}
384
385impl ProjectRef<'_> {
386 fn try_begin_fetch(&mut self, is_refresh: bool) -> Option<Fetch> {
387 let now = Instant::now();
388 self.private
389 .try_begin_fetch(now, is_refresh, self.config)
390 .map(|fetch| fetch.with_revision(self.shared.revision()))
391 }
392
393 fn complete_fetch(&mut self, fetch: CompletedFetch) -> FetchResult {
394 let now = Instant::now();
395
396 if let Some(latency) = fetch.latency() {
397 let delay = match fetch.delay() {
398 Some(delay) if delay.as_secs() <= 15 => "lte15s",
399 Some(delay) if delay.as_secs() <= 30 => "lte30s",
400 Some(delay) if delay.as_secs() <= 60 => "lte60s",
401 Some(delay) if delay.as_secs() <= 120 => "lte120",
402 Some(delay) if delay.as_secs() <= 300 => "lte300s",
403 Some(delay) if delay.as_secs() <= 600 => "lte600s",
404 Some(delay) if delay.as_secs() <= 1800 => "lte1800s",
405 Some(delay) if delay.as_secs() <= 3600 => "lte3600s",
406 Some(_) => "gt3600s",
407 None => "none",
408 };
409 metric!(
410 timer(RelayTimers::ProjectCacheUpdateLatency) = latency,
411 delay = delay
412 );
413 }
414
415 if !fetch.is_pending() {
416 let state = match fetch.state {
417 SourceProjectState::New(_) => "new",
418 SourceProjectState::NotModified => "not_modified",
419 };
420
421 metric!(
422 timer(RelayTimers::ProjectCacheFetchDuration) = fetch.duration(now),
423 state = state
424 );
425 }
426
427 let result = self.private.complete_fetch(&fetch, now, self.config);
429 match fetch.state {
430 SourceProjectState::New(state) if !state.is_pending() => {
433 self.shared.set_project_state(state);
434 }
435 _ => {}
436 }
437
438 result
439 }
440}
441
442pub enum Action {
443 Eviction(Eviction),
444 Refresh(Refresh),
445}
446
447#[derive(Debug)]
451#[must_use = "a refresh must be used"]
452pub struct Refresh(ProjectKey);
453
454impl Refresh {
455 pub fn project_key(&self) -> ProjectKey {
457 self.0
458 }
459}
460
461#[derive(Debug)]
465#[must_use = "an eviction must be used"]
466pub struct Eviction(ProjectKey);
467
468impl Eviction {
469 pub fn project_key(&self) -> ProjectKey {
471 self.0
472 }
473}
474
475#[must_use = "a fetch must be executed"]
479#[derive(Debug)]
480pub struct Fetch {
481 project_key: ProjectKey,
482 previous_fetch: Option<Instant>,
483 initiated: Instant,
484 when: Option<Instant>,
485 revision: Revision,
486}
487
488impl Fetch {
489 pub fn project_key(&self) -> ProjectKey {
491 self.project_key
492 }
493
494 pub fn when(&self) -> Option<Instant> {
499 self.when
500 }
501
502 pub fn revision(&self) -> Revision {
507 self.revision.clone()
508 }
509
510 pub fn complete(self, state: SourceProjectState) -> CompletedFetch {
512 CompletedFetch { fetch: self, state }
513 }
514
515 fn with_revision(mut self, revision: Revision) -> Self {
516 self.revision = revision;
517 self
518 }
519}
520
521#[must_use = "a completed fetch must be acted upon"]
523#[derive(Debug)]
524pub struct CompletedFetch {
525 fetch: Fetch,
526 state: SourceProjectState,
527}
528
529impl CompletedFetch {
530 pub fn project_key(&self) -> ProjectKey {
532 self.fetch.project_key()
533 }
534
535 fn delay(&self) -> Option<Duration> {
539 self.fetch
540 .previous_fetch
541 .map(|pf| self.fetch.initiated.duration_since(pf))
542 }
543
544 fn duration(&self, now: Instant) -> Duration {
546 now.duration_since(self.fetch.initiated)
547 }
548
549 fn latency(&self) -> Option<Duration> {
557 let is_first_fetch = self.fetch.revision().as_str().is_none();
561 if is_first_fetch {
562 return None;
563 }
564
565 let project_info = match &self.state {
566 SourceProjectState::New(ProjectState::Enabled(project_info)) => project_info,
567 _ => return None,
572 };
573
574 if project_info.rev == self.fetch.revision {
576 return None;
577 }
578
579 let elapsed = chrono::Utc::now() - project_info.last_change?;
580 elapsed.to_std().ok()
581 }
582
583 fn is_pending(&self) -> bool {
585 match &self.state {
586 SourceProjectState::New(state) => state.is_pending(),
587 SourceProjectState::NotModified => false,
588 }
589 }
590}
591
592#[derive(Debug, Default, Clone)]
596struct SharedProjectState(Arc<ArcSwap<SharedProjectStateInner>>);
597
598impl SharedProjectState {
599 fn set_project_state(&self, state: ProjectState) {
601 let prev = self.0.rcu(|stored| SharedProjectStateInner {
602 state: state.clone(),
603 rate_limits: Arc::clone(&stored.rate_limits),
604 reservoir_counters: Arc::clone(&stored.reservoir_counters),
605 });
606
607 if let Some(state) = state.enabled() {
611 let config = state.config.sampling.as_ref();
612 if let Some(config) = config.and_then(|eb| eb.as_ref().ok()) {
613 if let Ok(mut counters) = prev.reservoir_counters.try_lock() {
618 counters.retain(|key, _| config.rules.iter().any(|rule| rule.id == *key));
619 }
620 }
621 }
622 }
623
624 fn revision(&self) -> Revision {
626 self.0.as_ref().load().state.revision().clone()
627 }
628
629 fn to_shared_project(&self) -> SharedProject {
631 SharedProject(self.0.as_ref().load_full())
632 }
633}
634
635#[derive(Debug, Default)]
640struct SharedProjectStateInner {
641 state: ProjectState,
642 rate_limits: Arc<CachedRateLimits>,
643 reservoir_counters: ReservoirCounters,
644}
645
646#[derive(Debug)]
658enum FetchState {
659 InProgress {
661 is_refresh: bool,
666 },
667 Pending {
676 initiated: Option<Instant>,
684 next_fetch_attempt: Option<Instant>,
688 },
689 Complete {
691 when: LastFetch,
693 },
694}
695
696struct PrivateProjectState {
698 project_key: ProjectKey,
700
701 state: FetchState,
703 backoff: RetryBackoff,
707
708 last_fetch: Option<Instant>,
715
716 expiry: Option<Instant>,
720}
721
722impl PrivateProjectState {
723 fn new(project_key: ProjectKey, config: &Config) -> Self {
724 Self {
725 project_key,
726 state: FetchState::Pending {
727 initiated: None,
728 next_fetch_attempt: None,
729 },
730 backoff: RetryBackoff::new(config.max_retry_backoff),
731 last_fetch: None,
732 expiry: None,
733 }
734 }
735
736 fn try_begin_fetch(
737 &mut self,
738 now: Instant,
739 is_refresh: bool,
740 config: &Config,
741 ) -> Option<Fetch> {
742 let (initiated, when) = match &mut self.state {
743 FetchState::InProgress {
744 is_refresh: refresh_in_progress,
745 } => {
746 relay_log::trace!(
747 tags.project_key = self.project_key.as_str(),
748 "project fetch skipped, fetch in progress"
749 );
750 *refresh_in_progress = *refresh_in_progress && is_refresh;
752 return None;
753 }
754 FetchState::Pending {
755 initiated,
756 next_fetch_attempt,
757 } => {
758 (initiated.unwrap_or(now), *next_fetch_attempt)
760 }
761 FetchState::Complete { when } => {
762 debug_assert_eq!(Some(when.0), self.last_fetch);
764
765 if when.check_expiry(now, config).is_fresh() {
766 relay_log::trace!(
768 tags.project_key = self.project_key.as_str(),
769 "project fetch skipped, already up to date"
770 );
771 return None;
772 }
773
774 (now, None)
775 }
776 };
777
778 self.state = FetchState::InProgress { is_refresh };
780
781 relay_log::trace!(
782 tags.project_key = &self.project_key.as_str(),
783 attempts = self.backoff.attempt() + 1,
784 "project state {} scheduled in {:?}",
785 if is_refresh { "refresh" } else { "fetch" },
786 when.unwrap_or(now).saturating_duration_since(now),
787 );
788
789 Some(Fetch {
790 project_key: self.project_key,
791 previous_fetch: self.last_fetch,
792 initiated,
793 when,
794 revision: Revision::default(),
795 })
796 }
797
798 fn complete_fetch(
799 &mut self,
800 fetch: &CompletedFetch,
801 now: Instant,
802 config: &Config,
803 ) -> FetchResult {
804 let FetchState::InProgress { is_refresh } = self.state else {
805 debug_assert!(
806 false,
807 "fetch completed while there was no current fetch registered"
808 );
809 return FetchResult::ReSchedule { refresh: false };
811 };
812
813 if fetch.is_pending() {
814 let next_backoff = self.backoff.next_backoff();
815 let next_fetch_attempt = match next_backoff.is_zero() {
816 false => now.checked_add(next_backoff),
817 true => None,
818 };
819 self.state = FetchState::Pending {
820 next_fetch_attempt,
821 initiated: Some(fetch.fetch.initiated),
822 };
823 relay_log::trace!(
824 tags.project_key = &self.project_key.as_str(),
825 "project state {} completed but still pending",
826 if is_refresh { "refresh" } else { "fetch" },
827 );
828
829 FetchResult::ReSchedule {
830 refresh: is_refresh,
831 }
832 } else {
833 relay_log::trace!(
834 tags.project_key = &self.project_key.as_str(),
835 "project state {} completed with non-pending config",
836 if is_refresh { "refresh" } else { "fetch" },
837 );
838
839 self.backoff.reset();
840 self.last_fetch = Some(now);
841
842 let when = LastFetch(now);
843
844 let refresh = when.refresh_time(config);
845 let expiry = match self.expiry {
846 Some(expiry) if is_refresh => ExpiryTime(expiry),
847 Some(_) | None => when.expiry_time(config),
850 };
851 self.expiry = Some(expiry.0);
852
853 self.state = FetchState::Complete { when };
854 FetchResult::Done { expiry, refresh }
855 }
856 }
857}
858
859#[derive(Debug)]
861#[must_use = "fetch result must be used"]
862enum FetchResult {
863 ReSchedule {
865 refresh: bool,
867 },
868 Done {
870 expiry: ExpiryTime,
872 refresh: Option<RefreshTime>,
874 },
875}
876
877#[derive(Debug, Copy, Clone)]
879struct LastFetch(Instant);
880
881impl LastFetch {
882 fn check_expiry(&self, now: Instant, config: &Config) -> Expiry {
884 let elapsed = now.saturating_duration_since(self.0);
885
886 if elapsed >= config.expiry + config.grace_period {
887 Expiry::Expired
888 } else if elapsed >= config.expiry {
889 Expiry::Stale
890 } else {
891 Expiry::Fresh
892 }
893 }
894
895 fn refresh_time(&self, config: &Config) -> Option<RefreshTime> {
897 config
898 .refresh_interval
899 .map(|duration| self.0 + duration)
900 .map(RefreshTime)
901 }
902
903 fn expiry_time(&self, config: &Config) -> ExpiryTime {
905 ExpiryTime(self.0 + config.grace_period + config.expiry)
906 }
907}
908
909#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
911enum Expiry {
912 Fresh,
914 Stale,
917 Expired,
920}
921
922impl Expiry {
923 fn is_fresh(&self) -> bool {
925 matches!(self, Self::Fresh)
926 }
927}
928
929#[derive(Debug)]
931#[must_use = "an refresh time must be used to schedule a refresh"]
932struct RefreshTime(Instant);
933
934#[derive(Debug)]
936#[must_use = "an expiry time must be used to schedule an eviction"]
937struct ExpiryTime(Instant);
938
939#[cfg(test)]
940mod tests {
941 use std::time::Duration;
942
943 use super::*;
944
945 async fn collect_evicted(store: &mut ProjectStore) -> Vec<ProjectKey> {
946 let mut evicted = Vec::new();
947 while let Ok(Some(Action::Eviction(eviction))) =
949 tokio::time::timeout(Duration::from_nanos(5), store.poll()).await
950 {
951 evicted.push(eviction.0);
952 store.evict(eviction);
953 }
954 evicted
955 }
956
957 macro_rules! assert_state {
958 ($store:ident, $project_key:ident, $state:pat) => {
959 assert!(matches!(
960 $store.shared().get_or_create($project_key).project_state(),
961 $state
962 ));
963 };
964 }
965
966 #[tokio::test(start_paused = true)]
967 async fn test_store_fetch() {
968 let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
969 let mut store = ProjectStore::new(&Default::default());
970
971 let fetch = store.try_begin_fetch(project_key).unwrap();
972 assert_eq!(fetch.project_key(), project_key);
973 assert_eq!(fetch.when(), None);
974 assert_eq!(fetch.revision().as_str(), None);
975 assert_state!(store, project_key, ProjectState::Pending);
976
977 assert!(store.try_begin_fetch(project_key).is_none());
979
980 let fetch = fetch.complete(ProjectState::Pending.into());
982 let fetch = store.complete_fetch(fetch).unwrap();
983 assert_eq!(fetch.project_key(), project_key);
984 assert_eq!(fetch.when(), None);
986 assert_eq!(fetch.revision().as_str(), None);
987 assert_state!(store, project_key, ProjectState::Pending);
988
989 let fetch = fetch.complete(ProjectState::Pending.into());
991 let fetch = store.complete_fetch(fetch).unwrap();
992 assert_eq!(fetch.project_key(), project_key);
993 assert!(fetch.when() > Some(Instant::now()));
995 assert_eq!(fetch.revision().as_str(), None);
996 assert_state!(store, project_key, ProjectState::Pending);
997
998 let fetch = fetch.complete(ProjectState::Disabled.into());
1000 assert!(store.complete_fetch(fetch).is_none());
1001 assert_state!(store, project_key, ProjectState::Disabled);
1002
1003 assert!(store.try_begin_fetch(project_key).is_none());
1005 }
1006
1007 #[tokio::test(start_paused = true)]
1008 async fn test_store_fetch_pending_does_not_replace_state() {
1009 let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1010 let mut store = ProjectStore::new(
1011 &relay_config::Config::from_json_value(serde_json::json!({
1012 "cache": {
1013 "project_expiry": 5,
1014 "project_grace_period": 5,
1015 }
1016 }))
1017 .unwrap(),
1018 );
1019
1020 let fetch = store.try_begin_fetch(project_key).unwrap();
1021 let fetch = fetch.complete(ProjectState::Disabled.into());
1022 assert!(store.complete_fetch(fetch).is_none());
1023 assert_state!(store, project_key, ProjectState::Disabled);
1024
1025 tokio::time::advance(Duration::from_secs(6)).await;
1026
1027 let fetch = store.try_begin_fetch(project_key).unwrap();
1028 let fetch = fetch.complete(ProjectState::Pending.into());
1029 let fetch = store.complete_fetch(fetch).unwrap();
1031 assert_state!(store, project_key, ProjectState::Disabled);
1033
1034 let fetch = fetch.complete(ProjectState::new_allowed().into());
1035 assert!(store.complete_fetch(fetch).is_none());
1036 assert_state!(store, project_key, ProjectState::Enabled(_));
1037 }
1038
1039 #[tokio::test(start_paused = true)]
1040 async fn test_store_evict_projects() {
1041 let project_key1 = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1042 let project_key2 = ProjectKey::parse("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb").unwrap();
1043 let mut store = ProjectStore::new(
1044 &relay_config::Config::from_json_value(serde_json::json!({
1045 "cache": {
1046 "project_expiry": 5,
1047 "project_grace_period": 0,
1048 }
1049 }))
1050 .unwrap(),
1051 );
1052
1053 let fetch = store.try_begin_fetch(project_key1).unwrap();
1054 let fetch = fetch.complete(ProjectState::Disabled.into());
1055 assert!(store.complete_fetch(fetch).is_none());
1056
1057 assert_eq!(collect_evicted(&mut store).await, Vec::new());
1058 assert_state!(store, project_key1, ProjectState::Disabled);
1059
1060 tokio::time::advance(Duration::from_secs(3)).await;
1062
1063 assert_eq!(collect_evicted(&mut store).await, Vec::new());
1064 assert_state!(store, project_key1, ProjectState::Disabled);
1065
1066 let fetch = store.try_begin_fetch(project_key2).unwrap();
1067 let fetch = fetch.complete(ProjectState::Disabled.into());
1068 assert!(store.complete_fetch(fetch).is_none());
1069
1070 tokio::time::advance(Duration::from_secs(3)).await;
1072
1073 assert_eq!(collect_evicted(&mut store).await, vec![project_key1]);
1074 assert_state!(store, project_key1, ProjectState::Pending);
1075 assert_state!(store, project_key2, ProjectState::Disabled);
1076 }
1077
1078 #[tokio::test(start_paused = true)]
1079 async fn test_store_evict_projects_pending_not_expired() {
1080 let project_key1 = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1081 let project_key2 = ProjectKey::parse("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb").unwrap();
1082 let mut store = ProjectStore::new(
1083 &relay_config::Config::from_json_value(serde_json::json!({
1084 "cache": {
1085 "project_expiry": 5,
1086 "project_grace_period": 0,
1087 }
1088 }))
1089 .unwrap(),
1090 );
1091
1092 let fetch = store.try_begin_fetch(project_key1).unwrap();
1093 store.shared().get_or_create(project_key2);
1095
1096 tokio::time::advance(Duration::from_secs(6)).await;
1097
1098 assert_eq!(collect_evicted(&mut store).await, Vec::new());
1100
1101 let fetch = fetch.complete(ProjectState::Disabled.into());
1103 assert!(store.complete_fetch(fetch).is_none());
1104
1105 assert_eq!(collect_evicted(&mut store).await, Vec::new());
1107 tokio::time::advance(Duration::from_secs(4)).await;
1108 assert_eq!(collect_evicted(&mut store).await, Vec::new());
1109 assert_state!(store, project_key1, ProjectState::Disabled);
1110
1111 tokio::time::advance(Duration::from_millis(1001)).await;
1113 assert_eq!(collect_evicted(&mut store).await, vec![project_key1]);
1114 assert_state!(store, project_key1, ProjectState::Pending);
1115 assert_state!(store, project_key2, ProjectState::Pending);
1116 }
1117
1118 #[tokio::test(start_paused = true)]
1119 async fn test_store_evict_projects_stale() {
1120 let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1121 let mut store = ProjectStore::new(
1122 &relay_config::Config::from_json_value(serde_json::json!({
1123 "cache": {
1124 "project_expiry": 5,
1125 "project_grace_period": 5,
1126 }
1127 }))
1128 .unwrap(),
1129 );
1130
1131 let fetch = store.try_begin_fetch(project_key).unwrap();
1132 let fetch = fetch.complete(ProjectState::Disabled.into());
1133 assert!(store.complete_fetch(fetch).is_none());
1134
1135 tokio::time::advance(Duration::from_millis(9500)).await;
1137
1138 assert_eq!(collect_evicted(&mut store).await, Vec::new());
1139 assert_state!(store, project_key, ProjectState::Disabled);
1140
1141 tokio::time::advance(Duration::from_secs(1)).await;
1143
1144 assert_eq!(collect_evicted(&mut store).await, vec![project_key]);
1145 assert_state!(store, project_key, ProjectState::Pending);
1146 }
1147
1148 #[tokio::test(start_paused = true)]
1149 async fn test_store_no_eviction_during_fetch() {
1150 let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1151 let mut store = ProjectStore::new(
1152 &relay_config::Config::from_json_value(serde_json::json!({
1153 "cache": {
1154 "project_expiry": 5,
1155 "project_grace_period": 5,
1156 }
1157 }))
1158 .unwrap(),
1159 );
1160
1161 let fetch = store.try_begin_fetch(project_key).unwrap();
1162
1163 tokio::time::advance(Duration::from_millis(10500)).await;
1165 assert_eq!(collect_evicted(&mut store).await, Vec::new());
1167
1168 let fetch = fetch.complete(ProjectState::Disabled.into());
1170 assert!(store.complete_fetch(fetch).is_none());
1171 tokio::time::advance(Duration::from_millis(5001)).await;
1173 let fetch = store.try_begin_fetch(project_key).unwrap();
1174
1175 tokio::time::advance(Duration::from_millis(10500)).await;
1177 assert_eq!(collect_evicted(&mut store).await, Vec::new());
1179
1180 let fetch = fetch.complete(ProjectState::Disabled.into());
1182 assert!(store.complete_fetch(fetch).is_none());
1183
1184 tokio::time::advance(Duration::from_millis(9500)).await;
1186 assert_eq!(collect_evicted(&mut store).await, Vec::new());
1187 tokio::time::advance(Duration::from_millis(501)).await;
1189 assert_eq!(collect_evicted(&mut store).await, vec![project_key]);
1190 assert_state!(store, project_key, ProjectState::Pending);
1191 }
1192
1193 #[tokio::test(start_paused = true)]
1194 async fn test_store_refresh() {
1195 let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1196 let mut store = ProjectStore::new(
1197 &relay_config::Config::from_json_value(serde_json::json!({
1198 "cache": {
1199 "project_expiry": 5,
1200 "project_grace_period": 5,
1201 "project_refresh_interval": 7,
1202 }
1203 }))
1204 .unwrap(),
1205 );
1206
1207 let fetch = store.try_begin_fetch(project_key).unwrap();
1208 let fetch = fetch.complete(ProjectState::Disabled.into());
1209 assert!(store.complete_fetch(fetch).is_none());
1210 assert_state!(store, project_key, ProjectState::Disabled);
1211
1212 let Some(Action::Refresh(refresh)) = store.poll().await else {
1214 panic!();
1215 };
1216 assert_eq!(refresh.project_key(), project_key);
1217
1218 let fetch = store.refresh(refresh).unwrap();
1219 assert!(store.try_begin_fetch(project_key).is_none());
1221 let fetch = fetch.complete(ProjectState::Disabled.into());
1222 assert!(store.complete_fetch(fetch).is_none());
1223
1224 let Some(Action::Refresh(refresh)) = store.poll().await else {
1228 panic!();
1229 };
1230 let fetch = store.refresh(refresh).unwrap();
1231 let fetch = fetch.complete(ProjectState::Disabled.into());
1232 assert!(store.complete_fetch(fetch).is_none());
1233
1234 let Some(Action::Eviction(eviction)) = store.poll().await else {
1237 panic!();
1238 };
1239 assert_eq!(eviction.project_key(), project_key);
1240 }
1241
1242 #[tokio::test(start_paused = true)]
1243 async fn test_store_refresh_overtaken_by_eviction() {
1244 let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1245 let mut store = ProjectStore::new(
1246 &relay_config::Config::from_json_value(serde_json::json!({
1247 "cache": {
1248 "project_expiry": 5,
1249 "project_grace_period": 5,
1250 "project_refresh_interval": 7,
1251 }
1252 }))
1253 .unwrap(),
1254 );
1255
1256 let fetch = store.try_begin_fetch(project_key).unwrap();
1257 let fetch = fetch.complete(ProjectState::Disabled.into());
1258 assert!(store.complete_fetch(fetch).is_none());
1259 assert_state!(store, project_key, ProjectState::Disabled);
1260
1261 tokio::time::advance(Duration::from_secs(20)).await;
1263
1264 let Some(Action::Eviction(eviction)) = store.poll().await else {
1267 panic!();
1268 };
1269 assert_eq!(eviction.project_key(), project_key);
1270 store.evict(eviction);
1271
1272 assert!(
1276 tokio::time::timeout(Duration::from_secs(60), store.poll())
1277 .await
1278 .is_err()
1279 );
1280 }
1281
1282 #[tokio::test(start_paused = true)]
1283 async fn test_store_refresh_during_eviction() {
1284 let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
1285 let mut store = ProjectStore::new(
1286 &relay_config::Config::from_json_value(serde_json::json!({
1287 "cache": {
1288 "project_expiry": 5,
1289 "project_grace_period": 5,
1290 "project_refresh_interval": 7,
1291 }
1292 }))
1293 .unwrap(),
1294 );
1295
1296 let fetch = store.try_begin_fetch(project_key).unwrap();
1297 let fetch = fetch.complete(ProjectState::Disabled.into());
1298 assert!(store.complete_fetch(fetch).is_none());
1299 assert_state!(store, project_key, ProjectState::Disabled);
1300
1301 tokio::time::advance(Duration::from_secs(20)).await;
1303
1304 let Some(Action::Eviction(eviction)) = store.poll().await else {
1308 panic!();
1309 };
1310 let Some(Action::Refresh(refresh)) = store.poll().await else {
1311 panic!();
1312 };
1313 assert_eq!(eviction.project_key(), project_key);
1314 assert_eq!(refresh.project_key(), project_key);
1315 store.evict(eviction);
1316
1317 assert!(store.refresh(refresh).is_none());
1318 }
1319}