relay_cardinality/redis/
state.rs

1use std::collections::BTreeMap;
2
3use relay_statsd::metric;
4
5use crate::{
6    CardinalityLimit,
7    limiter::{Entry, EntryId, Scoping},
8    redis::quota::{PartialQuotaScoping, QuotaScoping},
9    statsd::{CardinalityLimiterCounters, CardinalityLimiterSets},
10};
11
12/// Internal state combining relevant entries for the respective quotas.
13///
14/// Also tracks amount of cache hits and misses as well as amount of
15/// items accepted and rejected. These metrics are reported via statsd on drop.
16#[derive(Debug)]
17pub struct LimitState<'a> {
18    /// The limit of the quota.
19    pub limit: u32,
20
21    /// Scoping of the quota.
22    partial_scope: PartialQuotaScoping,
23    /// Entries grouped by quota scoping.
24    ///
25    /// Contained scopes are a superset of `scope`.
26    scopes: BTreeMap<QuotaScoping, Vec<RedisEntry>>,
27
28    /// The original cardinality limit.
29    cardinality_limit: &'a CardinalityLimit,
30    /// The original/full scoping.
31    scoping: Scoping,
32    /// Amount of cache hits.
33    cache_hits: i64,
34    /// Amount of cache misses.
35    cache_misses: i64,
36    /// Amount of accepts,
37    accepts: i64,
38    /// Amount of rejections.
39    rejections: i64,
40}
41
42impl<'a> LimitState<'a> {
43    /// Creates a new limit state from a [`Scoping`] and [`CardinalityLimit`].
44    ///
45    /// Returns `None` if the cardinality limit scope is [`Unknown`](crate::CardinalityScope::Unknown).
46    pub fn new(scoping: Scoping, cardinality_limit: &'a CardinalityLimit) -> Option<Self> {
47        Some(Self {
48            partial_scope: PartialQuotaScoping::new(scoping, cardinality_limit)?,
49            scopes: BTreeMap::new(),
50            limit: cardinality_limit.limit,
51            cardinality_limit,
52            scoping,
53            cache_hits: 0,
54            cache_misses: 0,
55            accepts: 0,
56            rejections: 0,
57        })
58    }
59
60    /// Converts a list of limits with the same scope to a vector of limit states.
61    ///
62    /// All invalid/unknown limits are skipped, see also [`Self::new`].
63    pub fn from_limits(scoping: Scoping, limits: &'a [CardinalityLimit]) -> Vec<Self> {
64        limits
65            .iter()
66            .filter_map(|limit| LimitState::new(scoping, limit))
67            .collect::<Vec<_>>()
68    }
69
70    /// Returns a [`QuotaScoping`] if the `entry` matches the limit contained in the state.
71    pub fn matching_scope(&self, entry: Entry) -> Option<QuotaScoping> {
72        if self.partial_scope.matches(&entry) {
73            Some(self.partial_scope.complete(entry))
74        } else {
75            None
76        }
77    }
78
79    /// Adds an entry to the state.
80    ///
81    /// The `scope` must be extracted from the state with [`Self::matching_scope`] first.
82    pub fn add(&mut self, scope: QuotaScoping, entry: RedisEntry) {
83        self.scopes.entry(scope).or_default().push(entry)
84    }
85
86    /// Returns `true` if this state does not contain any entries.
87    ///
88    /// The state can be empty if:
89    /// - there are no entries matching this limit.
90    /// - all entries matching this limit were already handled by the cache.
91    pub fn is_empty(&self) -> bool {
92        self.scopes.is_empty()
93    }
94
95    /// Returns the underlying cardinality limit id.
96    pub fn id(&self) -> &'a str {
97        &self.cardinality_limit.id
98    }
99
100    /// Returns the underlying cardinality limit.
101    pub fn cardinality_limit(&self) -> &'a CardinalityLimit {
102        self.cardinality_limit
103    }
104
105    /// Returns a reference to all contained scopes.
106    pub fn scopes(&self) -> &BTreeMap<QuotaScoping, Vec<RedisEntry>> {
107        &self.scopes
108    }
109
110    /// Removes all contained scopes and entries and returns them.
111    pub fn take_scopes(&mut self) -> BTreeMap<QuotaScoping, Vec<RedisEntry>> {
112        std::mem::take(&mut self.scopes)
113    }
114
115    /// Increases the cache hit counter.
116    pub fn cache_hit(&mut self) {
117        self.cache_hits += 1;
118    }
119
120    /// Increases the cache miss counter.
121    pub fn cache_miss(&mut self) {
122        self.cache_misses += 1;
123    }
124
125    /// Increases the accepted counter.
126    pub fn accepted(&mut self) {
127        self.accepts += 1;
128    }
129
130    /// Increases the rejected counter.
131    pub fn rejected(&mut self) {
132        self.rejections += 1;
133    }
134}
135
136impl Drop for LimitState<'_> {
137    fn drop(&mut self) {
138        let passive = if self.cardinality_limit.passive {
139            "true"
140        } else {
141            "false"
142        };
143
144        metric!(
145            counter(CardinalityLimiterCounters::RedisCacheHit) += self.cache_hits,
146            id = &self.cardinality_limit.id,
147            passive = passive,
148        );
149        metric!(
150            counter(CardinalityLimiterCounters::RedisCacheMiss) += self.cache_misses,
151            id = &self.cardinality_limit.id,
152            passive = passive,
153        );
154        metric!(
155            counter(CardinalityLimiterCounters::Accepted) += self.accepts,
156            id = &self.cardinality_limit.id,
157            passive = passive,
158        );
159        metric!(
160            counter(CardinalityLimiterCounters::Rejected) += self.rejections,
161            id = &self.cardinality_limit.id,
162            passive = passive,
163        );
164
165        let organization_id = self.scoping.organization_id;
166        let status = if self.rejections > 0 { "limited" } else { "ok" };
167        metric!(
168            set(CardinalityLimiterSets::Organizations) = organization_id.value() as i64,
169            id = &self.cardinality_limit.id,
170            passive = passive,
171            status = status,
172        )
173    }
174}
175
176/// Entry used by the Redis limiter.
177#[derive(Clone, Copy, Debug)]
178pub struct RedisEntry {
179    /// The correlating entry id.
180    pub id: EntryId,
181    /// The entry hash.
182    pub hash: u32,
183}
184
185impl RedisEntry {
186    /// Creates a new Redis entry.
187    pub fn new(id: EntryId, hash: u32) -> Self {
188        Self { id, hash }
189    }
190}