1use std::ops::ControlFlow;
3
4use chrono::Utc;
5use relay_config::Config;
6use relay_dynamic_config::ErrorBoundary;
7use relay_event_schema::protocol::{Contexts, Event, TraceContext};
8use relay_protocol::{Annotated, Empty};
9use relay_quotas::DataCategory;
10use relay_sampling::config::RuleType;
11use relay_sampling::evaluation::{ReservoirEvaluator, SamplingEvaluator};
12use relay_sampling::{DynamicSamplingContext, SamplingConfig};
13
14use crate::envelope::ItemType;
15use crate::managed::TypedEnvelope;
16use crate::services::outcome::Outcome;
17use crate::services::processor::{
18 EventProcessing, Sampling, SpansExtracted, TransactionGroup, event_category,
19};
20use crate::services::projects::project::ProjectInfo;
21use crate::utils::{self, SamplingResult};
22
23pub fn validate_and_set_dsc<'a, T>(
48 managed_envelope: &mut TypedEnvelope<T>,
49 event: &mut Annotated<Event>,
50 project_info: &'a ProjectInfo,
51 sampling_project_info: Option<&'a ProjectInfo>,
52) -> Option<&'a ProjectInfo> {
53 let original_dsc = managed_envelope.envelope().dsc();
54 if original_dsc.is_some() && sampling_project_info.is_some() {
55 return sampling_project_info;
56 }
57
58 if let Some(event) = event.value()
61 && let Some(key_config) = project_info.get_public_key_config()
62 && let Some(mut dsc) = utils::dsc_from_event(key_config.public_key, event)
63 {
64 let original_sample_rate = original_dsc.and_then(|dsc| dsc.sample_rate);
67 dsc.sample_rate = dsc.sample_rate.or(original_sample_rate);
68
69 managed_envelope.envelope_mut().set_dsc(dsc);
70 return Some(project_info);
71 }
72
73 managed_envelope.envelope_mut().remove_dsc();
75 None
76}
77
78pub async fn run<Group>(
80 managed_envelope: &mut TypedEnvelope<Group>,
81 event: &mut Annotated<Event>,
82 config: &Config,
83 project_info: &ProjectInfo,
84 sampling_project_info: Option<&ProjectInfo>,
85 reservoir: &ReservoirEvaluator<'_>,
86) -> SamplingResult
87where
88 Group: Sampling,
89{
90 if !Group::supports_sampling(project_info) {
91 return SamplingResult::Pending;
92 }
93
94 let sampling_config = match project_info.config.sampling {
95 Some(ErrorBoundary::Ok(ref config)) if !config.unsupported() => Some(config),
96 _ => None,
97 };
98
99 let root_state = sampling_project_info.as_ref();
100 let root_config = match root_state.and_then(|s| s.config.sampling.as_ref()) {
101 Some(ErrorBoundary::Ok(config)) if !config.unsupported() => Some(config),
102 _ => None,
103 };
104
105 let reservoir = Group::supports_reservoir_sampling().then_some(reservoir);
106
107 compute_sampling_decision(
108 config.processing_enabled(),
109 reservoir,
110 sampling_config,
111 event.value(),
112 root_config,
113 managed_envelope.envelope().dsc(),
114 )
115 .await
116}
117
118pub fn drop_unsampled_items(
120 managed_envelope: &mut TypedEnvelope<TransactionGroup>,
121 event: Annotated<Event>,
122 outcome: Outcome,
123 spans_extracted: SpansExtracted,
124) {
125 let dropped_items = managed_envelope
127 .envelope_mut()
128 .take_items_by(|item| *item.ty() != ItemType::Profile);
131
132 for item in dropped_items {
133 for (category, quantity) in item.quantities() {
134 if category.index_category().is_none() {
138 managed_envelope.track_outcome(outcome.clone(), category, quantity);
139 }
140 }
141 }
142
143 for item in managed_envelope.envelope_mut().items_mut() {
145 item.set_sampled(false);
146 }
147
148 if !spans_extracted.0 {
157 let spans = event.value().and_then(|e| e.spans.value());
158 let span_count = spans.map_or(0, |s| s.len());
159
160 managed_envelope.track_outcome(outcome.clone(), DataCategory::SpanIndexed, span_count + 1);
163 }
164
165 if let Some(category) = event_category(&event) {
167 let category = category.index_category().unwrap_or(category);
168 managed_envelope.track_outcome(outcome, category, 1)
169 }
170}
171
172async fn compute_sampling_decision(
174 processing_enabled: bool,
175 reservoir: Option<&ReservoirEvaluator<'_>>,
176 sampling_config: Option<&SamplingConfig>,
177 event: Option<&Event>,
178 root_sampling_config: Option<&SamplingConfig>,
179 dsc: Option<&DynamicSamplingContext>,
180) -> SamplingResult {
181 if (sampling_config.is_none() || event.is_none())
182 && (root_sampling_config.is_none() || dsc.is_none())
183 {
184 return SamplingResult::NoMatch;
185 }
186
187 if sampling_config.is_some_and(|config| config.unsupported())
188 || root_sampling_config.is_some_and(|config| config.unsupported())
189 {
190 if processing_enabled {
191 relay_log::error!("found unsupported rules even as processing relay");
192 } else {
193 return SamplingResult::NoMatch;
194 }
195 }
196
197 let mut evaluator = match reservoir {
198 Some(reservoir) => SamplingEvaluator::new_with_reservoir(Utc::now(), reservoir),
199 None => SamplingEvaluator::new(Utc::now()),
200 };
201
202 if let (Some(event), Some(sampling_state)) = (event, sampling_config)
203 && let Some(seed) = event.id.value().map(|id| id.0)
204 {
205 let rules = sampling_state.filter_rules(RuleType::Transaction);
206 evaluator = match evaluator.match_rules(seed, event, rules).await {
207 ControlFlow::Continue(evaluator) => evaluator,
208 ControlFlow::Break(sampling_match) => {
209 return SamplingResult::Match(sampling_match);
210 }
211 }
212 };
213
214 if let (Some(dsc), Some(sampling_state)) = (dsc, sampling_config) {
215 let rules = sampling_state.filter_rules(RuleType::Project);
216 evaluator = match evaluator.match_rules(*dsc.trace_id, dsc, rules).await {
217 ControlFlow::Continue(evaluator) => evaluator,
218 ControlFlow::Break(sampling_match) => {
219 return SamplingResult::Match(sampling_match);
220 }
221 }
222 };
223
224 if let (Some(dsc), Some(sampling_state)) = (dsc, root_sampling_config) {
225 let rules = sampling_state.filter_rules(RuleType::Trace);
226 return evaluator
227 .match_rules(*dsc.trace_id, dsc, rules)
228 .await
229 .into();
230 }
231
232 SamplingResult::NoMatch
233}
234
235pub async fn tag_error_with_sampling_decision<Group: EventProcessing>(
241 managed_envelope: &mut TypedEnvelope<Group>,
242 event: &mut Annotated<Event>,
243 sampling_project_info: Option<&ProjectInfo>,
244 config: &Config,
245) {
246 let (Some(dsc), Some(event)) = (managed_envelope.envelope().dsc(), event.value_mut()) else {
247 return;
248 };
249
250 let root_state = sampling_project_info.as_ref();
251 let sampling_config = match root_state.and_then(|s| s.config.sampling.as_ref()) {
252 Some(ErrorBoundary::Ok(config)) => config,
253 _ => return,
254 };
255
256 if sampling_config.unsupported() {
257 if config.processing_enabled() {
258 relay_log::error!("found unsupported rules even as processing relay");
259 }
260
261 return;
262 }
263
264 let Some(sampled) = utils::is_trace_fully_sampled(sampling_config, dsc).await else {
265 return;
266 };
267
268 let context = event
270 .contexts
271 .get_or_insert_with(Contexts::new)
272 .get_or_default::<TraceContext>();
273
274 if context.sampled.is_empty() {
278 relay_log::trace!("tagged error with `sampled = {}` flag", sampled);
279 context.sampled = Annotated::new(sampled);
280 }
281}
282
283#[cfg(test)]
284mod tests {
285 use std::collections::BTreeMap;
286 use std::sync::Arc;
287
288 use bytes::Bytes;
289 use relay_base_schema::events::EventType;
290 use relay_base_schema::project::ProjectKey;
291 use relay_cogs::Token;
292 use relay_dynamic_config::{MetricExtractionConfig, TransactionMetricsConfig};
293 use relay_event_schema::protocol::{EventId, LenientString};
294 use relay_protocol::RuleCondition;
295 use relay_sampling::config::{
296 DecayingFunction, RuleId, SamplingRule, SamplingValue, TimeRange,
297 };
298 use relay_sampling::evaluation::{ReservoirCounters, SamplingDecision, SamplingMatch};
299 use relay_system::Addr;
300
301 use crate::envelope::{ContentType, Envelope, Item};
302 use crate::extractors::RequestMeta;
303 use crate::managed::ManagedEnvelope;
304 use crate::processing;
305 use crate::services::processor::{ProcessEnvelopeGrouped, ProcessingGroup, SpanGroup, Submit};
306 use crate::services::projects::project::ProjectInfo;
307 use crate::testutils::{create_test_processor, new_envelope, state_with_rule_and_condition};
308
309 use super::*;
310
311 fn mocked_event(event_type: EventType, transaction: &str, release: &str) -> Event {
312 Event {
313 id: Annotated::new(EventId::new()),
314 ty: Annotated::new(event_type),
315 transaction: Annotated::new(transaction.to_owned()),
316 release: Annotated::new(LenientString(release.to_owned())),
317 ..Event::default()
318 }
319 }
320
321 fn dummy_reservoir() -> ReservoirEvaluator<'static> {
322 ReservoirEvaluator::new(ReservoirCounters::default())
323 }
324
325 fn test_dsc() -> DynamicSamplingContext {
326 DynamicSamplingContext {
327 trace_id: "67e5504410b1426f9247bb680e5fe0c8".parse().unwrap(),
328 public_key: ProjectKey::parse("abd0f232775f45feab79864e580d160b").unwrap(),
329 release: Some("1.1.1".to_owned()),
330 user: Default::default(),
331 replay_id: None,
332 environment: None,
333 transaction: Some("transaction1".into()),
334 sample_rate: Some(0.5),
335 sampled: Some(true),
336 other: BTreeMap::new(),
337 }
338 }
339
340 fn get_sampling_match(sampling_result: SamplingResult) -> SamplingMatch {
342 if let SamplingResult::Match(sampling_match) = sampling_result {
343 sampling_match
344 } else {
345 panic!()
346 }
347 }
348
349 async fn process_envelope_with_root_project_state(
351 envelope: Box<Envelope>,
352 sampling_project_info: Option<&ProjectInfo>,
353 ) -> Envelope {
354 let processor = create_test_processor(Default::default()).await;
355 let outcome_aggregator = Addr::dummy();
356
357 let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
358 assert_eq!(envelopes.len(), 1);
359 let (group, envelope) = envelopes.pop().unwrap();
360
361 let message = ProcessEnvelopeGrouped {
362 group,
363 envelope: ManagedEnvelope::new(envelope, outcome_aggregator),
364 ctx: processing::Context {
365 sampling_project_info,
366 ..processing::Context::for_test()
367 },
368 reservoir_counters: &ReservoirCounters::default(),
369 };
370
371 let Ok(Some(Submit::Envelope(envelope))) =
372 processor.process(&mut Token::noop(), message).await
373 else {
374 panic!();
375 };
376
377 envelope.envelope().clone()
378 }
379
380 fn extract_first_event_from_envelope(envelope: Envelope) -> Event {
381 let item = envelope.items().next().unwrap();
382 let annotated_event: Annotated<Event> =
383 Annotated::from_json_bytes(&item.payload()).unwrap();
384 annotated_event.into_value().unwrap()
385 }
386
387 fn mocked_error_item() -> Item {
388 let mut item = Item::new(ItemType::Event);
389 item.set_payload(
390 ContentType::Json,
391 r#"{
392 "event_id": "52df9022835246eeb317dbd739ccd059",
393 "exception": {
394 "values": [
395 {
396 "type": "mytype",
397 "value": "myvalue",
398 "module": "mymodule",
399 "thread_id": 42,
400 "other": "value"
401 }
402 ]
403 }
404 }"#,
405 );
406 item
407 }
408
409 #[tokio::test]
410 async fn test_error_is_tagged_correctly_if_trace_sampling_result_is_none() {
411 let event_id = EventId::new();
412 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
413 .parse()
414 .unwrap();
415 let request_meta = RequestMeta::new(dsn);
416
417 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
419 envelope.add_item(mocked_error_item());
420 let new_envelope = process_envelope_with_root_project_state(envelope, None).await;
421 let event = extract_first_event_from_envelope(new_envelope);
422
423 assert!(event.contexts.value().is_none());
424 }
425
426 #[tokio::test]
427 async fn test_it_keeps_or_drops_transactions() {
428 let event = Event {
429 id: Annotated::new(EventId::new()),
430 ty: Annotated::new(EventType::Transaction),
431 transaction: Annotated::new("testing".to_owned()),
432 ..Event::default()
433 };
434
435 for (sample_rate, should_keep) in [(0.0, false), (1.0, true)] {
436 let sampling_config = SamplingConfig {
437 rules: vec![SamplingRule {
438 condition: RuleCondition::all(),
439 sampling_value: SamplingValue::SampleRate { value: sample_rate },
440 ty: RuleType::Transaction,
441 id: RuleId(1),
442 time_range: Default::default(),
443 decaying_fn: DecayingFunction::Constant,
444 }],
445 ..SamplingConfig::new()
446 };
447
448 let res = compute_sampling_decision(
452 false,
453 None,
454 Some(&sampling_config),
455 Some(&event),
456 None,
457 None,
458 )
459 .await;
460 assert_eq!(res.decision().is_keep(), should_keep);
461 }
462 }
463
464 #[tokio::test]
465 async fn test_dsc_respects_metrics_extracted() {
466 relay_test::setup();
467 let outcome_aggregator = Addr::dummy();
468
469 let config = Arc::new(
470 Config::from_json_value(serde_json::json!({
471 "processing": {
472 "enabled": true,
473 "kafka_config": [],
474 }
475 }))
476 .unwrap(),
477 );
478
479 let get_test_params = |version: Option<u16>| {
480 let event = Event {
481 id: Annotated::new(EventId::new()),
482 ty: Annotated::new(EventType::Transaction),
483 transaction: Annotated::new("testing".to_owned()),
484 ..Event::default()
485 };
486
487 let mut project_info = state_with_rule_and_condition(
488 Some(0.0),
489 RuleType::Transaction,
490 RuleCondition::all(),
491 );
492
493 if let Some(version) = version {
494 project_info.config.transaction_metrics =
495 ErrorBoundary::Ok(relay_dynamic_config::TransactionMetricsConfig {
496 version,
497 ..Default::default()
498 })
499 .into();
500 }
501
502 let envelope = new_envelope(false, "foo");
503 let managed_envelope: TypedEnvelope<TransactionGroup> = (
504 ManagedEnvelope::new(envelope, outcome_aggregator.clone()),
505 ProcessingGroup::Transaction,
506 )
507 .try_into()
508 .unwrap();
509
510 let event = Annotated::from(event);
511 (managed_envelope, event, project_info)
512 };
513
514 let reservoir = dummy_reservoir();
515
516 let (mut managed_envelope, mut event, project_info) = get_test_params(None);
518 let sampling_result = run(
519 &mut managed_envelope,
520 &mut event,
521 &config,
522 &project_info,
523 None,
524 &reservoir,
525 )
526 .await;
527 assert_eq!(sampling_result.decision(), SamplingDecision::Keep);
528
529 let (mut managed_envelope, mut event, project_info) = get_test_params(Some(2));
531 let sampling_result = run(
532 &mut managed_envelope,
533 &mut event,
534 &config,
535 &project_info,
536 None,
537 &reservoir,
538 )
539 .await;
540 assert_eq!(sampling_result.decision(), SamplingDecision::Keep);
541
542 let (mut managed_envelope, mut event, project_info) = get_test_params(Some(3));
544 let sampling_result = run(
545 &mut managed_envelope,
546 &mut event,
547 &config,
548 &project_info,
549 None,
550 &reservoir,
551 )
552 .await;
553 assert_eq!(sampling_result.decision(), SamplingDecision::Drop);
554 }
555
556 fn project_state_with_single_rule(sample_rate: f64) -> ProjectInfo {
557 let sampling_config = SamplingConfig {
558 rules: vec![SamplingRule {
559 condition: RuleCondition::all(),
560 sampling_value: SamplingValue::SampleRate { value: sample_rate },
561 ty: RuleType::Trace,
562 id: RuleId(1),
563 time_range: Default::default(),
564 decaying_fn: Default::default(),
565 }],
566 ..SamplingConfig::new()
567 };
568
569 let mut sampling_project_state = ProjectInfo::default();
570 sampling_project_state.config.sampling = Some(ErrorBoundary::Ok(sampling_config));
571 sampling_project_state
572 }
573
574 #[tokio::test]
575 async fn test_error_is_tagged_correctly_if_trace_sampling_result_is_some() {
576 let event_id = EventId::new();
577 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
578 .parse()
579 .unwrap();
580 let request_meta = RequestMeta::new(dsn);
581 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
582 envelope.set_dsc(test_dsc());
583 envelope.add_item(mocked_error_item());
584
585 let sampling_project_state = project_state_with_single_rule(1.0);
587 let new_envelope = process_envelope_with_root_project_state(
588 envelope.clone(),
589 Some(&sampling_project_state),
590 )
591 .await;
592 let event = extract_first_event_from_envelope(new_envelope);
593 let trace_context = event.context::<TraceContext>().unwrap();
594 assert!(trace_context.sampled.value().unwrap());
595
596 let sampling_project_state = project_state_with_single_rule(0.0);
598 let new_envelope =
599 process_envelope_with_root_project_state(envelope, Some(&sampling_project_state)).await;
600 let event = extract_first_event_from_envelope(new_envelope);
601 let trace_context = event.context::<TraceContext>().unwrap();
602 assert!(!trace_context.sampled.value().unwrap());
603 }
604
605 #[tokio::test]
606 async fn test_error_is_not_tagged_if_already_tagged() {
607 let event_id = EventId::new();
608 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
609 .parse()
610 .unwrap();
611 let request_meta = RequestMeta::new(dsn);
612
613 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
615 let mut item = Item::new(ItemType::Event);
616 item.set_payload(
617 ContentType::Json,
618 r#"{
619 "event_id": "52df9022835246eeb317dbd739ccd059",
620 "exception": {
621 "values": [
622 {
623 "type": "mytype",
624 "value": "myvalue",
625 "module": "mymodule",
626 "thread_id": 42,
627 "other": "value"
628 }
629 ]
630 },
631 "contexts": {
632 "trace": {
633 "sampled": true
634 }
635 }
636 }"#,
637 );
638 envelope.add_item(item);
639 let sampling_project_state = project_state_with_single_rule(0.0);
640 let new_envelope =
641 process_envelope_with_root_project_state(envelope, Some(&sampling_project_state)).await;
642 let event = extract_first_event_from_envelope(new_envelope);
643 let trace_context = event.context::<TraceContext>().unwrap();
644 assert!(trace_context.sampled.value().unwrap());
645 }
646
647 #[tokio::test]
649 async fn test_compute_sampling_decision_matching() {
650 for rule_type in [RuleType::Transaction, RuleType::Project] {
651 let event = mocked_event(EventType::Transaction, "foo", "bar");
652 let rule = SamplingRule {
653 condition: RuleCondition::all(),
654 sampling_value: SamplingValue::SampleRate { value: 1.0 },
655 ty: rule_type,
656 id: RuleId(0),
657 time_range: TimeRange::default(),
658 decaying_fn: Default::default(),
659 };
660
661 let sampling_config = SamplingConfig {
662 rules: vec![rule],
663 ..SamplingConfig::new()
664 };
665
666 let res = compute_sampling_decision(
667 false,
668 None,
669 Some(&sampling_config),
670 Some(&event),
671 None,
672 Some(&test_dsc()),
673 )
674 .await;
675 assert!(res.is_match());
676 }
677 }
678
679 #[tokio::test]
680 async fn test_matching_with_unsupported_rule() {
681 let event = mocked_event(EventType::Transaction, "foo", "bar");
682 let rule = SamplingRule {
683 condition: RuleCondition::all(),
684 sampling_value: SamplingValue::SampleRate { value: 1.0 },
685 ty: RuleType::Transaction,
686 id: RuleId(0),
687 time_range: TimeRange::default(),
688 decaying_fn: Default::default(),
689 };
690
691 let unsupported_rule = SamplingRule {
692 condition: RuleCondition::all(),
693 sampling_value: SamplingValue::SampleRate { value: 1.0 },
694 ty: RuleType::Unsupported,
695 id: RuleId(0),
696 time_range: TimeRange::default(),
697 decaying_fn: Default::default(),
698 };
699
700 let sampling_config = SamplingConfig {
701 rules: vec![rule, unsupported_rule],
702 ..SamplingConfig::new()
703 };
704
705 let res = compute_sampling_decision(
707 false,
708 None,
709 Some(&sampling_config),
710 Some(&event),
711 None,
712 None,
713 )
714 .await;
715 assert!(res.is_no_match());
716
717 let res =
719 compute_sampling_decision(true, None, Some(&sampling_config), Some(&event), None, None)
720 .await;
721 assert!(res.is_match());
722 }
723
724 #[tokio::test]
725 async fn test_client_sample_rate() {
726 let dsc = test_dsc();
727
728 let rule = SamplingRule {
729 condition: RuleCondition::all(),
730 sampling_value: SamplingValue::SampleRate { value: 0.2 },
731 ty: RuleType::Trace,
732 id: RuleId(0),
733 time_range: TimeRange::default(),
734 decaying_fn: Default::default(),
735 };
736
737 let sampling_config = SamplingConfig {
738 rules: vec![rule],
739 ..SamplingConfig::new()
740 };
741
742 let res =
743 compute_sampling_decision(false, None, None, None, Some(&sampling_config), Some(&dsc))
744 .await;
745
746 assert_eq!(get_sampling_match(res).sample_rate(), 0.2);
747 }
748
749 async fn run_with_reservoir_rule<Group>(processing_group: ProcessingGroup) -> SamplingResult
750 where
751 Group: Sampling + TryFrom<ProcessingGroup>,
752 {
753 let project_info = {
754 let mut info = ProjectInfo::default();
755 info.config.transaction_metrics = Some(ErrorBoundary::Ok(TransactionMetricsConfig {
756 version: 1,
757 ..Default::default()
758 }));
759 info
760 };
761
762 let bytes = Bytes::from(
763 r#"{"dsn":"https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42","trace":{"trace_id":"89143b0763095bd9c9955e8175d1fb23","public_key":"e12d836b15bb49d7bbf99e64295d995b"}}"#,
764 );
765 let envelope = Envelope::parse_bytes(bytes).unwrap();
766 let config = Config::default();
767
768 let mut managed_envelope: TypedEnvelope<Group> = (
769 ManagedEnvelope::new(envelope, Addr::dummy()),
770 processing_group,
771 )
772 .try_into()
773 .unwrap();
774
775 let mut event = Annotated::new(Event::default());
776
777 let sampling_project_info = {
778 let mut state = ProjectInfo::default();
779 state.config.metric_extraction = ErrorBoundary::Ok(MetricExtractionConfig::default());
780 state.config.sampling = Some(ErrorBoundary::Ok(SamplingConfig {
781 version: 2,
782 rules: vec![
783 SamplingRule {
785 condition: RuleCondition::all(),
786 sampling_value: SamplingValue::Reservoir { limit: 100 },
787 ty: RuleType::Trace,
788 id: RuleId(1),
789 time_range: Default::default(),
790 decaying_fn: Default::default(),
791 },
792 SamplingRule {
794 condition: RuleCondition::all(),
795 sampling_value: SamplingValue::SampleRate { value: 0.0 },
796 ty: RuleType::Trace,
797 id: RuleId(2),
798 time_range: Default::default(),
799 decaying_fn: Default::default(),
800 },
801 ],
802 rules_v2: vec![],
803 }));
804 Some(state)
805 };
806
807 let reservoir = dummy_reservoir();
808 run::<Group>(
809 &mut managed_envelope,
810 &mut event,
811 &config,
812 &project_info,
813 sampling_project_info.as_ref(),
814 &reservoir,
815 )
816 .await
817 }
818
819 #[tokio::test]
820 async fn test_reservoir_applied_for_transactions() {
821 let result =
822 run_with_reservoir_rule::<TransactionGroup>(ProcessingGroup::Transaction).await;
823 assert_eq!(result.decision(), SamplingDecision::Keep);
825 }
826
827 #[tokio::test]
828 async fn test_reservoir_not_applied_for_spans() {
829 let result = run_with_reservoir_rule::<SpanGroup>(ProcessingGroup::Span).await;
830 assert_eq!(result.decision(), SamplingDecision::Drop);
832 }
833}