use futures::StreamExt;
use std::fmt;
use std::sync::Arc;
use tokio::time::Instant;
use arc_swap::ArcSwap;
use relay_base_schema::project::ProjectKey;
use relay_config::Config;
use relay_quotas::CachedRateLimits;
use relay_sampling::evaluation::ReservoirCounters;
use relay_statsd::metric;
use crate::services::projects::project::{ProjectState, Revision};
use crate::services::projects::source::SourceProjectState;
use crate::statsd::RelayHistograms;
use crate::utils::{RetryBackoff, UniqueScheduledQueue};
#[derive(Default)]
pub struct ProjectStore {
shared: Arc<Shared>,
private: hashbrown::HashMap<ProjectKey, PrivateProjectState>,
evictions: UniqueScheduledQueue<ProjectKey>,
}
impl ProjectStore {
pub fn shared(&self) -> Arc<Shared> {
Arc::clone(&self.shared)
}
pub fn try_begin_fetch(&mut self, project_key: ProjectKey, config: &Config) -> Option<Fetch> {
let fetch = self
.get_or_create(project_key, config)
.try_begin_fetch(config);
if fetch.is_some() {
self.evictions.remove(&project_key);
}
fetch
}
#[must_use = "an incomplete fetch must be retried"]
pub fn complete_fetch(&mut self, fetch: CompletedFetch, config: &Config) -> Option<Fetch> {
let project_key = fetch.project_key;
debug_assert!(self.shared.projects.pin().get(&project_key).is_some());
debug_assert!(self.private.get(&project_key).is_some());
let mut project = self.get_or_create(project_key, config);
let expiry = project.complete_fetch(fetch, config);
let new_fetch = project.try_begin_fetch(config);
if let Some(ExpiryTime(when)) = expiry {
debug_assert!(
new_fetch.is_none(),
"there cannot be a new fetch and a scheduled expiry"
);
self.evictions.schedule(when, project_key);
}
metric!(
histogram(RelayHistograms::ProjectStateCacheSize) = self.shared.projects.len() as u64,
storage = "shared"
);
metric!(
histogram(RelayHistograms::ProjectStateCacheSize) = self.private.len() as u64,
storage = "private"
);
new_fetch
}
pub async fn next_eviction(&mut self) -> Option<Eviction> {
if self.evictions.is_empty() {
return None;
}
self.evictions.next().await.map(Eviction)
}
pub fn evict(&mut self, Eviction(project_key): Eviction) {
let Some(private) = self.private.remove(&project_key) else {
debug_assert!(false, "no private state for eviction");
return;
};
debug_assert!(
matches!(private.state, FetchState::Complete { .. }),
"private state must be completed"
);
let shared = self.shared.projects.pin();
let _removed = shared.remove(&project_key);
debug_assert!(
_removed.is_some(),
"an expired project must exist in the shared state"
);
}
fn get_or_create(&mut self, project_key: ProjectKey, config: &Config) -> ProjectRef<'_> {
#[cfg(debug_assertions)]
if self.private.contains_key(&project_key) {
debug_assert!(self.shared.projects.pin().contains_key(&project_key));
}
let private = self
.private
.entry(project_key)
.or_insert_with(|| PrivateProjectState::new(project_key, config));
let shared = self
.shared
.projects
.pin()
.get_or_insert_with(project_key, Default::default)
.clone();
ProjectRef { private, shared }
}
}
#[derive(Default)]
pub struct Shared {
projects: papaya::HashMap<ProjectKey, SharedProjectState, ahash::RandomState>,
}
impl Shared {
pub fn get_or_create(&self, project_key: ProjectKey) -> SharedProject {
let projects = self.projects.pin();
if let Some(project) = projects.get(&project_key) {
return project.to_shared_project();
}
match projects.try_insert(project_key, Default::default()) {
Ok(inserted) => inserted.to_shared_project(),
Err(occupied) => occupied.current.to_shared_project(),
}
}
}
#[cfg(test)]
impl Shared {
pub fn test_set_project_state(&self, project_key: ProjectKey, state: ProjectState) {
self.projects
.pin()
.get_or_insert_with(project_key, Default::default)
.set_project_state(state);
}
pub fn test_has_project_created(&self, project_key: ProjectKey) -> bool {
self.projects.pin().contains_key(&project_key)
}
}
impl fmt::Debug for Shared {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Shared")
.field("num_projects", &self.projects.len())
.finish()
}
}
pub struct SharedProject(Arc<SharedProjectStateInner>);
impl SharedProject {
pub fn project_state(&self) -> &ProjectState {
&self.0.state
}
pub fn cached_rate_limits(&self) -> &CachedRateLimits {
&self.0.rate_limits
}
pub fn reservoir_counters(&self) -> &ReservoirCounters {
&self.0.reservoir_counters
}
}
#[cfg(test)]
impl SharedProject {
pub fn for_test(state: ProjectState) -> Self {
Self(Arc::new(SharedProjectStateInner {
state,
..Default::default()
}))
}
}
struct ProjectRef<'a> {
shared: SharedProjectState,
private: &'a mut PrivateProjectState,
}
impl ProjectRef<'_> {
fn try_begin_fetch(&mut self, config: &Config) -> Option<Fetch> {
let now = Instant::now();
self.private
.try_begin_fetch(now, config)
.map(|fetch| fetch.with_revision(self.shared.revision()))
}
fn complete_fetch(&mut self, fetch: CompletedFetch, config: &Config) -> Option<ExpiryTime> {
let now = Instant::now();
self.private.complete_fetch(&fetch, now);
match fetch.state {
SourceProjectState::New(state) if !state.is_pending() => {
self.shared.set_project_state(state);
}
_ => {}
}
self.private.expiry_time(config)
}
}
#[must_use = "an eviction must be used"]
pub struct Eviction(ProjectKey);
impl Eviction {
pub fn project_key(&self) -> ProjectKey {
self.0
}
}
#[must_use = "a fetch must be executed"]
#[derive(Debug)]
pub struct Fetch {
project_key: ProjectKey,
when: Option<Instant>,
revision: Revision,
}
impl Fetch {
pub fn project_key(&self) -> ProjectKey {
self.project_key
}
pub fn when(&self) -> Option<Instant> {
self.when
}
pub fn revision(&self) -> Revision {
self.revision.clone()
}
pub fn complete(self, state: SourceProjectState) -> CompletedFetch {
CompletedFetch {
project_key: self.project_key,
state,
}
}
fn with_revision(mut self, revision: Revision) -> Self {
self.revision = revision;
self
}
}
#[must_use = "a completed fetch must be acted upon"]
#[derive(Debug)]
pub struct CompletedFetch {
project_key: ProjectKey,
state: SourceProjectState,
}
impl CompletedFetch {
pub fn project_key(&self) -> ProjectKey {
self.project_key
}
fn is_pending(&self) -> bool {
match &self.state {
SourceProjectState::New(state) => state.is_pending(),
SourceProjectState::NotModified => false,
}
}
}
#[derive(Debug, Default, Clone)]
struct SharedProjectState(Arc<ArcSwap<SharedProjectStateInner>>);
impl SharedProjectState {
fn set_project_state(&self, state: ProjectState) {
let prev = self.0.rcu(|stored| SharedProjectStateInner {
state: state.clone(),
rate_limits: Arc::clone(&stored.rate_limits),
reservoir_counters: Arc::clone(&stored.reservoir_counters),
});
if let Some(state) = state.enabled() {
let config = state.config.sampling.as_ref();
if let Some(config) = config.and_then(|eb| eb.as_ref().ok()) {
if let Ok(mut counters) = prev.reservoir_counters.try_lock() {
counters.retain(|key, _| config.rules.iter().any(|rule| rule.id == *key));
}
}
}
}
fn revision(&self) -> Revision {
self.0.as_ref().load().state.revision().clone()
}
fn to_shared_project(&self) -> SharedProject {
SharedProject(self.0.as_ref().load_full())
}
}
#[derive(Debug, Default)]
struct SharedProjectStateInner {
state: ProjectState,
rate_limits: Arc<CachedRateLimits>,
reservoir_counters: ReservoirCounters,
}
#[derive(Debug)]
enum FetchState {
InProgress,
Pending {
next_fetch_attempt: Option<Instant>,
},
Complete {
last_fetch: LastFetch,
},
}
struct PrivateProjectState {
project_key: ProjectKey,
state: FetchState,
backoff: RetryBackoff,
}
impl PrivateProjectState {
fn new(project_key: ProjectKey, config: &Config) -> Self {
Self {
project_key,
state: FetchState::Pending {
next_fetch_attempt: None,
},
backoff: RetryBackoff::new(config.http_max_retry_interval()),
}
}
fn expiry_time(&self, config: &Config) -> Option<ExpiryTime> {
match &self.state {
FetchState::Complete { last_fetch } => Some(last_fetch.expiry_time(config)),
_ => None,
}
}
fn try_begin_fetch(&mut self, now: Instant, config: &Config) -> Option<Fetch> {
let when = match &self.state {
FetchState::InProgress {} => {
relay_log::trace!(
tags.project_key = self.project_key.as_str(),
"project fetch skipped, fetch in progress"
);
return None;
}
FetchState::Pending { next_fetch_attempt } => {
*next_fetch_attempt
}
FetchState::Complete { last_fetch } => {
if last_fetch.check_expiry(now, config).is_fresh() {
relay_log::trace!(
tags.project_key = self.project_key.as_str(),
"project fetch skipped, already up to date"
);
return None;
}
None
}
};
self.state = FetchState::InProgress {};
relay_log::trace!(
tags.project_key = &self.project_key.as_str(),
attempts = self.backoff.attempt() + 1,
"project state fetch scheduled in {:?}",
when.unwrap_or(now).saturating_duration_since(now),
);
Some(Fetch {
project_key: self.project_key,
when,
revision: Revision::default(),
})
}
fn complete_fetch(&mut self, fetch: &CompletedFetch, now: Instant) {
debug_assert!(
matches!(self.state, FetchState::InProgress),
"fetch completed while there was no current fetch registered"
);
if fetch.is_pending() {
let next_backoff = self.backoff.next_backoff();
let next_fetch_attempt = match next_backoff.is_zero() {
false => now.checked_add(next_backoff),
true => None,
};
self.state = FetchState::Pending { next_fetch_attempt };
relay_log::trace!(
tags.project_key = &self.project_key.as_str(),
"project state fetch completed but still pending"
);
} else {
relay_log::trace!(
tags.project_key = &self.project_key.as_str(),
"project state fetch completed with non-pending config"
);
self.backoff.reset();
self.state = FetchState::Complete {
last_fetch: LastFetch(now),
};
}
}
}
#[derive(Debug, Copy, Clone)]
struct LastFetch(Instant);
impl LastFetch {
fn check_expiry(&self, now: Instant, config: &Config) -> Expiry {
let expiry = config.project_cache_expiry();
let elapsed = now.saturating_duration_since(self.0);
if elapsed >= expiry + config.project_grace_period() {
Expiry::Expired
} else if elapsed >= expiry {
Expiry::Stale
} else {
Expiry::Fresh
}
}
fn expiry_time(&self, config: &Config) -> ExpiryTime {
ExpiryTime(self.0 + config.project_grace_period() + config.project_cache_expiry())
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
enum Expiry {
Fresh,
Stale,
Expired,
}
impl Expiry {
fn is_fresh(&self) -> bool {
matches!(self, Self::Fresh)
}
}
#[must_use = "an expiry time must be used to schedule an eviction"]
struct ExpiryTime(Instant);
#[cfg(test)]
mod tests {
use std::time::Duration;
use super::*;
async fn collect_evicted(store: &mut ProjectStore) -> Vec<ProjectKey> {
let mut evicted = Vec::new();
while let Ok(Some(eviction)) =
tokio::time::timeout(Duration::from_nanos(5), store.next_eviction()).await
{
evicted.push(eviction.0);
store.evict(eviction);
}
evicted
}
macro_rules! assert_state {
($store:ident, $project_key:ident, $state:pat) => {
assert!(matches!(
$store.shared().get_or_create($project_key).project_state(),
$state
));
};
}
#[tokio::test(start_paused = true)]
async fn test_store_fetch() {
let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
let mut store = ProjectStore::default();
let config = Default::default();
let fetch = store.try_begin_fetch(project_key, &config).unwrap();
assert_eq!(fetch.project_key(), project_key);
assert_eq!(fetch.when(), None);
assert_eq!(fetch.revision().as_str(), None);
assert_state!(store, project_key, ProjectState::Pending);
assert!(store.try_begin_fetch(project_key, &config).is_none());
let fetch = fetch.complete(ProjectState::Pending.into());
let fetch = store.complete_fetch(fetch, &config).unwrap();
assert_eq!(fetch.project_key(), project_key);
assert_eq!(fetch.when(), None);
assert_eq!(fetch.revision().as_str(), None);
assert_state!(store, project_key, ProjectState::Pending);
let fetch = fetch.complete(ProjectState::Pending.into());
let fetch = store.complete_fetch(fetch, &config).unwrap();
assert_eq!(fetch.project_key(), project_key);
assert!(fetch.when() > Some(Instant::now()));
assert_eq!(fetch.revision().as_str(), None);
assert_state!(store, project_key, ProjectState::Pending);
let fetch = fetch.complete(ProjectState::Disabled.into());
assert!(store.complete_fetch(fetch, &config).is_none());
assert_state!(store, project_key, ProjectState::Disabled);
assert!(store.try_begin_fetch(project_key, &config).is_none());
}
#[tokio::test(start_paused = true)]
async fn test_store_fetch_pending_does_not_replace_state() {
let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
let mut store = ProjectStore::default();
let config = Config::from_json_value(serde_json::json!({
"cache": {
"project_expiry": 5,
"project_grace_period": 5,
}
}))
.unwrap();
let fetch = store.try_begin_fetch(project_key, &config).unwrap();
let fetch = fetch.complete(ProjectState::Disabled.into());
assert!(store.complete_fetch(fetch, &config).is_none());
assert_state!(store, project_key, ProjectState::Disabled);
tokio::time::advance(Duration::from_secs(6)).await;
let fetch = store.try_begin_fetch(project_key, &config).unwrap();
let fetch = fetch.complete(ProjectState::Pending.into());
let fetch = store.complete_fetch(fetch, &config).unwrap();
assert_state!(store, project_key, ProjectState::Disabled);
let fetch = fetch.complete(ProjectState::new_allowed().into());
assert!(store.complete_fetch(fetch, &config).is_none());
assert_state!(store, project_key, ProjectState::Enabled(_));
}
#[tokio::test(start_paused = true)]
async fn test_store_evict_projects() {
let project_key1 = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
let project_key2 = ProjectKey::parse("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb").unwrap();
let mut store = ProjectStore::default();
let config = Config::from_json_value(serde_json::json!({
"cache": {
"project_expiry": 5,
"project_grace_period": 0,
}
}))
.unwrap();
let fetch = store.try_begin_fetch(project_key1, &config).unwrap();
let fetch = fetch.complete(ProjectState::Disabled.into());
assert!(store.complete_fetch(fetch, &config).is_none());
assert_eq!(collect_evicted(&mut store).await, Vec::new());
assert_state!(store, project_key1, ProjectState::Disabled);
tokio::time::advance(Duration::from_secs(3)).await;
assert_eq!(collect_evicted(&mut store).await, Vec::new());
assert_state!(store, project_key1, ProjectState::Disabled);
let fetch = store.try_begin_fetch(project_key2, &config).unwrap();
let fetch = fetch.complete(ProjectState::Disabled.into());
assert!(store.complete_fetch(fetch, &config).is_none());
tokio::time::advance(Duration::from_secs(3)).await;
assert_eq!(collect_evicted(&mut store).await, vec![project_key1]);
assert_state!(store, project_key1, ProjectState::Pending);
assert_state!(store, project_key2, ProjectState::Disabled);
}
#[tokio::test(start_paused = true)]
async fn test_store_evict_projects_pending_not_expired() {
let project_key1 = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
let project_key2 = ProjectKey::parse("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb").unwrap();
let mut store = ProjectStore::default();
let config = Config::from_json_value(serde_json::json!({
"cache": {
"project_expiry": 5,
"project_grace_period": 0,
}
}))
.unwrap();
let fetch = store.try_begin_fetch(project_key1, &config).unwrap();
store.shared().get_or_create(project_key2);
tokio::time::advance(Duration::from_secs(6)).await;
assert_eq!(collect_evicted(&mut store).await, Vec::new());
let fetch = fetch.complete(ProjectState::Disabled.into());
assert!(store.complete_fetch(fetch, &config).is_none());
assert_eq!(collect_evicted(&mut store).await, Vec::new());
tokio::time::advance(Duration::from_secs(4)).await;
assert_eq!(collect_evicted(&mut store).await, Vec::new());
assert_state!(store, project_key1, ProjectState::Disabled);
tokio::time::advance(Duration::from_millis(1001)).await;
assert_eq!(collect_evicted(&mut store).await, vec![project_key1]);
assert_state!(store, project_key1, ProjectState::Pending);
assert_state!(store, project_key2, ProjectState::Pending);
}
#[tokio::test(start_paused = true)]
async fn test_store_evict_projects_stale() {
let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
let mut store = ProjectStore::default();
let config = Config::from_json_value(serde_json::json!({
"cache": {
"project_expiry": 5,
"project_grace_period": 5,
}
}))
.unwrap();
let fetch = store.try_begin_fetch(project_key, &config).unwrap();
let fetch = fetch.complete(ProjectState::Disabled.into());
assert!(store.complete_fetch(fetch, &config).is_none());
tokio::time::advance(Duration::from_millis(9500)).await;
assert_eq!(collect_evicted(&mut store).await, Vec::new());
assert_state!(store, project_key, ProjectState::Disabled);
tokio::time::advance(Duration::from_secs(1)).await;
assert_eq!(collect_evicted(&mut store).await, vec![project_key]);
assert_state!(store, project_key, ProjectState::Pending);
}
#[tokio::test(start_paused = true)]
async fn test_store_no_eviction_during_fetch() {
let project_key = ProjectKey::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
let mut store = ProjectStore::default();
let config = Config::from_json_value(serde_json::json!({
"cache": {
"project_expiry": 5,
"project_grace_period": 5,
}
}))
.unwrap();
let fetch = store.try_begin_fetch(project_key, &config).unwrap();
tokio::time::advance(Duration::from_millis(10500)).await;
assert_eq!(collect_evicted(&mut store).await, Vec::new());
let fetch = fetch.complete(ProjectState::Disabled.into());
assert!(store.complete_fetch(fetch, &config).is_none());
tokio::time::advance(Duration::from_millis(5001)).await;
let fetch = store.try_begin_fetch(project_key, &config).unwrap();
tokio::time::advance(Duration::from_millis(10500)).await;
assert_eq!(collect_evicted(&mut store).await, Vec::new());
let fetch = fetch.complete(ProjectState::Disabled.into());
assert!(store.complete_fetch(fetch, &config).is_none());
tokio::time::advance(Duration::from_millis(9500)).await;
assert_eq!(collect_evicted(&mut store).await, Vec::new());
tokio::time::advance(Duration::from_millis(501)).await;
assert_eq!(collect_evicted(&mut store).await, vec![project_key]);
assert_state!(store, project_key, ProjectState::Pending);
}
}