relay_sampling/
redis_sampling.rs

1use chrono::{DateTime, Utc};
2use relay_base_schema::organization::OrganizationId;
3use relay_redis::AsyncRedisConnection;
4
5use crate::config::RuleId;
6
7pub struct ReservoirRuleKey(String);
8
9impl ReservoirRuleKey {
10    pub fn new(org_id: OrganizationId, rule_id: RuleId) -> Self {
11        Self(format!("reservoir:{org_id}:{rule_id}"))
12    }
13
14    fn as_str(&self) -> &str {
15        self.0.as_str()
16    }
17}
18
19/// Increments the reservoir count for a given rule in redis.
20///
21/// - INCR docs: [`https://redis.io/commands/incr/`]
22/// - If the counter doesn't exist in redis, a new one will be inserted.
23pub async fn increment_redis_reservoir_count(
24    connection: &mut AsyncRedisConnection,
25    key: &ReservoirRuleKey,
26) -> anyhow::Result<i64> {
27    let val = relay_redis::redis::cmd("INCR")
28        .arg(key.as_str())
29        .query_async(connection)
30        .await?;
31
32    Ok(val)
33}
34
35/// Sets the expiry time for a reservoir rule count.
36pub async fn set_redis_expiry(
37    connection: &mut AsyncRedisConnection,
38    key: &ReservoirRuleKey,
39    rule_expiry: Option<&DateTime<Utc>>,
40) -> anyhow::Result<()> {
41    let now = Utc::now().timestamp();
42    let expiry_time = rule_expiry
43        .map(|rule_expiry| rule_expiry.timestamp() + 60)
44        .unwrap_or_else(|| now + 86400);
45
46    relay_redis::redis::cmd("EXPIRE")
47        .arg(key.as_str())
48        .arg(expiry_time - now)
49        .query_async::<()>(connection)
50        .await?;
51
52    Ok(())
53}