1use std::collections::BTreeMap;
4use std::fmt;
5use std::num::ParseIntError;
6use std::ops::ControlFlow;
7use std::sync::{Arc, Mutex};
8
9use chrono::{DateTime, Utc};
10use rand::Rng;
11use rand::distr::StandardUniform;
12use rand_pcg::Pcg32;
13#[cfg(feature = "redis")]
14use relay_base_schema::organization::OrganizationId;
15use relay_protocol::Getter;
16#[cfg(feature = "redis")]
17use relay_redis::AsyncRedisClient;
18use serde::Serialize;
19use uuid::Uuid;
20
21use crate::config::{RuleId, SamplingRule, SamplingValue};
22#[cfg(feature = "redis")]
23use crate::redis_sampling::{self, ReservoirRuleKey};
24
25fn pseudo_random_from_seed(seed: Uuid) -> f64 {
29 let seed_number = seed.as_u128();
30 let mut generator = Pcg32::new((seed_number >> 64) as u64, seed_number as u64);
31 generator.sample(StandardUniform)
32}
33
34pub type ReservoirCounters = Arc<Mutex<BTreeMap<RuleId, i64>>>;
36
37#[derive(Debug)]
50pub struct ReservoirEvaluator<'a> {
51 counters: ReservoirCounters,
52 #[cfg(feature = "redis")]
53 org_id_and_client: Option<(OrganizationId, &'a AsyncRedisClient)>,
54 _phantom: std::marker::PhantomData<&'a ()>,
56}
57
58impl ReservoirEvaluator<'_> {
59 pub fn new(counters: ReservoirCounters) -> Self {
61 Self {
62 counters,
63 #[cfg(feature = "redis")]
64 org_id_and_client: None,
65 _phantom: std::marker::PhantomData,
66 }
67 }
68
69 pub fn counters(&self) -> ReservoirCounters {
71 Arc::clone(&self.counters)
72 }
73
74 #[cfg(feature = "redis")]
75 async fn redis_incr(
76 &self,
77 key: &ReservoirRuleKey,
78 client: &AsyncRedisClient,
79 rule_expiry: Option<&DateTime<Utc>>,
80 ) -> anyhow::Result<i64> {
81 let mut connection = client.get_connection().await?;
82
83 let val = redis_sampling::increment_redis_reservoir_count(&mut connection, key).await?;
84 redis_sampling::set_redis_expiry(&mut connection, key, rule_expiry).await?;
85
86 Ok(val)
87 }
88
89 pub fn incr_local(&self, rule: RuleId, limit: i64) -> bool {
91 let Ok(mut map_guard) = self.counters.lock() else {
92 relay_log::error!("failed to lock reservoir counter mutex");
93 return false;
94 };
95
96 let counter_value = map_guard.entry(rule).or_insert(0);
97
98 if *counter_value < limit {
99 *counter_value += 1;
100 true
101 } else {
102 false
103 }
104 }
105
106 pub async fn evaluate(
108 &self,
109 rule: RuleId,
110 limit: i64,
111 _rule_expiry: Option<&DateTime<Utc>>,
112 ) -> bool {
113 #[cfg(feature = "redis")]
114 if let Some((org_id, client)) = self.org_id_and_client {
115 if let Ok(guard) = self.counters.lock() {
116 if *guard.get(&rule).unwrap_or(&0) > limit {
117 return false;
118 }
119 }
120
121 let key = ReservoirRuleKey::new(org_id, rule);
122 let redis_count = match self.redis_incr(&key, client, _rule_expiry).await {
123 Ok(redis_count) => redis_count,
124 Err(e) => {
125 relay_log::error!(error = &*e, "failed to increment reservoir rule");
126 return false;
127 }
128 };
129
130 if let Ok(mut map_guard) = self.counters.lock() {
131 if let Some(value) = map_guard.get_mut(&rule) {
134 *value = redis_count.max(*value);
135 }
136 }
137 return redis_count <= limit;
138 }
139
140 self.incr_local(rule, limit)
141 }
142}
143
144#[cfg(feature = "redis")]
145impl<'a> ReservoirEvaluator<'a> {
146 pub fn set_redis(&mut self, org_id: OrganizationId, client: &'a AsyncRedisClient) {
150 self.org_id_and_client = Some((org_id, client));
151 }
152}
153
154#[derive(Debug)]
156pub struct SamplingEvaluator<'a> {
157 now: DateTime<Utc>,
158 rule_ids: Vec<RuleId>,
159 factor: f64,
160 minimum_sample_rate: Option<f64>,
161 reservoir: Option<&'a ReservoirEvaluator<'a>>,
162}
163
164impl<'a> SamplingEvaluator<'a> {
165 pub fn new_with_reservoir(now: DateTime<Utc>, reservoir: &'a ReservoirEvaluator<'a>) -> Self {
167 Self {
168 now,
169 rule_ids: vec![],
170 factor: 1.0,
171 minimum_sample_rate: None,
172 reservoir: Some(reservoir),
173 }
174 }
175
176 pub fn new(now: DateTime<Utc>) -> Self {
178 Self {
179 now,
180 rule_ids: vec![],
181 factor: 1.0,
182 minimum_sample_rate: None,
183 reservoir: None,
184 }
185 }
186
187 pub async fn match_rules<'b, I, G>(
199 mut self,
200 seed: Uuid,
201 instance: &G,
202 rules: I,
203 ) -> ControlFlow<SamplingMatch, Self>
204 where
205 G: Getter,
206 I: Iterator<Item = &'b SamplingRule>,
207 {
208 for rule in rules {
209 if !rule.time_range.contains(self.now) || !rule.condition.matches(instance) {
210 continue;
211 };
212
213 if let Some(sample_rate) = self.try_compute_sample_rate(rule).await {
214 return ControlFlow::Break(SamplingMatch::new(sample_rate, seed, self.rule_ids));
215 };
216 }
217
218 ControlFlow::Continue(self)
219 }
220
221 async fn try_compute_sample_rate(&mut self, rule: &SamplingRule) -> Option<f64> {
229 match rule.sampling_value {
230 SamplingValue::Factor { value } => {
231 self.factor *= rule.apply_decaying_fn(value, self.now)?;
232 self.rule_ids.push(rule.id);
233 None
234 }
235 SamplingValue::SampleRate { value } => {
236 let sample_rate = rule.apply_decaying_fn(value, self.now)?;
237 let minimum_sample_rate = self.minimum_sample_rate.unwrap_or(0.0);
238 let adjusted = (sample_rate.max(minimum_sample_rate) * self.factor).clamp(0.0, 1.0);
239
240 self.rule_ids.push(rule.id);
241 Some(adjusted)
242 }
243 SamplingValue::Reservoir { limit } => {
244 let reservoir = self.reservoir?;
245 if !reservoir
246 .evaluate(rule.id, limit, rule.time_range.end.as_ref())
247 .await
248 {
249 return None;
250 }
251
252 self.rule_ids.clear();
254 self.rule_ids.push(rule.id);
255 Some(1.0)
257 }
258 SamplingValue::MinimumSampleRate { value } => {
259 if self.minimum_sample_rate.is_none() {
260 self.minimum_sample_rate = Some(rule.apply_decaying_fn(value, self.now)?);
261 self.rule_ids.push(rule.id);
262 }
263 None
264 }
265 }
266 }
267}
268
269fn sampling_match(sample_rate: f64, seed: Uuid) -> SamplingDecision {
270 if sample_rate <= 0.0 {
271 return SamplingDecision::Drop;
272 } else if sample_rate >= 1.0 {
273 return SamplingDecision::Keep;
274 }
275
276 let random_number = pseudo_random_from_seed(seed);
277 relay_log::trace!(
278 sample_rate,
279 random_number,
280 "applying dynamic sampling to matching event"
281 );
282
283 if random_number >= sample_rate {
284 relay_log::trace!("dropping event that matched the configuration");
285 SamplingDecision::Drop
286 } else {
287 relay_log::trace!("keeping event that matched the configuration");
288 SamplingDecision::Keep
289 }
290}
291
292#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
294pub enum SamplingDecision {
295 Keep,
297 Drop,
299}
300
301impl SamplingDecision {
302 pub fn is_keep(self) -> bool {
304 matches!(self, Self::Keep)
305 }
306
307 pub fn is_drop(self) -> bool {
309 matches!(self, Self::Drop)
310 }
311
312 pub fn as_str(self) -> &'static str {
314 match self {
315 Self::Keep => "keep",
316 Self::Drop => "drop",
317 }
318 }
319}
320
321impl fmt::Display for SamplingDecision {
322 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
323 write!(f, "{}", self.as_str())
324 }
325}
326
327#[derive(Clone, Debug, PartialEq)]
329pub struct SamplingMatch {
330 sample_rate: f64,
332 seed: Uuid,
338 matched_rules: MatchedRuleIds,
340 decision: SamplingDecision,
344}
345
346impl SamplingMatch {
347 fn new(sample_rate: f64, seed: Uuid, matched_rules: Vec<RuleId>) -> Self {
348 let matched_rules = MatchedRuleIds(matched_rules);
349 let decision = sampling_match(sample_rate, seed);
350
351 Self {
352 sample_rate,
353 seed,
354 matched_rules,
355 decision,
356 }
357 }
358
359 pub fn sample_rate(&self) -> f64 {
361 self.sample_rate
362 }
363
364 pub fn into_matched_rules(self) -> MatchedRuleIds {
369 self.matched_rules
370 }
371
372 pub fn decision(&self) -> SamplingDecision {
374 self.decision
375 }
376}
377
378#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
380pub struct MatchedRuleIds(pub Vec<RuleId>);
381
382impl MatchedRuleIds {
383 pub fn parse(value: &str) -> Result<MatchedRuleIds, ParseIntError> {
391 let mut rule_ids = vec![];
392
393 for rule_id in value.split(',') {
394 rule_ids.push(RuleId(rule_id.parse()?));
395 }
396
397 Ok(MatchedRuleIds(rule_ids))
398 }
399}
400
401impl fmt::Display for MatchedRuleIds {
402 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
403 for (i, rule_id) in self.0.iter().enumerate() {
404 if i > 0 {
405 write!(f, ",")?;
406 }
407 write!(f, "{rule_id}")?;
408 }
409
410 Ok(())
411 }
412}
413
414#[cfg(test)]
415mod tests {
416 use chrono::TimeZone;
417 use relay_protocol::RuleCondition;
418 use similar_asserts::assert_eq;
419 use std::str::FromStr;
420 use uuid::Uuid;
421
422 use crate::DynamicSamplingContext;
423 use crate::config::{DecayingFunction, RuleType, TimeRange};
424 use crate::dsc::TraceUserContext;
425
426 use super::*;
427
428 fn mock_reservoir_evaluator(vals: Vec<(u32, i64)>) -> ReservoirEvaluator<'static> {
429 let mut map = BTreeMap::default();
430
431 for (rule_id, count) in vals {
432 map.insert(RuleId(rule_id), count);
433 }
434
435 let map = Arc::new(Mutex::new(map));
436
437 ReservoirEvaluator::new(map)
438 }
439
440 async fn get_sampling_match(rules: &[SamplingRule], instance: &impl Getter) -> SamplingMatch {
442 match SamplingEvaluator::new(Utc::now())
443 .match_rules(Uuid::default(), instance, rules.iter())
444 .await
445 {
446 ControlFlow::Break(sampling_match) => sampling_match,
447 ControlFlow::Continue(_) => panic!("no match found"),
448 }
449 }
450
451 fn evaluation_is_match(res: ControlFlow<SamplingMatch, SamplingEvaluator>) -> bool {
452 matches!(res, ControlFlow::Break(_))
453 }
454
455 async fn matches_rule_ids(
457 rule_ids: &[u32],
458 rules: &[SamplingRule],
459 instance: &impl Getter,
460 ) -> bool {
461 let matched_rule_ids = MatchedRuleIds(rule_ids.iter().map(|num| RuleId(*num)).collect());
462 let sampling_match = get_sampling_match(rules, instance).await;
463 matched_rule_ids == sampling_match.matched_rules
464 }
465
466 fn get_matched_rules(
468 sampling_evaluator: &ControlFlow<SamplingMatch, SamplingEvaluator>,
469 ) -> Vec<u32> {
470 match sampling_evaluator {
471 ControlFlow::Continue(_) => panic!("expected a sampling match"),
472 ControlFlow::Break(m) => m.matched_rules.0.iter().map(|rule_id| rule_id.0).collect(),
473 }
474 }
475
476 fn mocked_dsc_with_getter_values(
478 paths_and_values: Vec<(&str, &str)>,
479 ) -> DynamicSamplingContext {
480 let mut dsc = DynamicSamplingContext {
481 trace_id: "67e5504410b1426f9247bb680e5fe0c8".parse().unwrap(),
482 public_key: "12345678123456781234567812345678".parse().unwrap(),
483 release: None,
484 environment: None,
485 transaction: None,
486 sample_rate: None,
487 user: TraceUserContext::default(),
488 replay_id: None,
489 sampled: None,
490 other: Default::default(),
491 };
492
493 for (path, value) in paths_and_values {
494 match path {
495 "trace.release" => dsc.release = Some(value.to_owned()),
496 "trace.environment" => dsc.environment = Some(value.to_owned()),
497 "trace.user.id" => value.clone_into(&mut dsc.user.user_id),
498 "trace.user.segment" => value.clone_into(&mut dsc.user.user_segment),
499 "trace.transaction" => dsc.transaction = Some(value.to_owned()),
500 "trace.replay_id" => dsc.replay_id = Some(Uuid::from_str(value).unwrap()),
501 _ => panic!("invalid path"),
502 }
503 }
504
505 dsc
506 }
507
508 async fn is_match(
509 now: DateTime<Utc>,
510 rule: &SamplingRule,
511 dsc: &DynamicSamplingContext,
512 ) -> bool {
513 SamplingEvaluator::new(now)
514 .match_rules(Uuid::default(), dsc, std::iter::once(rule))
515 .await
516 .is_break()
517 }
518
519 #[tokio::test]
520 async fn test_reservoir_evaluator_limit() {
521 let evaluator = mock_reservoir_evaluator(vec![(1, 0)]);
522
523 let rule = RuleId(1);
524 let limit = 3;
525
526 assert!(evaluator.evaluate(rule, limit, None).await);
527 assert!(evaluator.evaluate(rule, limit, None).await);
528 assert!(evaluator.evaluate(rule, limit, None).await);
529 assert!(!evaluator.evaluate(rule, limit, None).await);
531 assert!(!evaluator.evaluate(rule, limit, None).await);
532 }
533
534 #[tokio::test]
535 async fn test_sample_rate_compounding() {
536 let rules = simple_sampling_rules(vec![
537 (RuleCondition::all(), SamplingValue::Factor { value: 0.8 }),
538 (RuleCondition::all(), SamplingValue::Factor { value: 0.5 }),
539 (
540 RuleCondition::all(),
541 SamplingValue::SampleRate { value: 0.25 },
542 ),
543 ]);
544 let dsc = mocked_dsc_with_getter_values(vec![]);
545
546 assert_eq!(get_sampling_match(&rules, &dsc).await.sample_rate(), 0.1);
548 }
549
550 #[tokio::test]
551 async fn test_minimum_sample_rate() {
552 let rules = simple_sampling_rules(vec![
553 (RuleCondition::all(), SamplingValue::Factor { value: 1.5 }),
554 (
555 RuleCondition::all(),
556 SamplingValue::MinimumSampleRate { value: 0.5 },
557 ),
558 (
560 RuleCondition::all(),
561 SamplingValue::MinimumSampleRate { value: 1.0 },
562 ),
563 (
564 RuleCondition::all(),
565 SamplingValue::SampleRate { value: 0.05 },
566 ),
567 ]);
568 let dsc = mocked_dsc_with_getter_values(vec![]);
569
570 assert_eq!(get_sampling_match(&rules, &dsc).await.sample_rate(), 0.75);
572 }
573
574 fn mocked_sampling_rule() -> SamplingRule {
575 SamplingRule {
576 condition: RuleCondition::all(),
577 sampling_value: SamplingValue::SampleRate { value: 1.0 },
578 ty: RuleType::Trace,
579 id: RuleId(0),
580 time_range: Default::default(),
581 decaying_fn: Default::default(),
582 }
583 }
584
585 fn simple_sampling_rules(vals: Vec<(RuleCondition, SamplingValue)>) -> Vec<SamplingRule> {
588 let mut vec = vec![];
589
590 for (i, val) in vals.into_iter().enumerate() {
591 let (condition, sampling_value) = val;
592 vec.push(SamplingRule {
593 condition,
594 sampling_value,
595 ty: RuleType::Trace,
596 id: RuleId(i as u32),
597 time_range: Default::default(),
598 decaying_fn: Default::default(),
599 });
600 }
601 vec
602 }
603
604 #[tokio::test]
612 async fn test_reservoir_override() {
613 let dsc = mocked_dsc_with_getter_values(vec![]);
614 let rules = simple_sampling_rules(vec![
615 (RuleCondition::all(), SamplingValue::Factor { value: 0.5 }),
616 (RuleCondition::all(), SamplingValue::Reservoir { limit: 2 }),
619 (
620 RuleCondition::all(),
621 SamplingValue::SampleRate { value: 0.5 },
622 ),
623 ]);
624
625 let reservoir = mock_reservoir_evaluator(vec![]);
628
629 let evaluator = SamplingEvaluator::new_with_reservoir(Utc::now(), &reservoir);
630 let matched_rules = get_matched_rules(
631 &evaluator
632 .match_rules(Uuid::default(), &dsc, rules.iter())
633 .await,
634 );
635 assert_eq!(&matched_rules, &[1]);
637
638 let evaluator = SamplingEvaluator::new_with_reservoir(Utc::now(), &reservoir);
639 let matched_rules = get_matched_rules(
640 &evaluator
641 .match_rules(Uuid::default(), &dsc, rules.iter())
642 .await,
643 );
644 assert_eq!(&matched_rules, &[1]);
646
647 let evaluator = SamplingEvaluator::new_with_reservoir(Utc::now(), &reservoir);
648 let matched_rules = get_matched_rules(
649 &evaluator
650 .match_rules(Uuid::default(), &dsc, rules.iter())
651 .await,
652 );
653 assert_eq!(&matched_rules, &[0, 2]);
655 }
656
657 #[tokio::test]
659 async fn test_expired_rules() {
660 let rule = SamplingRule {
661 condition: RuleCondition::all(),
662 sampling_value: SamplingValue::SampleRate { value: 1.0 },
663 ty: RuleType::Trace,
664 id: RuleId(0),
665 time_range: TimeRange {
666 start: Some(Utc.with_ymd_and_hms(1970, 10, 10, 0, 0, 0).unwrap()),
667 end: Some(Utc.with_ymd_and_hms(1970, 10, 12, 0, 0, 0).unwrap()),
668 },
669 decaying_fn: Default::default(),
670 };
671
672 let dsc = mocked_dsc_with_getter_values(vec![]);
673
674 let within_timerange = Utc.with_ymd_and_hms(1970, 10, 11, 0, 0, 0).unwrap();
676 let res = SamplingEvaluator::new(within_timerange)
677 .match_rules(Uuid::default(), &dsc, [rule.clone()].iter())
678 .await;
679 assert!(evaluation_is_match(res));
680
681 let before_timerange = Utc.with_ymd_and_hms(1969, 1, 1, 0, 0, 0).unwrap();
682 let res = SamplingEvaluator::new(before_timerange)
683 .match_rules(Uuid::default(), &dsc, [rule.clone()].iter())
684 .await;
685 assert!(!evaluation_is_match(res));
686
687 let after_timerange = Utc.with_ymd_and_hms(1971, 1, 1, 0, 0, 0).unwrap();
688 let res = SamplingEvaluator::new(after_timerange)
689 .match_rules(Uuid::default(), &dsc, [rule].iter())
690 .await;
691 assert!(!evaluation_is_match(res));
692 }
693
694 #[tokio::test]
696 async fn test_condition_matching() {
697 let rules = simple_sampling_rules(vec![
698 (
699 RuleCondition::glob("trace.transaction", "*healthcheck*"),
700 SamplingValue::SampleRate { value: 1.0 },
701 ),
702 (
703 RuleCondition::glob("trace.environment", "*dev*"),
704 SamplingValue::SampleRate { value: 1.0 },
705 ),
706 (
707 RuleCondition::eq_ignore_case("trace.transaction", "raboof"),
708 SamplingValue::Factor { value: 1.0 },
709 ),
710 (
711 RuleCondition::glob("trace.release", "1.1.1")
712 & RuleCondition::eq_ignore_case("trace.user.segment", "vip"),
713 SamplingValue::SampleRate { value: 1.0 },
714 ),
715 (
716 RuleCondition::eq_ignore_case("trace.release", "1.1.1")
717 & RuleCondition::eq_ignore_case("trace.environment", "prod"),
718 SamplingValue::Factor { value: 1.0 },
719 ),
720 (
721 RuleCondition::all(),
722 SamplingValue::SampleRate { value: 1.0 },
723 ),
724 ]);
725
726 let dsc = mocked_dsc_with_getter_values(vec![("trace.transaction", "foohealthcheckbar")]);
728 assert!(matches_rule_ids(&[0], &rules, &dsc).await);
729
730 let dsc = mocked_dsc_with_getter_values(vec![("trace.environment", "dev")]);
732 assert!(matches_rule_ids(&[1], &rules, &dsc).await);
733
734 let dsc = mocked_dsc_with_getter_values(vec![("trace.transaction", "raboof")]);
736 assert!(matches_rule_ids(&[2, 5], &rules, &dsc).await);
737
738 let dsc = mocked_dsc_with_getter_values(vec![
740 ("trace.transaction", "raboof"),
741 ("trace.release", "1.1.1"),
742 ("trace.user.segment", "vip"),
743 ]);
744 assert!(matches_rule_ids(&[2, 3], &rules, &dsc).await);
745
746 let dsc = mocked_dsc_with_getter_values(vec![
748 ("trace.transaction", "raboof"),
749 ("trace.release", "1.1.1"),
750 ("trace.environment", "prod"),
751 ]);
752 assert!(matches_rule_ids(&[2, 4, 5], &rules, &dsc).await);
753
754 let dsc = mocked_dsc_with_getter_values(vec![
756 ("trace.release", "1.1.1"),
757 ("trace.environment", "prod"),
758 ]);
759 assert!(matches_rule_ids(&[4, 5], &rules, &dsc).await);
760 }
761
762 #[test]
763 fn test_repeatable_seed() {
765 let val1 = pseudo_random_from_seed(Uuid::default());
766 let val2 = pseudo_random_from_seed(Uuid::default());
767 assert!(val1 + f64::EPSILON > val2 && val2 + f64::EPSILON > val1);
768 }
769
770 #[test]
771 fn matched_rule_ids_display() {
773 let matched_rule_ids = MatchedRuleIds(vec![RuleId(123), RuleId(456)]);
774 assert_eq!(matched_rule_ids.to_string(), "123,456");
775
776 let matched_rule_ids = MatchedRuleIds(vec![RuleId(123)]);
777 assert_eq!(matched_rule_ids.to_string(), "123");
778
779 let matched_rule_ids = MatchedRuleIds(vec![]);
780 assert_eq!(matched_rule_ids.to_string(), "")
781 }
782
783 #[test]
784 fn matched_rule_ids_parse() {
786 assert_eq!(
787 MatchedRuleIds::parse("123,456"),
788 Ok(MatchedRuleIds(vec![RuleId(123), RuleId(456)]))
789 );
790
791 assert_eq!(
792 MatchedRuleIds::parse("123"),
793 Ok(MatchedRuleIds(vec![RuleId(123)]))
794 );
795
796 assert!(MatchedRuleIds::parse("").is_err());
797
798 assert!(MatchedRuleIds::parse(",").is_err());
799
800 assert!(MatchedRuleIds::parse("123.456").is_err());
801
802 assert!(MatchedRuleIds::parse("a,b").is_err());
803 }
804
805 #[tokio::test]
806 async fn test_get_sampling_match_result_with_no_match() {
808 let dsc = mocked_dsc_with_getter_values(vec![]);
809
810 let res = SamplingEvaluator::new(Utc::now())
811 .match_rules(Uuid::default(), &dsc, [].iter())
812 .await;
813
814 assert!(!evaluation_is_match(res));
815 }
816
817 #[tokio::test]
822 async fn test_sample_rate_valid_time_range() {
823 let dsc = mocked_dsc_with_getter_values(vec![]);
824 let time_range = TimeRange {
825 start: Some(Utc.with_ymd_and_hms(1970, 1, 1, 0, 0, 0).unwrap()),
826 end: Some(Utc.with_ymd_and_hms(1980, 1, 1, 0, 0, 0).unwrap()),
827 };
828
829 let before_time_range = Utc.with_ymd_and_hms(1969, 1, 1, 0, 0, 0).unwrap();
830 let during_time_range = Utc.with_ymd_and_hms(1975, 1, 1, 0, 0, 0).unwrap();
831 let after_time_range = Utc.with_ymd_and_hms(1981, 1, 1, 0, 0, 0).unwrap();
832
833 let rule = SamplingRule {
834 condition: RuleCondition::all(),
835 sampling_value: SamplingValue::SampleRate { value: 1.0 },
836 ty: RuleType::Trace,
837 id: RuleId(0),
838 time_range,
839 decaying_fn: DecayingFunction::Constant,
840 };
841
842 assert!(!is_match(before_time_range, &rule, &dsc).await);
844 assert!(is_match(during_time_range, &rule, &dsc).await);
845 assert!(!is_match(after_time_range, &rule, &dsc).await);
846
847 let mut rule_without_end = rule.clone();
849 rule_without_end.time_range.end = None;
850 assert!(!is_match(before_time_range, &rule_without_end, &dsc).await);
851 assert!(is_match(during_time_range, &rule_without_end, &dsc).await);
852 assert!(is_match(after_time_range, &rule_without_end, &dsc).await);
853
854 let mut rule_without_start = rule.clone();
856 rule_without_start.time_range.start = None;
857 assert!(is_match(before_time_range, &rule_without_start, &dsc).await);
858 assert!(is_match(during_time_range, &rule_without_start, &dsc).await);
859 assert!(!is_match(after_time_range, &rule_without_start, &dsc).await);
860
861 let mut rule_without_range = rule.clone();
863 rule_without_range.time_range = TimeRange::default();
864 assert!(is_match(before_time_range, &rule_without_range, &dsc).await);
865 assert!(is_match(during_time_range, &rule_without_range, &dsc).await);
866 assert!(is_match(after_time_range, &rule_without_range, &dsc).await);
867 }
868
869 #[tokio::test]
871 async fn test_validate_match() {
872 let mut rule = mocked_sampling_rule();
873
874 let reservoir = ReservoirEvaluator::new(ReservoirCounters::default());
875 let mut eval = SamplingEvaluator::new_with_reservoir(Utc::now(), &reservoir);
876
877 rule.sampling_value = SamplingValue::SampleRate { value: 1.0 };
878 assert_eq!(eval.try_compute_sample_rate(&rule).await, Some(1.0));
879
880 rule.sampling_value = SamplingValue::Factor { value: 1.0 };
881 assert_eq!(eval.try_compute_sample_rate(&rule).await, None);
882
883 rule.sampling_value = SamplingValue::Reservoir { limit: 1 };
884 assert_eq!(eval.try_compute_sample_rate(&rule).await, Some(1.0));
885 }
886}