relay_server/services/processor/
dynamic_sampling.rs

1//! Dynamic sampling processor related code.
2use std::ops::ControlFlow;
3use std::sync::Arc;
4
5use chrono::Utc;
6use relay_config::Config;
7use relay_dynamic_config::ErrorBoundary;
8use relay_event_schema::protocol::{Contexts, Event, TraceContext};
9use relay_protocol::{Annotated, Empty};
10use relay_quotas::DataCategory;
11use relay_sampling::config::RuleType;
12use relay_sampling::evaluation::{ReservoirEvaluator, SamplingEvaluator};
13use relay_sampling::{DynamicSamplingContext, SamplingConfig};
14
15use crate::envelope::ItemType;
16use crate::managed::TypedEnvelope;
17use crate::services::outcome::Outcome;
18use crate::services::processor::{
19    EventProcessing, Sampling, SpansExtracted, TransactionGroup, event_category,
20};
21use crate::services::projects::project::ProjectInfo;
22use crate::utils::{self, SamplingResult};
23
24/// Ensures there is a valid dynamic sampling context and corresponding project state.
25///
26/// The dynamic sampling context (DSC) specifies the project_key of the project that initiated
27/// the trace. That project state should have been loaded previously by the project cache and is
28/// available on the `ProcessEnvelopeState`. Under these conditions, this cannot happen:
29///
30///  - There is no DSC in the envelope headers. This occurs with older or third-party SDKs.
31///  - The project key does not exist. This can happen if the project key was disabled, the
32///    project removed, or in rare cases when a project from another Sentry instance is referred
33///    to.
34///  - The project key refers to a project from another organization. In this case the project
35///    cache does not resolve the state and instead leaves it blank.
36///  - The project state could not be fetched. This is a runtime error, but in this case Relay
37///    should fall back to the next-best sampling rule set.
38///
39/// In all of the above cases, this function will compute a new DSC using information from the
40/// event payload, similar to how SDKs do this. The `sampling_project_state` is also switched to
41/// the main project state.
42///
43/// If there is no transaction event in the envelope, this function will do nothing.
44///
45/// The function will return the sampling project information of the root project for the event. If
46/// no sampling project information is specified, the project information of the event’s project
47/// will be returned.
48pub fn validate_and_set_dsc<T>(
49    managed_envelope: &mut TypedEnvelope<T>,
50    event: &mut Annotated<Event>,
51    project_info: Arc<ProjectInfo>,
52    sampling_project_info: Option<Arc<ProjectInfo>>,
53) -> Option<Arc<ProjectInfo>> {
54    let original_dsc = managed_envelope.envelope().dsc();
55    if original_dsc.is_some() && sampling_project_info.is_some() {
56        return sampling_project_info;
57    }
58
59    // The DSC can only be computed if there's a transaction event. Note that `dsc_from_event`
60    // below already checks for the event type.
61    if let Some(event) = event.value()
62        && let Some(key_config) = project_info.get_public_key_config()
63        && let Some(mut dsc) = utils::dsc_from_event(key_config.public_key, event)
64    {
65        // All other information in the DSC must be discarded, but the sample rate was
66        // actually applied by the client and is therefore correct.
67        let original_sample_rate = original_dsc.and_then(|dsc| dsc.sample_rate);
68        dsc.sample_rate = dsc.sample_rate.or(original_sample_rate);
69
70        managed_envelope.envelope_mut().set_dsc(dsc);
71        return Some(project_info.clone());
72    }
73
74    // If we cannot compute a new DSC but the old one is incorrect, we need to remove it.
75    managed_envelope.envelope_mut().remove_dsc();
76    None
77}
78
79/// Computes the sampling decision on the incoming event
80pub async fn run<Group>(
81    managed_envelope: &mut TypedEnvelope<Group>,
82    event: &mut Annotated<Event>,
83    config: Arc<Config>,
84    project_info: Arc<ProjectInfo>,
85    sampling_project_info: Option<Arc<ProjectInfo>>,
86    reservoir: &ReservoirEvaluator<'_>,
87) -> SamplingResult
88where
89    Group: Sampling,
90{
91    if !Group::supports_sampling(&project_info) {
92        return SamplingResult::Pending;
93    }
94
95    let sampling_config = match project_info.config.sampling {
96        Some(ErrorBoundary::Ok(ref config)) if !config.unsupported() => Some(config),
97        _ => None,
98    };
99
100    let root_state = sampling_project_info.as_ref();
101    let root_config = match root_state.and_then(|s| s.config.sampling.as_ref()) {
102        Some(ErrorBoundary::Ok(config)) if !config.unsupported() => Some(config),
103        _ => None,
104    };
105
106    let reservoir = Group::supports_reservoir_sampling().then_some(reservoir);
107
108    compute_sampling_decision(
109        config.processing_enabled(),
110        reservoir,
111        sampling_config,
112        event.value(),
113        root_config,
114        managed_envelope.envelope().dsc(),
115    )
116    .await
117}
118
119/// Apply the dynamic sampling decision from `compute_sampling_decision`.
120pub fn drop_unsampled_items(
121    managed_envelope: &mut TypedEnvelope<TransactionGroup>,
122    event: Annotated<Event>,
123    outcome: Outcome,
124    spans_extracted: SpansExtracted,
125) {
126    // Remove all items from the envelope which need to be dropped due to dynamic sampling.
127    let dropped_items = managed_envelope
128        .envelope_mut()
129        // Profiles are not dropped by dynamic sampling, they are all forwarded to storage and
130        // later processed in Sentry and potentially dropped there.
131        .take_items_by(|item| *item.ty() != ItemType::Profile);
132
133    for item in dropped_items {
134        for (category, quantity) in item.quantities() {
135            // Dynamic sampling only drops indexed items. Upgrade the category to the index
136            // category if one exists for this category, for example profiles will be upgraded to profiles indexed,
137            // but attachments are still emitted as attachments.
138            let category = category.index_category().unwrap_or(category);
139
140            managed_envelope.track_outcome(outcome.clone(), category, quantity);
141        }
142    }
143
144    // Mark all remaining items in the envelope as un-sampled.
145    for item in managed_envelope.envelope_mut().items_mut() {
146        item.set_sampled(false);
147    }
148
149    // Another 'hack' to emit outcomes from the container item for the contained items (spans).
150    //
151    // The entire tracking outcomes for contained elements is not handled in a systematic way
152    // and whenever an event/transaction is discarded, contained elements are tracked in a 'best
153    // effort' basis (basically in all the cases where someone figured out this is a problem).
154    //
155    // This is yet another case, when the spans have not yet been separated from the transaction
156    // also emit dynamic sampling outcomes for the contained spans.
157    if !spans_extracted.0 {
158        let spans = event.value().and_then(|e| e.spans.value());
159        let span_count = spans.map_or(0, |s| s.len());
160
161        // Track the amount of contained spans + 1 segment span (the transaction itself which would
162        // be converted to a span).
163        managed_envelope.track_outcome(outcome.clone(), DataCategory::SpanIndexed, span_count + 1);
164    }
165
166    // All items have been dropped, now make sure the event is also handled and dropped.
167    if let Some(category) = event_category(&event) {
168        let category = category.index_category().unwrap_or(category);
169        managed_envelope.track_outcome(outcome, category, 1)
170    }
171}
172
173/// Computes the sampling decision on the incoming envelope.
174async fn compute_sampling_decision(
175    processing_enabled: bool,
176    reservoir: Option<&ReservoirEvaluator<'_>>,
177    sampling_config: Option<&SamplingConfig>,
178    event: Option<&Event>,
179    root_sampling_config: Option<&SamplingConfig>,
180    dsc: Option<&DynamicSamplingContext>,
181) -> SamplingResult {
182    if (sampling_config.is_none() || event.is_none())
183        && (root_sampling_config.is_none() || dsc.is_none())
184    {
185        return SamplingResult::NoMatch;
186    }
187
188    if sampling_config.is_some_and(|config| config.unsupported())
189        || root_sampling_config.is_some_and(|config| config.unsupported())
190    {
191        if processing_enabled {
192            relay_log::error!("found unsupported rules even as processing relay");
193        } else {
194            return SamplingResult::NoMatch;
195        }
196    }
197
198    let mut evaluator = match reservoir {
199        Some(reservoir) => SamplingEvaluator::new_with_reservoir(Utc::now(), reservoir),
200        None => SamplingEvaluator::new(Utc::now()),
201    };
202
203    if let (Some(event), Some(sampling_state)) = (event, sampling_config)
204        && let Some(seed) = event.id.value().map(|id| id.0)
205    {
206        let rules = sampling_state.filter_rules(RuleType::Transaction);
207        evaluator = match evaluator.match_rules(seed, event, rules).await {
208            ControlFlow::Continue(evaluator) => evaluator,
209            ControlFlow::Break(sampling_match) => {
210                return SamplingResult::Match(sampling_match);
211            }
212        }
213    };
214
215    if let (Some(dsc), Some(sampling_state)) = (dsc, sampling_config) {
216        let rules = sampling_state.filter_rules(RuleType::Project);
217        evaluator = match evaluator.match_rules(*dsc.trace_id, dsc, rules).await {
218            ControlFlow::Continue(evaluator) => evaluator,
219            ControlFlow::Break(sampling_match) => {
220                return SamplingResult::Match(sampling_match);
221            }
222        }
223    };
224
225    if let (Some(dsc), Some(sampling_state)) = (dsc, root_sampling_config) {
226        let rules = sampling_state.filter_rules(RuleType::Trace);
227        return evaluator
228            .match_rules(*dsc.trace_id, dsc, rules)
229            .await
230            .into();
231    }
232
233    SamplingResult::NoMatch
234}
235
236/// Runs dynamic sampling on an incoming error and tags it in case of successful sampling
237/// decision.
238///
239/// This execution of dynamic sampling is technically a "simulation" since we will use the result
240/// only for tagging errors and not for actually sampling incoming events.
241pub async fn tag_error_with_sampling_decision<Group: EventProcessing>(
242    managed_envelope: &mut TypedEnvelope<Group>,
243    event: &mut Annotated<Event>,
244    sampling_project_info: Option<Arc<ProjectInfo>>,
245    config: &Config,
246) {
247    let (Some(dsc), Some(event)) = (managed_envelope.envelope().dsc(), event.value_mut()) else {
248        return;
249    };
250
251    let root_state = sampling_project_info.as_ref();
252    let sampling_config = match root_state.and_then(|s| s.config.sampling.as_ref()) {
253        Some(ErrorBoundary::Ok(config)) => config,
254        _ => return,
255    };
256
257    if sampling_config.unsupported() {
258        if config.processing_enabled() {
259            relay_log::error!("found unsupported rules even as processing relay");
260        }
261
262        return;
263    }
264
265    let Some(sampled) = utils::is_trace_fully_sampled(sampling_config, dsc).await else {
266        return;
267    };
268
269    // We want to get the trace context, in which we will inject the `sampled` field.
270    let context = event
271        .contexts
272        .get_or_insert_with(Contexts::new)
273        .get_or_default::<TraceContext>();
274
275    // We want to update `sampled` only if it was not set, since if we don't check this
276    // we will end up overriding the value set by downstream Relays and this will lead
277    // to more complex debugging in case of problems.
278    if context.sampled.is_empty() {
279        relay_log::trace!("tagged error with `sampled = {}` flag", sampled);
280        context.sampled = Annotated::new(sampled);
281    }
282}
283
284#[cfg(test)]
285mod tests {
286    use std::collections::BTreeMap;
287    use std::sync::Arc;
288
289    use bytes::Bytes;
290    use relay_base_schema::events::EventType;
291    use relay_base_schema::project::ProjectKey;
292    use relay_cogs::Token;
293    use relay_dynamic_config::{MetricExtractionConfig, TransactionMetricsConfig};
294    use relay_event_schema::protocol::{EventId, LenientString};
295    use relay_protocol::RuleCondition;
296    use relay_sampling::config::{
297        DecayingFunction, RuleId, SamplingRule, SamplingValue, TimeRange,
298    };
299    use relay_sampling::evaluation::{ReservoirCounters, SamplingDecision, SamplingMatch};
300    use relay_system::Addr;
301
302    use crate::envelope::{ContentType, Envelope, Item};
303    use crate::extractors::RequestMeta;
304    use crate::managed::ManagedEnvelope;
305    use crate::services::processor::{ProcessEnvelopeGrouped, ProcessingGroup, SpanGroup, Submit};
306    use crate::services::projects::project::ProjectInfo;
307    use crate::testutils::{create_test_processor, new_envelope, state_with_rule_and_condition};
308
309    use super::*;
310
311    fn mocked_event(event_type: EventType, transaction: &str, release: &str) -> Event {
312        Event {
313            id: Annotated::new(EventId::new()),
314            ty: Annotated::new(event_type),
315            transaction: Annotated::new(transaction.to_owned()),
316            release: Annotated::new(LenientString(release.to_owned())),
317            ..Event::default()
318        }
319    }
320
321    fn dummy_reservoir() -> ReservoirEvaluator<'static> {
322        ReservoirEvaluator::new(ReservoirCounters::default())
323    }
324
325    fn test_dsc() -> DynamicSamplingContext {
326        DynamicSamplingContext {
327            trace_id: "67e5504410b1426f9247bb680e5fe0c8".parse().unwrap(),
328            public_key: ProjectKey::parse("abd0f232775f45feab79864e580d160b").unwrap(),
329            release: Some("1.1.1".to_owned()),
330            user: Default::default(),
331            replay_id: None,
332            environment: None,
333            transaction: Some("transaction1".into()),
334            sample_rate: Some(0.5),
335            sampled: Some(true),
336            other: BTreeMap::new(),
337        }
338    }
339
340    // Helper to extract the sampling match from SamplingResult if thats the variant.
341    fn get_sampling_match(sampling_result: SamplingResult) -> SamplingMatch {
342        if let SamplingResult::Match(sampling_match) = sampling_result {
343            sampling_match
344        } else {
345            panic!()
346        }
347    }
348
349    /// Always sets the processing item type to event.
350    async fn process_envelope_with_root_project_state(
351        envelope: Box<Envelope>,
352        sampling_project_info: Option<Arc<ProjectInfo>>,
353    ) -> Envelope {
354        let processor = create_test_processor(Default::default()).await;
355        let outcome_aggregator = Addr::dummy();
356
357        let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
358        assert_eq!(envelopes.len(), 1);
359        let (group, envelope) = envelopes.pop().unwrap();
360
361        let message = ProcessEnvelopeGrouped {
362            group,
363            envelope: ManagedEnvelope::new(envelope, outcome_aggregator),
364            project_info: Arc::new(ProjectInfo::default()),
365            rate_limits: Default::default(),
366            sampling_project_info,
367            reservoir_counters: ReservoirCounters::default(),
368        };
369
370        let Ok(Some(Submit::Envelope(envelope))) =
371            processor.process(&mut Token::noop(), message).await
372        else {
373            panic!();
374        };
375
376        envelope.envelope().clone()
377    }
378
379    fn extract_first_event_from_envelope(envelope: Envelope) -> Event {
380        let item = envelope.items().next().unwrap();
381        let annotated_event: Annotated<Event> =
382            Annotated::from_json_bytes(&item.payload()).unwrap();
383        annotated_event.into_value().unwrap()
384    }
385
386    fn mocked_error_item() -> Item {
387        let mut item = Item::new(ItemType::Event);
388        item.set_payload(
389            ContentType::Json,
390            r#"{
391              "event_id": "52df9022835246eeb317dbd739ccd059",
392              "exception": {
393                "values": [
394                    {
395                      "type": "mytype",
396                      "value": "myvalue",
397                      "module": "mymodule",
398                      "thread_id": 42,
399                      "other": "value"
400                    }
401                ]
402              }
403            }"#,
404        );
405        item
406    }
407
408    #[tokio::test]
409    async fn test_error_is_tagged_correctly_if_trace_sampling_result_is_none() {
410        let event_id = EventId::new();
411        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
412            .parse()
413            .unwrap();
414        let request_meta = RequestMeta::new(dsn);
415
416        // We test tagging when root project state and dsc are none.
417        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
418        envelope.add_item(mocked_error_item());
419        let new_envelope = process_envelope_with_root_project_state(envelope, None).await;
420        let event = extract_first_event_from_envelope(new_envelope);
421
422        assert!(event.contexts.value().is_none());
423    }
424
425    #[tokio::test]
426    async fn test_it_keeps_or_drops_transactions() {
427        let event = Event {
428            id: Annotated::new(EventId::new()),
429            ty: Annotated::new(EventType::Transaction),
430            transaction: Annotated::new("testing".to_owned()),
431            ..Event::default()
432        };
433
434        for (sample_rate, should_keep) in [(0.0, false), (1.0, true)] {
435            let sampling_config = SamplingConfig {
436                rules: vec![SamplingRule {
437                    condition: RuleCondition::all(),
438                    sampling_value: SamplingValue::SampleRate { value: sample_rate },
439                    ty: RuleType::Transaction,
440                    id: RuleId(1),
441                    time_range: Default::default(),
442                    decaying_fn: DecayingFunction::Constant,
443                }],
444                ..SamplingConfig::new()
445            };
446
447            // TODO: This does not test if the sampling decision is actually applied. This should be
448            // refactored to send a proper Envelope in and call process_state to cover the full
449            // pipeline.
450            let res = compute_sampling_decision(
451                false,
452                None,
453                Some(&sampling_config),
454                Some(&event),
455                None,
456                None,
457            )
458            .await;
459            assert_eq!(res.decision().is_keep(), should_keep);
460        }
461    }
462
463    #[tokio::test]
464    async fn test_dsc_respects_metrics_extracted() {
465        relay_test::setup();
466        let outcome_aggregator = Addr::dummy();
467
468        let config = Arc::new(
469            Config::from_json_value(serde_json::json!({
470                "processing": {
471                    "enabled": true,
472                    "kafka_config": [],
473                }
474            }))
475            .unwrap(),
476        );
477
478        let get_test_params = |version: Option<u16>| {
479            let event = Event {
480                id: Annotated::new(EventId::new()),
481                ty: Annotated::new(EventType::Transaction),
482                transaction: Annotated::new("testing".to_owned()),
483                ..Event::default()
484            };
485
486            let mut project_info = state_with_rule_and_condition(
487                Some(0.0),
488                RuleType::Transaction,
489                RuleCondition::all(),
490            );
491
492            if let Some(version) = version {
493                project_info.config.transaction_metrics =
494                    ErrorBoundary::Ok(relay_dynamic_config::TransactionMetricsConfig {
495                        version,
496                        ..Default::default()
497                    })
498                    .into();
499            }
500
501            let envelope = new_envelope(false, "foo");
502            let managed_envelope: TypedEnvelope<TransactionGroup> = (
503                ManagedEnvelope::new(envelope, outcome_aggregator.clone()),
504                ProcessingGroup::Transaction,
505            )
506                .try_into()
507                .unwrap();
508
509            let event = Annotated::from(event);
510
511            let project_info = Arc::new(project_info);
512
513            (managed_envelope, event, project_info)
514        };
515
516        let reservoir = dummy_reservoir();
517
518        // None represents no TransactionMetricsConfig, DS will not be run
519        let (mut managed_envelope, mut event, project_info) = get_test_params(None);
520        let sampling_result = run(
521            &mut managed_envelope,
522            &mut event,
523            config.clone(),
524            project_info,
525            None,
526            &reservoir,
527        )
528        .await;
529        assert_eq!(sampling_result.decision(), SamplingDecision::Keep);
530
531        // Current version is 3, so it won't run DS if it's outdated
532        let (mut managed_envelope, mut event, project_info) = get_test_params(Some(2));
533        let sampling_result = run(
534            &mut managed_envelope,
535            &mut event,
536            config.clone(),
537            project_info,
538            None,
539            &reservoir,
540        )
541        .await;
542        assert_eq!(sampling_result.decision(), SamplingDecision::Keep);
543
544        // Dynamic sampling is run, as the transaction metrics version is up to date.
545        let (mut managed_envelope, mut event, project_info) = get_test_params(Some(3));
546        let sampling_result = run(
547            &mut managed_envelope,
548            &mut event,
549            config.clone(),
550            project_info,
551            None,
552            &reservoir,
553        )
554        .await;
555        assert_eq!(sampling_result.decision(), SamplingDecision::Drop);
556    }
557
558    fn project_state_with_single_rule(sample_rate: f64) -> ProjectInfo {
559        let sampling_config = SamplingConfig {
560            rules: vec![SamplingRule {
561                condition: RuleCondition::all(),
562                sampling_value: SamplingValue::SampleRate { value: sample_rate },
563                ty: RuleType::Trace,
564                id: RuleId(1),
565                time_range: Default::default(),
566                decaying_fn: Default::default(),
567            }],
568            ..SamplingConfig::new()
569        };
570
571        let mut sampling_project_state = ProjectInfo::default();
572        sampling_project_state.config.sampling = Some(ErrorBoundary::Ok(sampling_config));
573        sampling_project_state
574    }
575
576    #[tokio::test]
577    async fn test_error_is_tagged_correctly_if_trace_sampling_result_is_some() {
578        let event_id = EventId::new();
579        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
580            .parse()
581            .unwrap();
582        let request_meta = RequestMeta::new(dsn);
583        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
584        envelope.set_dsc(test_dsc());
585        envelope.add_item(mocked_error_item());
586
587        // We test with sample rate equal to 100%.
588        let sampling_project_state = project_state_with_single_rule(1.0);
589        let new_envelope = process_envelope_with_root_project_state(
590            envelope.clone(),
591            Some(Arc::new(sampling_project_state)),
592        )
593        .await;
594        let event = extract_first_event_from_envelope(new_envelope);
595        let trace_context = event.context::<TraceContext>().unwrap();
596        assert!(trace_context.sampled.value().unwrap());
597
598        // We test with sample rate equal to 0%.
599        let sampling_project_state = project_state_with_single_rule(0.0);
600        let new_envelope = process_envelope_with_root_project_state(
601            envelope,
602            Some(Arc::new(sampling_project_state)),
603        )
604        .await;
605        let event = extract_first_event_from_envelope(new_envelope);
606        let trace_context = event.context::<TraceContext>().unwrap();
607        assert!(!trace_context.sampled.value().unwrap());
608    }
609
610    #[tokio::test]
611    async fn test_error_is_not_tagged_if_already_tagged() {
612        let event_id = EventId::new();
613        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
614            .parse()
615            .unwrap();
616        let request_meta = RequestMeta::new(dsn);
617
618        // We test tagging with an incoming event that has already been tagged by downstream Relay.
619        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
620        let mut item = Item::new(ItemType::Event);
621        item.set_payload(
622            ContentType::Json,
623            r#"{
624              "event_id": "52df9022835246eeb317dbd739ccd059",
625              "exception": {
626                "values": [
627                    {
628                      "type": "mytype",
629                      "value": "myvalue",
630                      "module": "mymodule",
631                      "thread_id": 42,
632                      "other": "value"
633                    }
634                ]
635              },
636              "contexts": {
637                "trace": {
638                    "sampled": true
639                }
640              }
641            }"#,
642        );
643        envelope.add_item(item);
644        let sampling_project_state = project_state_with_single_rule(0.0);
645        let new_envelope = process_envelope_with_root_project_state(
646            envelope,
647            Some(Arc::new(sampling_project_state)),
648        )
649        .await;
650        let event = extract_first_event_from_envelope(new_envelope);
651        let trace_context = event.context::<TraceContext>().unwrap();
652        assert!(trace_context.sampled.value().unwrap());
653    }
654
655    /// Happy path test for compute_sampling_decision.
656    #[tokio::test]
657    async fn test_compute_sampling_decision_matching() {
658        for rule_type in [RuleType::Transaction, RuleType::Project] {
659            let event = mocked_event(EventType::Transaction, "foo", "bar");
660            let rule = SamplingRule {
661                condition: RuleCondition::all(),
662                sampling_value: SamplingValue::SampleRate { value: 1.0 },
663                ty: rule_type,
664                id: RuleId(0),
665                time_range: TimeRange::default(),
666                decaying_fn: Default::default(),
667            };
668
669            let sampling_config = SamplingConfig {
670                rules: vec![rule],
671                ..SamplingConfig::new()
672            };
673
674            let res = compute_sampling_decision(
675                false,
676                None,
677                Some(&sampling_config),
678                Some(&event),
679                None,
680                Some(&test_dsc()),
681            )
682            .await;
683            assert!(res.is_match());
684        }
685    }
686
687    #[tokio::test]
688    async fn test_matching_with_unsupported_rule() {
689        let event = mocked_event(EventType::Transaction, "foo", "bar");
690        let rule = SamplingRule {
691            condition: RuleCondition::all(),
692            sampling_value: SamplingValue::SampleRate { value: 1.0 },
693            ty: RuleType::Transaction,
694            id: RuleId(0),
695            time_range: TimeRange::default(),
696            decaying_fn: Default::default(),
697        };
698
699        let unsupported_rule = SamplingRule {
700            condition: RuleCondition::all(),
701            sampling_value: SamplingValue::SampleRate { value: 1.0 },
702            ty: RuleType::Unsupported,
703            id: RuleId(0),
704            time_range: TimeRange::default(),
705            decaying_fn: Default::default(),
706        };
707
708        let sampling_config = SamplingConfig {
709            rules: vec![rule, unsupported_rule],
710            ..SamplingConfig::new()
711        };
712
713        // Unsupported rule should result in no match if processing is not enabled.
714        let res = compute_sampling_decision(
715            false,
716            None,
717            Some(&sampling_config),
718            Some(&event),
719            None,
720            None,
721        )
722        .await;
723        assert!(res.is_no_match());
724
725        // Match if processing is enabled.
726        let res =
727            compute_sampling_decision(true, None, Some(&sampling_config), Some(&event), None, None)
728                .await;
729        assert!(res.is_match());
730    }
731
732    #[tokio::test]
733    async fn test_client_sample_rate() {
734        let dsc = test_dsc();
735
736        let rule = SamplingRule {
737            condition: RuleCondition::all(),
738            sampling_value: SamplingValue::SampleRate { value: 0.2 },
739            ty: RuleType::Trace,
740            id: RuleId(0),
741            time_range: TimeRange::default(),
742            decaying_fn: Default::default(),
743        };
744
745        let sampling_config = SamplingConfig {
746            rules: vec![rule],
747            ..SamplingConfig::new()
748        };
749
750        let res =
751            compute_sampling_decision(false, None, None, None, Some(&sampling_config), Some(&dsc))
752                .await;
753
754        assert_eq!(get_sampling_match(res).sample_rate(), 0.2);
755    }
756
757    async fn run_with_reservoir_rule<Group>(processing_group: ProcessingGroup) -> SamplingResult
758    where
759        Group: Sampling + TryFrom<ProcessingGroup>,
760    {
761        let project_info = {
762            let mut info = ProjectInfo::default();
763            info.config.transaction_metrics = Some(ErrorBoundary::Ok(TransactionMetricsConfig {
764                version: 1,
765                ..Default::default()
766            }));
767            Arc::new(info)
768        };
769
770        let bytes = Bytes::from(
771            r#"{"dsn":"https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42","trace":{"trace_id":"89143b0763095bd9c9955e8175d1fb23","public_key":"e12d836b15bb49d7bbf99e64295d995b"}}"#,
772        );
773        let envelope = Envelope::parse_bytes(bytes).unwrap();
774        let config = Arc::new(Config::default());
775
776        let mut managed_envelope: TypedEnvelope<Group> = (
777            ManagedEnvelope::new(envelope, Addr::dummy()),
778            processing_group,
779        )
780            .try_into()
781            .unwrap();
782
783        let mut event = Annotated::new(Event::default());
784
785        let sampling_project_info = {
786            let mut state = ProjectInfo::default();
787            state.config.metric_extraction = ErrorBoundary::Ok(MetricExtractionConfig::default());
788            state.config.sampling = Some(ErrorBoundary::Ok(SamplingConfig {
789                version: 2,
790                rules: vec![
791                    // Set up a reservoir (only used for transactions):
792                    SamplingRule {
793                        condition: RuleCondition::all(),
794                        sampling_value: SamplingValue::Reservoir { limit: 100 },
795                        ty: RuleType::Trace,
796                        id: RuleId(1),
797                        time_range: Default::default(),
798                        decaying_fn: Default::default(),
799                    },
800                    // Reject everything that does not go into the reservoir:
801                    SamplingRule {
802                        condition: RuleCondition::all(),
803                        sampling_value: SamplingValue::SampleRate { value: 0.0 },
804                        ty: RuleType::Trace,
805                        id: RuleId(2),
806                        time_range: Default::default(),
807                        decaying_fn: Default::default(),
808                    },
809                ],
810                rules_v2: vec![],
811            }));
812            Some(Arc::new(state))
813        };
814
815        let reservoir = dummy_reservoir();
816        run::<Group>(
817            &mut managed_envelope,
818            &mut event,
819            config,
820            project_info,
821            sampling_project_info,
822            &reservoir,
823        )
824        .await
825    }
826
827    #[tokio::test]
828    async fn test_reservoir_applied_for_transactions() {
829        let result =
830            run_with_reservoir_rule::<TransactionGroup>(ProcessingGroup::Transaction).await;
831        // Default sampling rate is 0.0, but transaction is retained because of reservoir:
832        assert_eq!(result.decision(), SamplingDecision::Keep);
833    }
834
835    #[tokio::test]
836    async fn test_reservoir_not_applied_for_spans() {
837        let result = run_with_reservoir_rule::<SpanGroup>(ProcessingGroup::Span).await;
838        // Default sampling rate is 0.0, and the reservoir does not apply to spans:
839        assert_eq!(result.decision(), SamplingDecision::Drop);
840    }
841}