relay_cardinality/redis/
state.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
use std::collections::BTreeMap;

use relay_statsd::metric;

use crate::{
    limiter::{Entry, EntryId, Scoping},
    redis::quota::{PartialQuotaScoping, QuotaScoping},
    statsd::{CardinalityLimiterCounters, CardinalityLimiterSets},
    CardinalityLimit,
};

/// Internal state combining relevant entries for the respective quotas.
///
/// Also tracks amount of cache hits and misses as well as amount of
/// items accepted and rejected. These metrics are reported via statsd on drop.
#[derive(Debug)]
pub struct LimitState<'a> {
    /// The limit of the quota.
    pub limit: u32,

    /// Scoping of the quota.
    partial_scope: PartialQuotaScoping,
    /// Entries grouped by quota scoping.
    ///
    /// Contained scopes are a superset of `scope`.
    scopes: BTreeMap<QuotaScoping, Vec<RedisEntry>>,

    /// The original cardinality limit.
    cardinality_limit: &'a CardinalityLimit,
    /// The original/full scoping.
    scoping: Scoping,
    /// Amount of cache hits.
    cache_hits: i64,
    /// Amount of cache misses.
    cache_misses: i64,
    /// Amount of accepts,
    accepts: i64,
    /// Amount of rejections.
    rejections: i64,
}

impl<'a> LimitState<'a> {
    /// Creates a new limit state from a [`Scoping`] and [`CardinalityLimit`].
    ///
    /// Returns `None` if the cardinality limit scope is [`Unknown`](crate::CardinalityScope::Unknown).
    pub fn new(scoping: Scoping, cardinality_limit: &'a CardinalityLimit) -> Option<Self> {
        Some(Self {
            partial_scope: PartialQuotaScoping::new(scoping, cardinality_limit)?,
            scopes: BTreeMap::new(),
            limit: cardinality_limit.limit,
            cardinality_limit,
            scoping,
            cache_hits: 0,
            cache_misses: 0,
            accepts: 0,
            rejections: 0,
        })
    }

    /// Converts a list of limits with the same scope to a vector of limit states.
    ///
    /// All invalid/unknown limits are skipped, see also [`Self::new`].
    pub fn from_limits(scoping: Scoping, limits: &'a [CardinalityLimit]) -> Vec<Self> {
        limits
            .iter()
            .filter_map(|limit| LimitState::new(scoping, limit))
            .collect::<Vec<_>>()
    }

    /// Returns a [`QuotaScoping`] if the `entry` matches the limit contained in the state.
    pub fn matching_scope(&self, entry: Entry) -> Option<QuotaScoping> {
        if self.partial_scope.matches(&entry) {
            Some(self.partial_scope.complete(entry))
        } else {
            None
        }
    }

    /// Adds an entry to the state.
    ///
    /// The `scope` must be extracted from the state with [`Self::matching_scope`] first.
    pub fn add(&mut self, scope: QuotaScoping, entry: RedisEntry) {
        self.scopes.entry(scope).or_default().push(entry)
    }

    /// Returns `true` if this state does not contain any entries.
    ///
    /// The state can be empty if:
    /// - there are no entries matching this limit.
    /// - all entries matching this limit were already handled by the cache.
    pub fn is_empty(&self) -> bool {
        self.scopes.is_empty()
    }

    /// Returns the underlying cardinality limit id.
    pub fn id(&self) -> &'a str {
        &self.cardinality_limit.id
    }

    /// Returns the underlying cardinality limit.
    pub fn cardinality_limit(&self) -> &'a CardinalityLimit {
        self.cardinality_limit
    }

    /// Returns a reference to all contained scopes.
    pub fn scopes(&self) -> &BTreeMap<QuotaScoping, Vec<RedisEntry>> {
        &self.scopes
    }

    /// Removes all contained scopes and entries and returns them.
    pub fn take_scopes(&mut self) -> BTreeMap<QuotaScoping, Vec<RedisEntry>> {
        std::mem::take(&mut self.scopes)
    }

    /// Increases the cache hit counter.
    pub fn cache_hit(&mut self) {
        self.cache_hits += 1;
    }

    /// Increases the cache miss counter.
    pub fn cache_miss(&mut self) {
        self.cache_misses += 1;
    }

    /// Increases the accepted counter.
    pub fn accepted(&mut self) {
        self.accepts += 1;
    }

    /// Increases the rejected counter.
    pub fn rejected(&mut self) {
        self.rejections += 1;
    }
}

impl Drop for LimitState<'_> {
    fn drop(&mut self) {
        let passive = if self.cardinality_limit.passive {
            "true"
        } else {
            "false"
        };

        metric!(
            counter(CardinalityLimiterCounters::RedisCacheHit) += self.cache_hits,
            id = &self.cardinality_limit.id,
            passive = passive,
        );
        metric!(
            counter(CardinalityLimiterCounters::RedisCacheMiss) += self.cache_misses,
            id = &self.cardinality_limit.id,
            passive = passive,
        );
        metric!(
            counter(CardinalityLimiterCounters::Accepted) += self.accepts,
            id = &self.cardinality_limit.id,
            passive = passive,
        );
        metric!(
            counter(CardinalityLimiterCounters::Rejected) += self.rejections,
            id = &self.cardinality_limit.id,
            passive = passive,
        );

        let organization_id = self.scoping.organization_id;
        let status = if self.rejections > 0 { "limited" } else { "ok" };
        metric!(
            set(CardinalityLimiterSets::Organizations) = organization_id.value() as i64,
            id = &self.cardinality_limit.id,
            passive = passive,
            status = status,
        )
    }
}

/// Entry used by the Redis limiter.
#[derive(Clone, Copy, Debug)]
pub struct RedisEntry {
    /// The correlating entry id.
    pub id: EntryId,
    /// The entry hash.
    pub hash: u32,
}

impl RedisEntry {
    /// Creates a new Redis entry.
    pub fn new(id: EntryId, hash: u32) -> Self {
        Self { id, hash }
    }
}