relay_sampling/
evaluation.rs

1//! Evaluation of dynamic sampling rules.
2
3use std::collections::BTreeMap;
4use std::fmt;
5use std::num::ParseIntError;
6use std::ops::ControlFlow;
7use std::sync::{Arc, Mutex};
8
9use chrono::{DateTime, Utc};
10use rand::Rng;
11use rand::distr::StandardUniform;
12use rand_pcg::Pcg32;
13#[cfg(feature = "redis")]
14use relay_base_schema::organization::OrganizationId;
15use relay_protocol::Getter;
16#[cfg(feature = "redis")]
17use relay_redis::AsyncRedisClient;
18use serde::Serialize;
19use uuid::Uuid;
20
21use crate::config::{RuleId, SamplingRule, SamplingValue};
22#[cfg(feature = "redis")]
23use crate::redis_sampling::{self, ReservoirRuleKey};
24
25/// Generates a pseudo random number by seeding the generator with the given id.
26///
27/// The return is deterministic, always generates the same number from the same id.
28fn pseudo_random_from_seed(seed: Uuid) -> f64 {
29    let seed_number = seed.as_u128();
30    let mut generator = Pcg32::new((seed_number >> 64) as u64, seed_number as u64);
31    generator.sample(StandardUniform)
32}
33
34/// The amount of matches for each reservoir rule in a given project.
35pub type ReservoirCounters = Arc<Mutex<BTreeMap<RuleId, i64>>>;
36
37/// Utility for evaluating reservoir-based sampling rules.
38///
39/// A "reservoir limit" rule samples every match until its limit is reached, after which
40/// the rule is disabled.
41///
42/// This utility uses a dual-counter system for enforcing this limit:
43///
44/// - Local Counter: Each relay instance maintains a local counter to track sampled events.
45///
46/// - Redis Counter: For processing relays, a Redis-based counter provides synchronization
47///   across multiple relay-instances. When incremented, the Redis counter returns the current global
48///   count for the given rule, which is then used to update the local counter.
49#[derive(Debug)]
50pub struct ReservoirEvaluator<'a> {
51    counters: ReservoirCounters,
52    #[cfg(feature = "redis")]
53    org_id_and_client: Option<(OrganizationId, &'a AsyncRedisClient)>,
54    // Using PhantomData because the lifetimes are behind a feature flag.
55    _phantom: std::marker::PhantomData<&'a ()>,
56}
57
58impl ReservoirEvaluator<'_> {
59    /// Constructor for [`ReservoirEvaluator`].
60    pub fn new(counters: ReservoirCounters) -> Self {
61        Self {
62            counters,
63            #[cfg(feature = "redis")]
64            org_id_and_client: None,
65            _phantom: std::marker::PhantomData,
66        }
67    }
68
69    /// Gets shared ownership of the reservoir counters.
70    pub fn counters(&self) -> ReservoirCounters {
71        Arc::clone(&self.counters)
72    }
73
74    #[cfg(feature = "redis")]
75    async fn redis_incr(
76        &self,
77        key: &ReservoirRuleKey,
78        client: &AsyncRedisClient,
79        rule_expiry: Option<&DateTime<Utc>>,
80    ) -> anyhow::Result<i64> {
81        relay_statsd::metric!(timer(crate::statsd::SamplingTimers::RedisReservoir), {
82            let mut connection = client.get_connection().await?;
83
84            let val = redis_sampling::increment_redis_reservoir_count(&mut connection, key).await?;
85            redis_sampling::set_redis_expiry(&mut connection, key, rule_expiry).await?;
86
87            Ok(val)
88        })
89    }
90
91    /// Evaluates a reservoir rule, returning `true` if it should be sampled.
92    pub fn incr_local(&self, rule: RuleId, limit: i64) -> bool {
93        let Ok(mut map_guard) = self.counters.lock() else {
94            relay_log::error!("failed to lock reservoir counter mutex");
95            return false;
96        };
97
98        let counter_value = map_guard.entry(rule).or_insert(0);
99
100        if *counter_value < limit {
101            *counter_value += 1;
102            true
103        } else {
104            false
105        }
106    }
107
108    /// Evaluates a reservoir rule, returning `true` if it should be sampled.
109    pub async fn evaluate(
110        &self,
111        rule: RuleId,
112        limit: i64,
113        _rule_expiry: Option<&DateTime<Utc>>,
114    ) -> bool {
115        #[cfg(feature = "redis")]
116        if let Some((org_id, client)) = self.org_id_and_client {
117            if let Ok(guard) = self.counters.lock()
118                && *guard.get(&rule).unwrap_or(&0) > limit
119            {
120                return false;
121            }
122
123            let key = ReservoirRuleKey::new(org_id, rule);
124            let redis_count = match self.redis_incr(&key, client, _rule_expiry).await {
125                Ok(redis_count) => redis_count,
126                Err(e) => {
127                    relay_log::error!(error = &*e, "failed to increment reservoir rule");
128                    return false;
129                }
130            };
131
132            if let Ok(mut map_guard) = self.counters.lock() {
133                // If the rule isn't present, it has just been cleaned up by a project state update.
134                // In that case, it is no longer relevant so we ignore it.
135                if let Some(value) = map_guard.get_mut(&rule) {
136                    *value = redis_count.max(*value);
137                }
138            }
139            return redis_count <= limit;
140        }
141
142        self.incr_local(rule, limit)
143    }
144}
145
146#[cfg(feature = "redis")]
147impl<'a> ReservoirEvaluator<'a> {
148    /// Sets the Redis pool and organization ID for the [`ReservoirEvaluator`].
149    ///
150    /// These values are needed to synchronize with Redis.
151    pub fn set_redis(&mut self, org_id: OrganizationId, client: &'a AsyncRedisClient) {
152        self.org_id_and_client = Some((org_id, client));
153    }
154}
155
156/// State machine for dynamic sampling.
157#[derive(Debug)]
158pub struct SamplingEvaluator<'a> {
159    now: DateTime<Utc>,
160    rule_ids: Vec<RuleId>,
161    factor: f64,
162    minimum_sample_rate: Option<f64>,
163    reservoir: Option<&'a ReservoirEvaluator<'a>>,
164}
165
166impl<'a> SamplingEvaluator<'a> {
167    /// Constructs an evaluator with reservoir sampling.
168    pub fn new_with_reservoir(now: DateTime<Utc>, reservoir: &'a ReservoirEvaluator<'a>) -> Self {
169        Self {
170            now,
171            rule_ids: vec![],
172            factor: 1.0,
173            minimum_sample_rate: None,
174            reservoir: Some(reservoir),
175        }
176    }
177
178    /// Constructs an evaluator without reservoir sampling.
179    pub fn new(now: DateTime<Utc>) -> Self {
180        Self {
181            now,
182            rule_ids: vec![],
183            factor: 1.0,
184            minimum_sample_rate: None,
185            reservoir: None,
186        }
187    }
188
189    /// Attempts to find a match for sampling rules using `ControlFlow`.
190    ///
191    /// This function returns a `ControlFlow` to provide control over the matching process.
192    ///
193    /// - `ControlFlow::Continue`: Indicates that matching is incomplete, and more rules can be evaluated.
194    ///    - This state occurs either if no active rules match the provided data, or if the matched rules
195    ///      are factors requiring a final sampling value.
196    ///    - The returned evaluator contains the state of the matched rules and the accumulated sampling factor.
197    ///    - If this value is returned and there are no more rules to evaluate, it should be interpreted as "no match."
198    ///
199    /// - `ControlFlow::Break`: Indicates that one or more rules have successfully matched.
200    pub async fn match_rules<'b, I, G>(
201        mut self,
202        seed: Uuid,
203        instance: &G,
204        rules: I,
205    ) -> ControlFlow<SamplingMatch, Self>
206    where
207        G: Getter,
208        I: Iterator<Item = &'b SamplingRule>,
209    {
210        for rule in rules {
211            if !rule.time_range.contains(self.now) || !rule.condition.matches(instance) {
212                continue;
213            };
214
215            if let Some(sample_rate) = self.try_compute_sample_rate(rule).await {
216                return ControlFlow::Break(SamplingMatch::new(sample_rate, seed, self.rule_ids));
217            };
218        }
219
220        ControlFlow::Continue(self)
221    }
222
223    /// Attempts to compute the sample rate for a given [`SamplingRule`].
224    ///
225    /// # Returns
226    ///
227    /// - `None` if the sampling rule is invalid, expired, or if the final sample rate has not been
228    ///   determined yet.
229    /// - `Some` if the computed sample rate should be applied directly.
230    async fn try_compute_sample_rate(&mut self, rule: &SamplingRule) -> Option<f64> {
231        match rule.sampling_value {
232            SamplingValue::Factor { value } => {
233                self.factor *= rule.apply_decaying_fn(value, self.now)?;
234                self.rule_ids.push(rule.id);
235                None
236            }
237            SamplingValue::SampleRate { value } => {
238                let sample_rate = rule.apply_decaying_fn(value, self.now)?;
239                let minimum_sample_rate = self.minimum_sample_rate.unwrap_or(0.0);
240                let adjusted = (sample_rate.max(minimum_sample_rate) * self.factor).clamp(0.0, 1.0);
241
242                self.rule_ids.push(rule.id);
243                Some(adjusted)
244            }
245            SamplingValue::Reservoir { limit } => {
246                let reservoir = self.reservoir?;
247                if !reservoir
248                    .evaluate(rule.id, limit, rule.time_range.end.as_ref())
249                    .await
250                {
251                    return None;
252                }
253
254                // Clearing the previously matched rules because reservoir overrides them.
255                self.rule_ids.clear();
256                self.rule_ids.push(rule.id);
257                // If the reservoir has not yet reached its limit, we want to sample 100%.
258                Some(1.0)
259            }
260            SamplingValue::MinimumSampleRate { value } => {
261                if self.minimum_sample_rate.is_none() {
262                    self.minimum_sample_rate = Some(rule.apply_decaying_fn(value, self.now)?);
263                    self.rule_ids.push(rule.id);
264                }
265                None
266            }
267        }
268    }
269}
270
271fn sampling_match(sample_rate: f64, seed: Uuid) -> SamplingDecision {
272    if sample_rate <= 0.0 {
273        return SamplingDecision::Drop;
274    } else if sample_rate >= 1.0 {
275        return SamplingDecision::Keep;
276    }
277
278    let random_number = pseudo_random_from_seed(seed);
279    relay_log::trace!(
280        sample_rate,
281        random_number,
282        "applying dynamic sampling to matching event"
283    );
284
285    if random_number >= sample_rate {
286        relay_log::trace!("dropping event that matched the configuration");
287        SamplingDecision::Drop
288    } else {
289        relay_log::trace!("keeping event that matched the configuration");
290        SamplingDecision::Keep
291    }
292}
293
294/// A sampling decision.
295#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
296pub enum SamplingDecision {
297    /// The item is sampled and should not be dropped.
298    Keep,
299    /// The item is not sampled and should be dropped.
300    Drop,
301}
302
303impl SamplingDecision {
304    /// Returns `true` if the sampling decision is [`Self::Keep`].
305    pub fn is_keep(self) -> bool {
306        matches!(self, Self::Keep)
307    }
308
309    /// Returns `true` if the sampling decision is [`Self::Drop`].
310    pub fn is_drop(self) -> bool {
311        matches!(self, Self::Drop)
312    }
313
314    /// Returns a string representation of the sampling decision.
315    pub fn as_str(self) -> &'static str {
316        match self {
317            Self::Keep => "keep",
318            Self::Drop => "drop",
319        }
320    }
321}
322
323impl fmt::Display for SamplingDecision {
324    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
325        write!(f, "{}", self.as_str())
326    }
327}
328
329/// Represents the specification for sampling an incoming event.
330#[derive(Clone, Debug, PartialEq)]
331pub struct SamplingMatch {
332    /// The sample rate to use for the incoming event.
333    sample_rate: f64,
334    /// The seed to feed to the random number generator which allows the same number to be
335    /// generated given the same seed.
336    ///
337    /// This is especially important for trace sampling, even though we can have inconsistent
338    /// traces due to multi-matching.
339    seed: Uuid,
340    /// The list of rule ids that have matched the incoming event and/or dynamic sampling context.
341    matched_rules: MatchedRuleIds,
342    /// Whether this sampling match results in the item getting sampled.
343    /// It's essentially a cache, as the value can be deterministically derived from
344    /// the sample rate and the seed.
345    decision: SamplingDecision,
346}
347
348impl SamplingMatch {
349    fn new(sample_rate: f64, seed: Uuid, matched_rules: Vec<RuleId>) -> Self {
350        let matched_rules = MatchedRuleIds(matched_rules);
351        let decision = sampling_match(sample_rate, seed);
352
353        Self {
354            sample_rate,
355            seed,
356            matched_rules,
357            decision,
358        }
359    }
360
361    /// Returns the sample rate.
362    pub fn sample_rate(&self) -> f64 {
363        self.sample_rate
364    }
365
366    /// Returns the matched rules for the sampling match.
367    ///
368    /// Takes ownership, useful if you don't need the [`SamplingMatch`] anymore
369    /// and you want to avoid allocations.
370    pub fn into_matched_rules(self) -> MatchedRuleIds {
371        self.matched_rules
372    }
373
374    /// Returns the sampling decision.
375    pub fn decision(&self) -> SamplingDecision {
376        self.decision
377    }
378}
379
380/// Represents a list of rule ids which is used for outcomes.
381#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
382pub struct MatchedRuleIds(pub Vec<RuleId>);
383
384impl MatchedRuleIds {
385    /// Parses `MatchedRuleIds` from a string with concatenated rule identifiers.
386    ///
387    /// The format it parses from is:
388    ///
389    /// ```text
390    /// rule_id_1,rule_id_2,...
391    /// ```
392    pub fn parse(value: &str) -> Result<MatchedRuleIds, ParseIntError> {
393        let mut rule_ids = vec![];
394
395        for rule_id in value.split(',') {
396            rule_ids.push(RuleId(rule_id.parse()?));
397        }
398
399        Ok(MatchedRuleIds(rule_ids))
400    }
401}
402
403impl fmt::Display for MatchedRuleIds {
404    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
405        for (i, rule_id) in self.0.iter().enumerate() {
406            if i > 0 {
407                write!(f, ",")?;
408            }
409            write!(f, "{rule_id}")?;
410        }
411
412        Ok(())
413    }
414}
415
416#[cfg(test)]
417mod tests {
418    use chrono::TimeZone;
419    use relay_protocol::RuleCondition;
420    use similar_asserts::assert_eq;
421    use std::str::FromStr;
422    use uuid::Uuid;
423
424    use crate::DynamicSamplingContext;
425    use crate::config::{DecayingFunction, RuleType, TimeRange};
426    use crate::dsc::TraceUserContext;
427
428    use super::*;
429
430    fn mock_reservoir_evaluator(vals: Vec<(u32, i64)>) -> ReservoirEvaluator<'static> {
431        let mut map = BTreeMap::default();
432
433        for (rule_id, count) in vals {
434            map.insert(RuleId(rule_id), count);
435        }
436
437        let map = Arc::new(Mutex::new(map));
438
439        ReservoirEvaluator::new(map)
440    }
441
442    /// Helper to extract the sampling match after evaluating rules.
443    async fn get_sampling_match(rules: &[SamplingRule], instance: &impl Getter) -> SamplingMatch {
444        match SamplingEvaluator::new(Utc::now())
445            .match_rules(Uuid::default(), instance, rules.iter())
446            .await
447        {
448            ControlFlow::Break(sampling_match) => sampling_match,
449            ControlFlow::Continue(_) => panic!("no match found"),
450        }
451    }
452
453    fn evaluation_is_match(res: ControlFlow<SamplingMatch, SamplingEvaluator>) -> bool {
454        matches!(res, ControlFlow::Break(_))
455    }
456
457    /// Helper to check if certain rules are matched on.
458    async fn matches_rule_ids(
459        rule_ids: &[u32],
460        rules: &[SamplingRule],
461        instance: &impl Getter,
462    ) -> bool {
463        let matched_rule_ids = MatchedRuleIds(rule_ids.iter().map(|num| RuleId(*num)).collect());
464        let sampling_match = get_sampling_match(rules, instance).await;
465        matched_rule_ids == sampling_match.matched_rules
466    }
467
468    // Helper method to "unwrap" the sampling match.
469    fn get_matched_rules(
470        sampling_evaluator: &ControlFlow<SamplingMatch, SamplingEvaluator>,
471    ) -> Vec<u32> {
472        match sampling_evaluator {
473            ControlFlow::Continue(_) => panic!("expected a sampling match"),
474            ControlFlow::Break(m) => m.matched_rules.0.iter().map(|rule_id| rule_id.0).collect(),
475        }
476    }
477
478    /// Helper function to create a dsc with the provided getter-values set.
479    fn mocked_dsc_with_getter_values(
480        paths_and_values: Vec<(&str, &str)>,
481    ) -> DynamicSamplingContext {
482        let mut dsc = DynamicSamplingContext {
483            trace_id: "67e5504410b1426f9247bb680e5fe0c8".parse().unwrap(),
484            public_key: "12345678123456781234567812345678".parse().unwrap(),
485            release: None,
486            environment: None,
487            transaction: None,
488            sample_rate: None,
489            user: TraceUserContext::default(),
490            replay_id: None,
491            sampled: None,
492            other: Default::default(),
493        };
494
495        for (path, value) in paths_and_values {
496            match path {
497                "trace.release" => dsc.release = Some(value.to_owned()),
498                "trace.environment" => dsc.environment = Some(value.to_owned()),
499                "trace.user.id" => value.clone_into(&mut dsc.user.user_id),
500                "trace.user.segment" => value.clone_into(&mut dsc.user.user_segment),
501                "trace.transaction" => dsc.transaction = Some(value.to_owned()),
502                "trace.replay_id" => dsc.replay_id = Some(Uuid::from_str(value).unwrap()),
503                _ => panic!("invalid path"),
504            }
505        }
506
507        dsc
508    }
509
510    async fn is_match(
511        now: DateTime<Utc>,
512        rule: &SamplingRule,
513        dsc: &DynamicSamplingContext,
514    ) -> bool {
515        SamplingEvaluator::new(now)
516            .match_rules(Uuid::default(), dsc, std::iter::once(rule))
517            .await
518            .is_break()
519    }
520
521    #[tokio::test]
522    async fn test_reservoir_evaluator_limit() {
523        let evaluator = mock_reservoir_evaluator(vec![(1, 0)]);
524
525        let rule = RuleId(1);
526        let limit = 3;
527
528        assert!(evaluator.evaluate(rule, limit, None).await);
529        assert!(evaluator.evaluate(rule, limit, None).await);
530        assert!(evaluator.evaluate(rule, limit, None).await);
531        // After 3 samples we have reached the limit, and the following rules are not sampled.
532        assert!(!evaluator.evaluate(rule, limit, None).await);
533        assert!(!evaluator.evaluate(rule, limit, None).await);
534    }
535
536    #[tokio::test]
537    async fn test_sample_rate_compounding() {
538        let rules = simple_sampling_rules(vec![
539            (RuleCondition::all(), SamplingValue::Factor { value: 0.8 }),
540            (RuleCondition::all(), SamplingValue::Factor { value: 0.5 }),
541            (
542                RuleCondition::all(),
543                SamplingValue::SampleRate { value: 0.25 },
544            ),
545        ]);
546        let dsc = mocked_dsc_with_getter_values(vec![]);
547
548        // 0.8 * 0.5 * 0.25 == 0.1
549        assert_eq!(get_sampling_match(&rules, &dsc).await.sample_rate(), 0.1);
550    }
551
552    #[tokio::test]
553    async fn test_minimum_sample_rate() {
554        let rules = simple_sampling_rules(vec![
555            (RuleCondition::all(), SamplingValue::Factor { value: 1.5 }),
556            (
557                RuleCondition::all(),
558                SamplingValue::MinimumSampleRate { value: 0.5 },
559            ),
560            // Only the first matching minimum is applied.
561            (
562                RuleCondition::all(),
563                SamplingValue::MinimumSampleRate { value: 1.0 },
564            ),
565            (
566                RuleCondition::all(),
567                SamplingValue::SampleRate { value: 0.05 },
568            ),
569        ]);
570        let dsc = mocked_dsc_with_getter_values(vec![]);
571
572        // max(0.05, 0.5) * 1.5 = 0.75
573        assert_eq!(get_sampling_match(&rules, &dsc).await.sample_rate(), 0.75);
574    }
575
576    fn mocked_sampling_rule() -> SamplingRule {
577        SamplingRule {
578            condition: RuleCondition::all(),
579            sampling_value: SamplingValue::SampleRate { value: 1.0 },
580            ty: RuleType::Trace,
581            id: RuleId(0),
582            time_range: Default::default(),
583            decaying_fn: Default::default(),
584        }
585    }
586
587    /// Helper function to quickly construct many rules with their condition and value, and a unique id,
588    /// so the caller can easily check which rules are matching.
589    fn simple_sampling_rules(vals: Vec<(RuleCondition, SamplingValue)>) -> Vec<SamplingRule> {
590        let mut vec = vec![];
591
592        for (i, val) in vals.into_iter().enumerate() {
593            let (condition, sampling_value) = val;
594            vec.push(SamplingRule {
595                condition,
596                sampling_value,
597                ty: RuleType::Trace,
598                id: RuleId(i as u32),
599                time_range: Default::default(),
600                decaying_fn: Default::default(),
601            });
602        }
603        vec
604    }
605
606    /// Tests that reservoir rules override the other rules.
607    ///
608    /// Here all 3 rules are a match. But when the reservoir
609    /// rule (id = 1) has not yet reached its limit of "2" matches, the
610    /// previous rule(s) will not be present in the matched rules output.
611    /// After the limit has been reached, the reservoir rule is ignored
612    /// and the output is the two other rules (id = 0, id = 2).
613    #[tokio::test]
614    async fn test_reservoir_override() {
615        let dsc = mocked_dsc_with_getter_values(vec![]);
616        let rules = simple_sampling_rules(vec![
617            (RuleCondition::all(), SamplingValue::Factor { value: 0.5 }),
618            // The reservoir has a limit of 2, meaning it should be sampled twice
619            // before it is ignored.
620            (RuleCondition::all(), SamplingValue::Reservoir { limit: 2 }),
621            (
622                RuleCondition::all(),
623                SamplingValue::SampleRate { value: 0.5 },
624            ),
625        ]);
626
627        // The reservoir keeps the counter state behind a mutex, which is how it
628        // shares state among multiple evaluator instances.
629        let reservoir = mock_reservoir_evaluator(vec![]);
630
631        let evaluator = SamplingEvaluator::new_with_reservoir(Utc::now(), &reservoir);
632        let matched_rules = get_matched_rules(
633            &evaluator
634                .match_rules(Uuid::default(), &dsc, rules.iter())
635                .await,
636        );
637        // Reservoir rule overrides 0 and 2.
638        assert_eq!(&matched_rules, &[1]);
639
640        let evaluator = SamplingEvaluator::new_with_reservoir(Utc::now(), &reservoir);
641        let matched_rules = get_matched_rules(
642            &evaluator
643                .match_rules(Uuid::default(), &dsc, rules.iter())
644                .await,
645        );
646        // Reservoir rule overrides 0 and 2.
647        assert_eq!(&matched_rules, &[1]);
648
649        let evaluator = SamplingEvaluator::new_with_reservoir(Utc::now(), &reservoir);
650        let matched_rules = get_matched_rules(
651            &evaluator
652                .match_rules(Uuid::default(), &dsc, rules.iter())
653                .await,
654        );
655        // Reservoir rule reached its limit, rule 0 and 2 are now matched instead.
656        assert_eq!(&matched_rules, &[0, 2]);
657    }
658
659    /// Checks that rules don't match if the time is outside the time range.
660    #[tokio::test]
661    async fn test_expired_rules() {
662        let rule = SamplingRule {
663            condition: RuleCondition::all(),
664            sampling_value: SamplingValue::SampleRate { value: 1.0 },
665            ty: RuleType::Trace,
666            id: RuleId(0),
667            time_range: TimeRange {
668                start: Some(Utc.with_ymd_and_hms(1970, 10, 10, 0, 0, 0).unwrap()),
669                end: Some(Utc.with_ymd_and_hms(1970, 10, 12, 0, 0, 0).unwrap()),
670            },
671            decaying_fn: Default::default(),
672        };
673
674        let dsc = mocked_dsc_with_getter_values(vec![]);
675
676        // Baseline test.
677        let within_timerange = Utc.with_ymd_and_hms(1970, 10, 11, 0, 0, 0).unwrap();
678        let res = SamplingEvaluator::new(within_timerange)
679            .match_rules(Uuid::default(), &dsc, [rule.clone()].iter())
680            .await;
681        assert!(evaluation_is_match(res));
682
683        let before_timerange = Utc.with_ymd_and_hms(1969, 1, 1, 0, 0, 0).unwrap();
684        let res = SamplingEvaluator::new(before_timerange)
685            .match_rules(Uuid::default(), &dsc, [rule.clone()].iter())
686            .await;
687        assert!(!evaluation_is_match(res));
688
689        let after_timerange = Utc.with_ymd_and_hms(1971, 1, 1, 0, 0, 0).unwrap();
690        let res = SamplingEvaluator::new(after_timerange)
691            .match_rules(Uuid::default(), &dsc, [rule].iter())
692            .await;
693        assert!(!evaluation_is_match(res));
694    }
695
696    /// Checks that `SamplingValueEvaluator` correctly matches the right rules.
697    #[tokio::test]
698    async fn test_condition_matching() {
699        let rules = simple_sampling_rules(vec![
700            (
701                RuleCondition::glob("trace.transaction", "*healthcheck*"),
702                SamplingValue::SampleRate { value: 1.0 },
703            ),
704            (
705                RuleCondition::glob("trace.environment", "*dev*"),
706                SamplingValue::SampleRate { value: 1.0 },
707            ),
708            (
709                RuleCondition::eq_ignore_case("trace.transaction", "raboof"),
710                SamplingValue::Factor { value: 1.0 },
711            ),
712            (
713                RuleCondition::glob("trace.release", "1.1.1")
714                    & RuleCondition::eq_ignore_case("trace.user.segment", "vip"),
715                SamplingValue::SampleRate { value: 1.0 },
716            ),
717            (
718                RuleCondition::eq_ignore_case("trace.release", "1.1.1")
719                    & RuleCondition::eq_ignore_case("trace.environment", "prod"),
720                SamplingValue::Factor { value: 1.0 },
721            ),
722            (
723                RuleCondition::all(),
724                SamplingValue::SampleRate { value: 1.0 },
725            ),
726        ]);
727
728        // early return of first rule
729        let dsc = mocked_dsc_with_getter_values(vec![("trace.transaction", "foohealthcheckbar")]);
730        assert!(matches_rule_ids(&[0], &rules, &dsc).await);
731
732        // early return of second rule
733        let dsc = mocked_dsc_with_getter_values(vec![("trace.environment", "dev")]);
734        assert!(matches_rule_ids(&[1], &rules, &dsc).await);
735
736        // factor match third rule and early return sixth rule
737        let dsc = mocked_dsc_with_getter_values(vec![("trace.transaction", "raboof")]);
738        assert!(matches_rule_ids(&[2, 5], &rules, &dsc).await);
739
740        // factor match third rule and early return fourth rule
741        let dsc = mocked_dsc_with_getter_values(vec![
742            ("trace.transaction", "raboof"),
743            ("trace.release", "1.1.1"),
744            ("trace.user.segment", "vip"),
745        ]);
746        assert!(matches_rule_ids(&[2, 3], &rules, &dsc).await);
747
748        // factor match third, fifth rule and early return sixth rule
749        let dsc = mocked_dsc_with_getter_values(vec![
750            ("trace.transaction", "raboof"),
751            ("trace.release", "1.1.1"),
752            ("trace.environment", "prod"),
753        ]);
754        assert!(matches_rule_ids(&[2, 4, 5], &rules, &dsc).await);
755
756        // factor match fifth and early return sixth rule
757        let dsc = mocked_dsc_with_getter_values(vec![
758            ("trace.release", "1.1.1"),
759            ("trace.environment", "prod"),
760        ]);
761        assert!(matches_rule_ids(&[4, 5], &rules, &dsc).await);
762    }
763
764    #[test]
765    /// Test that we get the same sampling decision from the same trace id
766    fn test_repeatable_seed() {
767        let val1 = pseudo_random_from_seed(Uuid::default());
768        let val2 = pseudo_random_from_seed(Uuid::default());
769        assert!(val1 + f64::EPSILON > val2 && val2 + f64::EPSILON > val1);
770    }
771
772    #[test]
773    /// Tests if the MatchedRuleIds struct is displayed correctly as string.
774    fn matched_rule_ids_display() {
775        let matched_rule_ids = MatchedRuleIds(vec![RuleId(123), RuleId(456)]);
776        assert_eq!(matched_rule_ids.to_string(), "123,456");
777
778        let matched_rule_ids = MatchedRuleIds(vec![RuleId(123)]);
779        assert_eq!(matched_rule_ids.to_string(), "123");
780
781        let matched_rule_ids = MatchedRuleIds(vec![]);
782        assert_eq!(matched_rule_ids.to_string(), "")
783    }
784
785    #[test]
786    /// Tests if the MatchRuleIds struct is created correctly from its string representation.
787    fn matched_rule_ids_parse() {
788        assert_eq!(
789            MatchedRuleIds::parse("123,456"),
790            Ok(MatchedRuleIds(vec![RuleId(123), RuleId(456)]))
791        );
792
793        assert_eq!(
794            MatchedRuleIds::parse("123"),
795            Ok(MatchedRuleIds(vec![RuleId(123)]))
796        );
797
798        assert!(MatchedRuleIds::parse("").is_err());
799
800        assert!(MatchedRuleIds::parse(",").is_err());
801
802        assert!(MatchedRuleIds::parse("123.456").is_err());
803
804        assert!(MatchedRuleIds::parse("a,b").is_err());
805    }
806
807    #[tokio::test]
808    /// Tests that no match is done when there are no matching rules.
809    async fn test_get_sampling_match_result_with_no_match() {
810        let dsc = mocked_dsc_with_getter_values(vec![]);
811
812        let res = SamplingEvaluator::new(Utc::now())
813            .match_rules(Uuid::default(), &dsc, [].iter())
814            .await;
815
816        assert!(!evaluation_is_match(res));
817    }
818
819    /// Validates the early return (and hence no match) of the `match_rules` function if the current
820    /// time is out of bounds of the time range.
821    /// When the `start` or `end` of the range is missing, it defaults to always include
822    /// times before the `end` or after the `start`, respectively.
823    #[tokio::test]
824    async fn test_sample_rate_valid_time_range() {
825        let dsc = mocked_dsc_with_getter_values(vec![]);
826        let time_range = TimeRange {
827            start: Some(Utc.with_ymd_and_hms(1970, 1, 1, 0, 0, 0).unwrap()),
828            end: Some(Utc.with_ymd_and_hms(1980, 1, 1, 0, 0, 0).unwrap()),
829        };
830
831        let before_time_range = Utc.with_ymd_and_hms(1969, 1, 1, 0, 0, 0).unwrap();
832        let during_time_range = Utc.with_ymd_and_hms(1975, 1, 1, 0, 0, 0).unwrap();
833        let after_time_range = Utc.with_ymd_and_hms(1981, 1, 1, 0, 0, 0).unwrap();
834
835        let rule = SamplingRule {
836            condition: RuleCondition::all(),
837            sampling_value: SamplingValue::SampleRate { value: 1.0 },
838            ty: RuleType::Trace,
839            id: RuleId(0),
840            time_range,
841            decaying_fn: DecayingFunction::Constant,
842        };
843
844        // [start..end]
845        assert!(!is_match(before_time_range, &rule, &dsc).await);
846        assert!(is_match(during_time_range, &rule, &dsc).await);
847        assert!(!is_match(after_time_range, &rule, &dsc).await);
848
849        // [start..]
850        let mut rule_without_end = rule.clone();
851        rule_without_end.time_range.end = None;
852        assert!(!is_match(before_time_range, &rule_without_end, &dsc).await);
853        assert!(is_match(during_time_range, &rule_without_end, &dsc).await);
854        assert!(is_match(after_time_range, &rule_without_end, &dsc).await);
855
856        // [..end]
857        let mut rule_without_start = rule.clone();
858        rule_without_start.time_range.start = None;
859        assert!(is_match(before_time_range, &rule_without_start, &dsc).await);
860        assert!(is_match(during_time_range, &rule_without_start, &dsc).await);
861        assert!(!is_match(after_time_range, &rule_without_start, &dsc).await);
862
863        // [..]
864        let mut rule_without_range = rule.clone();
865        rule_without_range.time_range = TimeRange::default();
866        assert!(is_match(before_time_range, &rule_without_range, &dsc).await);
867        assert!(is_match(during_time_range, &rule_without_range, &dsc).await);
868        assert!(is_match(after_time_range, &rule_without_range, &dsc).await);
869    }
870
871    /// Checks that `validate_match` yields the correct controlflow given the SamplingValue variant.
872    #[tokio::test]
873    async fn test_validate_match() {
874        let mut rule = mocked_sampling_rule();
875
876        let reservoir = ReservoirEvaluator::new(ReservoirCounters::default());
877        let mut eval = SamplingEvaluator::new_with_reservoir(Utc::now(), &reservoir);
878
879        rule.sampling_value = SamplingValue::SampleRate { value: 1.0 };
880        assert_eq!(eval.try_compute_sample_rate(&rule).await, Some(1.0));
881
882        rule.sampling_value = SamplingValue::Factor { value: 1.0 };
883        assert_eq!(eval.try_compute_sample_rate(&rule).await, None);
884
885        rule.sampling_value = SamplingValue::Reservoir { limit: 1 };
886        assert_eq!(eval.try_compute_sample_rate(&rule).await, Some(1.0));
887    }
888}