1use std::fmt;
2use std::time::Duration;
3
4use parking_lot::{MappedRwLockWriteGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};
5use relay_common::time::UnixTimestamp;
6use relay_statsd::metric;
7
8use crate::redis::quota::QuotaScoping;
9use crate::statsd::{CardinalityLimiterCounters, CardinalityLimiterTimers};
10use crate::window::Slot;
11
12#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
15pub enum CacheOutcome {
16 Accepted,
18 Rejected,
20 Unknown,
22}
23
24pub struct Cache {
28 inner: RwLock<Inner>,
29}
30
31impl Cache {
32 pub fn new(vacuum_interval: Duration) -> Self {
33 Self {
34 inner: RwLock::new(Inner {
35 cache: Default::default(),
36 vacuum_interval,
37 last_vacuum: UnixTimestamp::from_secs(0),
38 }),
39 }
40 }
41
42 pub fn read(&self, timestamp: UnixTimestamp) -> CacheRead<'_> {
47 let inner = self.inner.read();
48 CacheRead::new(inner, timestamp)
49 }
50
51 pub fn update(&self, scope: &QuotaScoping, timestamp: UnixTimestamp) -> CacheUpdate<'_> {
56 let mut inner = self.inner.write();
57
58 inner.vacuum(timestamp);
59
60 let slot = scope.active_slot(timestamp);
61 let cache = inner.cache.entry_ref(scope).or_default();
62
63 if slot < cache.current_slot {
65 return CacheUpdate::noop();
66 }
67
68 if slot > cache.current_slot {
70 cache.reset(slot);
71 }
72
73 CacheUpdate::new(inner, scope)
74 }
75}
76
77impl fmt::Debug for Cache {
78 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
79 f.debug_tuple("Cache").finish()
80 }
81}
82
83pub struct CacheRead<'a> {
87 inner: RwLockReadGuard<'a, Inner>,
88 timestamp: UnixTimestamp,
89}
90
91impl<'a> CacheRead<'a> {
93 fn new(inner: RwLockReadGuard<'a, Inner>, timestamp: UnixTimestamp) -> Self {
95 Self { inner, timestamp }
96 }
97
98 pub fn check(&self, scope: &QuotaScoping, hash: u32, limit: u32) -> CacheOutcome {
99 let Some(cache) = self.inner.cache.get(scope) else {
100 return CacheOutcome::Unknown;
101 };
102
103 let slot = scope.active_slot(self.timestamp);
104 cache.check(slot, hash, limit)
105 }
106}
107
108pub struct CacheUpdate<'a>(Option<MappedRwLockWriteGuard<'a, ScopedCache>>);
112
113impl<'a> CacheUpdate<'a> {
114 fn new(inner: RwLockWriteGuard<'a, Inner>, key: &QuotaScoping) -> Self {
116 Self(RwLockWriteGuard::try_map(inner, |inner| inner.cache.get_mut(key)).ok())
117 }
118
119 fn noop() -> Self {
121 Self(None)
122 }
123
124 pub fn accept(&mut self, hash: u32) {
127 if let Some(cache) = self.0.as_mut() {
128 cache.insert(hash);
129 }
130 }
131}
132
133#[derive(Debug)]
135struct Inner {
136 cache: hashbrown::HashMap<QuotaScoping, ScopedCache>,
137 vacuum_interval: Duration,
138 last_vacuum: UnixTimestamp,
139}
140
141impl Inner {
142 fn vacuum(&mut self, ts: UnixTimestamp) {
143 let secs_since_last_vacuum = ts.as_secs().saturating_sub(self.last_vacuum.as_secs());
145 if secs_since_last_vacuum < self.vacuum_interval.as_secs() {
146 return;
147 }
148 self.last_vacuum = ts;
149
150 let expired = metric!(timer(CardinalityLimiterTimers::CacheVacuum), {
151 self.cache
152 .extract_if(|scope, cache| cache.current_slot < scope.active_slot(ts))
153 .count()
154 });
155 metric!(counter(CardinalityLimiterCounters::RedisCacheVacuum) += expired as i64);
156 }
157}
158
159#[derive(Debug, Default)]
161struct ScopedCache {
162 hashes: hashbrown::HashSet<u32>,
164 current_slot: Slot,
165}
166
167impl ScopedCache {
168 fn check(&self, slot: Slot, hash: u32, limit: u32) -> CacheOutcome {
169 if slot != self.current_slot {
170 return CacheOutcome::Unknown;
171 }
172
173 if self.hashes.contains(&hash) {
174 CacheOutcome::Accepted
176 } else if self.hashes.len().try_into().unwrap_or(u32::MAX) >= limit {
177 CacheOutcome::Rejected
180 } else {
181 CacheOutcome::Unknown
183 }
184 }
185
186 fn insert(&mut self, hash: u32) {
187 self.hashes.insert(hash);
188 }
189
190 fn reset(&mut self, slot: Slot) {
191 self.current_slot = slot;
192 self.hashes.clear();
193 }
194}
195
196#[cfg(test)]
197mod tests {
198 use relay_base_schema::metrics::{MetricName, MetricNamespace};
199 use relay_base_schema::organization::OrganizationId;
200 use relay_base_schema::project::ProjectId;
201
202 use crate::limiter::{Entry, EntryId};
203 use crate::redis::quota::PartialQuotaScoping;
204 use crate::{CardinalityLimit, CardinalityScope, Scoping, SlidingWindow};
205
206 use super::*;
207
208 fn build_scoping(organization_id: OrganizationId, window: SlidingWindow) -> QuotaScoping {
209 PartialQuotaScoping::new(
210 Scoping {
211 organization_id,
212 project_id: ProjectId::new(1),
213 },
214 &CardinalityLimit {
215 id: String::new(),
216 passive: false,
217 report: false,
218 window,
219 limit: 100,
220 scope: CardinalityScope::Organization,
221 namespace: None,
222 },
223 )
224 .unwrap()
225 .complete(Entry {
226 id: EntryId(0),
227 namespace: MetricNamespace::Spans,
228 name: &MetricName::from("foobar"),
229 hash: 123,
230 })
231 }
232
233 #[test]
234 fn test_cache() {
235 let cache = Cache::new(Duration::from_secs(180));
236
237 let window = SlidingWindow {
238 window_seconds: 100,
239 granularity_seconds: 10,
240 };
241 let scope = build_scoping(OrganizationId::new(1), window);
242 let now = UnixTimestamp::now();
243 let future = now + Duration::from_secs(window.granularity_seconds + 1);
244
245 {
246 let cache = cache.read(now);
247 assert_eq!(cache.check(&scope, 1, 1), CacheOutcome::Unknown);
248 }
249
250 {
251 let mut cache = cache.update(&scope, now);
252 cache.accept(1);
253 cache.accept(2);
254 }
255
256 {
257 let r1 = cache.read(now);
258 assert_eq!(r1.check(&scope, 1, 1), CacheOutcome::Accepted);
260 assert_eq!(r1.check(&scope, 1, 2), CacheOutcome::Accepted);
261 assert_eq!(r1.check(&scope, 2, 1), CacheOutcome::Accepted);
262
263 assert_eq!(r1.check(&scope, 3, 3), CacheOutcome::Unknown);
265 assert_eq!(r1.check(&scope, 3, 2), CacheOutcome::Rejected);
266
267 let r2 = cache.read(future);
269 assert_eq!(r2.check(&scope, 1, 1), CacheOutcome::Unknown);
270 assert_eq!(r2.check(&scope, 2, 2), CacheOutcome::Unknown);
271 }
272
273 {
274 let mut cache = cache.update(&scope, future);
276 cache.accept(1);
277 }
278
279 {
280 let future = cache.read(future);
281 assert_eq!(future.check(&scope, 1, 1), CacheOutcome::Accepted);
283 assert_eq!(future.check(&scope, 2, 1), CacheOutcome::Rejected);
284
285 let past = cache.read(now);
286 assert_eq!(past.check(&scope, 1, 1), CacheOutcome::Unknown);
288 assert_eq!(past.check(&scope, 2, 1), CacheOutcome::Unknown);
289 assert_eq!(past.check(&scope, 3, 99), CacheOutcome::Unknown);
290 }
291 }
292
293 #[test]
294 fn test_cache_different_scopings() {
295 let cache = Cache::new(Duration::from_secs(180));
296
297 let window = SlidingWindow {
298 window_seconds: 100,
299 granularity_seconds: 10,
300 };
301 let scope1 = build_scoping(OrganizationId::new(1), window);
302 let scope2 = build_scoping(OrganizationId::new(2), window);
303
304 let now = UnixTimestamp::now();
305
306 {
307 let mut cache = cache.update(&scope1, now);
308 cache.accept(1);
309 }
310
311 {
312 let mut cache = cache.update(&scope2, now);
313 cache.accept(1);
314 cache.accept(2);
315 }
316
317 {
318 let cache = cache.read(now);
319 assert_eq!(cache.check(&scope1, 1, 99), CacheOutcome::Accepted);
320 assert_eq!(cache.check(&scope1, 2, 99), CacheOutcome::Unknown);
321 assert_eq!(cache.check(&scope1, 3, 99), CacheOutcome::Unknown);
322 assert_eq!(cache.check(&scope2, 3, 1), CacheOutcome::Rejected);
323 assert_eq!(cache.check(&scope2, 1, 99), CacheOutcome::Accepted);
324 assert_eq!(cache.check(&scope2, 2, 99), CacheOutcome::Accepted);
325 assert_eq!(cache.check(&scope2, 3, 99), CacheOutcome::Unknown);
326 assert_eq!(cache.check(&scope2, 3, 2), CacheOutcome::Rejected);
327 }
328 }
329
330 #[test]
331 fn test_cache_vacuum() {
332 let vacuum_interval = Duration::from_secs(30);
333 let cache = Cache::new(vacuum_interval);
334
335 let window = SlidingWindow {
336 window_seconds: vacuum_interval.as_secs() * 10,
337 granularity_seconds: vacuum_interval.as_secs() * 2,
338 };
339 let scope1 = build_scoping(OrganizationId::new(1), window);
340 let scope2 = build_scoping(OrganizationId::new(2), window);
341
342 let now = UnixTimestamp::now();
343 let in_interval = now + Duration::from_secs(vacuum_interval.as_secs() - 1);
344 let future = now + Duration::from_secs(vacuum_interval.as_secs() * 3);
345
346 {
347 let mut cache = cache.update(&scope1, now);
348 cache.accept(10);
349 }
350
351 {
352 let mut cache = cache.update(&scope2, now);
353 cache.accept(20);
354 }
355
356 {
357 let cache = cache.read(now);
359 assert_eq!(cache.check(&scope1, 10, 100), CacheOutcome::Accepted);
360 assert_eq!(cache.check(&scope2, 20, 100), CacheOutcome::Accepted);
361 }
362
363 {
364 let mut cache = cache.update(&scope2, in_interval);
366 cache.accept(21);
367 }
368
369 {
370 let cache = cache.read(now);
372 assert_eq!(cache.check(&scope1, 10, 100), CacheOutcome::Accepted);
373 }
374
375 {
376 let mut cache = cache.update(&scope2, future);
378 cache.accept(22);
379 }
380
381 {
382 let cache = cache.read(now);
384 assert_eq!(cache.check(&scope1, 10, 100), CacheOutcome::Unknown);
385 assert_eq!(cache.check(&scope1, 11, 100), CacheOutcome::Unknown);
386 assert_eq!(cache.check(&scope2, 20, 100), CacheOutcome::Unknown);
387 assert_eq!(cache.check(&scope2, 21, 100), CacheOutcome::Unknown);
388 }
389
390 {
391 let cache = cache.read(future);
393 assert_eq!(cache.check(&scope2, 22, 100), CacheOutcome::Accepted);
394 }
395 }
396}