relay_sampling/
redis_sampling.rs1use chrono::{DateTime, Utc};
2use relay_base_schema::organization::OrganizationId;
3use relay_redis::AsyncRedisConnection;
4
5use crate::config::RuleId;
6
7pub struct ReservoirRuleKey(String);
8
9impl ReservoirRuleKey {
10 pub fn new(org_id: OrganizationId, rule_id: RuleId) -> Self {
11 Self(format!("reservoir:{org_id}:{rule_id}"))
12 }
13
14 fn as_str(&self) -> &str {
15 self.0.as_str()
16 }
17}
18
19pub async fn increment_redis_reservoir_count(
24 connection: &mut AsyncRedisConnection,
25 key: &ReservoirRuleKey,
26) -> anyhow::Result<i64> {
27 let val = relay_redis::redis::cmd("INCR")
28 .arg(key.as_str())
29 .query_async(connection)
30 .await?;
31
32 Ok(val)
33}
34
35pub async fn set_redis_expiry(
37 connection: &mut AsyncRedisConnection,
38 key: &ReservoirRuleKey,
39 rule_expiry: Option<&DateTime<Utc>>,
40) -> anyhow::Result<()> {
41 let now = Utc::now().timestamp();
42 let expiry_time = rule_expiry
43 .map(|rule_expiry| rule_expiry.timestamp() + 60)
44 .unwrap_or_else(|| now + 86400);
45
46 relay_redis::redis::cmd("EXPIRE")
47 .arg(key.as_str())
48 .arg(expiry_time - now)
49 .query_async::<()>(connection)
50 .await?;
51
52 Ok(())
53}