1use std::collections::BTreeMap;
4use std::fmt;
5use std::num::ParseIntError;
6use std::ops::ControlFlow;
7use std::sync::{Arc, Mutex};
8
9use chrono::{DateTime, Utc};
10use rand::Rng;
11use rand::distr::StandardUniform;
12use rand_pcg::Pcg32;
13#[cfg(feature = "redis")]
14use relay_base_schema::organization::OrganizationId;
15use relay_protocol::Getter;
16#[cfg(feature = "redis")]
17use relay_redis::AsyncRedisClient;
18use serde::Serialize;
19use uuid::Uuid;
20
21use crate::config::{RuleId, SamplingRule, SamplingValue};
22#[cfg(feature = "redis")]
23use crate::redis_sampling::{self, ReservoirRuleKey};
24
25fn pseudo_random_from_seed(seed: Uuid) -> f64 {
29 let seed_number = seed.as_u128();
30 let mut generator = Pcg32::new((seed_number >> 64) as u64, seed_number as u64);
31 generator.sample(StandardUniform)
32}
33
34pub type ReservoirCounters = Arc<Mutex<BTreeMap<RuleId, i64>>>;
36
37#[derive(Debug)]
50pub struct ReservoirEvaluator<'a> {
51 counters: ReservoirCounters,
52 #[cfg(feature = "redis")]
53 org_id_and_client: Option<(OrganizationId, &'a AsyncRedisClient)>,
54 _phantom: std::marker::PhantomData<&'a ()>,
56}
57
58impl ReservoirEvaluator<'_> {
59 pub fn new(counters: ReservoirCounters) -> Self {
61 Self {
62 counters,
63 #[cfg(feature = "redis")]
64 org_id_and_client: None,
65 _phantom: std::marker::PhantomData,
66 }
67 }
68
69 pub fn counters(&self) -> ReservoirCounters {
71 Arc::clone(&self.counters)
72 }
73
74 #[cfg(feature = "redis")]
75 async fn redis_incr(
76 &self,
77 key: &ReservoirRuleKey,
78 client: &AsyncRedisClient,
79 rule_expiry: Option<&DateTime<Utc>>,
80 ) -> anyhow::Result<i64> {
81 relay_statsd::metric!(timer(crate::statsd::SamplingTimers::RedisReservoir), {
82 let mut connection = client.get_connection().await?;
83
84 let val = redis_sampling::increment_redis_reservoir_count(&mut connection, key).await?;
85 redis_sampling::set_redis_expiry(&mut connection, key, rule_expiry).await?;
86
87 Ok(val)
88 })
89 }
90
91 pub fn incr_local(&self, rule: RuleId, limit: i64) -> bool {
93 let Ok(mut map_guard) = self.counters.lock() else {
94 relay_log::error!("failed to lock reservoir counter mutex");
95 return false;
96 };
97
98 let counter_value = map_guard.entry(rule).or_insert(0);
99
100 if *counter_value < limit {
101 *counter_value += 1;
102 true
103 } else {
104 false
105 }
106 }
107
108 pub async fn evaluate(
110 &self,
111 rule: RuleId,
112 limit: i64,
113 _rule_expiry: Option<&DateTime<Utc>>,
114 ) -> bool {
115 #[cfg(feature = "redis")]
116 if let Some((org_id, client)) = self.org_id_and_client {
117 if let Ok(guard) = self.counters.lock()
118 && *guard.get(&rule).unwrap_or(&0) > limit
119 {
120 return false;
121 }
122
123 let key = ReservoirRuleKey::new(org_id, rule);
124 let redis_count = match self.redis_incr(&key, client, _rule_expiry).await {
125 Ok(redis_count) => redis_count,
126 Err(e) => {
127 relay_log::error!(error = &*e, "failed to increment reservoir rule");
128 return false;
129 }
130 };
131
132 if let Ok(mut map_guard) = self.counters.lock() {
133 if let Some(value) = map_guard.get_mut(&rule) {
136 *value = redis_count.max(*value);
137 }
138 }
139 return redis_count <= limit;
140 }
141
142 self.incr_local(rule, limit)
143 }
144}
145
146#[cfg(feature = "redis")]
147impl<'a> ReservoirEvaluator<'a> {
148 pub fn set_redis(&mut self, org_id: OrganizationId, client: &'a AsyncRedisClient) {
152 self.org_id_and_client = Some((org_id, client));
153 }
154}
155
156#[derive(Debug)]
158pub struct SamplingEvaluator<'a> {
159 now: DateTime<Utc>,
160 rule_ids: Vec<RuleId>,
161 factor: f64,
162 minimum_sample_rate: Option<f64>,
163 reservoir: Option<&'a ReservoirEvaluator<'a>>,
164}
165
166impl<'a> SamplingEvaluator<'a> {
167 pub fn new_with_reservoir(now: DateTime<Utc>, reservoir: &'a ReservoirEvaluator<'a>) -> Self {
169 Self {
170 now,
171 rule_ids: vec![],
172 factor: 1.0,
173 minimum_sample_rate: None,
174 reservoir: Some(reservoir),
175 }
176 }
177
178 pub fn new(now: DateTime<Utc>) -> Self {
180 Self {
181 now,
182 rule_ids: vec![],
183 factor: 1.0,
184 minimum_sample_rate: None,
185 reservoir: None,
186 }
187 }
188
189 pub async fn match_rules<'b, I, G>(
201 mut self,
202 seed: Uuid,
203 instance: &G,
204 rules: I,
205 ) -> ControlFlow<SamplingMatch, Self>
206 where
207 G: Getter,
208 I: Iterator<Item = &'b SamplingRule>,
209 {
210 for rule in rules {
211 if !rule.time_range.contains(self.now) || !rule.condition.matches(instance) {
212 continue;
213 };
214
215 if let Some(sample_rate) = self.try_compute_sample_rate(rule).await {
216 return ControlFlow::Break(SamplingMatch::new(sample_rate, seed, self.rule_ids));
217 };
218 }
219
220 ControlFlow::Continue(self)
221 }
222
223 async fn try_compute_sample_rate(&mut self, rule: &SamplingRule) -> Option<f64> {
231 match rule.sampling_value {
232 SamplingValue::Factor { value } => {
233 self.factor *= rule.apply_decaying_fn(value, self.now)?;
234 self.rule_ids.push(rule.id);
235 None
236 }
237 SamplingValue::SampleRate { value } => {
238 let sample_rate = rule.apply_decaying_fn(value, self.now)?;
239 let minimum_sample_rate = self.minimum_sample_rate.unwrap_or(0.0);
240 let adjusted = (sample_rate.max(minimum_sample_rate) * self.factor).clamp(0.0, 1.0);
241
242 self.rule_ids.push(rule.id);
243 Some(adjusted)
244 }
245 SamplingValue::Reservoir { limit } => {
246 let reservoir = self.reservoir?;
247 if !reservoir
248 .evaluate(rule.id, limit, rule.time_range.end.as_ref())
249 .await
250 {
251 return None;
252 }
253
254 self.rule_ids.clear();
256 self.rule_ids.push(rule.id);
257 Some(1.0)
259 }
260 SamplingValue::MinimumSampleRate { value } => {
261 if self.minimum_sample_rate.is_none() {
262 self.minimum_sample_rate = Some(rule.apply_decaying_fn(value, self.now)?);
263 self.rule_ids.push(rule.id);
264 }
265 None
266 }
267 }
268 }
269}
270
271fn sampling_match(sample_rate: f64, seed: Uuid) -> SamplingDecision {
272 if sample_rate <= 0.0 {
273 return SamplingDecision::Drop;
274 } else if sample_rate >= 1.0 {
275 return SamplingDecision::Keep;
276 }
277
278 let random_number = pseudo_random_from_seed(seed);
279 relay_log::trace!(
280 sample_rate,
281 random_number,
282 "applying dynamic sampling to matching event"
283 );
284
285 if random_number >= sample_rate {
286 relay_log::trace!("dropping event that matched the configuration");
287 SamplingDecision::Drop
288 } else {
289 relay_log::trace!("keeping event that matched the configuration");
290 SamplingDecision::Keep
291 }
292}
293
294#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
296pub enum SamplingDecision {
297 Keep,
299 Drop,
301}
302
303impl SamplingDecision {
304 pub fn is_keep(self) -> bool {
306 matches!(self, Self::Keep)
307 }
308
309 pub fn is_drop(self) -> bool {
311 matches!(self, Self::Drop)
312 }
313
314 pub fn as_str(self) -> &'static str {
316 match self {
317 Self::Keep => "keep",
318 Self::Drop => "drop",
319 }
320 }
321}
322
323impl fmt::Display for SamplingDecision {
324 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
325 write!(f, "{}", self.as_str())
326 }
327}
328
329#[derive(Clone, Debug, PartialEq)]
331pub struct SamplingMatch {
332 sample_rate: f64,
334 seed: Uuid,
340 matched_rules: MatchedRuleIds,
342 decision: SamplingDecision,
346}
347
348impl SamplingMatch {
349 fn new(sample_rate: f64, seed: Uuid, matched_rules: Vec<RuleId>) -> Self {
350 let matched_rules = MatchedRuleIds(matched_rules);
351 let decision = sampling_match(sample_rate, seed);
352
353 Self {
354 sample_rate,
355 seed,
356 matched_rules,
357 decision,
358 }
359 }
360
361 pub fn sample_rate(&self) -> f64 {
363 self.sample_rate
364 }
365
366 pub fn into_matched_rules(self) -> MatchedRuleIds {
371 self.matched_rules
372 }
373
374 pub fn decision(&self) -> SamplingDecision {
376 self.decision
377 }
378}
379
380#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
382pub struct MatchedRuleIds(pub Vec<RuleId>);
383
384impl MatchedRuleIds {
385 pub fn parse(value: &str) -> Result<MatchedRuleIds, ParseIntError> {
393 let mut rule_ids = vec![];
394
395 for rule_id in value.split(',') {
396 rule_ids.push(RuleId(rule_id.parse()?));
397 }
398
399 Ok(MatchedRuleIds(rule_ids))
400 }
401}
402
403impl fmt::Display for MatchedRuleIds {
404 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
405 for (i, rule_id) in self.0.iter().enumerate() {
406 if i > 0 {
407 write!(f, ",")?;
408 }
409 write!(f, "{rule_id}")?;
410 }
411
412 Ok(())
413 }
414}
415
416#[cfg(test)]
417mod tests {
418 use chrono::TimeZone;
419 use relay_protocol::RuleCondition;
420 use similar_asserts::assert_eq;
421 use std::str::FromStr;
422 use uuid::Uuid;
423
424 use crate::DynamicSamplingContext;
425 use crate::config::{DecayingFunction, RuleType, TimeRange};
426 use crate::dsc::TraceUserContext;
427
428 use super::*;
429
430 fn mock_reservoir_evaluator(vals: Vec<(u32, i64)>) -> ReservoirEvaluator<'static> {
431 let mut map = BTreeMap::default();
432
433 for (rule_id, count) in vals {
434 map.insert(RuleId(rule_id), count);
435 }
436
437 let map = Arc::new(Mutex::new(map));
438
439 ReservoirEvaluator::new(map)
440 }
441
442 async fn get_sampling_match(rules: &[SamplingRule], instance: &impl Getter) -> SamplingMatch {
444 match SamplingEvaluator::new(Utc::now())
445 .match_rules(Uuid::default(), instance, rules.iter())
446 .await
447 {
448 ControlFlow::Break(sampling_match) => sampling_match,
449 ControlFlow::Continue(_) => panic!("no match found"),
450 }
451 }
452
453 fn evaluation_is_match(res: ControlFlow<SamplingMatch, SamplingEvaluator>) -> bool {
454 matches!(res, ControlFlow::Break(_))
455 }
456
457 async fn matches_rule_ids(
459 rule_ids: &[u32],
460 rules: &[SamplingRule],
461 instance: &impl Getter,
462 ) -> bool {
463 let matched_rule_ids = MatchedRuleIds(rule_ids.iter().map(|num| RuleId(*num)).collect());
464 let sampling_match = get_sampling_match(rules, instance).await;
465 matched_rule_ids == sampling_match.matched_rules
466 }
467
468 fn get_matched_rules(
470 sampling_evaluator: &ControlFlow<SamplingMatch, SamplingEvaluator>,
471 ) -> Vec<u32> {
472 match sampling_evaluator {
473 ControlFlow::Continue(_) => panic!("expected a sampling match"),
474 ControlFlow::Break(m) => m.matched_rules.0.iter().map(|rule_id| rule_id.0).collect(),
475 }
476 }
477
478 fn mocked_dsc_with_getter_values(
480 paths_and_values: Vec<(&str, &str)>,
481 ) -> DynamicSamplingContext {
482 let mut dsc = DynamicSamplingContext {
483 trace_id: "67e5504410b1426f9247bb680e5fe0c8".parse().unwrap(),
484 public_key: "12345678123456781234567812345678".parse().unwrap(),
485 release: None,
486 environment: None,
487 transaction: None,
488 sample_rate: None,
489 user: TraceUserContext::default(),
490 replay_id: None,
491 sampled: None,
492 other: Default::default(),
493 };
494
495 for (path, value) in paths_and_values {
496 match path {
497 "trace.release" => dsc.release = Some(value.to_owned()),
498 "trace.environment" => dsc.environment = Some(value.to_owned()),
499 "trace.user.id" => value.clone_into(&mut dsc.user.user_id),
500 "trace.user.segment" => value.clone_into(&mut dsc.user.user_segment),
501 "trace.transaction" => dsc.transaction = Some(value.to_owned()),
502 "trace.replay_id" => dsc.replay_id = Some(Uuid::from_str(value).unwrap()),
503 _ => panic!("invalid path"),
504 }
505 }
506
507 dsc
508 }
509
510 async fn is_match(
511 now: DateTime<Utc>,
512 rule: &SamplingRule,
513 dsc: &DynamicSamplingContext,
514 ) -> bool {
515 SamplingEvaluator::new(now)
516 .match_rules(Uuid::default(), dsc, std::iter::once(rule))
517 .await
518 .is_break()
519 }
520
521 #[tokio::test]
522 async fn test_reservoir_evaluator_limit() {
523 let evaluator = mock_reservoir_evaluator(vec![(1, 0)]);
524
525 let rule = RuleId(1);
526 let limit = 3;
527
528 assert!(evaluator.evaluate(rule, limit, None).await);
529 assert!(evaluator.evaluate(rule, limit, None).await);
530 assert!(evaluator.evaluate(rule, limit, None).await);
531 assert!(!evaluator.evaluate(rule, limit, None).await);
533 assert!(!evaluator.evaluate(rule, limit, None).await);
534 }
535
536 #[tokio::test]
537 async fn test_sample_rate_compounding() {
538 let rules = simple_sampling_rules(vec![
539 (RuleCondition::all(), SamplingValue::Factor { value: 0.8 }),
540 (RuleCondition::all(), SamplingValue::Factor { value: 0.5 }),
541 (
542 RuleCondition::all(),
543 SamplingValue::SampleRate { value: 0.25 },
544 ),
545 ]);
546 let dsc = mocked_dsc_with_getter_values(vec![]);
547
548 assert_eq!(get_sampling_match(&rules, &dsc).await.sample_rate(), 0.1);
550 }
551
552 #[tokio::test]
553 async fn test_minimum_sample_rate() {
554 let rules = simple_sampling_rules(vec![
555 (RuleCondition::all(), SamplingValue::Factor { value: 1.5 }),
556 (
557 RuleCondition::all(),
558 SamplingValue::MinimumSampleRate { value: 0.5 },
559 ),
560 (
562 RuleCondition::all(),
563 SamplingValue::MinimumSampleRate { value: 1.0 },
564 ),
565 (
566 RuleCondition::all(),
567 SamplingValue::SampleRate { value: 0.05 },
568 ),
569 ]);
570 let dsc = mocked_dsc_with_getter_values(vec![]);
571
572 assert_eq!(get_sampling_match(&rules, &dsc).await.sample_rate(), 0.75);
574 }
575
576 fn mocked_sampling_rule() -> SamplingRule {
577 SamplingRule {
578 condition: RuleCondition::all(),
579 sampling_value: SamplingValue::SampleRate { value: 1.0 },
580 ty: RuleType::Trace,
581 id: RuleId(0),
582 time_range: Default::default(),
583 decaying_fn: Default::default(),
584 }
585 }
586
587 fn simple_sampling_rules(vals: Vec<(RuleCondition, SamplingValue)>) -> Vec<SamplingRule> {
590 let mut vec = vec![];
591
592 for (i, val) in vals.into_iter().enumerate() {
593 let (condition, sampling_value) = val;
594 vec.push(SamplingRule {
595 condition,
596 sampling_value,
597 ty: RuleType::Trace,
598 id: RuleId(i as u32),
599 time_range: Default::default(),
600 decaying_fn: Default::default(),
601 });
602 }
603 vec
604 }
605
606 #[tokio::test]
614 async fn test_reservoir_override() {
615 let dsc = mocked_dsc_with_getter_values(vec![]);
616 let rules = simple_sampling_rules(vec![
617 (RuleCondition::all(), SamplingValue::Factor { value: 0.5 }),
618 (RuleCondition::all(), SamplingValue::Reservoir { limit: 2 }),
621 (
622 RuleCondition::all(),
623 SamplingValue::SampleRate { value: 0.5 },
624 ),
625 ]);
626
627 let reservoir = mock_reservoir_evaluator(vec![]);
630
631 let evaluator = SamplingEvaluator::new_with_reservoir(Utc::now(), &reservoir);
632 let matched_rules = get_matched_rules(
633 &evaluator
634 .match_rules(Uuid::default(), &dsc, rules.iter())
635 .await,
636 );
637 assert_eq!(&matched_rules, &[1]);
639
640 let evaluator = SamplingEvaluator::new_with_reservoir(Utc::now(), &reservoir);
641 let matched_rules = get_matched_rules(
642 &evaluator
643 .match_rules(Uuid::default(), &dsc, rules.iter())
644 .await,
645 );
646 assert_eq!(&matched_rules, &[1]);
648
649 let evaluator = SamplingEvaluator::new_with_reservoir(Utc::now(), &reservoir);
650 let matched_rules = get_matched_rules(
651 &evaluator
652 .match_rules(Uuid::default(), &dsc, rules.iter())
653 .await,
654 );
655 assert_eq!(&matched_rules, &[0, 2]);
657 }
658
659 #[tokio::test]
661 async fn test_expired_rules() {
662 let rule = SamplingRule {
663 condition: RuleCondition::all(),
664 sampling_value: SamplingValue::SampleRate { value: 1.0 },
665 ty: RuleType::Trace,
666 id: RuleId(0),
667 time_range: TimeRange {
668 start: Some(Utc.with_ymd_and_hms(1970, 10, 10, 0, 0, 0).unwrap()),
669 end: Some(Utc.with_ymd_and_hms(1970, 10, 12, 0, 0, 0).unwrap()),
670 },
671 decaying_fn: Default::default(),
672 };
673
674 let dsc = mocked_dsc_with_getter_values(vec![]);
675
676 let within_timerange = Utc.with_ymd_and_hms(1970, 10, 11, 0, 0, 0).unwrap();
678 let res = SamplingEvaluator::new(within_timerange)
679 .match_rules(Uuid::default(), &dsc, [rule.clone()].iter())
680 .await;
681 assert!(evaluation_is_match(res));
682
683 let before_timerange = Utc.with_ymd_and_hms(1969, 1, 1, 0, 0, 0).unwrap();
684 let res = SamplingEvaluator::new(before_timerange)
685 .match_rules(Uuid::default(), &dsc, [rule.clone()].iter())
686 .await;
687 assert!(!evaluation_is_match(res));
688
689 let after_timerange = Utc.with_ymd_and_hms(1971, 1, 1, 0, 0, 0).unwrap();
690 let res = SamplingEvaluator::new(after_timerange)
691 .match_rules(Uuid::default(), &dsc, [rule].iter())
692 .await;
693 assert!(!evaluation_is_match(res));
694 }
695
696 #[tokio::test]
698 async fn test_condition_matching() {
699 let rules = simple_sampling_rules(vec![
700 (
701 RuleCondition::glob("trace.transaction", "*healthcheck*"),
702 SamplingValue::SampleRate { value: 1.0 },
703 ),
704 (
705 RuleCondition::glob("trace.environment", "*dev*"),
706 SamplingValue::SampleRate { value: 1.0 },
707 ),
708 (
709 RuleCondition::eq_ignore_case("trace.transaction", "raboof"),
710 SamplingValue::Factor { value: 1.0 },
711 ),
712 (
713 RuleCondition::glob("trace.release", "1.1.1")
714 & RuleCondition::eq_ignore_case("trace.user.segment", "vip"),
715 SamplingValue::SampleRate { value: 1.0 },
716 ),
717 (
718 RuleCondition::eq_ignore_case("trace.release", "1.1.1")
719 & RuleCondition::eq_ignore_case("trace.environment", "prod"),
720 SamplingValue::Factor { value: 1.0 },
721 ),
722 (
723 RuleCondition::all(),
724 SamplingValue::SampleRate { value: 1.0 },
725 ),
726 ]);
727
728 let dsc = mocked_dsc_with_getter_values(vec![("trace.transaction", "foohealthcheckbar")]);
730 assert!(matches_rule_ids(&[0], &rules, &dsc).await);
731
732 let dsc = mocked_dsc_with_getter_values(vec![("trace.environment", "dev")]);
734 assert!(matches_rule_ids(&[1], &rules, &dsc).await);
735
736 let dsc = mocked_dsc_with_getter_values(vec![("trace.transaction", "raboof")]);
738 assert!(matches_rule_ids(&[2, 5], &rules, &dsc).await);
739
740 let dsc = mocked_dsc_with_getter_values(vec![
742 ("trace.transaction", "raboof"),
743 ("trace.release", "1.1.1"),
744 ("trace.user.segment", "vip"),
745 ]);
746 assert!(matches_rule_ids(&[2, 3], &rules, &dsc).await);
747
748 let dsc = mocked_dsc_with_getter_values(vec![
750 ("trace.transaction", "raboof"),
751 ("trace.release", "1.1.1"),
752 ("trace.environment", "prod"),
753 ]);
754 assert!(matches_rule_ids(&[2, 4, 5], &rules, &dsc).await);
755
756 let dsc = mocked_dsc_with_getter_values(vec![
758 ("trace.release", "1.1.1"),
759 ("trace.environment", "prod"),
760 ]);
761 assert!(matches_rule_ids(&[4, 5], &rules, &dsc).await);
762 }
763
764 #[test]
765 fn test_repeatable_seed() {
767 let val1 = pseudo_random_from_seed(Uuid::default());
768 let val2 = pseudo_random_from_seed(Uuid::default());
769 assert!(val1 + f64::EPSILON > val2 && val2 + f64::EPSILON > val1);
770 }
771
772 #[test]
773 fn matched_rule_ids_display() {
775 let matched_rule_ids = MatchedRuleIds(vec![RuleId(123), RuleId(456)]);
776 assert_eq!(matched_rule_ids.to_string(), "123,456");
777
778 let matched_rule_ids = MatchedRuleIds(vec![RuleId(123)]);
779 assert_eq!(matched_rule_ids.to_string(), "123");
780
781 let matched_rule_ids = MatchedRuleIds(vec![]);
782 assert_eq!(matched_rule_ids.to_string(), "")
783 }
784
785 #[test]
786 fn matched_rule_ids_parse() {
788 assert_eq!(
789 MatchedRuleIds::parse("123,456"),
790 Ok(MatchedRuleIds(vec![RuleId(123), RuleId(456)]))
791 );
792
793 assert_eq!(
794 MatchedRuleIds::parse("123"),
795 Ok(MatchedRuleIds(vec![RuleId(123)]))
796 );
797
798 assert!(MatchedRuleIds::parse("").is_err());
799
800 assert!(MatchedRuleIds::parse(",").is_err());
801
802 assert!(MatchedRuleIds::parse("123.456").is_err());
803
804 assert!(MatchedRuleIds::parse("a,b").is_err());
805 }
806
807 #[tokio::test]
808 async fn test_get_sampling_match_result_with_no_match() {
810 let dsc = mocked_dsc_with_getter_values(vec![]);
811
812 let res = SamplingEvaluator::new(Utc::now())
813 .match_rules(Uuid::default(), &dsc, [].iter())
814 .await;
815
816 assert!(!evaluation_is_match(res));
817 }
818
819 #[tokio::test]
824 async fn test_sample_rate_valid_time_range() {
825 let dsc = mocked_dsc_with_getter_values(vec![]);
826 let time_range = TimeRange {
827 start: Some(Utc.with_ymd_and_hms(1970, 1, 1, 0, 0, 0).unwrap()),
828 end: Some(Utc.with_ymd_and_hms(1980, 1, 1, 0, 0, 0).unwrap()),
829 };
830
831 let before_time_range = Utc.with_ymd_and_hms(1969, 1, 1, 0, 0, 0).unwrap();
832 let during_time_range = Utc.with_ymd_and_hms(1975, 1, 1, 0, 0, 0).unwrap();
833 let after_time_range = Utc.with_ymd_and_hms(1981, 1, 1, 0, 0, 0).unwrap();
834
835 let rule = SamplingRule {
836 condition: RuleCondition::all(),
837 sampling_value: SamplingValue::SampleRate { value: 1.0 },
838 ty: RuleType::Trace,
839 id: RuleId(0),
840 time_range,
841 decaying_fn: DecayingFunction::Constant,
842 };
843
844 assert!(!is_match(before_time_range, &rule, &dsc).await);
846 assert!(is_match(during_time_range, &rule, &dsc).await);
847 assert!(!is_match(after_time_range, &rule, &dsc).await);
848
849 let mut rule_without_end = rule.clone();
851 rule_without_end.time_range.end = None;
852 assert!(!is_match(before_time_range, &rule_without_end, &dsc).await);
853 assert!(is_match(during_time_range, &rule_without_end, &dsc).await);
854 assert!(is_match(after_time_range, &rule_without_end, &dsc).await);
855
856 let mut rule_without_start = rule.clone();
858 rule_without_start.time_range.start = None;
859 assert!(is_match(before_time_range, &rule_without_start, &dsc).await);
860 assert!(is_match(during_time_range, &rule_without_start, &dsc).await);
861 assert!(!is_match(after_time_range, &rule_without_start, &dsc).await);
862
863 let mut rule_without_range = rule.clone();
865 rule_without_range.time_range = TimeRange::default();
866 assert!(is_match(before_time_range, &rule_without_range, &dsc).await);
867 assert!(is_match(during_time_range, &rule_without_range, &dsc).await);
868 assert!(is_match(after_time_range, &rule_without_range, &dsc).await);
869 }
870
871 #[tokio::test]
873 async fn test_validate_match() {
874 let mut rule = mocked_sampling_rule();
875
876 let reservoir = ReservoirEvaluator::new(ReservoirCounters::default());
877 let mut eval = SamplingEvaluator::new_with_reservoir(Utc::now(), &reservoir);
878
879 rule.sampling_value = SamplingValue::SampleRate { value: 1.0 };
880 assert_eq!(eval.try_compute_sample_rate(&rule).await, Some(1.0));
881
882 rule.sampling_value = SamplingValue::Factor { value: 1.0 };
883 assert_eq!(eval.try_compute_sample_rate(&rule).await, None);
884
885 rule.sampling_value = SamplingValue::Reservoir { limit: 1 };
886 assert_eq!(eval.try_compute_sample_rate(&rule).await, Some(1.0));
887 }
888}