relay_cardinality/redis/state.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190
use std::collections::BTreeMap;
use relay_statsd::metric;
use crate::{
limiter::{Entry, EntryId, Scoping},
redis::quota::{PartialQuotaScoping, QuotaScoping},
statsd::{CardinalityLimiterCounters, CardinalityLimiterSets},
CardinalityLimit,
};
/// Internal state combining relevant entries for the respective quotas.
///
/// Also tracks amount of cache hits and misses as well as amount of
/// items accepted and rejected. These metrics are reported via statsd on drop.
#[derive(Debug)]
pub struct LimitState<'a> {
/// The limit of the quota.
pub limit: u32,
/// Scoping of the quota.
partial_scope: PartialQuotaScoping,
/// Entries grouped by quota scoping.
///
/// Contained scopes are a superset of `scope`.
scopes: BTreeMap<QuotaScoping, Vec<RedisEntry>>,
/// The original cardinality limit.
cardinality_limit: &'a CardinalityLimit,
/// The original/full scoping.
scoping: Scoping,
/// Amount of cache hits.
cache_hits: i64,
/// Amount of cache misses.
cache_misses: i64,
/// Amount of accepts,
accepts: i64,
/// Amount of rejections.
rejections: i64,
}
impl<'a> LimitState<'a> {
/// Creates a new limit state from a [`Scoping`] and [`CardinalityLimit`].
///
/// Returns `None` if the cardinality limit scope is [`Unknown`](crate::CardinalityScope::Unknown).
pub fn new(scoping: Scoping, cardinality_limit: &'a CardinalityLimit) -> Option<Self> {
Some(Self {
partial_scope: PartialQuotaScoping::new(scoping, cardinality_limit)?,
scopes: BTreeMap::new(),
limit: cardinality_limit.limit,
cardinality_limit,
scoping,
cache_hits: 0,
cache_misses: 0,
accepts: 0,
rejections: 0,
})
}
/// Converts a list of limits with the same scope to a vector of limit states.
///
/// All invalid/unknown limits are skipped, see also [`Self::new`].
pub fn from_limits(scoping: Scoping, limits: &'a [CardinalityLimit]) -> Vec<Self> {
limits
.iter()
.filter_map(|limit| LimitState::new(scoping, limit))
.collect::<Vec<_>>()
}
/// Returns a [`QuotaScoping`] if the `entry` matches the limit contained in the state.
pub fn matching_scope(&self, entry: Entry) -> Option<QuotaScoping> {
if self.partial_scope.matches(&entry) {
Some(self.partial_scope.complete(entry))
} else {
None
}
}
/// Adds an entry to the state.
///
/// The `scope` must be extracted from the state with [`Self::matching_scope`] first.
pub fn add(&mut self, scope: QuotaScoping, entry: RedisEntry) {
self.scopes.entry(scope).or_default().push(entry)
}
/// Returns `true` if this state does not contain any entries.
///
/// The state can be empty if:
/// - there are no entries matching this limit.
/// - all entries matching this limit were already handled by the cache.
pub fn is_empty(&self) -> bool {
self.scopes.is_empty()
}
/// Returns the underlying cardinality limit id.
pub fn id(&self) -> &'a str {
&self.cardinality_limit.id
}
/// Returns the underlying cardinality limit.
pub fn cardinality_limit(&self) -> &'a CardinalityLimit {
self.cardinality_limit
}
/// Returns a reference to all contained scopes.
pub fn scopes(&self) -> &BTreeMap<QuotaScoping, Vec<RedisEntry>> {
&self.scopes
}
/// Removes all contained scopes and entries and returns them.
pub fn take_scopes(&mut self) -> BTreeMap<QuotaScoping, Vec<RedisEntry>> {
std::mem::take(&mut self.scopes)
}
/// Increases the cache hit counter.
pub fn cache_hit(&mut self) {
self.cache_hits += 1;
}
/// Increases the cache miss counter.
pub fn cache_miss(&mut self) {
self.cache_misses += 1;
}
/// Increases the accepted counter.
pub fn accepted(&mut self) {
self.accepts += 1;
}
/// Increases the rejected counter.
pub fn rejected(&mut self) {
self.rejections += 1;
}
}
impl Drop for LimitState<'_> {
fn drop(&mut self) {
let passive = if self.cardinality_limit.passive {
"true"
} else {
"false"
};
metric!(
counter(CardinalityLimiterCounters::RedisCacheHit) += self.cache_hits,
id = &self.cardinality_limit.id,
passive = passive,
);
metric!(
counter(CardinalityLimiterCounters::RedisCacheMiss) += self.cache_misses,
id = &self.cardinality_limit.id,
passive = passive,
);
metric!(
counter(CardinalityLimiterCounters::Accepted) += self.accepts,
id = &self.cardinality_limit.id,
passive = passive,
);
metric!(
counter(CardinalityLimiterCounters::Rejected) += self.rejections,
id = &self.cardinality_limit.id,
passive = passive,
);
let organization_id = self.scoping.organization_id;
let status = if self.rejections > 0 { "limited" } else { "ok" };
metric!(
set(CardinalityLimiterSets::Organizations) = organization_id.value() as i64,
id = &self.cardinality_limit.id,
passive = passive,
status = status,
)
}
}
/// Entry used by the Redis limiter.
#[derive(Clone, Copy, Debug)]
pub struct RedisEntry {
/// The correlating entry id.
pub id: EntryId,
/// The entry hash.
pub hash: u32,
}
impl RedisEntry {
/// Creates a new Redis entry.
pub fn new(id: EntryId, hash: u32) -> Self {
Self { id, hash }
}
}