1use std::collections::BTreeMap;
4use std::fmt;
5use std::num::ParseIntError;
6use std::ops::ControlFlow;
7use std::sync::{Arc, Mutex};
8
9use chrono::{DateTime, Utc};
10use rand::distributions::Uniform;
11use rand::Rng;
12use rand_pcg::Pcg32;
13#[cfg(feature = "redis")]
14use relay_base_schema::organization::OrganizationId;
15use relay_protocol::Getter;
16#[cfg(feature = "redis")]
17use relay_redis::AsyncRedisClient;
18use serde::Serialize;
19use uuid::Uuid;
20
21use crate::config::{RuleId, SamplingRule, SamplingValue};
22#[cfg(feature = "redis")]
23use crate::redis_sampling::{self, ReservoirRuleKey};
24
25fn pseudo_random_from_seed(seed: Uuid) -> f64 {
29 let seed_number = seed.as_u128();
30 let mut generator = Pcg32::new((seed_number >> 64) as u64, seed_number as u64);
31 let dist = Uniform::new(0f64, 1f64);
32 generator.sample(dist)
33}
34
35pub type ReservoirCounters = Arc<Mutex<BTreeMap<RuleId, i64>>>;
37
38#[derive(Debug)]
51pub struct ReservoirEvaluator<'a> {
52 counters: ReservoirCounters,
53 #[cfg(feature = "redis")]
54 org_id_and_client: Option<(OrganizationId, &'a AsyncRedisClient)>,
55 _phantom: std::marker::PhantomData<&'a ()>,
57}
58
59impl ReservoirEvaluator<'_> {
60 pub fn new(counters: ReservoirCounters) -> Self {
62 Self {
63 counters,
64 #[cfg(feature = "redis")]
65 org_id_and_client: None,
66 _phantom: std::marker::PhantomData,
67 }
68 }
69
70 pub fn counters(&self) -> ReservoirCounters {
72 Arc::clone(&self.counters)
73 }
74
75 #[cfg(feature = "redis")]
76 async fn redis_incr(
77 &self,
78 key: &ReservoirRuleKey,
79 client: &AsyncRedisClient,
80 rule_expiry: Option<&DateTime<Utc>>,
81 ) -> anyhow::Result<i64> {
82 let mut connection = client.get_connection().await?;
83
84 let val = redis_sampling::increment_redis_reservoir_count(&mut connection, key).await?;
85 redis_sampling::set_redis_expiry(&mut connection, key, rule_expiry).await?;
86
87 Ok(val)
88 }
89
90 pub fn incr_local(&self, rule: RuleId, limit: i64) -> bool {
92 let Ok(mut map_guard) = self.counters.lock() else {
93 relay_log::error!("failed to lock reservoir counter mutex");
94 return false;
95 };
96
97 let counter_value = map_guard.entry(rule).or_insert(0);
98
99 if *counter_value < limit {
100 *counter_value += 1;
101 true
102 } else {
103 false
104 }
105 }
106
107 pub async fn evaluate(
109 &self,
110 rule: RuleId,
111 limit: i64,
112 _rule_expiry: Option<&DateTime<Utc>>,
113 ) -> bool {
114 #[cfg(feature = "redis")]
115 if let Some((org_id, client)) = self.org_id_and_client {
116 if let Ok(guard) = self.counters.lock() {
117 if *guard.get(&rule).unwrap_or(&0) > limit {
118 return false;
119 }
120 }
121
122 let key = ReservoirRuleKey::new(org_id, rule);
123 let redis_count = match self.redis_incr(&key, client, _rule_expiry).await {
124 Ok(redis_count) => redis_count,
125 Err(e) => {
126 relay_log::error!(error = &*e, "failed to increment reservoir rule");
127 return false;
128 }
129 };
130
131 if let Ok(mut map_guard) = self.counters.lock() {
132 if let Some(value) = map_guard.get_mut(&rule) {
135 *value = redis_count.max(*value);
136 }
137 }
138 return redis_count <= limit;
139 }
140
141 self.incr_local(rule, limit)
142 }
143}
144
145#[cfg(feature = "redis")]
146impl<'a> ReservoirEvaluator<'a> {
147 pub fn set_redis(&mut self, org_id: OrganizationId, client: &'a AsyncRedisClient) {
151 self.org_id_and_client = Some((org_id, client));
152 }
153}
154
155#[derive(Debug)]
157pub struct SamplingEvaluator<'a> {
158 now: DateTime<Utc>,
159 rule_ids: Vec<RuleId>,
160 factor: f64,
161 reservoir: Option<&'a ReservoirEvaluator<'a>>,
162}
163
164impl<'a> SamplingEvaluator<'a> {
165 pub fn new_with_reservoir(now: DateTime<Utc>, reservoir: &'a ReservoirEvaluator<'a>) -> Self {
167 Self {
168 now,
169 rule_ids: vec![],
170 factor: 1.0,
171 reservoir: Some(reservoir),
172 }
173 }
174
175 pub fn new(now: DateTime<Utc>) -> Self {
177 Self {
178 now,
179 rule_ids: vec![],
180 factor: 1.0,
181 reservoir: None,
182 }
183 }
184
185 pub async fn match_rules<'b, I, G>(
197 mut self,
198 seed: Uuid,
199 instance: &G,
200 rules: I,
201 ) -> ControlFlow<SamplingMatch, Self>
202 where
203 G: Getter,
204 I: Iterator<Item = &'b SamplingRule>,
205 {
206 for rule in rules {
207 if !rule.time_range.contains(self.now) || !rule.condition.matches(instance) {
208 continue;
209 };
210
211 if let Some(sample_rate) = self.try_compute_sample_rate(rule).await {
212 return ControlFlow::Break(SamplingMatch::new(sample_rate, seed, self.rule_ids));
213 };
214 }
215
216 ControlFlow::Continue(self)
217 }
218
219 async fn try_compute_sample_rate(&mut self, rule: &SamplingRule) -> Option<f64> {
227 match rule.sampling_value {
228 SamplingValue::Factor { value } => {
229 self.factor *= rule.apply_decaying_fn(value, self.now)?;
230 self.rule_ids.push(rule.id);
231 None
232 }
233 SamplingValue::SampleRate { value } => {
234 let sample_rate = rule.apply_decaying_fn(value, self.now)?;
235 let adjusted = (sample_rate * self.factor).clamp(0.0, 1.0);
236
237 self.rule_ids.push(rule.id);
238 Some(adjusted)
239 }
240 SamplingValue::Reservoir { limit } => {
241 let reservoir = self.reservoir?;
242 if !reservoir
243 .evaluate(rule.id, limit, rule.time_range.end.as_ref())
244 .await
245 {
246 return None;
247 }
248
249 self.rule_ids.clear();
251 self.rule_ids.push(rule.id);
252 Some(1.0)
254 }
255 }
256 }
257}
258
259fn sampling_match(sample_rate: f64, seed: Uuid) -> SamplingDecision {
260 if sample_rate <= 0.0 {
261 return SamplingDecision::Drop;
262 } else if sample_rate >= 1.0 {
263 return SamplingDecision::Keep;
264 }
265
266 let random_number = pseudo_random_from_seed(seed);
267 relay_log::trace!(
268 sample_rate,
269 random_number,
270 "applying dynamic sampling to matching event"
271 );
272
273 if random_number >= sample_rate {
274 relay_log::trace!("dropping event that matched the configuration");
275 SamplingDecision::Drop
276 } else {
277 relay_log::trace!("keeping event that matched the configuration");
278 SamplingDecision::Keep
279 }
280}
281
282#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
284pub enum SamplingDecision {
285 Keep,
287 Drop,
289}
290
291impl SamplingDecision {
292 pub fn is_keep(self) -> bool {
294 matches!(self, Self::Keep)
295 }
296
297 pub fn is_drop(self) -> bool {
299 matches!(self, Self::Drop)
300 }
301
302 pub fn as_str(self) -> &'static str {
304 match self {
305 Self::Keep => "keep",
306 Self::Drop => "drop",
307 }
308 }
309}
310
311impl fmt::Display for SamplingDecision {
312 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
313 write!(f, "{}", self.as_str())
314 }
315}
316
317#[derive(Clone, Debug, PartialEq)]
319pub struct SamplingMatch {
320 sample_rate: f64,
322 seed: Uuid,
328 matched_rules: MatchedRuleIds,
330 decision: SamplingDecision,
334}
335
336impl SamplingMatch {
337 fn new(sample_rate: f64, seed: Uuid, matched_rules: Vec<RuleId>) -> Self {
338 let matched_rules = MatchedRuleIds(matched_rules);
339 let decision = sampling_match(sample_rate, seed);
340
341 Self {
342 sample_rate,
343 seed,
344 matched_rules,
345 decision,
346 }
347 }
348
349 pub fn sample_rate(&self) -> f64 {
351 self.sample_rate
352 }
353
354 pub fn into_matched_rules(self) -> MatchedRuleIds {
359 self.matched_rules
360 }
361
362 pub fn decision(&self) -> SamplingDecision {
364 self.decision
365 }
366}
367
368#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
370pub struct MatchedRuleIds(pub Vec<RuleId>);
371
372impl MatchedRuleIds {
373 pub fn parse(value: &str) -> Result<MatchedRuleIds, ParseIntError> {
381 let mut rule_ids = vec![];
382
383 for rule_id in value.split(',') {
384 rule_ids.push(RuleId(rule_id.parse()?));
385 }
386
387 Ok(MatchedRuleIds(rule_ids))
388 }
389}
390
391impl fmt::Display for MatchedRuleIds {
392 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
393 for (i, rule_id) in self.0.iter().enumerate() {
394 if i > 0 {
395 write!(f, ",")?;
396 }
397 write!(f, "{rule_id}")?;
398 }
399
400 Ok(())
401 }
402}
403
404#[cfg(test)]
405mod tests {
406 use chrono::TimeZone;
407 use relay_protocol::RuleCondition;
408 use similar_asserts::assert_eq;
409 use std::str::FromStr;
410 use uuid::Uuid;
411
412 use crate::config::{DecayingFunction, RuleType, TimeRange};
413 use crate::dsc::TraceUserContext;
414 use crate::DynamicSamplingContext;
415
416 use super::*;
417
418 fn mock_reservoir_evaluator(vals: Vec<(u32, i64)>) -> ReservoirEvaluator<'static> {
419 let mut map = BTreeMap::default();
420
421 for (rule_id, count) in vals {
422 map.insert(RuleId(rule_id), count);
423 }
424
425 let map = Arc::new(Mutex::new(map));
426
427 ReservoirEvaluator::new(map)
428 }
429
430 async fn get_sampling_match(rules: &[SamplingRule], instance: &impl Getter) -> SamplingMatch {
432 match SamplingEvaluator::new(Utc::now())
433 .match_rules(Uuid::default(), instance, rules.iter())
434 .await
435 {
436 ControlFlow::Break(sampling_match) => sampling_match,
437 ControlFlow::Continue(_) => panic!("no match found"),
438 }
439 }
440
441 fn evaluation_is_match(res: ControlFlow<SamplingMatch, SamplingEvaluator>) -> bool {
442 matches!(res, ControlFlow::Break(_))
443 }
444
445 async fn matches_rule_ids(
447 rule_ids: &[u32],
448 rules: &[SamplingRule],
449 instance: &impl Getter,
450 ) -> bool {
451 let matched_rule_ids = MatchedRuleIds(rule_ids.iter().map(|num| RuleId(*num)).collect());
452 let sampling_match = get_sampling_match(rules, instance).await;
453 matched_rule_ids == sampling_match.matched_rules
454 }
455
456 fn get_matched_rules(
458 sampling_evaluator: &ControlFlow<SamplingMatch, SamplingEvaluator>,
459 ) -> Vec<u32> {
460 match sampling_evaluator {
461 ControlFlow::Continue(_) => panic!("expected a sampling match"),
462 ControlFlow::Break(m) => m.matched_rules.0.iter().map(|rule_id| rule_id.0).collect(),
463 }
464 }
465
466 fn mocked_dsc_with_getter_values(
468 paths_and_values: Vec<(&str, &str)>,
469 ) -> DynamicSamplingContext {
470 let mut dsc = DynamicSamplingContext {
471 trace_id: "67e5504410b1426f9247bb680e5fe0c8".parse().unwrap(),
472 public_key: "12345678123456781234567812345678".parse().unwrap(),
473 release: None,
474 environment: None,
475 transaction: None,
476 sample_rate: None,
477 user: TraceUserContext::default(),
478 replay_id: None,
479 sampled: None,
480 other: Default::default(),
481 };
482
483 for (path, value) in paths_and_values {
484 match path {
485 "trace.release" => dsc.release = Some(value.to_owned()),
486 "trace.environment" => dsc.environment = Some(value.to_owned()),
487 "trace.user.id" => value.clone_into(&mut dsc.user.user_id),
488 "trace.user.segment" => value.clone_into(&mut dsc.user.user_segment),
489 "trace.transaction" => dsc.transaction = Some(value.to_owned()),
490 "trace.replay_id" => dsc.replay_id = Some(Uuid::from_str(value).unwrap()),
491 _ => panic!("invalid path"),
492 }
493 }
494
495 dsc
496 }
497
498 async fn is_match(
499 now: DateTime<Utc>,
500 rule: &SamplingRule,
501 dsc: &DynamicSamplingContext,
502 ) -> bool {
503 SamplingEvaluator::new(now)
504 .match_rules(Uuid::default(), dsc, std::iter::once(rule))
505 .await
506 .is_break()
507 }
508
509 #[tokio::test]
510 async fn test_reservoir_evaluator_limit() {
511 let evaluator = mock_reservoir_evaluator(vec![(1, 0)]);
512
513 let rule = RuleId(1);
514 let limit = 3;
515
516 assert!(evaluator.evaluate(rule, limit, None).await);
517 assert!(evaluator.evaluate(rule, limit, None).await);
518 assert!(evaluator.evaluate(rule, limit, None).await);
519 assert!(!evaluator.evaluate(rule, limit, None).await);
521 assert!(!evaluator.evaluate(rule, limit, None).await);
522 }
523
524 #[tokio::test]
525 async fn test_sample_rate_compounding() {
526 let rules = simple_sampling_rules(vec![
527 (RuleCondition::all(), SamplingValue::Factor { value: 0.8 }),
528 (RuleCondition::all(), SamplingValue::Factor { value: 0.5 }),
529 (
530 RuleCondition::all(),
531 SamplingValue::SampleRate { value: 0.25 },
532 ),
533 ]);
534 let dsc = mocked_dsc_with_getter_values(vec![]);
535
536 assert_eq!(get_sampling_match(&rules, &dsc).await.sample_rate(), 0.1);
538 }
539
540 fn mocked_sampling_rule() -> SamplingRule {
541 SamplingRule {
542 condition: RuleCondition::all(),
543 sampling_value: SamplingValue::SampleRate { value: 1.0 },
544 ty: RuleType::Trace,
545 id: RuleId(0),
546 time_range: Default::default(),
547 decaying_fn: Default::default(),
548 }
549 }
550
551 fn simple_sampling_rules(vals: Vec<(RuleCondition, SamplingValue)>) -> Vec<SamplingRule> {
554 let mut vec = vec![];
555
556 for (i, val) in vals.into_iter().enumerate() {
557 let (condition, sampling_value) = val;
558 vec.push(SamplingRule {
559 condition,
560 sampling_value,
561 ty: RuleType::Trace,
562 id: RuleId(i as u32),
563 time_range: Default::default(),
564 decaying_fn: Default::default(),
565 });
566 }
567 vec
568 }
569
570 #[tokio::test]
578 async fn test_reservoir_override() {
579 let dsc = mocked_dsc_with_getter_values(vec![]);
580 let rules = simple_sampling_rules(vec![
581 (RuleCondition::all(), SamplingValue::Factor { value: 0.5 }),
582 (RuleCondition::all(), SamplingValue::Reservoir { limit: 2 }),
585 (
586 RuleCondition::all(),
587 SamplingValue::SampleRate { value: 0.5 },
588 ),
589 ]);
590
591 let reservoir = mock_reservoir_evaluator(vec![]);
594
595 let evaluator = SamplingEvaluator::new_with_reservoir(Utc::now(), &reservoir);
596 let matched_rules = get_matched_rules(
597 &evaluator
598 .match_rules(Uuid::default(), &dsc, rules.iter())
599 .await,
600 );
601 assert_eq!(&matched_rules, &[1]);
603
604 let evaluator = SamplingEvaluator::new_with_reservoir(Utc::now(), &reservoir);
605 let matched_rules = get_matched_rules(
606 &evaluator
607 .match_rules(Uuid::default(), &dsc, rules.iter())
608 .await,
609 );
610 assert_eq!(&matched_rules, &[1]);
612
613 let evaluator = SamplingEvaluator::new_with_reservoir(Utc::now(), &reservoir);
614 let matched_rules = get_matched_rules(
615 &evaluator
616 .match_rules(Uuid::default(), &dsc, rules.iter())
617 .await,
618 );
619 assert_eq!(&matched_rules, &[0, 2]);
621 }
622
623 #[tokio::test]
625 async fn test_expired_rules() {
626 let rule = SamplingRule {
627 condition: RuleCondition::all(),
628 sampling_value: SamplingValue::SampleRate { value: 1.0 },
629 ty: RuleType::Trace,
630 id: RuleId(0),
631 time_range: TimeRange {
632 start: Some(Utc.with_ymd_and_hms(1970, 10, 10, 0, 0, 0).unwrap()),
633 end: Some(Utc.with_ymd_and_hms(1970, 10, 12, 0, 0, 0).unwrap()),
634 },
635 decaying_fn: Default::default(),
636 };
637
638 let dsc = mocked_dsc_with_getter_values(vec![]);
639
640 let within_timerange = Utc.with_ymd_and_hms(1970, 10, 11, 0, 0, 0).unwrap();
642 let res = SamplingEvaluator::new(within_timerange)
643 .match_rules(Uuid::default(), &dsc, [rule.clone()].iter())
644 .await;
645 assert!(evaluation_is_match(res));
646
647 let before_timerange = Utc.with_ymd_and_hms(1969, 1, 1, 0, 0, 0).unwrap();
648 let res = SamplingEvaluator::new(before_timerange)
649 .match_rules(Uuid::default(), &dsc, [rule.clone()].iter())
650 .await;
651 assert!(!evaluation_is_match(res));
652
653 let after_timerange = Utc.with_ymd_and_hms(1971, 1, 1, 0, 0, 0).unwrap();
654 let res = SamplingEvaluator::new(after_timerange)
655 .match_rules(Uuid::default(), &dsc, [rule].iter())
656 .await;
657 assert!(!evaluation_is_match(res));
658 }
659
660 #[tokio::test]
662 async fn test_condition_matching() {
663 let rules = simple_sampling_rules(vec![
664 (
665 RuleCondition::glob("trace.transaction", "*healthcheck*"),
666 SamplingValue::SampleRate { value: 1.0 },
667 ),
668 (
669 RuleCondition::glob("trace.environment", "*dev*"),
670 SamplingValue::SampleRate { value: 1.0 },
671 ),
672 (
673 RuleCondition::eq_ignore_case("trace.transaction", "raboof"),
674 SamplingValue::Factor { value: 1.0 },
675 ),
676 (
677 RuleCondition::glob("trace.release", "1.1.1")
678 & RuleCondition::eq_ignore_case("trace.user.segment", "vip"),
679 SamplingValue::SampleRate { value: 1.0 },
680 ),
681 (
682 RuleCondition::eq_ignore_case("trace.release", "1.1.1")
683 & RuleCondition::eq_ignore_case("trace.environment", "prod"),
684 SamplingValue::Factor { value: 1.0 },
685 ),
686 (
687 RuleCondition::all(),
688 SamplingValue::SampleRate { value: 1.0 },
689 ),
690 ]);
691
692 let dsc = mocked_dsc_with_getter_values(vec![("trace.transaction", "foohealthcheckbar")]);
694 assert!(matches_rule_ids(&[0], &rules, &dsc).await);
695
696 let dsc = mocked_dsc_with_getter_values(vec![("trace.environment", "dev")]);
698 assert!(matches_rule_ids(&[1], &rules, &dsc).await);
699
700 let dsc = mocked_dsc_with_getter_values(vec![("trace.transaction", "raboof")]);
702 assert!(matches_rule_ids(&[2, 5], &rules, &dsc).await);
703
704 let dsc = mocked_dsc_with_getter_values(vec![
706 ("trace.transaction", "raboof"),
707 ("trace.release", "1.1.1"),
708 ("trace.user.segment", "vip"),
709 ]);
710 assert!(matches_rule_ids(&[2, 3], &rules, &dsc).await);
711
712 let dsc = mocked_dsc_with_getter_values(vec![
714 ("trace.transaction", "raboof"),
715 ("trace.release", "1.1.1"),
716 ("trace.environment", "prod"),
717 ]);
718 assert!(matches_rule_ids(&[2, 4, 5], &rules, &dsc).await);
719
720 let dsc = mocked_dsc_with_getter_values(vec![
722 ("trace.release", "1.1.1"),
723 ("trace.environment", "prod"),
724 ]);
725 assert!(matches_rule_ids(&[4, 5], &rules, &dsc).await);
726 }
727
728 #[test]
729 fn test_repeatable_seed() {
731 let val1 = pseudo_random_from_seed(Uuid::default());
732 let val2 = pseudo_random_from_seed(Uuid::default());
733 assert!(val1 + f64::EPSILON > val2 && val2 + f64::EPSILON > val1);
734 }
735
736 #[test]
737 fn matched_rule_ids_display() {
739 let matched_rule_ids = MatchedRuleIds(vec![RuleId(123), RuleId(456)]);
740 assert_eq!(matched_rule_ids.to_string(), "123,456");
741
742 let matched_rule_ids = MatchedRuleIds(vec![RuleId(123)]);
743 assert_eq!(matched_rule_ids.to_string(), "123");
744
745 let matched_rule_ids = MatchedRuleIds(vec![]);
746 assert_eq!(matched_rule_ids.to_string(), "")
747 }
748
749 #[test]
750 fn matched_rule_ids_parse() {
752 assert_eq!(
753 MatchedRuleIds::parse("123,456"),
754 Ok(MatchedRuleIds(vec![RuleId(123), RuleId(456)]))
755 );
756
757 assert_eq!(
758 MatchedRuleIds::parse("123"),
759 Ok(MatchedRuleIds(vec![RuleId(123)]))
760 );
761
762 assert!(MatchedRuleIds::parse("").is_err());
763
764 assert!(MatchedRuleIds::parse(",").is_err());
765
766 assert!(MatchedRuleIds::parse("123.456").is_err());
767
768 assert!(MatchedRuleIds::parse("a,b").is_err());
769 }
770
771 #[tokio::test]
772 async fn test_get_sampling_match_result_with_no_match() {
774 let dsc = mocked_dsc_with_getter_values(vec![]);
775
776 let res = SamplingEvaluator::new(Utc::now())
777 .match_rules(Uuid::default(), &dsc, [].iter())
778 .await;
779
780 assert!(!evaluation_is_match(res));
781 }
782
783 #[tokio::test]
788 async fn test_sample_rate_valid_time_range() {
789 let dsc = mocked_dsc_with_getter_values(vec![]);
790 let time_range = TimeRange {
791 start: Some(Utc.with_ymd_and_hms(1970, 1, 1, 0, 0, 0).unwrap()),
792 end: Some(Utc.with_ymd_and_hms(1980, 1, 1, 0, 0, 0).unwrap()),
793 };
794
795 let before_time_range = Utc.with_ymd_and_hms(1969, 1, 1, 0, 0, 0).unwrap();
796 let during_time_range = Utc.with_ymd_and_hms(1975, 1, 1, 0, 0, 0).unwrap();
797 let after_time_range = Utc.with_ymd_and_hms(1981, 1, 1, 0, 0, 0).unwrap();
798
799 let rule = SamplingRule {
800 condition: RuleCondition::all(),
801 sampling_value: SamplingValue::SampleRate { value: 1.0 },
802 ty: RuleType::Trace,
803 id: RuleId(0),
804 time_range,
805 decaying_fn: DecayingFunction::Constant,
806 };
807
808 assert!(!is_match(before_time_range, &rule, &dsc).await);
810 assert!(is_match(during_time_range, &rule, &dsc).await);
811 assert!(!is_match(after_time_range, &rule, &dsc).await);
812
813 let mut rule_without_end = rule.clone();
815 rule_without_end.time_range.end = None;
816 assert!(!is_match(before_time_range, &rule_without_end, &dsc).await);
817 assert!(is_match(during_time_range, &rule_without_end, &dsc).await);
818 assert!(is_match(after_time_range, &rule_without_end, &dsc).await);
819
820 let mut rule_without_start = rule.clone();
822 rule_without_start.time_range.start = None;
823 assert!(is_match(before_time_range, &rule_without_start, &dsc).await);
824 assert!(is_match(during_time_range, &rule_without_start, &dsc).await);
825 assert!(!is_match(after_time_range, &rule_without_start, &dsc).await);
826
827 let mut rule_without_range = rule.clone();
829 rule_without_range.time_range = TimeRange::default();
830 assert!(is_match(before_time_range, &rule_without_range, &dsc).await);
831 assert!(is_match(during_time_range, &rule_without_range, &dsc).await);
832 assert!(is_match(after_time_range, &rule_without_range, &dsc).await);
833 }
834
835 #[tokio::test]
837 async fn test_validate_match() {
838 let mut rule = mocked_sampling_rule();
839
840 let reservoir = ReservoirEvaluator::new(ReservoirCounters::default());
841 let mut eval = SamplingEvaluator::new_with_reservoir(Utc::now(), &reservoir);
842
843 rule.sampling_value = SamplingValue::SampleRate { value: 1.0 };
844 assert_eq!(eval.try_compute_sample_rate(&rule).await, Some(1.0));
845
846 rule.sampling_value = SamplingValue::Factor { value: 1.0 };
847 assert_eq!(eval.try_compute_sample_rate(&rule).await, None);
848
849 rule.sampling_value = SamplingValue::Reservoir { limit: 1 };
850 assert_eq!(eval.try_compute_sample_rate(&rule).await, Some(1.0));
851 }
852}