relay_cardinality/redis/
cache.rs

1use std::fmt;
2use std::time::Duration;
3
4use parking_lot::{MappedRwLockWriteGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};
5use relay_common::time::UnixTimestamp;
6use relay_statsd::metric;
7
8use crate::redis::quota::QuotaScoping;
9use crate::statsd::{CardinalityLimiterCounters, CardinalityLimiterTimers};
10use crate::window::Slot;
11
12/// Cached outcome, whether the item can be accepted, rejected or the cache has no information about
13/// this hash.
14#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
15pub enum CacheOutcome {
16    /// Hash accepted by cache.
17    Accepted,
18    /// Hash rejected by cache.
19    Rejected,
20    /// Cache has no information about the hash.
21    Unknown,
22}
23
24/// Internal cache remembering already accepted elements and current cardinality.
25///
26/// Only caches for the currently active granule of the sliding window.
27pub struct Cache {
28    inner: RwLock<Inner>,
29}
30
31impl Cache {
32    pub fn new(vacuum_interval: Duration) -> Self {
33        Self {
34            inner: RwLock::new(Inner {
35                cache: Default::default(),
36                vacuum_interval,
37                last_vacuum: UnixTimestamp::from_secs(0),
38            }),
39        }
40    }
41
42    /// Acquires a read lock from the cache and returns a read handle.
43    ///
44    /// All operations done on the handle share the same lock. To release the lock
45    /// the returned [`CacheRead`] must be dropped.
46    pub fn read(&self, timestamp: UnixTimestamp) -> CacheRead<'_> {
47        let inner = self.inner.read();
48        CacheRead::new(inner, timestamp)
49    }
50
51    /// Acquires a write lock from the cache and returns an update handle.
52    ///
53    /// All operations done on the handle share the same lock. To release the lock
54    /// the returned [`CacheUpdate`] must be dropped.
55    pub fn update(&self, scope: &QuotaScoping, timestamp: UnixTimestamp) -> CacheUpdate<'_> {
56        let mut inner = self.inner.write();
57
58        inner.vacuum(timestamp);
59
60        let slot = scope.active_slot(timestamp);
61        let cache = inner.cache.entry_ref(scope).or_default();
62
63        // If the slot is older, don't do anything and give up the lock early.
64        if slot < cache.current_slot {
65            return CacheUpdate::noop();
66        }
67
68        // If the slot is newer than the current slot, reset the cache to the new slot.
69        if slot > cache.current_slot {
70            cache.reset(slot);
71        }
72
73        CacheUpdate::new(inner, scope)
74    }
75}
76
77impl fmt::Debug for Cache {
78    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
79        f.debug_tuple("Cache").finish()
80    }
81}
82
83/// Cache read handle.
84///
85/// Holds a cache read lock, the lock is released on drop.
86pub struct CacheRead<'a> {
87    inner: RwLockReadGuard<'a, Inner>,
88    timestamp: UnixTimestamp,
89}
90
91/// Internal state for [`CacheRead`].
92impl<'a> CacheRead<'a> {
93    /// Creates a new [`CacheRead`] which reads from the cache.
94    fn new(inner: RwLockReadGuard<'a, Inner>, timestamp: UnixTimestamp) -> Self {
95        Self { inner, timestamp }
96    }
97
98    pub fn check(&self, scope: &QuotaScoping, hash: u32, limit: u32) -> CacheOutcome {
99        let Some(cache) = self.inner.cache.get(scope) else {
100            return CacheOutcome::Unknown;
101        };
102
103        let slot = scope.active_slot(self.timestamp);
104        cache.check(slot, hash, limit)
105    }
106}
107
108/// Cache update handle.
109///
110/// Holds a cache write lock, the lock is released on drop.
111pub struct CacheUpdate<'a>(Option<MappedRwLockWriteGuard<'a, ScopedCache>>);
112
113impl<'a> CacheUpdate<'a> {
114    /// Creates a new [`CacheUpdate`] which operates on the passed cache.
115    fn new(inner: RwLockWriteGuard<'a, Inner>, key: &QuotaScoping) -> Self {
116        Self(RwLockWriteGuard::try_map(inner, |inner| inner.cache.get_mut(key)).ok())
117    }
118
119    /// Creates a new noop [`CacheUpdate`] which does not require a lock.
120    fn noop() -> Self {
121        Self(None)
122    }
123
124    /// Marks a hash as accepted in the cache, future checks of the item will immediately mark the
125    /// item as accepted.
126    pub fn accept(&mut self, hash: u32) {
127        if let Some(cache) = self.0.as_mut() {
128            cache.insert(hash);
129        }
130    }
131}
132
133/// Critical section of the [`Cache`].
134#[derive(Debug)]
135struct Inner {
136    cache: hashbrown::HashMap<QuotaScoping, ScopedCache>,
137    vacuum_interval: Duration,
138    last_vacuum: UnixTimestamp,
139}
140
141impl Inner {
142    fn vacuum(&mut self, ts: UnixTimestamp) {
143        // Debounce the vacuuming.
144        let secs_since_last_vacuum = ts.as_secs().saturating_sub(self.last_vacuum.as_secs());
145        if secs_since_last_vacuum < self.vacuum_interval.as_secs() {
146            return;
147        }
148        self.last_vacuum = ts;
149
150        let expired = metric!(timer(CardinalityLimiterTimers::CacheVacuum), {
151            self.cache
152                .extract_if(|scope, cache| cache.current_slot < scope.active_slot(ts))
153                .count()
154        });
155        metric!(counter(CardinalityLimiterCounters::RedisCacheVacuum) += expired as i64);
156    }
157}
158
159/// Scope specific information of the cache.
160#[derive(Debug, Default)]
161struct ScopedCache {
162    // Uses hashbrown for a faster hasher `ahash`, benchmarks show about 10% speedup.
163    hashes: hashbrown::HashSet<u32>,
164    current_slot: Slot,
165}
166
167impl ScopedCache {
168    fn check(&self, slot: Slot, hash: u32, limit: u32) -> CacheOutcome {
169        if slot != self.current_slot {
170            return CacheOutcome::Unknown;
171        }
172
173        if self.hashes.contains(&hash) {
174            // Local cache copy contains the hash -> accept it straight away
175            CacheOutcome::Accepted
176        } else if self.hashes.len().try_into().unwrap_or(u32::MAX) >= limit {
177            // We have more or the same amount of items in the local cache as the cardinality
178            // limit -> this new item/hash is rejected.
179            CacheOutcome::Rejected
180        } else {
181            // Check with Redis.
182            CacheOutcome::Unknown
183        }
184    }
185
186    fn insert(&mut self, hash: u32) {
187        self.hashes.insert(hash);
188    }
189
190    fn reset(&mut self, slot: Slot) {
191        self.current_slot = slot;
192        self.hashes.clear();
193    }
194}
195
196#[cfg(test)]
197mod tests {
198    use relay_base_schema::metrics::{MetricName, MetricNamespace};
199    use relay_base_schema::organization::OrganizationId;
200    use relay_base_schema::project::ProjectId;
201
202    use crate::limiter::{Entry, EntryId};
203    use crate::redis::quota::PartialQuotaScoping;
204    use crate::{CardinalityLimit, CardinalityScope, Scoping, SlidingWindow};
205
206    use super::*;
207
208    fn build_scoping(organization_id: OrganizationId, window: SlidingWindow) -> QuotaScoping {
209        PartialQuotaScoping::new(
210            Scoping {
211                organization_id,
212                project_id: ProjectId::new(1),
213            },
214            &CardinalityLimit {
215                id: String::new(),
216                passive: false,
217                report: false,
218                window,
219                limit: 100,
220                scope: CardinalityScope::Organization,
221                namespace: None,
222            },
223        )
224        .unwrap()
225        .complete(Entry {
226            id: EntryId(0),
227            namespace: MetricNamespace::Spans,
228            name: &MetricName::from("foobar"),
229            hash: 123,
230        })
231    }
232
233    #[test]
234    fn test_cache() {
235        let cache = Cache::new(Duration::from_secs(180));
236
237        let window = SlidingWindow {
238            window_seconds: 100,
239            granularity_seconds: 10,
240        };
241        let scope = build_scoping(OrganizationId::new(1), window);
242        let now = UnixTimestamp::now();
243        let future = now + Duration::from_secs(window.granularity_seconds + 1);
244
245        {
246            let cache = cache.read(now);
247            assert_eq!(cache.check(&scope, 1, 1), CacheOutcome::Unknown);
248        }
249
250        {
251            let mut cache = cache.update(&scope, now);
252            cache.accept(1);
253            cache.accept(2);
254        }
255
256        {
257            let r1 = cache.read(now);
258            // All in cache, no matter the limit.
259            assert_eq!(r1.check(&scope, 1, 1), CacheOutcome::Accepted);
260            assert_eq!(r1.check(&scope, 1, 2), CacheOutcome::Accepted);
261            assert_eq!(r1.check(&scope, 2, 1), CacheOutcome::Accepted);
262
263            // Not in cache, depends on limit and amount of items in the cache.
264            assert_eq!(r1.check(&scope, 3, 3), CacheOutcome::Unknown);
265            assert_eq!(r1.check(&scope, 3, 2), CacheOutcome::Rejected);
266
267            // Read concurrently from a future slot.
268            let r2 = cache.read(future);
269            assert_eq!(r2.check(&scope, 1, 1), CacheOutcome::Unknown);
270            assert_eq!(r2.check(&scope, 2, 2), CacheOutcome::Unknown);
271        }
272
273        {
274            // Move the cache into the future.
275            let mut cache = cache.update(&scope, future);
276            cache.accept(1);
277        }
278
279        {
280            let future = cache.read(future);
281            // The future only contains `1`.
282            assert_eq!(future.check(&scope, 1, 1), CacheOutcome::Accepted);
283            assert_eq!(future.check(&scope, 2, 1), CacheOutcome::Rejected);
284
285            let past = cache.read(now);
286            // The cache has no information about the past.
287            assert_eq!(past.check(&scope, 1, 1), CacheOutcome::Unknown);
288            assert_eq!(past.check(&scope, 2, 1), CacheOutcome::Unknown);
289            assert_eq!(past.check(&scope, 3, 99), CacheOutcome::Unknown);
290        }
291    }
292
293    #[test]
294    fn test_cache_different_scopings() {
295        let cache = Cache::new(Duration::from_secs(180));
296
297        let window = SlidingWindow {
298            window_seconds: 100,
299            granularity_seconds: 10,
300        };
301        let scope1 = build_scoping(OrganizationId::new(1), window);
302        let scope2 = build_scoping(OrganizationId::new(2), window);
303
304        let now = UnixTimestamp::now();
305
306        {
307            let mut cache = cache.update(&scope1, now);
308            cache.accept(1);
309        }
310
311        {
312            let mut cache = cache.update(&scope2, now);
313            cache.accept(1);
314            cache.accept(2);
315        }
316
317        {
318            let cache = cache.read(now);
319            assert_eq!(cache.check(&scope1, 1, 99), CacheOutcome::Accepted);
320            assert_eq!(cache.check(&scope1, 2, 99), CacheOutcome::Unknown);
321            assert_eq!(cache.check(&scope1, 3, 99), CacheOutcome::Unknown);
322            assert_eq!(cache.check(&scope2, 3, 1), CacheOutcome::Rejected);
323            assert_eq!(cache.check(&scope2, 1, 99), CacheOutcome::Accepted);
324            assert_eq!(cache.check(&scope2, 2, 99), CacheOutcome::Accepted);
325            assert_eq!(cache.check(&scope2, 3, 99), CacheOutcome::Unknown);
326            assert_eq!(cache.check(&scope2, 3, 2), CacheOutcome::Rejected);
327        }
328    }
329
330    #[test]
331    fn test_cache_vacuum() {
332        let vacuum_interval = Duration::from_secs(30);
333        let cache = Cache::new(vacuum_interval);
334
335        let window = SlidingWindow {
336            window_seconds: vacuum_interval.as_secs() * 10,
337            granularity_seconds: vacuum_interval.as_secs() * 2,
338        };
339        let scope1 = build_scoping(OrganizationId::new(1), window);
340        let scope2 = build_scoping(OrganizationId::new(2), window);
341
342        let now = UnixTimestamp::now();
343        let in_interval = now + Duration::from_secs(vacuum_interval.as_secs() - 1);
344        let future = now + Duration::from_secs(vacuum_interval.as_secs() * 3);
345
346        {
347            let mut cache = cache.update(&scope1, now);
348            cache.accept(10);
349        }
350
351        {
352            let mut cache = cache.update(&scope2, now);
353            cache.accept(20);
354        }
355
356        {
357            // Verify entries.
358            let cache = cache.read(now);
359            assert_eq!(cache.check(&scope1, 10, 100), CacheOutcome::Accepted);
360            assert_eq!(cache.check(&scope2, 20, 100), CacheOutcome::Accepted);
361        }
362
363        {
364            // Fast forward time a little bit and stay within all bounds.
365            let mut cache = cache.update(&scope2, in_interval);
366            cache.accept(21);
367        }
368
369        {
370            // Verify entries with old timestamp, values should still be there.
371            let cache = cache.read(now);
372            assert_eq!(cache.check(&scope1, 10, 100), CacheOutcome::Accepted);
373        }
374
375        {
376            // Fast-forward time far in the future, should vacuum old values.
377            let mut cache = cache.update(&scope2, future);
378            cache.accept(22);
379        }
380
381        {
382            // Verify that there is no data with the original timestamp.
383            let cache = cache.read(now);
384            assert_eq!(cache.check(&scope1, 10, 100), CacheOutcome::Unknown);
385            assert_eq!(cache.check(&scope1, 11, 100), CacheOutcome::Unknown);
386            assert_eq!(cache.check(&scope2, 20, 100), CacheOutcome::Unknown);
387            assert_eq!(cache.check(&scope2, 21, 100), CacheOutcome::Unknown);
388        }
389
390        {
391            // Make sure the new/current values are cached.
392            let cache = cache.read(future);
393            assert_eq!(cache.check(&scope2, 22, 100), CacheOutcome::Accepted);
394        }
395    }
396}