relay_server/services/processor/
dynamic_sampling.rs

1//! Dynamic sampling processor related code.
2use std::ops::ControlFlow;
3use std::sync::Arc;
4
5use chrono::Utc;
6use relay_config::Config;
7use relay_dynamic_config::ErrorBoundary;
8use relay_event_schema::protocol::{Contexts, Event, TraceContext};
9use relay_protocol::{Annotated, Empty};
10use relay_quotas::DataCategory;
11use relay_sampling::config::RuleType;
12use relay_sampling::evaluation::{ReservoirEvaluator, SamplingEvaluator};
13use relay_sampling::{DynamicSamplingContext, SamplingConfig};
14
15use crate::envelope::ItemType;
16use crate::managed::TypedEnvelope;
17use crate::services::outcome::Outcome;
18use crate::services::processor::{
19    EventProcessing, Sampling, SpansExtracted, TransactionGroup, event_category,
20};
21use crate::services::projects::project::ProjectInfo;
22use crate::utils::{self, SamplingResult};
23
24/// Ensures there is a valid dynamic sampling context and corresponding project state.
25///
26/// The dynamic sampling context (DSC) specifies the project_key of the project that initiated
27/// the trace. That project state should have been loaded previously by the project cache and is
28/// available on the `ProcessEnvelopeState`. Under these conditions, this cannot happen:
29///
30///  - There is no DSC in the envelope headers. This occurs with older or third-party SDKs.
31///  - The project key does not exist. This can happen if the project key was disabled, the
32///    project removed, or in rare cases when a project from another Sentry instance is referred
33///    to.
34///  - The project key refers to a project from another organization. In this case the project
35///    cache does not resolve the state and instead leaves it blank.
36///  - The project state could not be fetched. This is a runtime error, but in this case Relay
37///    should fall back to the next-best sampling rule set.
38///
39/// In all of the above cases, this function will compute a new DSC using information from the
40/// event payload, similar to how SDKs do this. The `sampling_project_state` is also switched to
41/// the main project state.
42///
43/// If there is no transaction event in the envelope, this function will do nothing.
44///
45/// The function will return the sampling project information of the root project for the event. If
46/// no sampling project information is specified, the project information of the event’s project
47/// will be returned.
48pub fn validate_and_set_dsc<T>(
49    managed_envelope: &mut TypedEnvelope<T>,
50    event: &mut Annotated<Event>,
51    project_info: Arc<ProjectInfo>,
52    sampling_project_info: Option<Arc<ProjectInfo>>,
53) -> Option<Arc<ProjectInfo>> {
54    let original_dsc = managed_envelope.envelope().dsc();
55    if original_dsc.is_some() && sampling_project_info.is_some() {
56        return sampling_project_info;
57    }
58
59    // The DSC can only be computed if there's a transaction event. Note that `dsc_from_event`
60    // below already checks for the event type.
61    if let Some(event) = event.value() {
62        if let Some(key_config) = project_info.get_public_key_config() {
63            if let Some(mut dsc) = utils::dsc_from_event(key_config.public_key, event) {
64                // All other information in the DSC must be discarded, but the sample rate was
65                // actually applied by the client and is therefore correct.
66                let original_sample_rate = original_dsc.and_then(|dsc| dsc.sample_rate);
67                dsc.sample_rate = dsc.sample_rate.or(original_sample_rate);
68
69                managed_envelope.envelope_mut().set_dsc(dsc);
70                return Some(project_info.clone());
71            }
72        }
73    }
74
75    // If we cannot compute a new DSC but the old one is incorrect, we need to remove it.
76    managed_envelope.envelope_mut().remove_dsc();
77    None
78}
79
80/// Computes the sampling decision on the incoming event
81pub async fn run<Group>(
82    managed_envelope: &mut TypedEnvelope<Group>,
83    event: &mut Annotated<Event>,
84    config: Arc<Config>,
85    project_info: Arc<ProjectInfo>,
86    sampling_project_info: Option<Arc<ProjectInfo>>,
87    reservoir: &ReservoirEvaluator<'_>,
88) -> SamplingResult
89where
90    Group: Sampling,
91{
92    if !Group::supports_sampling(&project_info) {
93        return SamplingResult::Pending;
94    }
95
96    let sampling_config = match project_info.config.sampling {
97        Some(ErrorBoundary::Ok(ref config)) if !config.unsupported() => Some(config),
98        _ => None,
99    };
100
101    let root_state = sampling_project_info.as_ref();
102    let root_config = match root_state.and_then(|s| s.config.sampling.as_ref()) {
103        Some(ErrorBoundary::Ok(config)) if !config.unsupported() => Some(config),
104        _ => None,
105    };
106
107    let reservoir = Group::supports_reservoir_sampling().then_some(reservoir);
108
109    compute_sampling_decision(
110        config.processing_enabled(),
111        reservoir,
112        sampling_config,
113        event.value(),
114        root_config,
115        managed_envelope.envelope().dsc(),
116    )
117    .await
118}
119
120/// Apply the dynamic sampling decision from `compute_sampling_decision`.
121pub fn drop_unsampled_items(
122    managed_envelope: &mut TypedEnvelope<TransactionGroup>,
123    event: Annotated<Event>,
124    outcome: Outcome,
125    spans_extracted: SpansExtracted,
126) {
127    // Remove all items from the envelope which need to be dropped due to dynamic sampling.
128    let dropped_items = managed_envelope
129        .envelope_mut()
130        // Profiles are not dropped by dynamic sampling, they are all forwarded to storage and
131        // later processed in Sentry and potentially dropped there.
132        .take_items_by(|item| *item.ty() != ItemType::Profile);
133
134    for item in dropped_items {
135        for (category, quantity) in item.quantities() {
136            // Dynamic sampling only drops indexed items. Upgrade the category to the index
137            // category if one exists for this category, for example profiles will be upgraded to profiles indexed,
138            // but attachments are still emitted as attachments.
139            let category = category.index_category().unwrap_or(category);
140
141            managed_envelope.track_outcome(outcome.clone(), category, quantity);
142        }
143    }
144
145    // Mark all remaining items in the envelope as un-sampled.
146    for item in managed_envelope.envelope_mut().items_mut() {
147        item.set_sampled(false);
148    }
149
150    // Another 'hack' to emit outcomes from the container item for the contained items (spans).
151    //
152    // The entire tracking outcomes for contained elements is not handled in a systematic way
153    // and whenever an event/transaction is discarded, contained elements are tracked in a 'best
154    // effort' basis (basically in all the cases where someone figured out this is a problem).
155    //
156    // This is yet another case, when the spans have not yet been separated from the transaction
157    // also emit dynamic sampling outcomes for the contained spans.
158    if !spans_extracted.0 {
159        let spans = event.value().and_then(|e| e.spans.value());
160        let span_count = spans.map_or(0, |s| s.len());
161
162        // Track the amount of contained spans + 1 segment span (the transaction itself which would
163        // be converted to a span).
164        managed_envelope.track_outcome(outcome.clone(), DataCategory::SpanIndexed, span_count + 1);
165    }
166
167    // All items have been dropped, now make sure the event is also handled and dropped.
168    if let Some(category) = event_category(&event) {
169        let category = category.index_category().unwrap_or(category);
170        managed_envelope.track_outcome(outcome, category, 1)
171    }
172}
173
174/// Computes the sampling decision on the incoming envelope.
175async fn compute_sampling_decision(
176    processing_enabled: bool,
177    reservoir: Option<&ReservoirEvaluator<'_>>,
178    sampling_config: Option<&SamplingConfig>,
179    event: Option<&Event>,
180    root_sampling_config: Option<&SamplingConfig>,
181    dsc: Option<&DynamicSamplingContext>,
182) -> SamplingResult {
183    if (sampling_config.is_none() || event.is_none())
184        && (root_sampling_config.is_none() || dsc.is_none())
185    {
186        return SamplingResult::NoMatch;
187    }
188
189    if sampling_config.is_some_and(|config| config.unsupported())
190        || root_sampling_config.is_some_and(|config| config.unsupported())
191    {
192        if processing_enabled {
193            relay_log::error!("found unsupported rules even as processing relay");
194        } else {
195            return SamplingResult::NoMatch;
196        }
197    }
198
199    let mut evaluator = match reservoir {
200        Some(reservoir) => SamplingEvaluator::new_with_reservoir(Utc::now(), reservoir),
201        None => SamplingEvaluator::new(Utc::now()),
202    };
203
204    if let (Some(event), Some(sampling_state)) = (event, sampling_config) {
205        if let Some(seed) = event.id.value().map(|id| id.0) {
206            let rules = sampling_state.filter_rules(RuleType::Transaction);
207            evaluator = match evaluator.match_rules(seed, event, rules).await {
208                ControlFlow::Continue(evaluator) => evaluator,
209                ControlFlow::Break(sampling_match) => {
210                    return SamplingResult::Match(sampling_match);
211                }
212            }
213        };
214    }
215
216    if let (Some(dsc), Some(sampling_state)) = (dsc, root_sampling_config) {
217        let rules = sampling_state.filter_rules(RuleType::Trace);
218        return evaluator
219            .match_rules(*dsc.trace_id, dsc, rules)
220            .await
221            .into();
222    }
223
224    SamplingResult::NoMatch
225}
226
227/// Runs dynamic sampling on an incoming error and tags it in case of successful sampling
228/// decision.
229///
230/// This execution of dynamic sampling is technically a "simulation" since we will use the result
231/// only for tagging errors and not for actually sampling incoming events.
232pub async fn tag_error_with_sampling_decision<Group: EventProcessing>(
233    managed_envelope: &mut TypedEnvelope<Group>,
234    event: &mut Annotated<Event>,
235    sampling_project_info: Option<Arc<ProjectInfo>>,
236    config: &Config,
237) {
238    let (Some(dsc), Some(event)) = (managed_envelope.envelope().dsc(), event.value_mut()) else {
239        return;
240    };
241
242    let root_state = sampling_project_info.as_ref();
243    let sampling_config = match root_state.and_then(|s| s.config.sampling.as_ref()) {
244        Some(ErrorBoundary::Ok(config)) => config,
245        _ => return,
246    };
247
248    if sampling_config.unsupported() {
249        if config.processing_enabled() {
250            relay_log::error!("found unsupported rules even as processing relay");
251        }
252
253        return;
254    }
255
256    let Some(sampled) = utils::is_trace_fully_sampled(sampling_config, dsc).await else {
257        return;
258    };
259
260    // We want to get the trace context, in which we will inject the `sampled` field.
261    let context = event
262        .contexts
263        .get_or_insert_with(Contexts::new)
264        .get_or_default::<TraceContext>();
265
266    // We want to update `sampled` only if it was not set, since if we don't check this
267    // we will end up overriding the value set by downstream Relays and this will lead
268    // to more complex debugging in case of problems.
269    if context.sampled.is_empty() {
270        relay_log::trace!("tagged error with `sampled = {}` flag", sampled);
271        context.sampled = Annotated::new(sampled);
272    }
273}
274
275#[cfg(test)]
276mod tests {
277    use std::collections::BTreeMap;
278    use std::sync::Arc;
279
280    use bytes::Bytes;
281    use relay_base_schema::events::EventType;
282    use relay_base_schema::project::ProjectKey;
283    use relay_cogs::Token;
284    use relay_dynamic_config::{MetricExtractionConfig, TransactionMetricsConfig};
285    use relay_event_schema::protocol::{EventId, LenientString};
286    use relay_protocol::RuleCondition;
287    use relay_sampling::config::{
288        DecayingFunction, RuleId, SamplingRule, SamplingValue, TimeRange,
289    };
290    use relay_sampling::evaluation::{ReservoirCounters, SamplingDecision, SamplingMatch};
291    use relay_system::Addr;
292
293    use crate::envelope::{ContentType, Envelope, Item};
294    use crate::extractors::RequestMeta;
295    use crate::managed::ManagedEnvelope;
296    use crate::services::processor::{ProcessEnvelopeGrouped, ProcessingGroup, SpanGroup, Submit};
297    use crate::services::projects::project::ProjectInfo;
298    use crate::testutils::{
299        self, create_test_processor, new_envelope, state_with_rule_and_condition,
300    };
301
302    use super::*;
303
304    fn mocked_event(event_type: EventType, transaction: &str, release: &str) -> Event {
305        Event {
306            id: Annotated::new(EventId::new()),
307            ty: Annotated::new(event_type),
308            transaction: Annotated::new(transaction.to_owned()),
309            release: Annotated::new(LenientString(release.to_owned())),
310            ..Event::default()
311        }
312    }
313
314    fn dummy_reservoir() -> ReservoirEvaluator<'static> {
315        ReservoirEvaluator::new(ReservoirCounters::default())
316    }
317
318    // Helper to extract the sampling match from SamplingResult if thats the variant.
319    fn get_sampling_match(sampling_result: SamplingResult) -> SamplingMatch {
320        if let SamplingResult::Match(sampling_match) = sampling_result {
321            sampling_match
322        } else {
323            panic!()
324        }
325    }
326
327    /// Always sets the processing item type to event.
328    async fn process_envelope_with_root_project_state(
329        envelope: Box<Envelope>,
330        sampling_project_info: Option<Arc<ProjectInfo>>,
331    ) -> Envelope {
332        let processor = create_test_processor(Default::default()).await;
333        let (outcome_aggregator, test_store) = testutils::processor_services();
334
335        let mut envelopes = ProcessingGroup::split_envelope(*envelope);
336        assert_eq!(envelopes.len(), 1);
337        let (group, envelope) = envelopes.pop().unwrap();
338
339        let message = ProcessEnvelopeGrouped {
340            group,
341            envelope: ManagedEnvelope::new(envelope, outcome_aggregator, test_store),
342            project_info: Arc::new(ProjectInfo::default()),
343            rate_limits: Default::default(),
344            sampling_project_info,
345            reservoir_counters: ReservoirCounters::default(),
346        };
347
348        let Ok(Some(Submit::Envelope(envelope))) =
349            processor.process(&mut Token::noop(), message).await
350        else {
351            panic!();
352        };
353
354        envelope.envelope().clone()
355    }
356
357    fn extract_first_event_from_envelope(envelope: Envelope) -> Event {
358        let item = envelope.items().next().unwrap();
359        let annotated_event: Annotated<Event> =
360            Annotated::from_json_bytes(&item.payload()).unwrap();
361        annotated_event.into_value().unwrap()
362    }
363
364    fn mocked_error_item() -> Item {
365        let mut item = Item::new(ItemType::Event);
366        item.set_payload(
367            ContentType::Json,
368            r#"{
369              "event_id": "52df9022835246eeb317dbd739ccd059",
370              "exception": {
371                "values": [
372                    {
373                      "type": "mytype",
374                      "value": "myvalue",
375                      "module": "mymodule",
376                      "thread_id": 42,
377                      "other": "value"
378                    }
379                ]
380              }
381            }"#,
382        );
383        item
384    }
385
386    #[tokio::test]
387    async fn test_error_is_tagged_correctly_if_trace_sampling_result_is_none() {
388        let event_id = EventId::new();
389        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
390            .parse()
391            .unwrap();
392        let request_meta = RequestMeta::new(dsn);
393
394        // We test tagging when root project state and dsc are none.
395        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
396        envelope.add_item(mocked_error_item());
397        let new_envelope = process_envelope_with_root_project_state(envelope, None).await;
398        let event = extract_first_event_from_envelope(new_envelope);
399
400        assert!(event.contexts.value().is_none());
401    }
402
403    #[tokio::test]
404    async fn test_it_keeps_or_drops_transactions() {
405        let event = Event {
406            id: Annotated::new(EventId::new()),
407            ty: Annotated::new(EventType::Transaction),
408            transaction: Annotated::new("testing".to_owned()),
409            ..Event::default()
410        };
411
412        for (sample_rate, should_keep) in [(0.0, false), (1.0, true)] {
413            let sampling_config = SamplingConfig {
414                rules: vec![SamplingRule {
415                    condition: RuleCondition::all(),
416                    sampling_value: SamplingValue::SampleRate { value: sample_rate },
417                    ty: RuleType::Transaction,
418                    id: RuleId(1),
419                    time_range: Default::default(),
420                    decaying_fn: DecayingFunction::Constant,
421                }],
422                ..SamplingConfig::new()
423            };
424
425            // TODO: This does not test if the sampling decision is actually applied. This should be
426            // refactored to send a proper Envelope in and call process_state to cover the full
427            // pipeline.
428            let res = compute_sampling_decision(
429                false,
430                None,
431                Some(&sampling_config),
432                Some(&event),
433                None,
434                None,
435            )
436            .await;
437            assert_eq!(res.decision().is_keep(), should_keep);
438        }
439    }
440
441    #[tokio::test]
442    async fn test_dsc_respects_metrics_extracted() {
443        relay_test::setup();
444        let (outcome_aggregator, test_store) = testutils::processor_services();
445
446        let config = Arc::new(
447            Config::from_json_value(serde_json::json!({
448                "processing": {
449                    "enabled": true,
450                    "kafka_config": [],
451                }
452            }))
453            .unwrap(),
454        );
455
456        let get_test_params = |version: Option<u16>| {
457            let event = Event {
458                id: Annotated::new(EventId::new()),
459                ty: Annotated::new(EventType::Transaction),
460                transaction: Annotated::new("testing".to_owned()),
461                ..Event::default()
462            };
463
464            let mut project_info = state_with_rule_and_condition(
465                Some(0.0),
466                RuleType::Transaction,
467                RuleCondition::all(),
468            );
469
470            if let Some(version) = version {
471                project_info.config.transaction_metrics =
472                    ErrorBoundary::Ok(relay_dynamic_config::TransactionMetricsConfig {
473                        version,
474                        ..Default::default()
475                    })
476                    .into();
477            }
478
479            let envelope = new_envelope(false, "foo");
480            let managed_envelope: TypedEnvelope<TransactionGroup> = (
481                ManagedEnvelope::new(envelope, outcome_aggregator.clone(), test_store.clone()),
482                ProcessingGroup::Transaction,
483            )
484                .try_into()
485                .unwrap();
486
487            let event = Annotated::from(event);
488
489            let project_info = Arc::new(project_info);
490
491            (managed_envelope, event, project_info)
492        };
493
494        let reservoir = dummy_reservoir();
495
496        // None represents no TransactionMetricsConfig, DS will not be run
497        let (mut managed_envelope, mut event, project_info) = get_test_params(None);
498        let sampling_result = run(
499            &mut managed_envelope,
500            &mut event,
501            config.clone(),
502            project_info,
503            None,
504            &reservoir,
505        )
506        .await;
507        assert_eq!(sampling_result.decision(), SamplingDecision::Keep);
508
509        // Current version is 3, so it won't run DS if it's outdated
510        let (mut managed_envelope, mut event, project_info) = get_test_params(Some(2));
511        let sampling_result = run(
512            &mut managed_envelope,
513            &mut event,
514            config.clone(),
515            project_info,
516            None,
517            &reservoir,
518        )
519        .await;
520        assert_eq!(sampling_result.decision(), SamplingDecision::Keep);
521
522        // Dynamic sampling is run, as the transaction metrics version is up to date.
523        let (mut managed_envelope, mut event, project_info) = get_test_params(Some(3));
524        let sampling_result = run(
525            &mut managed_envelope,
526            &mut event,
527            config.clone(),
528            project_info,
529            None,
530            &reservoir,
531        )
532        .await;
533        assert_eq!(sampling_result.decision(), SamplingDecision::Drop);
534    }
535
536    fn project_state_with_single_rule(sample_rate: f64) -> ProjectInfo {
537        let sampling_config = SamplingConfig {
538            rules: vec![SamplingRule {
539                condition: RuleCondition::all(),
540                sampling_value: SamplingValue::SampleRate { value: sample_rate },
541                ty: RuleType::Trace,
542                id: RuleId(1),
543                time_range: Default::default(),
544                decaying_fn: Default::default(),
545            }],
546            ..SamplingConfig::new()
547        };
548
549        let mut sampling_project_state = ProjectInfo::default();
550        sampling_project_state.config.sampling = Some(ErrorBoundary::Ok(sampling_config));
551        sampling_project_state
552    }
553
554    #[tokio::test]
555    async fn test_error_is_tagged_correctly_if_trace_sampling_result_is_some() {
556        let event_id = EventId::new();
557        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
558            .parse()
559            .unwrap();
560        let request_meta = RequestMeta::new(dsn);
561        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
562        let dsc = DynamicSamplingContext {
563            trace_id: "67e5504410b1426f9247bb680e5fe0c8".parse().unwrap(),
564            public_key: ProjectKey::parse("abd0f232775f45feab79864e580d160b").unwrap(),
565            release: Some("1.1.1".to_owned()),
566            user: Default::default(),
567            replay_id: None,
568            environment: None,
569            transaction: Some("transaction1".into()),
570            sample_rate: None,
571            sampled: Some(true),
572            other: BTreeMap::new(),
573        };
574        envelope.set_dsc(dsc);
575        envelope.add_item(mocked_error_item());
576
577        // We test with sample rate equal to 100%.
578        let sampling_project_state = project_state_with_single_rule(1.0);
579        let new_envelope = process_envelope_with_root_project_state(
580            envelope.clone(),
581            Some(Arc::new(sampling_project_state)),
582        )
583        .await;
584        let event = extract_first_event_from_envelope(new_envelope);
585        let trace_context = event.context::<TraceContext>().unwrap();
586        assert!(trace_context.sampled.value().unwrap());
587
588        // We test with sample rate equal to 0%.
589        let sampling_project_state = project_state_with_single_rule(0.0);
590        let new_envelope = process_envelope_with_root_project_state(
591            envelope,
592            Some(Arc::new(sampling_project_state)),
593        )
594        .await;
595        let event = extract_first_event_from_envelope(new_envelope);
596        let trace_context = event.context::<TraceContext>().unwrap();
597        assert!(!trace_context.sampled.value().unwrap());
598    }
599
600    #[tokio::test]
601    async fn test_error_is_not_tagged_if_already_tagged() {
602        let event_id = EventId::new();
603        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
604            .parse()
605            .unwrap();
606        let request_meta = RequestMeta::new(dsn);
607
608        // We test tagging with an incoming event that has already been tagged by downstream Relay.
609        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
610        let mut item = Item::new(ItemType::Event);
611        item.set_payload(
612            ContentType::Json,
613            r#"{
614              "event_id": "52df9022835246eeb317dbd739ccd059",
615              "exception": {
616                "values": [
617                    {
618                      "type": "mytype",
619                      "value": "myvalue",
620                      "module": "mymodule",
621                      "thread_id": 42,
622                      "other": "value"
623                    }
624                ]
625              },
626              "contexts": {
627                "trace": {
628                    "sampled": true
629                }
630              }
631            }"#,
632        );
633        envelope.add_item(item);
634        let sampling_project_state = project_state_with_single_rule(0.0);
635        let new_envelope = process_envelope_with_root_project_state(
636            envelope,
637            Some(Arc::new(sampling_project_state)),
638        )
639        .await;
640        let event = extract_first_event_from_envelope(new_envelope);
641        let trace_context = event.context::<TraceContext>().unwrap();
642        assert!(trace_context.sampled.value().unwrap());
643    }
644
645    /// Happy path test for compute_sampling_decision.
646    #[tokio::test]
647    async fn test_compute_sampling_decision_matching() {
648        let event = mocked_event(EventType::Transaction, "foo", "bar");
649        let rule = SamplingRule {
650            condition: RuleCondition::all(),
651            sampling_value: SamplingValue::SampleRate { value: 1.0 },
652            ty: RuleType::Transaction,
653            id: RuleId(0),
654            time_range: TimeRange::default(),
655            decaying_fn: Default::default(),
656        };
657
658        let sampling_config = SamplingConfig {
659            rules: vec![rule],
660            ..SamplingConfig::new()
661        };
662
663        let res = compute_sampling_decision(
664            false,
665            None,
666            Some(&sampling_config),
667            Some(&event),
668            None,
669            None,
670        )
671        .await;
672        assert!(res.is_match());
673    }
674
675    #[tokio::test]
676    async fn test_matching_with_unsupported_rule() {
677        let event = mocked_event(EventType::Transaction, "foo", "bar");
678        let rule = SamplingRule {
679            condition: RuleCondition::all(),
680            sampling_value: SamplingValue::SampleRate { value: 1.0 },
681            ty: RuleType::Transaction,
682            id: RuleId(0),
683            time_range: TimeRange::default(),
684            decaying_fn: Default::default(),
685        };
686
687        let unsupported_rule = SamplingRule {
688            condition: RuleCondition::all(),
689            sampling_value: SamplingValue::SampleRate { value: 1.0 },
690            ty: RuleType::Unsupported,
691            id: RuleId(0),
692            time_range: TimeRange::default(),
693            decaying_fn: Default::default(),
694        };
695
696        let sampling_config = SamplingConfig {
697            rules: vec![rule, unsupported_rule],
698            ..SamplingConfig::new()
699        };
700
701        // Unsupported rule should result in no match if processing is not enabled.
702        let res = compute_sampling_decision(
703            false,
704            None,
705            Some(&sampling_config),
706            Some(&event),
707            None,
708            None,
709        )
710        .await;
711        assert!(res.is_no_match());
712
713        // Match if processing is enabled.
714        let res =
715            compute_sampling_decision(true, None, Some(&sampling_config), Some(&event), None, None)
716                .await;
717        assert!(res.is_match());
718    }
719
720    #[tokio::test]
721    async fn test_client_sample_rate() {
722        let dsc = DynamicSamplingContext {
723            trace_id: "67e5504410b1426f9247bb680e5fe0c8".parse().unwrap(),
724            public_key: ProjectKey::parse("abd0f232775f45feab79864e580d160b").unwrap(),
725            release: Some("1.1.1".to_owned()),
726            user: Default::default(),
727            replay_id: None,
728            environment: None,
729            transaction: Some("transaction1".into()),
730            sample_rate: Some(0.5),
731            sampled: Some(true),
732            other: BTreeMap::new(),
733        };
734
735        let rule = SamplingRule {
736            condition: RuleCondition::all(),
737            sampling_value: SamplingValue::SampleRate { value: 0.2 },
738            ty: RuleType::Trace,
739            id: RuleId(0),
740            time_range: TimeRange::default(),
741            decaying_fn: Default::default(),
742        };
743
744        let sampling_config = SamplingConfig {
745            rules: vec![rule],
746            ..SamplingConfig::new()
747        };
748
749        let res =
750            compute_sampling_decision(false, None, None, None, Some(&sampling_config), Some(&dsc))
751                .await;
752
753        assert_eq!(get_sampling_match(res).sample_rate(), 0.2);
754    }
755
756    async fn run_with_reservoir_rule<Group>(processing_group: ProcessingGroup) -> SamplingResult
757    where
758        Group: Sampling + TryFrom<ProcessingGroup>,
759    {
760        let project_info = {
761            let mut info = ProjectInfo::default();
762            info.config.transaction_metrics = Some(ErrorBoundary::Ok(TransactionMetricsConfig {
763                version: 1,
764                ..Default::default()
765            }));
766            Arc::new(info)
767        };
768
769        let bytes = Bytes::from(
770            r#"{"dsn":"https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42","trace":{"trace_id":"89143b0763095bd9c9955e8175d1fb23","public_key":"e12d836b15bb49d7bbf99e64295d995b"}}"#,
771        );
772        let envelope = Envelope::parse_bytes(bytes).unwrap();
773        let config = Arc::new(Config::default());
774
775        let mut managed_envelope: TypedEnvelope<Group> = (
776            ManagedEnvelope::new(envelope, Addr::dummy(), Addr::dummy()),
777            processing_group,
778        )
779            .try_into()
780            .unwrap();
781
782        let mut event = Annotated::new(Event::default());
783
784        let sampling_project_info = {
785            let mut state = ProjectInfo::default();
786            state.config.metric_extraction = ErrorBoundary::Ok(MetricExtractionConfig::default());
787            state.config.sampling = Some(ErrorBoundary::Ok(SamplingConfig {
788                version: 2,
789                rules: vec![
790                    // Set up a reservoir (only used for transactions):
791                    SamplingRule {
792                        condition: RuleCondition::all(),
793                        sampling_value: SamplingValue::Reservoir { limit: 100 },
794                        ty: RuleType::Trace,
795                        id: RuleId(1),
796                        time_range: Default::default(),
797                        decaying_fn: Default::default(),
798                    },
799                    // Reject everything that does not go into the reservoir:
800                    SamplingRule {
801                        condition: RuleCondition::all(),
802                        sampling_value: SamplingValue::SampleRate { value: 0.0 },
803                        ty: RuleType::Trace,
804                        id: RuleId(2),
805                        time_range: Default::default(),
806                        decaying_fn: Default::default(),
807                    },
808                ],
809                rules_v2: vec![],
810            }));
811            Some(Arc::new(state))
812        };
813
814        let reservoir = dummy_reservoir();
815        run::<Group>(
816            &mut managed_envelope,
817            &mut event,
818            config,
819            project_info,
820            sampling_project_info,
821            &reservoir,
822        )
823        .await
824    }
825
826    #[tokio::test]
827    async fn test_reservoir_applied_for_transactions() {
828        let result =
829            run_with_reservoir_rule::<TransactionGroup>(ProcessingGroup::Transaction).await;
830        // Default sampling rate is 0.0, but transaction is retained because of reservoir:
831        assert_eq!(result.decision(), SamplingDecision::Keep);
832    }
833
834    #[tokio::test]
835    async fn test_reservoir_not_applied_for_spans() {
836        let result = run_with_reservoir_rule::<SpanGroup>(ProcessingGroup::Span).await;
837        // Default sampling rate is 0.0, and the reservoir does not apply to spans:
838        assert_eq!(result.decision(), SamplingDecision::Drop);
839    }
840}