1use std::ops::ControlFlow;
3use std::sync::Arc;
4
5use chrono::Utc;
6use relay_config::Config;
7use relay_dynamic_config::ErrorBoundary;
8use relay_event_schema::protocol::{Contexts, Event, TraceContext};
9use relay_protocol::{Annotated, Empty};
10use relay_quotas::DataCategory;
11use relay_sampling::config::RuleType;
12use relay_sampling::evaluation::{ReservoirEvaluator, SamplingEvaluator};
13use relay_sampling::{DynamicSamplingContext, SamplingConfig};
14
15use crate::envelope::ItemType;
16use crate::managed::TypedEnvelope;
17use crate::services::outcome::Outcome;
18use crate::services::processor::{
19 EventProcessing, Sampling, SpansExtracted, TransactionGroup, event_category,
20};
21use crate::services::projects::project::ProjectInfo;
22use crate::utils::{self, SamplingResult};
23
24pub fn validate_and_set_dsc<T>(
49 managed_envelope: &mut TypedEnvelope<T>,
50 event: &mut Annotated<Event>,
51 project_info: Arc<ProjectInfo>,
52 sampling_project_info: Option<Arc<ProjectInfo>>,
53) -> Option<Arc<ProjectInfo>> {
54 let original_dsc = managed_envelope.envelope().dsc();
55 if original_dsc.is_some() && sampling_project_info.is_some() {
56 return sampling_project_info;
57 }
58
59 if let Some(event) = event.value()
62 && let Some(key_config) = project_info.get_public_key_config()
63 && let Some(mut dsc) = utils::dsc_from_event(key_config.public_key, event)
64 {
65 let original_sample_rate = original_dsc.and_then(|dsc| dsc.sample_rate);
68 dsc.sample_rate = dsc.sample_rate.or(original_sample_rate);
69
70 managed_envelope.envelope_mut().set_dsc(dsc);
71 return Some(project_info.clone());
72 }
73
74 managed_envelope.envelope_mut().remove_dsc();
76 None
77}
78
79pub async fn run<Group>(
81 managed_envelope: &mut TypedEnvelope<Group>,
82 event: &mut Annotated<Event>,
83 config: Arc<Config>,
84 project_info: Arc<ProjectInfo>,
85 sampling_project_info: Option<Arc<ProjectInfo>>,
86 reservoir: &ReservoirEvaluator<'_>,
87) -> SamplingResult
88where
89 Group: Sampling,
90{
91 if !Group::supports_sampling(&project_info) {
92 return SamplingResult::Pending;
93 }
94
95 let sampling_config = match project_info.config.sampling {
96 Some(ErrorBoundary::Ok(ref config)) if !config.unsupported() => Some(config),
97 _ => None,
98 };
99
100 let root_state = sampling_project_info.as_ref();
101 let root_config = match root_state.and_then(|s| s.config.sampling.as_ref()) {
102 Some(ErrorBoundary::Ok(config)) if !config.unsupported() => Some(config),
103 _ => None,
104 };
105
106 let reservoir = Group::supports_reservoir_sampling().then_some(reservoir);
107
108 compute_sampling_decision(
109 config.processing_enabled(),
110 reservoir,
111 sampling_config,
112 event.value(),
113 root_config,
114 managed_envelope.envelope().dsc(),
115 )
116 .await
117}
118
119pub fn drop_unsampled_items(
121 managed_envelope: &mut TypedEnvelope<TransactionGroup>,
122 event: Annotated<Event>,
123 outcome: Outcome,
124 spans_extracted: SpansExtracted,
125) {
126 let dropped_items = managed_envelope
128 .envelope_mut()
129 .take_items_by(|item| *item.ty() != ItemType::Profile);
132
133 for item in dropped_items {
134 for (category, quantity) in item.quantities() {
135 let category = category.index_category().unwrap_or(category);
139
140 managed_envelope.track_outcome(outcome.clone(), category, quantity);
141 }
142 }
143
144 for item in managed_envelope.envelope_mut().items_mut() {
146 item.set_sampled(false);
147 }
148
149 if !spans_extracted.0 {
158 let spans = event.value().and_then(|e| e.spans.value());
159 let span_count = spans.map_or(0, |s| s.len());
160
161 managed_envelope.track_outcome(outcome.clone(), DataCategory::SpanIndexed, span_count + 1);
164 }
165
166 if let Some(category) = event_category(&event) {
168 let category = category.index_category().unwrap_or(category);
169 managed_envelope.track_outcome(outcome, category, 1)
170 }
171}
172
173async fn compute_sampling_decision(
175 processing_enabled: bool,
176 reservoir: Option<&ReservoirEvaluator<'_>>,
177 sampling_config: Option<&SamplingConfig>,
178 event: Option<&Event>,
179 root_sampling_config: Option<&SamplingConfig>,
180 dsc: Option<&DynamicSamplingContext>,
181) -> SamplingResult {
182 if (sampling_config.is_none() || event.is_none())
183 && (root_sampling_config.is_none() || dsc.is_none())
184 {
185 return SamplingResult::NoMatch;
186 }
187
188 if sampling_config.is_some_and(|config| config.unsupported())
189 || root_sampling_config.is_some_and(|config| config.unsupported())
190 {
191 if processing_enabled {
192 relay_log::error!("found unsupported rules even as processing relay");
193 } else {
194 return SamplingResult::NoMatch;
195 }
196 }
197
198 let mut evaluator = match reservoir {
199 Some(reservoir) => SamplingEvaluator::new_with_reservoir(Utc::now(), reservoir),
200 None => SamplingEvaluator::new(Utc::now()),
201 };
202
203 if let (Some(event), Some(sampling_state)) = (event, sampling_config)
204 && let Some(seed) = event.id.value().map(|id| id.0)
205 {
206 let rules = sampling_state.filter_rules(RuleType::Transaction);
207 evaluator = match evaluator.match_rules(seed, event, rules).await {
208 ControlFlow::Continue(evaluator) => evaluator,
209 ControlFlow::Break(sampling_match) => {
210 return SamplingResult::Match(sampling_match);
211 }
212 }
213 };
214
215 if let (Some(dsc), Some(sampling_state)) = (dsc, sampling_config) {
216 let rules = sampling_state.filter_rules(RuleType::Project);
217 evaluator = match evaluator.match_rules(*dsc.trace_id, dsc, rules).await {
218 ControlFlow::Continue(evaluator) => evaluator,
219 ControlFlow::Break(sampling_match) => {
220 return SamplingResult::Match(sampling_match);
221 }
222 }
223 };
224
225 if let (Some(dsc), Some(sampling_state)) = (dsc, root_sampling_config) {
226 let rules = sampling_state.filter_rules(RuleType::Trace);
227 return evaluator
228 .match_rules(*dsc.trace_id, dsc, rules)
229 .await
230 .into();
231 }
232
233 SamplingResult::NoMatch
234}
235
236pub async fn tag_error_with_sampling_decision<Group: EventProcessing>(
242 managed_envelope: &mut TypedEnvelope<Group>,
243 event: &mut Annotated<Event>,
244 sampling_project_info: Option<Arc<ProjectInfo>>,
245 config: &Config,
246) {
247 let (Some(dsc), Some(event)) = (managed_envelope.envelope().dsc(), event.value_mut()) else {
248 return;
249 };
250
251 let root_state = sampling_project_info.as_ref();
252 let sampling_config = match root_state.and_then(|s| s.config.sampling.as_ref()) {
253 Some(ErrorBoundary::Ok(config)) => config,
254 _ => return,
255 };
256
257 if sampling_config.unsupported() {
258 if config.processing_enabled() {
259 relay_log::error!("found unsupported rules even as processing relay");
260 }
261
262 return;
263 }
264
265 let Some(sampled) = utils::is_trace_fully_sampled(sampling_config, dsc).await else {
266 return;
267 };
268
269 let context = event
271 .contexts
272 .get_or_insert_with(Contexts::new)
273 .get_or_default::<TraceContext>();
274
275 if context.sampled.is_empty() {
279 relay_log::trace!("tagged error with `sampled = {}` flag", sampled);
280 context.sampled = Annotated::new(sampled);
281 }
282}
283
284#[cfg(test)]
285mod tests {
286 use std::collections::BTreeMap;
287 use std::sync::Arc;
288
289 use bytes::Bytes;
290 use relay_base_schema::events::EventType;
291 use relay_base_schema::project::ProjectKey;
292 use relay_cogs::Token;
293 use relay_dynamic_config::{MetricExtractionConfig, TransactionMetricsConfig};
294 use relay_event_schema::protocol::{EventId, LenientString};
295 use relay_protocol::RuleCondition;
296 use relay_sampling::config::{
297 DecayingFunction, RuleId, SamplingRule, SamplingValue, TimeRange,
298 };
299 use relay_sampling::evaluation::{ReservoirCounters, SamplingDecision, SamplingMatch};
300 use relay_system::Addr;
301
302 use crate::envelope::{ContentType, Envelope, Item};
303 use crate::extractors::RequestMeta;
304 use crate::managed::ManagedEnvelope;
305 use crate::services::processor::{ProcessEnvelopeGrouped, ProcessingGroup, SpanGroup, Submit};
306 use crate::services::projects::project::ProjectInfo;
307 use crate::testutils::{create_test_processor, new_envelope, state_with_rule_and_condition};
308
309 use super::*;
310
311 fn mocked_event(event_type: EventType, transaction: &str, release: &str) -> Event {
312 Event {
313 id: Annotated::new(EventId::new()),
314 ty: Annotated::new(event_type),
315 transaction: Annotated::new(transaction.to_owned()),
316 release: Annotated::new(LenientString(release.to_owned())),
317 ..Event::default()
318 }
319 }
320
321 fn dummy_reservoir() -> ReservoirEvaluator<'static> {
322 ReservoirEvaluator::new(ReservoirCounters::default())
323 }
324
325 fn test_dsc() -> DynamicSamplingContext {
326 DynamicSamplingContext {
327 trace_id: "67e5504410b1426f9247bb680e5fe0c8".parse().unwrap(),
328 public_key: ProjectKey::parse("abd0f232775f45feab79864e580d160b").unwrap(),
329 release: Some("1.1.1".to_owned()),
330 user: Default::default(),
331 replay_id: None,
332 environment: None,
333 transaction: Some("transaction1".into()),
334 sample_rate: Some(0.5),
335 sampled: Some(true),
336 other: BTreeMap::new(),
337 }
338 }
339
340 fn get_sampling_match(sampling_result: SamplingResult) -> SamplingMatch {
342 if let SamplingResult::Match(sampling_match) = sampling_result {
343 sampling_match
344 } else {
345 panic!()
346 }
347 }
348
349 async fn process_envelope_with_root_project_state(
351 envelope: Box<Envelope>,
352 sampling_project_info: Option<Arc<ProjectInfo>>,
353 ) -> Envelope {
354 let processor = create_test_processor(Default::default()).await;
355 let outcome_aggregator = Addr::dummy();
356
357 let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
358 assert_eq!(envelopes.len(), 1);
359 let (group, envelope) = envelopes.pop().unwrap();
360
361 let message = ProcessEnvelopeGrouped {
362 group,
363 envelope: ManagedEnvelope::new(envelope, outcome_aggregator),
364 project_info: Arc::new(ProjectInfo::default()),
365 rate_limits: Default::default(),
366 sampling_project_info,
367 reservoir_counters: ReservoirCounters::default(),
368 };
369
370 let Ok(Some(Submit::Envelope(envelope))) =
371 processor.process(&mut Token::noop(), message).await
372 else {
373 panic!();
374 };
375
376 envelope.envelope().clone()
377 }
378
379 fn extract_first_event_from_envelope(envelope: Envelope) -> Event {
380 let item = envelope.items().next().unwrap();
381 let annotated_event: Annotated<Event> =
382 Annotated::from_json_bytes(&item.payload()).unwrap();
383 annotated_event.into_value().unwrap()
384 }
385
386 fn mocked_error_item() -> Item {
387 let mut item = Item::new(ItemType::Event);
388 item.set_payload(
389 ContentType::Json,
390 r#"{
391 "event_id": "52df9022835246eeb317dbd739ccd059",
392 "exception": {
393 "values": [
394 {
395 "type": "mytype",
396 "value": "myvalue",
397 "module": "mymodule",
398 "thread_id": 42,
399 "other": "value"
400 }
401 ]
402 }
403 }"#,
404 );
405 item
406 }
407
408 #[tokio::test]
409 async fn test_error_is_tagged_correctly_if_trace_sampling_result_is_none() {
410 let event_id = EventId::new();
411 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
412 .parse()
413 .unwrap();
414 let request_meta = RequestMeta::new(dsn);
415
416 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
418 envelope.add_item(mocked_error_item());
419 let new_envelope = process_envelope_with_root_project_state(envelope, None).await;
420 let event = extract_first_event_from_envelope(new_envelope);
421
422 assert!(event.contexts.value().is_none());
423 }
424
425 #[tokio::test]
426 async fn test_it_keeps_or_drops_transactions() {
427 let event = Event {
428 id: Annotated::new(EventId::new()),
429 ty: Annotated::new(EventType::Transaction),
430 transaction: Annotated::new("testing".to_owned()),
431 ..Event::default()
432 };
433
434 for (sample_rate, should_keep) in [(0.0, false), (1.0, true)] {
435 let sampling_config = SamplingConfig {
436 rules: vec![SamplingRule {
437 condition: RuleCondition::all(),
438 sampling_value: SamplingValue::SampleRate { value: sample_rate },
439 ty: RuleType::Transaction,
440 id: RuleId(1),
441 time_range: Default::default(),
442 decaying_fn: DecayingFunction::Constant,
443 }],
444 ..SamplingConfig::new()
445 };
446
447 let res = compute_sampling_decision(
451 false,
452 None,
453 Some(&sampling_config),
454 Some(&event),
455 None,
456 None,
457 )
458 .await;
459 assert_eq!(res.decision().is_keep(), should_keep);
460 }
461 }
462
463 #[tokio::test]
464 async fn test_dsc_respects_metrics_extracted() {
465 relay_test::setup();
466 let outcome_aggregator = Addr::dummy();
467
468 let config = Arc::new(
469 Config::from_json_value(serde_json::json!({
470 "processing": {
471 "enabled": true,
472 "kafka_config": [],
473 }
474 }))
475 .unwrap(),
476 );
477
478 let get_test_params = |version: Option<u16>| {
479 let event = Event {
480 id: Annotated::new(EventId::new()),
481 ty: Annotated::new(EventType::Transaction),
482 transaction: Annotated::new("testing".to_owned()),
483 ..Event::default()
484 };
485
486 let mut project_info = state_with_rule_and_condition(
487 Some(0.0),
488 RuleType::Transaction,
489 RuleCondition::all(),
490 );
491
492 if let Some(version) = version {
493 project_info.config.transaction_metrics =
494 ErrorBoundary::Ok(relay_dynamic_config::TransactionMetricsConfig {
495 version,
496 ..Default::default()
497 })
498 .into();
499 }
500
501 let envelope = new_envelope(false, "foo");
502 let managed_envelope: TypedEnvelope<TransactionGroup> = (
503 ManagedEnvelope::new(envelope, outcome_aggregator.clone()),
504 ProcessingGroup::Transaction,
505 )
506 .try_into()
507 .unwrap();
508
509 let event = Annotated::from(event);
510
511 let project_info = Arc::new(project_info);
512
513 (managed_envelope, event, project_info)
514 };
515
516 let reservoir = dummy_reservoir();
517
518 let (mut managed_envelope, mut event, project_info) = get_test_params(None);
520 let sampling_result = run(
521 &mut managed_envelope,
522 &mut event,
523 config.clone(),
524 project_info,
525 None,
526 &reservoir,
527 )
528 .await;
529 assert_eq!(sampling_result.decision(), SamplingDecision::Keep);
530
531 let (mut managed_envelope, mut event, project_info) = get_test_params(Some(2));
533 let sampling_result = run(
534 &mut managed_envelope,
535 &mut event,
536 config.clone(),
537 project_info,
538 None,
539 &reservoir,
540 )
541 .await;
542 assert_eq!(sampling_result.decision(), SamplingDecision::Keep);
543
544 let (mut managed_envelope, mut event, project_info) = get_test_params(Some(3));
546 let sampling_result = run(
547 &mut managed_envelope,
548 &mut event,
549 config.clone(),
550 project_info,
551 None,
552 &reservoir,
553 )
554 .await;
555 assert_eq!(sampling_result.decision(), SamplingDecision::Drop);
556 }
557
558 fn project_state_with_single_rule(sample_rate: f64) -> ProjectInfo {
559 let sampling_config = SamplingConfig {
560 rules: vec![SamplingRule {
561 condition: RuleCondition::all(),
562 sampling_value: SamplingValue::SampleRate { value: sample_rate },
563 ty: RuleType::Trace,
564 id: RuleId(1),
565 time_range: Default::default(),
566 decaying_fn: Default::default(),
567 }],
568 ..SamplingConfig::new()
569 };
570
571 let mut sampling_project_state = ProjectInfo::default();
572 sampling_project_state.config.sampling = Some(ErrorBoundary::Ok(sampling_config));
573 sampling_project_state
574 }
575
576 #[tokio::test]
577 async fn test_error_is_tagged_correctly_if_trace_sampling_result_is_some() {
578 let event_id = EventId::new();
579 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
580 .parse()
581 .unwrap();
582 let request_meta = RequestMeta::new(dsn);
583 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
584 envelope.set_dsc(test_dsc());
585 envelope.add_item(mocked_error_item());
586
587 let sampling_project_state = project_state_with_single_rule(1.0);
589 let new_envelope = process_envelope_with_root_project_state(
590 envelope.clone(),
591 Some(Arc::new(sampling_project_state)),
592 )
593 .await;
594 let event = extract_first_event_from_envelope(new_envelope);
595 let trace_context = event.context::<TraceContext>().unwrap();
596 assert!(trace_context.sampled.value().unwrap());
597
598 let sampling_project_state = project_state_with_single_rule(0.0);
600 let new_envelope = process_envelope_with_root_project_state(
601 envelope,
602 Some(Arc::new(sampling_project_state)),
603 )
604 .await;
605 let event = extract_first_event_from_envelope(new_envelope);
606 let trace_context = event.context::<TraceContext>().unwrap();
607 assert!(!trace_context.sampled.value().unwrap());
608 }
609
610 #[tokio::test]
611 async fn test_error_is_not_tagged_if_already_tagged() {
612 let event_id = EventId::new();
613 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
614 .parse()
615 .unwrap();
616 let request_meta = RequestMeta::new(dsn);
617
618 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
620 let mut item = Item::new(ItemType::Event);
621 item.set_payload(
622 ContentType::Json,
623 r#"{
624 "event_id": "52df9022835246eeb317dbd739ccd059",
625 "exception": {
626 "values": [
627 {
628 "type": "mytype",
629 "value": "myvalue",
630 "module": "mymodule",
631 "thread_id": 42,
632 "other": "value"
633 }
634 ]
635 },
636 "contexts": {
637 "trace": {
638 "sampled": true
639 }
640 }
641 }"#,
642 );
643 envelope.add_item(item);
644 let sampling_project_state = project_state_with_single_rule(0.0);
645 let new_envelope = process_envelope_with_root_project_state(
646 envelope,
647 Some(Arc::new(sampling_project_state)),
648 )
649 .await;
650 let event = extract_first_event_from_envelope(new_envelope);
651 let trace_context = event.context::<TraceContext>().unwrap();
652 assert!(trace_context.sampled.value().unwrap());
653 }
654
655 #[tokio::test]
657 async fn test_compute_sampling_decision_matching() {
658 for rule_type in [RuleType::Transaction, RuleType::Project] {
659 let event = mocked_event(EventType::Transaction, "foo", "bar");
660 let rule = SamplingRule {
661 condition: RuleCondition::all(),
662 sampling_value: SamplingValue::SampleRate { value: 1.0 },
663 ty: rule_type,
664 id: RuleId(0),
665 time_range: TimeRange::default(),
666 decaying_fn: Default::default(),
667 };
668
669 let sampling_config = SamplingConfig {
670 rules: vec![rule],
671 ..SamplingConfig::new()
672 };
673
674 let res = compute_sampling_decision(
675 false,
676 None,
677 Some(&sampling_config),
678 Some(&event),
679 None,
680 Some(&test_dsc()),
681 )
682 .await;
683 assert!(res.is_match());
684 }
685 }
686
687 #[tokio::test]
688 async fn test_matching_with_unsupported_rule() {
689 let event = mocked_event(EventType::Transaction, "foo", "bar");
690 let rule = SamplingRule {
691 condition: RuleCondition::all(),
692 sampling_value: SamplingValue::SampleRate { value: 1.0 },
693 ty: RuleType::Transaction,
694 id: RuleId(0),
695 time_range: TimeRange::default(),
696 decaying_fn: Default::default(),
697 };
698
699 let unsupported_rule = SamplingRule {
700 condition: RuleCondition::all(),
701 sampling_value: SamplingValue::SampleRate { value: 1.0 },
702 ty: RuleType::Unsupported,
703 id: RuleId(0),
704 time_range: TimeRange::default(),
705 decaying_fn: Default::default(),
706 };
707
708 let sampling_config = SamplingConfig {
709 rules: vec![rule, unsupported_rule],
710 ..SamplingConfig::new()
711 };
712
713 let res = compute_sampling_decision(
715 false,
716 None,
717 Some(&sampling_config),
718 Some(&event),
719 None,
720 None,
721 )
722 .await;
723 assert!(res.is_no_match());
724
725 let res =
727 compute_sampling_decision(true, None, Some(&sampling_config), Some(&event), None, None)
728 .await;
729 assert!(res.is_match());
730 }
731
732 #[tokio::test]
733 async fn test_client_sample_rate() {
734 let dsc = test_dsc();
735
736 let rule = SamplingRule {
737 condition: RuleCondition::all(),
738 sampling_value: SamplingValue::SampleRate { value: 0.2 },
739 ty: RuleType::Trace,
740 id: RuleId(0),
741 time_range: TimeRange::default(),
742 decaying_fn: Default::default(),
743 };
744
745 let sampling_config = SamplingConfig {
746 rules: vec![rule],
747 ..SamplingConfig::new()
748 };
749
750 let res =
751 compute_sampling_decision(false, None, None, None, Some(&sampling_config), Some(&dsc))
752 .await;
753
754 assert_eq!(get_sampling_match(res).sample_rate(), 0.2);
755 }
756
757 async fn run_with_reservoir_rule<Group>(processing_group: ProcessingGroup) -> SamplingResult
758 where
759 Group: Sampling + TryFrom<ProcessingGroup>,
760 {
761 let project_info = {
762 let mut info = ProjectInfo::default();
763 info.config.transaction_metrics = Some(ErrorBoundary::Ok(TransactionMetricsConfig {
764 version: 1,
765 ..Default::default()
766 }));
767 Arc::new(info)
768 };
769
770 let bytes = Bytes::from(
771 r#"{"dsn":"https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42","trace":{"trace_id":"89143b0763095bd9c9955e8175d1fb23","public_key":"e12d836b15bb49d7bbf99e64295d995b"}}"#,
772 );
773 let envelope = Envelope::parse_bytes(bytes).unwrap();
774 let config = Arc::new(Config::default());
775
776 let mut managed_envelope: TypedEnvelope<Group> = (
777 ManagedEnvelope::new(envelope, Addr::dummy()),
778 processing_group,
779 )
780 .try_into()
781 .unwrap();
782
783 let mut event = Annotated::new(Event::default());
784
785 let sampling_project_info = {
786 let mut state = ProjectInfo::default();
787 state.config.metric_extraction = ErrorBoundary::Ok(MetricExtractionConfig::default());
788 state.config.sampling = Some(ErrorBoundary::Ok(SamplingConfig {
789 version: 2,
790 rules: vec![
791 SamplingRule {
793 condition: RuleCondition::all(),
794 sampling_value: SamplingValue::Reservoir { limit: 100 },
795 ty: RuleType::Trace,
796 id: RuleId(1),
797 time_range: Default::default(),
798 decaying_fn: Default::default(),
799 },
800 SamplingRule {
802 condition: RuleCondition::all(),
803 sampling_value: SamplingValue::SampleRate { value: 0.0 },
804 ty: RuleType::Trace,
805 id: RuleId(2),
806 time_range: Default::default(),
807 decaying_fn: Default::default(),
808 },
809 ],
810 rules_v2: vec![],
811 }));
812 Some(Arc::new(state))
813 };
814
815 let reservoir = dummy_reservoir();
816 run::<Group>(
817 &mut managed_envelope,
818 &mut event,
819 config,
820 project_info,
821 sampling_project_info,
822 &reservoir,
823 )
824 .await
825 }
826
827 #[tokio::test]
828 async fn test_reservoir_applied_for_transactions() {
829 let result =
830 run_with_reservoir_rule::<TransactionGroup>(ProcessingGroup::Transaction).await;
831 assert_eq!(result.decision(), SamplingDecision::Keep);
833 }
834
835 #[tokio::test]
836 async fn test_reservoir_not_applied_for_spans() {
837 let result = run_with_reservoir_rule::<SpanGroup>(ProcessingGroup::Span).await;
838 assert_eq!(result.decision(), SamplingDecision::Drop);
840 }
841}