relay_cardinality/redis/
state.rs

1use std::collections::BTreeMap;
2
3use relay_statsd::metric;
4
5use crate::{
6    CardinalityLimit,
7    limiter::{Entry, EntryId, Scoping},
8    redis::quota::{PartialQuotaScoping, QuotaScoping},
9    statsd::CardinalityLimiterCounters,
10};
11
12/// Internal state combining relevant entries for the respective quotas.
13///
14/// Also tracks amount of cache hits and misses as well as amount of
15/// items accepted and rejected. These metrics are reported via statsd on drop.
16#[derive(Debug)]
17pub struct LimitState<'a> {
18    /// The limit of the quota.
19    pub limit: u32,
20
21    /// Scoping of the quota.
22    partial_scope: PartialQuotaScoping,
23    /// Entries grouped by quota scoping.
24    ///
25    /// Contained scopes are a superset of `scope`.
26    scopes: BTreeMap<QuotaScoping, Vec<RedisEntry>>,
27
28    /// The original cardinality limit.
29    cardinality_limit: &'a CardinalityLimit,
30    /// Amount of cache hits.
31    cache_hits: u64,
32    /// Amount of cache misses.
33    cache_misses: u64,
34    /// Amount of accepts,
35    accepts: u64,
36    /// Amount of rejections.
37    rejections: u64,
38}
39
40impl<'a> LimitState<'a> {
41    /// Creates a new limit state from a [`Scoping`] and [`CardinalityLimit`].
42    ///
43    /// Returns `None` if the cardinality limit scope is [`Unknown`](crate::CardinalityScope::Unknown).
44    pub fn new(scoping: Scoping, cardinality_limit: &'a CardinalityLimit) -> Option<Self> {
45        Some(Self {
46            partial_scope: PartialQuotaScoping::new(scoping, cardinality_limit)?,
47            scopes: BTreeMap::new(),
48            limit: cardinality_limit.limit,
49            cardinality_limit,
50            cache_hits: 0,
51            cache_misses: 0,
52            accepts: 0,
53            rejections: 0,
54        })
55    }
56
57    /// Converts a list of limits with the same scope to a vector of limit states.
58    ///
59    /// All invalid/unknown limits are skipped, see also [`Self::new`].
60    pub fn from_limits(scoping: Scoping, limits: &'a [CardinalityLimit]) -> Vec<Self> {
61        limits
62            .iter()
63            .filter_map(|limit| LimitState::new(scoping, limit))
64            .collect::<Vec<_>>()
65    }
66
67    /// Returns a [`QuotaScoping`] if the `entry` matches the limit contained in the state.
68    pub fn matching_scope(&self, entry: Entry) -> Option<QuotaScoping> {
69        if self.partial_scope.matches(&entry) {
70            Some(self.partial_scope.complete(entry))
71        } else {
72            None
73        }
74    }
75
76    /// Adds an entry to the state.
77    ///
78    /// The `scope` must be extracted from the state with [`Self::matching_scope`] first.
79    pub fn add(&mut self, scope: QuotaScoping, entry: RedisEntry) {
80        self.scopes.entry(scope).or_default().push(entry)
81    }
82
83    /// Returns `true` if this state does not contain any entries.
84    ///
85    /// The state can be empty if:
86    /// - there are no entries matching this limit.
87    /// - all entries matching this limit were already handled by the cache.
88    pub fn is_empty(&self) -> bool {
89        self.scopes.is_empty()
90    }
91
92    /// Returns the underlying cardinality limit id.
93    pub fn id(&self) -> &'a str {
94        &self.cardinality_limit.id
95    }
96
97    /// Returns the underlying cardinality limit.
98    pub fn cardinality_limit(&self) -> &'a CardinalityLimit {
99        self.cardinality_limit
100    }
101
102    /// Returns a reference to all contained scopes.
103    pub fn scopes(&self) -> &BTreeMap<QuotaScoping, Vec<RedisEntry>> {
104        &self.scopes
105    }
106
107    /// Removes all contained scopes and entries and returns them.
108    pub fn take_scopes(&mut self) -> BTreeMap<QuotaScoping, Vec<RedisEntry>> {
109        std::mem::take(&mut self.scopes)
110    }
111
112    /// Increases the cache hit counter.
113    pub fn cache_hit(&mut self) {
114        self.cache_hits += 1;
115    }
116
117    /// Increases the cache miss counter.
118    pub fn cache_miss(&mut self) {
119        self.cache_misses += 1;
120    }
121
122    /// Increases the accepted counter.
123    pub fn accepted(&mut self) {
124        self.accepts += 1;
125    }
126
127    /// Increases the rejected counter.
128    pub fn rejected(&mut self) {
129        self.rejections += 1;
130    }
131}
132
133impl Drop for LimitState<'_> {
134    fn drop(&mut self) {
135        let passive = if self.cardinality_limit.passive {
136            "true"
137        } else {
138            "false"
139        };
140
141        metric!(
142            counter(CardinalityLimiterCounters::RedisCacheHit) += self.cache_hits,
143            id = &self.cardinality_limit.id,
144            passive = passive,
145        );
146        metric!(
147            counter(CardinalityLimiterCounters::RedisCacheMiss) += self.cache_misses,
148            id = &self.cardinality_limit.id,
149            passive = passive,
150        );
151        metric!(
152            counter(CardinalityLimiterCounters::Accepted) += self.accepts,
153            id = &self.cardinality_limit.id,
154            passive = passive,
155        );
156        metric!(
157            counter(CardinalityLimiterCounters::Rejected) += self.rejections,
158            id = &self.cardinality_limit.id,
159            passive = passive,
160        );
161    }
162}
163
164/// Entry used by the Redis limiter.
165#[derive(Clone, Copy, Debug)]
166pub struct RedisEntry {
167    /// The correlating entry id.
168    pub id: EntryId,
169    /// The entry hash.
170    pub hash: u32,
171}
172
173impl RedisEntry {
174    /// Creates a new Redis entry.
175    pub fn new(id: EntryId, hash: u32) -> Self {
176        Self { id, hash }
177    }
178}