1use std::collections::BTreeMap;
4use std::fmt;
5use std::num::ParseIntError;
6use std::ops::ControlFlow;
7use std::sync::{Arc, Mutex};
8
9use chrono::{DateTime, Utc};
10use rand::Rng;
11use rand::distr::StandardUniform;
12use rand_pcg::Pcg32;
13#[cfg(feature = "redis")]
14use relay_base_schema::organization::OrganizationId;
15use relay_protocol::Getter;
16#[cfg(feature = "redis")]
17use relay_redis::AsyncRedisClient;
18use serde::Serialize;
19use uuid::Uuid;
20
21use crate::config::{RuleId, SamplingRule, SamplingValue};
22#[cfg(feature = "redis")]
23use crate::redis_sampling::{self, ReservoirRuleKey};
24
25fn pseudo_random_from_seed(seed: Uuid) -> f64 {
29 let seed_number = seed.as_u128();
30 let mut generator = Pcg32::new((seed_number >> 64) as u64, seed_number as u64);
31 generator.sample(StandardUniform)
32}
33
34pub type ReservoirCounters = Arc<Mutex<BTreeMap<RuleId, i64>>>;
36
37#[derive(Debug)]
50pub struct ReservoirEvaluator<'a> {
51 counters: ReservoirCounters,
52 #[cfg(feature = "redis")]
53 org_id_and_client: Option<(OrganizationId, &'a AsyncRedisClient)>,
54 _phantom: std::marker::PhantomData<&'a ()>,
56}
57
58impl ReservoirEvaluator<'_> {
59 pub fn new(counters: ReservoirCounters) -> Self {
61 Self {
62 counters,
63 #[cfg(feature = "redis")]
64 org_id_and_client: None,
65 _phantom: std::marker::PhantomData,
66 }
67 }
68
69 pub fn counters(&self) -> ReservoirCounters {
71 Arc::clone(&self.counters)
72 }
73
74 #[cfg(feature = "redis")]
75 async fn redis_incr(
76 &self,
77 key: &ReservoirRuleKey,
78 client: &AsyncRedisClient,
79 rule_expiry: Option<&DateTime<Utc>>,
80 ) -> relay_redis::Result<i64> {
81 relay_statsd::metric!(timer(crate::statsd::SamplingTimers::RedisReservoir), {
82 let mut connection = client.get_connection().await?;
83
84 let val = redis_sampling::increment_redis_reservoir_count(&mut connection, key).await?;
85 redis_sampling::set_redis_expiry(&mut connection, key, rule_expiry).await?;
86
87 Ok(val)
88 })
89 }
90
91 pub fn incr_local(&self, rule: RuleId, limit: i64) -> bool {
93 let Ok(mut map_guard) = self.counters.lock() else {
94 relay_log::error!("failed to lock reservoir counter mutex");
95 return false;
96 };
97
98 let counter_value = map_guard.entry(rule).or_insert(0);
99
100 if *counter_value < limit {
101 *counter_value += 1;
102 true
103 } else {
104 false
105 }
106 }
107
108 pub async fn evaluate(
110 &self,
111 rule: RuleId,
112 limit: i64,
113 _rule_expiry: Option<&DateTime<Utc>>,
114 ) -> bool {
115 #[cfg(feature = "redis")]
116 if let Some((org_id, client)) = self.org_id_and_client {
117 if let Ok(guard) = self.counters.lock()
118 && *guard.get(&rule).unwrap_or(&0) > limit
119 {
120 return false;
121 }
122
123 let key = ReservoirRuleKey::new(org_id, rule);
124 let redis_count = match self.redis_incr(&key, client, _rule_expiry).await {
125 Ok(redis_count) => redis_count,
126 Err(e) => {
127 relay_log::error!(
128 error = &e as &dyn std::error::Error,
129 "failed to increment reservoir rule"
130 );
131 return false;
132 }
133 };
134
135 if let Ok(mut map_guard) = self.counters.lock() {
136 if let Some(value) = map_guard.get_mut(&rule) {
139 *value = redis_count.max(*value);
140 }
141 }
142 return redis_count <= limit;
143 }
144
145 self.incr_local(rule, limit)
146 }
147}
148
149#[cfg(feature = "redis")]
150impl<'a> ReservoirEvaluator<'a> {
151 pub fn set_redis(&mut self, org_id: OrganizationId, client: &'a AsyncRedisClient) {
155 self.org_id_and_client = Some((org_id, client));
156 }
157}
158
159#[derive(Debug)]
161pub struct SamplingEvaluator<'a> {
162 now: DateTime<Utc>,
163 rule_ids: Vec<RuleId>,
164 factor: f64,
165 minimum_sample_rate: Option<f64>,
166 reservoir: Option<&'a ReservoirEvaluator<'a>>,
167}
168
169impl<'a> SamplingEvaluator<'a> {
170 pub fn new_with_reservoir(now: DateTime<Utc>, reservoir: &'a ReservoirEvaluator<'a>) -> Self {
172 Self {
173 now,
174 rule_ids: vec![],
175 factor: 1.0,
176 minimum_sample_rate: None,
177 reservoir: Some(reservoir),
178 }
179 }
180
181 pub fn new(now: DateTime<Utc>) -> Self {
183 Self {
184 now,
185 rule_ids: vec![],
186 factor: 1.0,
187 minimum_sample_rate: None,
188 reservoir: None,
189 }
190 }
191
192 pub async fn match_rules<'b, I, G>(
204 mut self,
205 seed: Uuid,
206 instance: &G,
207 rules: I,
208 ) -> ControlFlow<SamplingMatch, Self>
209 where
210 G: Getter,
211 I: Iterator<Item = &'b SamplingRule>,
212 {
213 for rule in rules {
214 if !rule.time_range.contains(self.now) || !rule.condition.matches(instance) {
215 continue;
216 };
217
218 if let Some(sample_rate) = self.try_compute_sample_rate(rule).await {
219 return ControlFlow::Break(SamplingMatch::new(sample_rate, seed, self.rule_ids));
220 };
221 }
222
223 ControlFlow::Continue(self)
224 }
225
226 async fn try_compute_sample_rate(&mut self, rule: &SamplingRule) -> Option<f64> {
234 match rule.sampling_value {
235 SamplingValue::Factor { value } => {
236 self.factor *= rule.apply_decaying_fn(value, self.now)?;
237 self.rule_ids.push(rule.id);
238 None
239 }
240 SamplingValue::SampleRate { value } => {
241 let sample_rate = rule.apply_decaying_fn(value, self.now)?;
242 let minimum_sample_rate = self.minimum_sample_rate.unwrap_or(0.0);
243 let adjusted = (sample_rate.max(minimum_sample_rate) * self.factor).clamp(0.0, 1.0);
244
245 self.rule_ids.push(rule.id);
246 Some(adjusted)
247 }
248 SamplingValue::Reservoir { limit } => {
249 let reservoir = self.reservoir?;
250 if !reservoir
251 .evaluate(rule.id, limit, rule.time_range.end.as_ref())
252 .await
253 {
254 return None;
255 }
256
257 self.rule_ids.clear();
259 self.rule_ids.push(rule.id);
260 Some(1.0)
262 }
263 SamplingValue::MinimumSampleRate { value } => {
264 if self.minimum_sample_rate.is_none() {
265 self.minimum_sample_rate = Some(rule.apply_decaying_fn(value, self.now)?);
266 self.rule_ids.push(rule.id);
267 }
268 None
269 }
270 }
271 }
272}
273
274fn sampling_match(sample_rate: f64, seed: Uuid) -> SamplingDecision {
275 if sample_rate <= 0.0 {
276 return SamplingDecision::Drop;
277 } else if sample_rate >= 1.0 {
278 return SamplingDecision::Keep;
279 }
280
281 let random_number = pseudo_random_from_seed(seed);
282 relay_log::trace!(
283 sample_rate,
284 random_number,
285 "applying dynamic sampling to matching event"
286 );
287
288 if random_number >= sample_rate {
289 relay_log::trace!("dropping event that matched the configuration");
290 SamplingDecision::Drop
291 } else {
292 relay_log::trace!("keeping event that matched the configuration");
293 SamplingDecision::Keep
294 }
295}
296
297#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
299pub enum SamplingDecision {
300 Keep,
302 Drop,
304}
305
306impl SamplingDecision {
307 pub fn is_keep(self) -> bool {
309 matches!(self, Self::Keep)
310 }
311
312 pub fn is_drop(self) -> bool {
314 matches!(self, Self::Drop)
315 }
316
317 pub fn as_str(self) -> &'static str {
319 match self {
320 Self::Keep => "keep",
321 Self::Drop => "drop",
322 }
323 }
324}
325
326impl fmt::Display for SamplingDecision {
327 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
328 write!(f, "{}", self.as_str())
329 }
330}
331
332#[derive(Clone, Debug, PartialEq)]
334pub struct SamplingMatch {
335 sample_rate: f64,
337 seed: Uuid,
343 matched_rules: MatchedRuleIds,
345 decision: SamplingDecision,
349}
350
351impl SamplingMatch {
352 fn new(sample_rate: f64, seed: Uuid, matched_rules: Vec<RuleId>) -> Self {
353 let matched_rules = MatchedRuleIds(matched_rules);
354 let decision = sampling_match(sample_rate, seed);
355
356 Self {
357 sample_rate,
358 seed,
359 matched_rules,
360 decision,
361 }
362 }
363
364 pub fn sample_rate(&self) -> f64 {
366 self.sample_rate
367 }
368
369 pub fn into_matched_rules(self) -> MatchedRuleIds {
374 self.matched_rules
375 }
376
377 pub fn decision(&self) -> SamplingDecision {
379 self.decision
380 }
381}
382
383#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
385pub struct MatchedRuleIds(pub Vec<RuleId>);
386
387impl MatchedRuleIds {
388 pub fn parse(value: &str) -> Result<MatchedRuleIds, ParseIntError> {
396 let mut rule_ids = vec![];
397
398 for rule_id in value.split(',') {
399 rule_ids.push(RuleId(rule_id.parse()?));
400 }
401
402 Ok(MatchedRuleIds(rule_ids))
403 }
404}
405
406impl fmt::Display for MatchedRuleIds {
407 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
408 for (i, rule_id) in self.0.iter().enumerate() {
409 if i > 0 {
410 write!(f, ",")?;
411 }
412 write!(f, "{rule_id}")?;
413 }
414
415 Ok(())
416 }
417}
418
419#[cfg(test)]
420mod tests {
421 use chrono::TimeZone;
422 use relay_protocol::RuleCondition;
423 use similar_asserts::assert_eq;
424 use std::str::FromStr;
425 use uuid::Uuid;
426
427 use crate::DynamicSamplingContext;
428 use crate::config::{DecayingFunction, RuleType, TimeRange};
429 use crate::dsc::TraceUserContext;
430
431 use super::*;
432
433 fn mock_reservoir_evaluator(vals: Vec<(u32, i64)>) -> ReservoirEvaluator<'static> {
434 let mut map = BTreeMap::default();
435
436 for (rule_id, count) in vals {
437 map.insert(RuleId(rule_id), count);
438 }
439
440 let map = Arc::new(Mutex::new(map));
441
442 ReservoirEvaluator::new(map)
443 }
444
445 async fn get_sampling_match(rules: &[SamplingRule], instance: &impl Getter) -> SamplingMatch {
447 match SamplingEvaluator::new(Utc::now())
448 .match_rules(Uuid::default(), instance, rules.iter())
449 .await
450 {
451 ControlFlow::Break(sampling_match) => sampling_match,
452 ControlFlow::Continue(_) => panic!("no match found"),
453 }
454 }
455
456 fn evaluation_is_match(res: ControlFlow<SamplingMatch, SamplingEvaluator>) -> bool {
457 matches!(res, ControlFlow::Break(_))
458 }
459
460 async fn matches_rule_ids(
462 rule_ids: &[u32],
463 rules: &[SamplingRule],
464 instance: &impl Getter,
465 ) -> bool {
466 let matched_rule_ids = MatchedRuleIds(rule_ids.iter().map(|num| RuleId(*num)).collect());
467 let sampling_match = get_sampling_match(rules, instance).await;
468 matched_rule_ids == sampling_match.matched_rules
469 }
470
471 fn get_matched_rules(
473 sampling_evaluator: &ControlFlow<SamplingMatch, SamplingEvaluator>,
474 ) -> Vec<u32> {
475 match sampling_evaluator {
476 ControlFlow::Continue(_) => panic!("expected a sampling match"),
477 ControlFlow::Break(m) => m.matched_rules.0.iter().map(|rule_id| rule_id.0).collect(),
478 }
479 }
480
481 fn mocked_dsc_with_getter_values(
483 paths_and_values: Vec<(&str, &str)>,
484 ) -> DynamicSamplingContext {
485 let mut dsc = DynamicSamplingContext {
486 trace_id: "67e5504410b1426f9247bb680e5fe0c8".parse().unwrap(),
487 public_key: "12345678123456781234567812345678".parse().unwrap(),
488 release: None,
489 environment: None,
490 transaction: None,
491 sample_rate: None,
492 user: TraceUserContext::default(),
493 replay_id: None,
494 sampled: None,
495 other: Default::default(),
496 };
497
498 for (path, value) in paths_and_values {
499 match path {
500 "trace.release" => dsc.release = Some(value.to_owned()),
501 "trace.environment" => dsc.environment = Some(value.to_owned()),
502 "trace.user.id" => value.clone_into(&mut dsc.user.user_id),
503 "trace.user.segment" => value.clone_into(&mut dsc.user.user_segment),
504 "trace.transaction" => dsc.transaction = Some(value.to_owned()),
505 "trace.replay_id" => dsc.replay_id = Some(Uuid::from_str(value).unwrap()),
506 _ => panic!("invalid path"),
507 }
508 }
509
510 dsc
511 }
512
513 async fn is_match(
514 now: DateTime<Utc>,
515 rule: &SamplingRule,
516 dsc: &DynamicSamplingContext,
517 ) -> bool {
518 SamplingEvaluator::new(now)
519 .match_rules(Uuid::default(), dsc, std::iter::once(rule))
520 .await
521 .is_break()
522 }
523
524 #[tokio::test]
525 async fn test_reservoir_evaluator_limit() {
526 let evaluator = mock_reservoir_evaluator(vec![(1, 0)]);
527
528 let rule = RuleId(1);
529 let limit = 3;
530
531 assert!(evaluator.evaluate(rule, limit, None).await);
532 assert!(evaluator.evaluate(rule, limit, None).await);
533 assert!(evaluator.evaluate(rule, limit, None).await);
534 assert!(!evaluator.evaluate(rule, limit, None).await);
536 assert!(!evaluator.evaluate(rule, limit, None).await);
537 }
538
539 #[tokio::test]
540 async fn test_sample_rate_compounding() {
541 let rules = simple_sampling_rules(vec![
542 (RuleCondition::all(), SamplingValue::Factor { value: 0.8 }),
543 (RuleCondition::all(), SamplingValue::Factor { value: 0.5 }),
544 (
545 RuleCondition::all(),
546 SamplingValue::SampleRate { value: 0.25 },
547 ),
548 ]);
549 let dsc = mocked_dsc_with_getter_values(vec![]);
550
551 assert_eq!(get_sampling_match(&rules, &dsc).await.sample_rate(), 0.1);
553 }
554
555 #[tokio::test]
556 async fn test_minimum_sample_rate() {
557 let rules = simple_sampling_rules(vec![
558 (RuleCondition::all(), SamplingValue::Factor { value: 1.5 }),
559 (
560 RuleCondition::all(),
561 SamplingValue::MinimumSampleRate { value: 0.5 },
562 ),
563 (
565 RuleCondition::all(),
566 SamplingValue::MinimumSampleRate { value: 1.0 },
567 ),
568 (
569 RuleCondition::all(),
570 SamplingValue::SampleRate { value: 0.05 },
571 ),
572 ]);
573 let dsc = mocked_dsc_with_getter_values(vec![]);
574
575 assert_eq!(get_sampling_match(&rules, &dsc).await.sample_rate(), 0.75);
577 }
578
579 fn mocked_sampling_rule() -> SamplingRule {
580 SamplingRule {
581 condition: RuleCondition::all(),
582 sampling_value: SamplingValue::SampleRate { value: 1.0 },
583 ty: RuleType::Trace,
584 id: RuleId(0),
585 time_range: Default::default(),
586 decaying_fn: Default::default(),
587 }
588 }
589
590 fn simple_sampling_rules(vals: Vec<(RuleCondition, SamplingValue)>) -> Vec<SamplingRule> {
593 let mut vec = vec![];
594
595 for (i, val) in vals.into_iter().enumerate() {
596 let (condition, sampling_value) = val;
597 vec.push(SamplingRule {
598 condition,
599 sampling_value,
600 ty: RuleType::Trace,
601 id: RuleId(i as u32),
602 time_range: Default::default(),
603 decaying_fn: Default::default(),
604 });
605 }
606 vec
607 }
608
609 #[tokio::test]
617 async fn test_reservoir_override() {
618 let dsc = mocked_dsc_with_getter_values(vec![]);
619 let rules = simple_sampling_rules(vec![
620 (RuleCondition::all(), SamplingValue::Factor { value: 0.5 }),
621 (RuleCondition::all(), SamplingValue::Reservoir { limit: 2 }),
624 (
625 RuleCondition::all(),
626 SamplingValue::SampleRate { value: 0.5 },
627 ),
628 ]);
629
630 let reservoir = mock_reservoir_evaluator(vec![]);
633
634 let evaluator = SamplingEvaluator::new_with_reservoir(Utc::now(), &reservoir);
635 let matched_rules = get_matched_rules(
636 &evaluator
637 .match_rules(Uuid::default(), &dsc, rules.iter())
638 .await,
639 );
640 assert_eq!(&matched_rules, &[1]);
642
643 let evaluator = SamplingEvaluator::new_with_reservoir(Utc::now(), &reservoir);
644 let matched_rules = get_matched_rules(
645 &evaluator
646 .match_rules(Uuid::default(), &dsc, rules.iter())
647 .await,
648 );
649 assert_eq!(&matched_rules, &[1]);
651
652 let evaluator = SamplingEvaluator::new_with_reservoir(Utc::now(), &reservoir);
653 let matched_rules = get_matched_rules(
654 &evaluator
655 .match_rules(Uuid::default(), &dsc, rules.iter())
656 .await,
657 );
658 assert_eq!(&matched_rules, &[0, 2]);
660 }
661
662 #[tokio::test]
664 async fn test_expired_rules() {
665 let rule = SamplingRule {
666 condition: RuleCondition::all(),
667 sampling_value: SamplingValue::SampleRate { value: 1.0 },
668 ty: RuleType::Trace,
669 id: RuleId(0),
670 time_range: TimeRange {
671 start: Some(Utc.with_ymd_and_hms(1970, 10, 10, 0, 0, 0).unwrap()),
672 end: Some(Utc.with_ymd_and_hms(1970, 10, 12, 0, 0, 0).unwrap()),
673 },
674 decaying_fn: Default::default(),
675 };
676
677 let dsc = mocked_dsc_with_getter_values(vec![]);
678
679 let within_timerange = Utc.with_ymd_and_hms(1970, 10, 11, 0, 0, 0).unwrap();
681 let res = SamplingEvaluator::new(within_timerange)
682 .match_rules(Uuid::default(), &dsc, [rule.clone()].iter())
683 .await;
684 assert!(evaluation_is_match(res));
685
686 let before_timerange = Utc.with_ymd_and_hms(1969, 1, 1, 0, 0, 0).unwrap();
687 let res = SamplingEvaluator::new(before_timerange)
688 .match_rules(Uuid::default(), &dsc, [rule.clone()].iter())
689 .await;
690 assert!(!evaluation_is_match(res));
691
692 let after_timerange = Utc.with_ymd_and_hms(1971, 1, 1, 0, 0, 0).unwrap();
693 let res = SamplingEvaluator::new(after_timerange)
694 .match_rules(Uuid::default(), &dsc, [rule].iter())
695 .await;
696 assert!(!evaluation_is_match(res));
697 }
698
699 #[tokio::test]
701 async fn test_condition_matching() {
702 let rules = simple_sampling_rules(vec![
703 (
704 RuleCondition::glob("trace.transaction", "*healthcheck*"),
705 SamplingValue::SampleRate { value: 1.0 },
706 ),
707 (
708 RuleCondition::glob("trace.environment", "*dev*"),
709 SamplingValue::SampleRate { value: 1.0 },
710 ),
711 (
712 RuleCondition::eq_ignore_case("trace.transaction", "raboof"),
713 SamplingValue::Factor { value: 1.0 },
714 ),
715 (
716 RuleCondition::glob("trace.release", "1.1.1")
717 & RuleCondition::eq_ignore_case("trace.user.segment", "vip"),
718 SamplingValue::SampleRate { value: 1.0 },
719 ),
720 (
721 RuleCondition::eq_ignore_case("trace.release", "1.1.1")
722 & RuleCondition::eq_ignore_case("trace.environment", "prod"),
723 SamplingValue::Factor { value: 1.0 },
724 ),
725 (
726 RuleCondition::all(),
727 SamplingValue::SampleRate { value: 1.0 },
728 ),
729 ]);
730
731 let dsc = mocked_dsc_with_getter_values(vec![("trace.transaction", "foohealthcheckbar")]);
733 assert!(matches_rule_ids(&[0], &rules, &dsc).await);
734
735 let dsc = mocked_dsc_with_getter_values(vec![("trace.environment", "dev")]);
737 assert!(matches_rule_ids(&[1], &rules, &dsc).await);
738
739 let dsc = mocked_dsc_with_getter_values(vec![("trace.transaction", "raboof")]);
741 assert!(matches_rule_ids(&[2, 5], &rules, &dsc).await);
742
743 let dsc = mocked_dsc_with_getter_values(vec![
745 ("trace.transaction", "raboof"),
746 ("trace.release", "1.1.1"),
747 ("trace.user.segment", "vip"),
748 ]);
749 assert!(matches_rule_ids(&[2, 3], &rules, &dsc).await);
750
751 let dsc = mocked_dsc_with_getter_values(vec![
753 ("trace.transaction", "raboof"),
754 ("trace.release", "1.1.1"),
755 ("trace.environment", "prod"),
756 ]);
757 assert!(matches_rule_ids(&[2, 4, 5], &rules, &dsc).await);
758
759 let dsc = mocked_dsc_with_getter_values(vec![
761 ("trace.release", "1.1.1"),
762 ("trace.environment", "prod"),
763 ]);
764 assert!(matches_rule_ids(&[4, 5], &rules, &dsc).await);
765 }
766
767 #[test]
768 fn test_repeatable_seed() {
770 let val1 = pseudo_random_from_seed(Uuid::default());
771 let val2 = pseudo_random_from_seed(Uuid::default());
772 assert!(val1 + f64::EPSILON > val2 && val2 + f64::EPSILON > val1);
773 }
774
775 #[test]
776 fn matched_rule_ids_display() {
778 let matched_rule_ids = MatchedRuleIds(vec![RuleId(123), RuleId(456)]);
779 assert_eq!(matched_rule_ids.to_string(), "123,456");
780
781 let matched_rule_ids = MatchedRuleIds(vec![RuleId(123)]);
782 assert_eq!(matched_rule_ids.to_string(), "123");
783
784 let matched_rule_ids = MatchedRuleIds(vec![]);
785 assert_eq!(matched_rule_ids.to_string(), "")
786 }
787
788 #[test]
789 fn matched_rule_ids_parse() {
791 assert_eq!(
792 MatchedRuleIds::parse("123,456"),
793 Ok(MatchedRuleIds(vec![RuleId(123), RuleId(456)]))
794 );
795
796 assert_eq!(
797 MatchedRuleIds::parse("123"),
798 Ok(MatchedRuleIds(vec![RuleId(123)]))
799 );
800
801 assert!(MatchedRuleIds::parse("").is_err());
802
803 assert!(MatchedRuleIds::parse(",").is_err());
804
805 assert!(MatchedRuleIds::parse("123.456").is_err());
806
807 assert!(MatchedRuleIds::parse("a,b").is_err());
808 }
809
810 #[tokio::test]
811 async fn test_get_sampling_match_result_with_no_match() {
813 let dsc = mocked_dsc_with_getter_values(vec![]);
814
815 let res = SamplingEvaluator::new(Utc::now())
816 .match_rules(Uuid::default(), &dsc, [].iter())
817 .await;
818
819 assert!(!evaluation_is_match(res));
820 }
821
822 #[tokio::test]
827 async fn test_sample_rate_valid_time_range() {
828 let dsc = mocked_dsc_with_getter_values(vec![]);
829 let time_range = TimeRange {
830 start: Some(Utc.with_ymd_and_hms(1970, 1, 1, 0, 0, 0).unwrap()),
831 end: Some(Utc.with_ymd_and_hms(1980, 1, 1, 0, 0, 0).unwrap()),
832 };
833
834 let before_time_range = Utc.with_ymd_and_hms(1969, 1, 1, 0, 0, 0).unwrap();
835 let during_time_range = Utc.with_ymd_and_hms(1975, 1, 1, 0, 0, 0).unwrap();
836 let after_time_range = Utc.with_ymd_and_hms(1981, 1, 1, 0, 0, 0).unwrap();
837
838 let rule = SamplingRule {
839 condition: RuleCondition::all(),
840 sampling_value: SamplingValue::SampleRate { value: 1.0 },
841 ty: RuleType::Trace,
842 id: RuleId(0),
843 time_range,
844 decaying_fn: DecayingFunction::Constant,
845 };
846
847 assert!(!is_match(before_time_range, &rule, &dsc).await);
849 assert!(is_match(during_time_range, &rule, &dsc).await);
850 assert!(!is_match(after_time_range, &rule, &dsc).await);
851
852 let mut rule_without_end = rule.clone();
854 rule_without_end.time_range.end = None;
855 assert!(!is_match(before_time_range, &rule_without_end, &dsc).await);
856 assert!(is_match(during_time_range, &rule_without_end, &dsc).await);
857 assert!(is_match(after_time_range, &rule_without_end, &dsc).await);
858
859 let mut rule_without_start = rule.clone();
861 rule_without_start.time_range.start = None;
862 assert!(is_match(before_time_range, &rule_without_start, &dsc).await);
863 assert!(is_match(during_time_range, &rule_without_start, &dsc).await);
864 assert!(!is_match(after_time_range, &rule_without_start, &dsc).await);
865
866 let mut rule_without_range = rule.clone();
868 rule_without_range.time_range = TimeRange::default();
869 assert!(is_match(before_time_range, &rule_without_range, &dsc).await);
870 assert!(is_match(during_time_range, &rule_without_range, &dsc).await);
871 assert!(is_match(after_time_range, &rule_without_range, &dsc).await);
872 }
873
874 #[tokio::test]
876 async fn test_validate_match() {
877 let mut rule = mocked_sampling_rule();
878
879 let reservoir = ReservoirEvaluator::new(ReservoirCounters::default());
880 let mut eval = SamplingEvaluator::new_with_reservoir(Utc::now(), &reservoir);
881
882 rule.sampling_value = SamplingValue::SampleRate { value: 1.0 };
883 assert_eq!(eval.try_compute_sample_rate(&rule).await, Some(1.0));
884
885 rule.sampling_value = SamplingValue::Factor { value: 1.0 };
886 assert_eq!(eval.try_compute_sample_rate(&rule).await, None);
887
888 rule.sampling_value = SamplingValue::Reservoir { limit: 1 };
889 assert_eq!(eval.try_compute_sample_rate(&rule).await, Some(1.0));
890 }
891}