1use std::ops::ControlFlow;
3use std::sync::Arc;
4
5use chrono::Utc;
6use relay_config::Config;
7use relay_dynamic_config::ErrorBoundary;
8use relay_event_schema::protocol::{Contexts, Event, TraceContext};
9use relay_protocol::{Annotated, Empty};
10use relay_quotas::DataCategory;
11use relay_sampling::config::RuleType;
12use relay_sampling::evaluation::{ReservoirEvaluator, SamplingEvaluator};
13use relay_sampling::{DynamicSamplingContext, SamplingConfig};
14
15use crate::envelope::ItemType;
16use crate::managed::TypedEnvelope;
17use crate::services::outcome::Outcome;
18use crate::services::processor::{
19 EventProcessing, Sampling, SpansExtracted, TransactionGroup, event_category,
20};
21use crate::services::projects::project::ProjectInfo;
22use crate::utils::{self, SamplingResult};
23
24pub fn validate_and_set_dsc<T>(
49 managed_envelope: &mut TypedEnvelope<T>,
50 event: &mut Annotated<Event>,
51 project_info: Arc<ProjectInfo>,
52 sampling_project_info: Option<Arc<ProjectInfo>>,
53) -> Option<Arc<ProjectInfo>> {
54 let original_dsc = managed_envelope.envelope().dsc();
55 if original_dsc.is_some() && sampling_project_info.is_some() {
56 return sampling_project_info;
57 }
58
59 if let Some(event) = event.value() {
62 if let Some(key_config) = project_info.get_public_key_config() {
63 if let Some(mut dsc) = utils::dsc_from_event(key_config.public_key, event) {
64 let original_sample_rate = original_dsc.and_then(|dsc| dsc.sample_rate);
67 dsc.sample_rate = dsc.sample_rate.or(original_sample_rate);
68
69 managed_envelope.envelope_mut().set_dsc(dsc);
70 return Some(project_info.clone());
71 }
72 }
73 }
74
75 managed_envelope.envelope_mut().remove_dsc();
77 None
78}
79
80pub async fn run<Group>(
82 managed_envelope: &mut TypedEnvelope<Group>,
83 event: &mut Annotated<Event>,
84 config: Arc<Config>,
85 project_info: Arc<ProjectInfo>,
86 sampling_project_info: Option<Arc<ProjectInfo>>,
87 reservoir: &ReservoirEvaluator<'_>,
88) -> SamplingResult
89where
90 Group: Sampling,
91{
92 if !Group::supports_sampling(&project_info) {
93 return SamplingResult::Pending;
94 }
95
96 let sampling_config = match project_info.config.sampling {
97 Some(ErrorBoundary::Ok(ref config)) if !config.unsupported() => Some(config),
98 _ => None,
99 };
100
101 let root_state = sampling_project_info.as_ref();
102 let root_config = match root_state.and_then(|s| s.config.sampling.as_ref()) {
103 Some(ErrorBoundary::Ok(config)) if !config.unsupported() => Some(config),
104 _ => None,
105 };
106
107 let reservoir = Group::supports_reservoir_sampling().then_some(reservoir);
108
109 compute_sampling_decision(
110 config.processing_enabled(),
111 reservoir,
112 sampling_config,
113 event.value(),
114 root_config,
115 managed_envelope.envelope().dsc(),
116 )
117 .await
118}
119
120pub fn drop_unsampled_items(
122 managed_envelope: &mut TypedEnvelope<TransactionGroup>,
123 event: Annotated<Event>,
124 outcome: Outcome,
125 spans_extracted: SpansExtracted,
126) {
127 let dropped_items = managed_envelope
129 .envelope_mut()
130 .take_items_by(|item| *item.ty() != ItemType::Profile);
133
134 for item in dropped_items {
135 for (category, quantity) in item.quantities() {
136 let category = category.index_category().unwrap_or(category);
140
141 managed_envelope.track_outcome(outcome.clone(), category, quantity);
142 }
143 }
144
145 for item in managed_envelope.envelope_mut().items_mut() {
147 item.set_sampled(false);
148 }
149
150 if !spans_extracted.0 {
159 let spans = event.value().and_then(|e| e.spans.value());
160 let span_count = spans.map_or(0, |s| s.len());
161
162 managed_envelope.track_outcome(outcome.clone(), DataCategory::SpanIndexed, span_count + 1);
165 }
166
167 if let Some(category) = event_category(&event) {
169 let category = category.index_category().unwrap_or(category);
170 managed_envelope.track_outcome(outcome, category, 1)
171 }
172}
173
174async fn compute_sampling_decision(
176 processing_enabled: bool,
177 reservoir: Option<&ReservoirEvaluator<'_>>,
178 sampling_config: Option<&SamplingConfig>,
179 event: Option<&Event>,
180 root_sampling_config: Option<&SamplingConfig>,
181 dsc: Option<&DynamicSamplingContext>,
182) -> SamplingResult {
183 if (sampling_config.is_none() || event.is_none())
184 && (root_sampling_config.is_none() || dsc.is_none())
185 {
186 return SamplingResult::NoMatch;
187 }
188
189 if sampling_config.is_some_and(|config| config.unsupported())
190 || root_sampling_config.is_some_and(|config| config.unsupported())
191 {
192 if processing_enabled {
193 relay_log::error!("found unsupported rules even as processing relay");
194 } else {
195 return SamplingResult::NoMatch;
196 }
197 }
198
199 let mut evaluator = match reservoir {
200 Some(reservoir) => SamplingEvaluator::new_with_reservoir(Utc::now(), reservoir),
201 None => SamplingEvaluator::new(Utc::now()),
202 };
203
204 if let (Some(event), Some(sampling_state)) = (event, sampling_config) {
205 if let Some(seed) = event.id.value().map(|id| id.0) {
206 let rules = sampling_state.filter_rules(RuleType::Transaction);
207 evaluator = match evaluator.match_rules(seed, event, rules).await {
208 ControlFlow::Continue(evaluator) => evaluator,
209 ControlFlow::Break(sampling_match) => {
210 return SamplingResult::Match(sampling_match);
211 }
212 }
213 };
214 }
215
216 if let (Some(dsc), Some(sampling_state)) = (dsc, root_sampling_config) {
217 let rules = sampling_state.filter_rules(RuleType::Trace);
218 return evaluator
219 .match_rules(*dsc.trace_id, dsc, rules)
220 .await
221 .into();
222 }
223
224 SamplingResult::NoMatch
225}
226
227pub async fn tag_error_with_sampling_decision<Group: EventProcessing>(
233 managed_envelope: &mut TypedEnvelope<Group>,
234 event: &mut Annotated<Event>,
235 sampling_project_info: Option<Arc<ProjectInfo>>,
236 config: &Config,
237) {
238 let (Some(dsc), Some(event)) = (managed_envelope.envelope().dsc(), event.value_mut()) else {
239 return;
240 };
241
242 let root_state = sampling_project_info.as_ref();
243 let sampling_config = match root_state.and_then(|s| s.config.sampling.as_ref()) {
244 Some(ErrorBoundary::Ok(config)) => config,
245 _ => return,
246 };
247
248 if sampling_config.unsupported() {
249 if config.processing_enabled() {
250 relay_log::error!("found unsupported rules even as processing relay");
251 }
252
253 return;
254 }
255
256 let Some(sampled) = utils::is_trace_fully_sampled(sampling_config, dsc).await else {
257 return;
258 };
259
260 let context = event
262 .contexts
263 .get_or_insert_with(Contexts::new)
264 .get_or_default::<TraceContext>();
265
266 if context.sampled.is_empty() {
270 relay_log::trace!("tagged error with `sampled = {}` flag", sampled);
271 context.sampled = Annotated::new(sampled);
272 }
273}
274
275#[cfg(test)]
276mod tests {
277 use std::collections::BTreeMap;
278 use std::sync::Arc;
279
280 use bytes::Bytes;
281 use relay_base_schema::events::EventType;
282 use relay_base_schema::project::ProjectKey;
283 use relay_cogs::Token;
284 use relay_dynamic_config::{MetricExtractionConfig, TransactionMetricsConfig};
285 use relay_event_schema::protocol::{EventId, LenientString};
286 use relay_protocol::RuleCondition;
287 use relay_sampling::config::{
288 DecayingFunction, RuleId, SamplingRule, SamplingValue, TimeRange,
289 };
290 use relay_sampling::evaluation::{ReservoirCounters, SamplingDecision, SamplingMatch};
291 use relay_system::Addr;
292
293 use crate::envelope::{ContentType, Envelope, Item};
294 use crate::extractors::RequestMeta;
295 use crate::managed::ManagedEnvelope;
296 use crate::services::processor::{ProcessEnvelopeGrouped, ProcessingGroup, SpanGroup, Submit};
297 use crate::services::projects::project::ProjectInfo;
298 use crate::testutils::{
299 self, create_test_processor, new_envelope, state_with_rule_and_condition,
300 };
301
302 use super::*;
303
304 fn mocked_event(event_type: EventType, transaction: &str, release: &str) -> Event {
305 Event {
306 id: Annotated::new(EventId::new()),
307 ty: Annotated::new(event_type),
308 transaction: Annotated::new(transaction.to_owned()),
309 release: Annotated::new(LenientString(release.to_owned())),
310 ..Event::default()
311 }
312 }
313
314 fn dummy_reservoir() -> ReservoirEvaluator<'static> {
315 ReservoirEvaluator::new(ReservoirCounters::default())
316 }
317
318 fn get_sampling_match(sampling_result: SamplingResult) -> SamplingMatch {
320 if let SamplingResult::Match(sampling_match) = sampling_result {
321 sampling_match
322 } else {
323 panic!()
324 }
325 }
326
327 async fn process_envelope_with_root_project_state(
329 envelope: Box<Envelope>,
330 sampling_project_info: Option<Arc<ProjectInfo>>,
331 ) -> Envelope {
332 let processor = create_test_processor(Default::default()).await;
333 let (outcome_aggregator, test_store) = testutils::processor_services();
334
335 let mut envelopes = ProcessingGroup::split_envelope(*envelope);
336 assert_eq!(envelopes.len(), 1);
337 let (group, envelope) = envelopes.pop().unwrap();
338
339 let message = ProcessEnvelopeGrouped {
340 group,
341 envelope: ManagedEnvelope::new(envelope, outcome_aggregator, test_store),
342 project_info: Arc::new(ProjectInfo::default()),
343 rate_limits: Default::default(),
344 sampling_project_info,
345 reservoir_counters: ReservoirCounters::default(),
346 };
347
348 let Ok(Some(Submit::Envelope(envelope))) =
349 processor.process(&mut Token::noop(), message).await
350 else {
351 panic!();
352 };
353
354 envelope.envelope().clone()
355 }
356
357 fn extract_first_event_from_envelope(envelope: Envelope) -> Event {
358 let item = envelope.items().next().unwrap();
359 let annotated_event: Annotated<Event> =
360 Annotated::from_json_bytes(&item.payload()).unwrap();
361 annotated_event.into_value().unwrap()
362 }
363
364 fn mocked_error_item() -> Item {
365 let mut item = Item::new(ItemType::Event);
366 item.set_payload(
367 ContentType::Json,
368 r#"{
369 "event_id": "52df9022835246eeb317dbd739ccd059",
370 "exception": {
371 "values": [
372 {
373 "type": "mytype",
374 "value": "myvalue",
375 "module": "mymodule",
376 "thread_id": 42,
377 "other": "value"
378 }
379 ]
380 }
381 }"#,
382 );
383 item
384 }
385
386 #[tokio::test]
387 async fn test_error_is_tagged_correctly_if_trace_sampling_result_is_none() {
388 let event_id = EventId::new();
389 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
390 .parse()
391 .unwrap();
392 let request_meta = RequestMeta::new(dsn);
393
394 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
396 envelope.add_item(mocked_error_item());
397 let new_envelope = process_envelope_with_root_project_state(envelope, None).await;
398 let event = extract_first_event_from_envelope(new_envelope);
399
400 assert!(event.contexts.value().is_none());
401 }
402
403 #[tokio::test]
404 async fn test_it_keeps_or_drops_transactions() {
405 let event = Event {
406 id: Annotated::new(EventId::new()),
407 ty: Annotated::new(EventType::Transaction),
408 transaction: Annotated::new("testing".to_owned()),
409 ..Event::default()
410 };
411
412 for (sample_rate, should_keep) in [(0.0, false), (1.0, true)] {
413 let sampling_config = SamplingConfig {
414 rules: vec![SamplingRule {
415 condition: RuleCondition::all(),
416 sampling_value: SamplingValue::SampleRate { value: sample_rate },
417 ty: RuleType::Transaction,
418 id: RuleId(1),
419 time_range: Default::default(),
420 decaying_fn: DecayingFunction::Constant,
421 }],
422 ..SamplingConfig::new()
423 };
424
425 let res = compute_sampling_decision(
429 false,
430 None,
431 Some(&sampling_config),
432 Some(&event),
433 None,
434 None,
435 )
436 .await;
437 assert_eq!(res.decision().is_keep(), should_keep);
438 }
439 }
440
441 #[tokio::test]
442 async fn test_dsc_respects_metrics_extracted() {
443 relay_test::setup();
444 let (outcome_aggregator, test_store) = testutils::processor_services();
445
446 let config = Arc::new(
447 Config::from_json_value(serde_json::json!({
448 "processing": {
449 "enabled": true,
450 "kafka_config": [],
451 }
452 }))
453 .unwrap(),
454 );
455
456 let get_test_params = |version: Option<u16>| {
457 let event = Event {
458 id: Annotated::new(EventId::new()),
459 ty: Annotated::new(EventType::Transaction),
460 transaction: Annotated::new("testing".to_owned()),
461 ..Event::default()
462 };
463
464 let mut project_info = state_with_rule_and_condition(
465 Some(0.0),
466 RuleType::Transaction,
467 RuleCondition::all(),
468 );
469
470 if let Some(version) = version {
471 project_info.config.transaction_metrics =
472 ErrorBoundary::Ok(relay_dynamic_config::TransactionMetricsConfig {
473 version,
474 ..Default::default()
475 })
476 .into();
477 }
478
479 let envelope = new_envelope(false, "foo");
480 let managed_envelope: TypedEnvelope<TransactionGroup> = (
481 ManagedEnvelope::new(envelope, outcome_aggregator.clone(), test_store.clone()),
482 ProcessingGroup::Transaction,
483 )
484 .try_into()
485 .unwrap();
486
487 let event = Annotated::from(event);
488
489 let project_info = Arc::new(project_info);
490
491 (managed_envelope, event, project_info)
492 };
493
494 let reservoir = dummy_reservoir();
495
496 let (mut managed_envelope, mut event, project_info) = get_test_params(None);
498 let sampling_result = run(
499 &mut managed_envelope,
500 &mut event,
501 config.clone(),
502 project_info,
503 None,
504 &reservoir,
505 )
506 .await;
507 assert_eq!(sampling_result.decision(), SamplingDecision::Keep);
508
509 let (mut managed_envelope, mut event, project_info) = get_test_params(Some(2));
511 let sampling_result = run(
512 &mut managed_envelope,
513 &mut event,
514 config.clone(),
515 project_info,
516 None,
517 &reservoir,
518 )
519 .await;
520 assert_eq!(sampling_result.decision(), SamplingDecision::Keep);
521
522 let (mut managed_envelope, mut event, project_info) = get_test_params(Some(3));
524 let sampling_result = run(
525 &mut managed_envelope,
526 &mut event,
527 config.clone(),
528 project_info,
529 None,
530 &reservoir,
531 )
532 .await;
533 assert_eq!(sampling_result.decision(), SamplingDecision::Drop);
534 }
535
536 fn project_state_with_single_rule(sample_rate: f64) -> ProjectInfo {
537 let sampling_config = SamplingConfig {
538 rules: vec![SamplingRule {
539 condition: RuleCondition::all(),
540 sampling_value: SamplingValue::SampleRate { value: sample_rate },
541 ty: RuleType::Trace,
542 id: RuleId(1),
543 time_range: Default::default(),
544 decaying_fn: Default::default(),
545 }],
546 ..SamplingConfig::new()
547 };
548
549 let mut sampling_project_state = ProjectInfo::default();
550 sampling_project_state.config.sampling = Some(ErrorBoundary::Ok(sampling_config));
551 sampling_project_state
552 }
553
554 #[tokio::test]
555 async fn test_error_is_tagged_correctly_if_trace_sampling_result_is_some() {
556 let event_id = EventId::new();
557 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
558 .parse()
559 .unwrap();
560 let request_meta = RequestMeta::new(dsn);
561 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
562 let dsc = DynamicSamplingContext {
563 trace_id: "67e5504410b1426f9247bb680e5fe0c8".parse().unwrap(),
564 public_key: ProjectKey::parse("abd0f232775f45feab79864e580d160b").unwrap(),
565 release: Some("1.1.1".to_owned()),
566 user: Default::default(),
567 replay_id: None,
568 environment: None,
569 transaction: Some("transaction1".into()),
570 sample_rate: None,
571 sampled: Some(true),
572 other: BTreeMap::new(),
573 };
574 envelope.set_dsc(dsc);
575 envelope.add_item(mocked_error_item());
576
577 let sampling_project_state = project_state_with_single_rule(1.0);
579 let new_envelope = process_envelope_with_root_project_state(
580 envelope.clone(),
581 Some(Arc::new(sampling_project_state)),
582 )
583 .await;
584 let event = extract_first_event_from_envelope(new_envelope);
585 let trace_context = event.context::<TraceContext>().unwrap();
586 assert!(trace_context.sampled.value().unwrap());
587
588 let sampling_project_state = project_state_with_single_rule(0.0);
590 let new_envelope = process_envelope_with_root_project_state(
591 envelope,
592 Some(Arc::new(sampling_project_state)),
593 )
594 .await;
595 let event = extract_first_event_from_envelope(new_envelope);
596 let trace_context = event.context::<TraceContext>().unwrap();
597 assert!(!trace_context.sampled.value().unwrap());
598 }
599
600 #[tokio::test]
601 async fn test_error_is_not_tagged_if_already_tagged() {
602 let event_id = EventId::new();
603 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
604 .parse()
605 .unwrap();
606 let request_meta = RequestMeta::new(dsn);
607
608 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
610 let mut item = Item::new(ItemType::Event);
611 item.set_payload(
612 ContentType::Json,
613 r#"{
614 "event_id": "52df9022835246eeb317dbd739ccd059",
615 "exception": {
616 "values": [
617 {
618 "type": "mytype",
619 "value": "myvalue",
620 "module": "mymodule",
621 "thread_id": 42,
622 "other": "value"
623 }
624 ]
625 },
626 "contexts": {
627 "trace": {
628 "sampled": true
629 }
630 }
631 }"#,
632 );
633 envelope.add_item(item);
634 let sampling_project_state = project_state_with_single_rule(0.0);
635 let new_envelope = process_envelope_with_root_project_state(
636 envelope,
637 Some(Arc::new(sampling_project_state)),
638 )
639 .await;
640 let event = extract_first_event_from_envelope(new_envelope);
641 let trace_context = event.context::<TraceContext>().unwrap();
642 assert!(trace_context.sampled.value().unwrap());
643 }
644
645 #[tokio::test]
647 async fn test_compute_sampling_decision_matching() {
648 let event = mocked_event(EventType::Transaction, "foo", "bar");
649 let rule = SamplingRule {
650 condition: RuleCondition::all(),
651 sampling_value: SamplingValue::SampleRate { value: 1.0 },
652 ty: RuleType::Transaction,
653 id: RuleId(0),
654 time_range: TimeRange::default(),
655 decaying_fn: Default::default(),
656 };
657
658 let sampling_config = SamplingConfig {
659 rules: vec![rule],
660 ..SamplingConfig::new()
661 };
662
663 let res = compute_sampling_decision(
664 false,
665 None,
666 Some(&sampling_config),
667 Some(&event),
668 None,
669 None,
670 )
671 .await;
672 assert!(res.is_match());
673 }
674
675 #[tokio::test]
676 async fn test_matching_with_unsupported_rule() {
677 let event = mocked_event(EventType::Transaction, "foo", "bar");
678 let rule = SamplingRule {
679 condition: RuleCondition::all(),
680 sampling_value: SamplingValue::SampleRate { value: 1.0 },
681 ty: RuleType::Transaction,
682 id: RuleId(0),
683 time_range: TimeRange::default(),
684 decaying_fn: Default::default(),
685 };
686
687 let unsupported_rule = SamplingRule {
688 condition: RuleCondition::all(),
689 sampling_value: SamplingValue::SampleRate { value: 1.0 },
690 ty: RuleType::Unsupported,
691 id: RuleId(0),
692 time_range: TimeRange::default(),
693 decaying_fn: Default::default(),
694 };
695
696 let sampling_config = SamplingConfig {
697 rules: vec![rule, unsupported_rule],
698 ..SamplingConfig::new()
699 };
700
701 let res = compute_sampling_decision(
703 false,
704 None,
705 Some(&sampling_config),
706 Some(&event),
707 None,
708 None,
709 )
710 .await;
711 assert!(res.is_no_match());
712
713 let res =
715 compute_sampling_decision(true, None, Some(&sampling_config), Some(&event), None, None)
716 .await;
717 assert!(res.is_match());
718 }
719
720 #[tokio::test]
721 async fn test_client_sample_rate() {
722 let dsc = DynamicSamplingContext {
723 trace_id: "67e5504410b1426f9247bb680e5fe0c8".parse().unwrap(),
724 public_key: ProjectKey::parse("abd0f232775f45feab79864e580d160b").unwrap(),
725 release: Some("1.1.1".to_owned()),
726 user: Default::default(),
727 replay_id: None,
728 environment: None,
729 transaction: Some("transaction1".into()),
730 sample_rate: Some(0.5),
731 sampled: Some(true),
732 other: BTreeMap::new(),
733 };
734
735 let rule = SamplingRule {
736 condition: RuleCondition::all(),
737 sampling_value: SamplingValue::SampleRate { value: 0.2 },
738 ty: RuleType::Trace,
739 id: RuleId(0),
740 time_range: TimeRange::default(),
741 decaying_fn: Default::default(),
742 };
743
744 let sampling_config = SamplingConfig {
745 rules: vec![rule],
746 ..SamplingConfig::new()
747 };
748
749 let res =
750 compute_sampling_decision(false, None, None, None, Some(&sampling_config), Some(&dsc))
751 .await;
752
753 assert_eq!(get_sampling_match(res).sample_rate(), 0.2);
754 }
755
756 async fn run_with_reservoir_rule<Group>(processing_group: ProcessingGroup) -> SamplingResult
757 where
758 Group: Sampling + TryFrom<ProcessingGroup>,
759 {
760 let project_info = {
761 let mut info = ProjectInfo::default();
762 info.config.transaction_metrics = Some(ErrorBoundary::Ok(TransactionMetricsConfig {
763 version: 1,
764 ..Default::default()
765 }));
766 Arc::new(info)
767 };
768
769 let bytes = Bytes::from(
770 r#"{"dsn":"https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42","trace":{"trace_id":"89143b0763095bd9c9955e8175d1fb23","public_key":"e12d836b15bb49d7bbf99e64295d995b"}}"#,
771 );
772 let envelope = Envelope::parse_bytes(bytes).unwrap();
773 let config = Arc::new(Config::default());
774
775 let mut managed_envelope: TypedEnvelope<Group> = (
776 ManagedEnvelope::new(envelope, Addr::dummy(), Addr::dummy()),
777 processing_group,
778 )
779 .try_into()
780 .unwrap();
781
782 let mut event = Annotated::new(Event::default());
783
784 let sampling_project_info = {
785 let mut state = ProjectInfo::default();
786 state.config.metric_extraction = ErrorBoundary::Ok(MetricExtractionConfig::default());
787 state.config.sampling = Some(ErrorBoundary::Ok(SamplingConfig {
788 version: 2,
789 rules: vec![
790 SamplingRule {
792 condition: RuleCondition::all(),
793 sampling_value: SamplingValue::Reservoir { limit: 100 },
794 ty: RuleType::Trace,
795 id: RuleId(1),
796 time_range: Default::default(),
797 decaying_fn: Default::default(),
798 },
799 SamplingRule {
801 condition: RuleCondition::all(),
802 sampling_value: SamplingValue::SampleRate { value: 0.0 },
803 ty: RuleType::Trace,
804 id: RuleId(2),
805 time_range: Default::default(),
806 decaying_fn: Default::default(),
807 },
808 ],
809 rules_v2: vec![],
810 }));
811 Some(Arc::new(state))
812 };
813
814 let reservoir = dummy_reservoir();
815 run::<Group>(
816 &mut managed_envelope,
817 &mut event,
818 config,
819 project_info,
820 sampling_project_info,
821 &reservoir,
822 )
823 .await
824 }
825
826 #[tokio::test]
827 async fn test_reservoir_applied_for_transactions() {
828 let result =
829 run_with_reservoir_rule::<TransactionGroup>(ProcessingGroup::Transaction).await;
830 assert_eq!(result.decision(), SamplingDecision::Keep);
832 }
833
834 #[tokio::test]
835 async fn test_reservoir_not_applied_for_spans() {
836 let result = run_with_reservoir_rule::<SpanGroup>(ProcessingGroup::Span).await;
837 assert_eq!(result.decision(), SamplingDecision::Drop);
839 }
840}