use parking_lot::{MappedRwLockWriteGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};
use std::fmt;
use std::time::Duration;
use relay_common::time::UnixTimestamp;
use relay_statsd::metric;
use crate::redis::quota::QuotaScoping;
use crate::statsd::{CardinalityLimiterCounters, CardinalityLimiterTimers};
use crate::window::Slot;
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
pub enum CacheOutcome {
Accepted,
Rejected,
Unknown,
}
pub struct Cache {
inner: RwLock<Inner>,
}
impl Cache {
pub fn new(vacuum_interval: Duration) -> Self {
Self {
inner: RwLock::new(Inner {
cache: Default::default(),
vacuum_interval,
last_vacuum: UnixTimestamp::from_secs(0),
}),
}
}
pub fn read(&self, timestamp: UnixTimestamp) -> CacheRead<'_> {
let inner = self.inner.read();
CacheRead::new(inner, timestamp)
}
pub fn update(&self, scope: &QuotaScoping, timestamp: UnixTimestamp) -> CacheUpdate<'_> {
let mut inner = self.inner.write();
inner.vacuum(timestamp);
let slot = scope.active_slot(timestamp);
let cache = inner.cache.entry_ref(scope).or_default();
if slot < cache.current_slot {
return CacheUpdate::noop();
}
if slot > cache.current_slot {
cache.reset(slot);
}
CacheUpdate::new(inner, scope)
}
}
impl fmt::Debug for Cache {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let inner = self.inner.read();
f.debug_tuple("Cache").field(&inner.cache).finish()
}
}
pub struct CacheRead<'a> {
inner: RwLockReadGuard<'a, Inner>,
timestamp: UnixTimestamp,
}
impl<'a> CacheRead<'a> {
fn new(inner: RwLockReadGuard<'a, Inner>, timestamp: UnixTimestamp) -> Self {
Self { inner, timestamp }
}
pub fn check(&self, scope: &QuotaScoping, hash: u32, limit: u32) -> CacheOutcome {
let Some(cache) = self.inner.cache.get(scope) else {
return CacheOutcome::Unknown;
};
let slot = scope.active_slot(self.timestamp);
cache.check(slot, hash, limit)
}
}
pub struct CacheUpdate<'a>(Option<MappedRwLockWriteGuard<'a, ScopedCache>>);
impl<'a> CacheUpdate<'a> {
fn new(inner: RwLockWriteGuard<'a, Inner>, key: &QuotaScoping) -> Self {
Self(RwLockWriteGuard::try_map(inner, |inner| inner.cache.get_mut(key)).ok())
}
fn noop() -> Self {
Self(None)
}
pub fn accept(&mut self, hash: u32) {
if let Some(cache) = self.0.as_mut() {
cache.insert(hash);
}
}
}
#[derive(Debug)]
struct Inner {
cache: hashbrown::HashMap<QuotaScoping, ScopedCache>,
vacuum_interval: Duration,
last_vacuum: UnixTimestamp,
}
impl Inner {
fn vacuum(&mut self, ts: UnixTimestamp) {
let secs_since_last_vacuum = ts.as_secs().saturating_sub(self.last_vacuum.as_secs());
if secs_since_last_vacuum < self.vacuum_interval.as_secs() {
return;
}
self.last_vacuum = ts;
let expired = metric!(timer(CardinalityLimiterTimers::CacheVacuum), {
self.cache
.extract_if(|scope, cache| cache.current_slot < scope.active_slot(ts))
.count()
});
metric!(counter(CardinalityLimiterCounters::RedisCacheVacuum) += expired as i64);
}
}
#[derive(Debug, Default)]
struct ScopedCache {
hashes: hashbrown::HashSet<u32>,
current_slot: Slot,
}
impl ScopedCache {
fn check(&self, slot: Slot, hash: u32, limit: u32) -> CacheOutcome {
if slot != self.current_slot {
return CacheOutcome::Unknown;
}
if self.hashes.contains(&hash) {
CacheOutcome::Accepted
} else if self.hashes.len().try_into().unwrap_or(u32::MAX) >= limit {
CacheOutcome::Rejected
} else {
CacheOutcome::Unknown
}
}
fn insert(&mut self, hash: u32) {
self.hashes.insert(hash);
}
fn reset(&mut self, slot: Slot) {
self.current_slot = slot;
self.hashes.clear();
}
}
#[cfg(test)]
mod tests {
use relay_base_schema::metrics::{MetricName, MetricNamespace};
use relay_base_schema::organization::OrganizationId;
use relay_base_schema::project::ProjectId;
use crate::limiter::{Entry, EntryId};
use crate::redis::quota::PartialQuotaScoping;
use crate::{CardinalityLimit, CardinalityScope, Scoping, SlidingWindow};
use super::*;
fn build_scoping(organization_id: OrganizationId, window: SlidingWindow) -> QuotaScoping {
PartialQuotaScoping::new(
Scoping {
organization_id,
project_id: ProjectId::new(1),
},
&CardinalityLimit {
id: String::new(),
passive: false,
report: false,
window,
limit: 100,
scope: CardinalityScope::Organization,
namespace: None,
},
)
.unwrap()
.complete(Entry {
id: EntryId(0),
namespace: MetricNamespace::Spans,
name: &MetricName::from("foobar"),
hash: 123,
})
}
#[test]
fn test_cache() {
let cache = Cache::new(Duration::from_secs(180));
let window = SlidingWindow {
window_seconds: 100,
granularity_seconds: 10,
};
let scope = build_scoping(OrganizationId::new(1), window);
let now = UnixTimestamp::now();
let future = now + Duration::from_secs(window.granularity_seconds + 1);
{
let cache = cache.read(now);
assert_eq!(cache.check(&scope, 1, 1), CacheOutcome::Unknown);
}
{
let mut cache = cache.update(&scope, now);
cache.accept(1);
cache.accept(2);
}
{
let r1 = cache.read(now);
assert_eq!(r1.check(&scope, 1, 1), CacheOutcome::Accepted);
assert_eq!(r1.check(&scope, 1, 2), CacheOutcome::Accepted);
assert_eq!(r1.check(&scope, 2, 1), CacheOutcome::Accepted);
assert_eq!(r1.check(&scope, 3, 3), CacheOutcome::Unknown);
assert_eq!(r1.check(&scope, 3, 2), CacheOutcome::Rejected);
let r2 = cache.read(future);
assert_eq!(r2.check(&scope, 1, 1), CacheOutcome::Unknown);
assert_eq!(r2.check(&scope, 2, 2), CacheOutcome::Unknown);
}
{
let mut cache = cache.update(&scope, future);
cache.accept(1);
}
{
let future = cache.read(future);
assert_eq!(future.check(&scope, 1, 1), CacheOutcome::Accepted);
assert_eq!(future.check(&scope, 2, 1), CacheOutcome::Rejected);
let past = cache.read(now);
assert_eq!(past.check(&scope, 1, 1), CacheOutcome::Unknown);
assert_eq!(past.check(&scope, 2, 1), CacheOutcome::Unknown);
assert_eq!(past.check(&scope, 3, 99), CacheOutcome::Unknown);
}
}
#[test]
fn test_cache_different_scopings() {
let cache = Cache::new(Duration::from_secs(180));
let window = SlidingWindow {
window_seconds: 100,
granularity_seconds: 10,
};
let scope1 = build_scoping(OrganizationId::new(1), window);
let scope2 = build_scoping(OrganizationId::new(2), window);
let now = UnixTimestamp::now();
{
let mut cache = cache.update(&scope1, now);
cache.accept(1);
}
{
let mut cache = cache.update(&scope2, now);
cache.accept(1);
cache.accept(2);
}
{
let cache = cache.read(now);
assert_eq!(cache.check(&scope1, 1, 99), CacheOutcome::Accepted);
assert_eq!(cache.check(&scope1, 2, 99), CacheOutcome::Unknown);
assert_eq!(cache.check(&scope1, 3, 99), CacheOutcome::Unknown);
assert_eq!(cache.check(&scope2, 3, 1), CacheOutcome::Rejected);
assert_eq!(cache.check(&scope2, 1, 99), CacheOutcome::Accepted);
assert_eq!(cache.check(&scope2, 2, 99), CacheOutcome::Accepted);
assert_eq!(cache.check(&scope2, 3, 99), CacheOutcome::Unknown);
assert_eq!(cache.check(&scope2, 3, 2), CacheOutcome::Rejected);
}
}
#[test]
fn test_cache_vacuum() {
let vacuum_interval = Duration::from_secs(30);
let cache = Cache::new(vacuum_interval);
let window = SlidingWindow {
window_seconds: vacuum_interval.as_secs() * 10,
granularity_seconds: vacuum_interval.as_secs() * 2,
};
let scope1 = build_scoping(OrganizationId::new(1), window);
let scope2 = build_scoping(OrganizationId::new(2), window);
let now = UnixTimestamp::now();
let in_interval = now + Duration::from_secs(vacuum_interval.as_secs() - 1);
let future = now + Duration::from_secs(vacuum_interval.as_secs() * 3);
{
let mut cache = cache.update(&scope1, now);
cache.accept(10);
}
{
let mut cache = cache.update(&scope2, now);
cache.accept(20);
}
{
let cache = cache.read(now);
assert_eq!(cache.check(&scope1, 10, 100), CacheOutcome::Accepted);
assert_eq!(cache.check(&scope2, 20, 100), CacheOutcome::Accepted);
}
{
let mut cache = cache.update(&scope2, in_interval);
cache.accept(21);
}
{
let cache = cache.read(now);
assert_eq!(cache.check(&scope1, 10, 100), CacheOutcome::Accepted);
}
{
let mut cache = cache.update(&scope2, future);
cache.accept(22);
}
{
let cache = cache.read(now);
assert_eq!(cache.check(&scope1, 10, 100), CacheOutcome::Unknown);
assert_eq!(cache.check(&scope1, 11, 100), CacheOutcome::Unknown);
assert_eq!(cache.check(&scope2, 20, 100), CacheOutcome::Unknown);
assert_eq!(cache.check(&scope2, 21, 100), CacheOutcome::Unknown);
}
{
let cache = cache.read(future);
assert_eq!(cache.check(&scope2, 22, 100), CacheOutcome::Accepted);
}
}
}