relay_server/services/processor/
dynamic_sampling.rs

1//! Dynamic sampling processor related code.
2use std::ops::ControlFlow;
3
4use chrono::Utc;
5use relay_config::Config;
6use relay_dynamic_config::ErrorBoundary;
7use relay_event_schema::protocol::{Contexts, Event, TraceContext};
8use relay_protocol::{Annotated, Empty};
9use relay_quotas::DataCategory;
10use relay_sampling::config::RuleType;
11use relay_sampling::evaluation::{ReservoirEvaluator, SamplingEvaluator};
12use relay_sampling::{DynamicSamplingContext, SamplingConfig};
13
14use crate::envelope::ItemType;
15use crate::managed::TypedEnvelope;
16use crate::services::outcome::Outcome;
17use crate::services::processor::{
18    EventProcessing, Sampling, SpansExtracted, TransactionGroup, event_category,
19};
20use crate::services::projects::project::ProjectInfo;
21use crate::utils::{self, SamplingResult};
22
23/// Ensures there is a valid dynamic sampling context and corresponding project state.
24///
25/// The dynamic sampling context (DSC) specifies the project_key of the project that initiated
26/// the trace. That project state should have been loaded previously by the project cache and is
27/// available on the `ProcessEnvelopeState`. Under these conditions, this cannot happen:
28///
29///  - There is no DSC in the envelope headers. This occurs with older or third-party SDKs.
30///  - The project key does not exist. This can happen if the project key was disabled, the
31///    project removed, or in rare cases when a project from another Sentry instance is referred
32///    to.
33///  - The project key refers to a project from another organization. In this case the project
34///    cache does not resolve the state and instead leaves it blank.
35///  - The project state could not be fetched. This is a runtime error, but in this case Relay
36///    should fall back to the next-best sampling rule set.
37///
38/// In all of the above cases, this function will compute a new DSC using information from the
39/// event payload, similar to how SDKs do this. The `sampling_project_state` is also switched to
40/// the main project state.
41///
42/// If there is no transaction event in the envelope, this function will do nothing.
43///
44/// The function will return the sampling project information of the root project for the event. If
45/// no sampling project information is specified, the project information of the event’s project
46/// will be returned.
47pub fn validate_and_set_dsc<'a, T>(
48    managed_envelope: &mut TypedEnvelope<T>,
49    event: &mut Annotated<Event>,
50    project_info: &'a ProjectInfo,
51    sampling_project_info: Option<&'a ProjectInfo>,
52) -> Option<&'a ProjectInfo> {
53    let original_dsc = managed_envelope.envelope().dsc();
54    if original_dsc.is_some() && sampling_project_info.is_some() {
55        return sampling_project_info;
56    }
57
58    // The DSC can only be computed if there's a transaction event. Note that `dsc_from_event`
59    // below already checks for the event type.
60    if let Some(event) = event.value()
61        && let Some(key_config) = project_info.get_public_key_config()
62        && let Some(mut dsc) = utils::dsc_from_event(key_config.public_key, event)
63    {
64        // All other information in the DSC must be discarded, but the sample rate was
65        // actually applied by the client and is therefore correct.
66        let original_sample_rate = original_dsc.and_then(|dsc| dsc.sample_rate);
67        dsc.sample_rate = dsc.sample_rate.or(original_sample_rate);
68
69        managed_envelope.envelope_mut().set_dsc(dsc);
70        return Some(project_info);
71    }
72
73    // If we cannot compute a new DSC but the old one is incorrect, we need to remove it.
74    managed_envelope.envelope_mut().remove_dsc();
75    None
76}
77
78/// Computes the sampling decision on the incoming event
79pub async fn run<Group>(
80    managed_envelope: &mut TypedEnvelope<Group>,
81    event: &mut Annotated<Event>,
82    config: &Config,
83    project_info: &ProjectInfo,
84    sampling_project_info: Option<&ProjectInfo>,
85    reservoir: &ReservoirEvaluator<'_>,
86) -> SamplingResult
87where
88    Group: Sampling,
89{
90    if !Group::supports_sampling(project_info) {
91        return SamplingResult::Pending;
92    }
93
94    let sampling_config = match project_info.config.sampling {
95        Some(ErrorBoundary::Ok(ref config)) if !config.unsupported() => Some(config),
96        _ => None,
97    };
98
99    let root_state = sampling_project_info.as_ref();
100    let root_config = match root_state.and_then(|s| s.config.sampling.as_ref()) {
101        Some(ErrorBoundary::Ok(config)) if !config.unsupported() => Some(config),
102        _ => None,
103    };
104
105    let reservoir = Group::supports_reservoir_sampling().then_some(reservoir);
106
107    compute_sampling_decision(
108        config.processing_enabled(),
109        reservoir,
110        sampling_config,
111        event.value(),
112        root_config,
113        managed_envelope.envelope().dsc(),
114    )
115    .await
116}
117
118/// Apply the dynamic sampling decision from `compute_sampling_decision`.
119pub fn drop_unsampled_items(
120    managed_envelope: &mut TypedEnvelope<TransactionGroup>,
121    event: Annotated<Event>,
122    outcome: Outcome,
123    spans_extracted: SpansExtracted,
124) {
125    // Remove all items from the envelope which need to be dropped due to dynamic sampling.
126    let dropped_items = managed_envelope
127        .envelope_mut()
128        // Profiles are not dropped by dynamic sampling, they are all forwarded to storage and
129        // later processed in Sentry and potentially dropped there.
130        .take_items_by(|item| *item.ty() != ItemType::Profile);
131
132    for item in dropped_items {
133        for (category, quantity) in item.quantities() {
134            // Dynamic sampling only drops indexed items.
135            //
136            // Only emit the base category, if the item does not have an indexed category.
137            if category.index_category().is_none() {
138                managed_envelope.track_outcome(outcome.clone(), category, quantity);
139            }
140        }
141    }
142
143    // Mark all remaining items in the envelope as un-sampled.
144    for item in managed_envelope.envelope_mut().items_mut() {
145        item.set_sampled(false);
146    }
147
148    // Another 'hack' to emit outcomes from the container item for the contained items (spans).
149    //
150    // The entire tracking outcomes for contained elements is not handled in a systematic way
151    // and whenever an event/transaction is discarded, contained elements are tracked in a 'best
152    // effort' basis (basically in all the cases where someone figured out this is a problem).
153    //
154    // This is yet another case, when the spans have not yet been separated from the transaction
155    // also emit dynamic sampling outcomes for the contained spans.
156    if !spans_extracted.0 {
157        let spans = event.value().and_then(|e| e.spans.value());
158        let span_count = spans.map_or(0, |s| s.len());
159
160        // Track the amount of contained spans + 1 segment span (the transaction itself which would
161        // be converted to a span).
162        managed_envelope.track_outcome(outcome.clone(), DataCategory::SpanIndexed, span_count + 1);
163    }
164
165    // All items have been dropped, now make sure the event is also handled and dropped.
166    if let Some(category) = event_category(&event) {
167        let category = category.index_category().unwrap_or(category);
168        managed_envelope.track_outcome(outcome, category, 1)
169    }
170}
171
172/// Computes the sampling decision on the incoming envelope.
173async fn compute_sampling_decision(
174    processing_enabled: bool,
175    reservoir: Option<&ReservoirEvaluator<'_>>,
176    sampling_config: Option<&SamplingConfig>,
177    event: Option<&Event>,
178    root_sampling_config: Option<&SamplingConfig>,
179    dsc: Option<&DynamicSamplingContext>,
180) -> SamplingResult {
181    if (sampling_config.is_none() || event.is_none())
182        && (root_sampling_config.is_none() || dsc.is_none())
183    {
184        return SamplingResult::NoMatch;
185    }
186
187    if sampling_config.is_some_and(|config| config.unsupported())
188        || root_sampling_config.is_some_and(|config| config.unsupported())
189    {
190        if processing_enabled {
191            relay_log::error!("found unsupported rules even as processing relay");
192        } else {
193            return SamplingResult::NoMatch;
194        }
195    }
196
197    let mut evaluator = match reservoir {
198        Some(reservoir) => SamplingEvaluator::new_with_reservoir(Utc::now(), reservoir),
199        None => SamplingEvaluator::new(Utc::now()),
200    };
201
202    if let (Some(event), Some(sampling_state)) = (event, sampling_config)
203        && let Some(seed) = event.id.value().map(|id| id.0)
204    {
205        let rules = sampling_state.filter_rules(RuleType::Transaction);
206        evaluator = match evaluator.match_rules(seed, event, rules).await {
207            ControlFlow::Continue(evaluator) => evaluator,
208            ControlFlow::Break(sampling_match) => {
209                return SamplingResult::Match(sampling_match);
210            }
211        }
212    };
213
214    if let (Some(dsc), Some(sampling_state)) = (dsc, sampling_config) {
215        let rules = sampling_state.filter_rules(RuleType::Project);
216        evaluator = match evaluator.match_rules(*dsc.trace_id, dsc, rules).await {
217            ControlFlow::Continue(evaluator) => evaluator,
218            ControlFlow::Break(sampling_match) => {
219                return SamplingResult::Match(sampling_match);
220            }
221        }
222    };
223
224    if let (Some(dsc), Some(sampling_state)) = (dsc, root_sampling_config) {
225        let rules = sampling_state.filter_rules(RuleType::Trace);
226        return evaluator
227            .match_rules(*dsc.trace_id, dsc, rules)
228            .await
229            .into();
230    }
231
232    SamplingResult::NoMatch
233}
234
235/// Runs dynamic sampling on an incoming error and tags it in case of successful sampling
236/// decision.
237///
238/// This execution of dynamic sampling is technically a "simulation" since we will use the result
239/// only for tagging errors and not for actually sampling incoming events.
240pub async fn tag_error_with_sampling_decision<Group: EventProcessing>(
241    managed_envelope: &mut TypedEnvelope<Group>,
242    event: &mut Annotated<Event>,
243    sampling_project_info: Option<&ProjectInfo>,
244    config: &Config,
245) {
246    let (Some(dsc), Some(event)) = (managed_envelope.envelope().dsc(), event.value_mut()) else {
247        return;
248    };
249
250    let root_state = sampling_project_info.as_ref();
251    let sampling_config = match root_state.and_then(|s| s.config.sampling.as_ref()) {
252        Some(ErrorBoundary::Ok(config)) => config,
253        _ => return,
254    };
255
256    if sampling_config.unsupported() {
257        if config.processing_enabled() {
258            relay_log::error!("found unsupported rules even as processing relay");
259        }
260
261        return;
262    }
263
264    let Some(sampled) = utils::is_trace_fully_sampled(sampling_config, dsc).await else {
265        return;
266    };
267
268    // We want to get the trace context, in which we will inject the `sampled` field.
269    let context = event
270        .contexts
271        .get_or_insert_with(Contexts::new)
272        .get_or_default::<TraceContext>();
273
274    // We want to update `sampled` only if it was not set, since if we don't check this
275    // we will end up overriding the value set by downstream Relays and this will lead
276    // to more complex debugging in case of problems.
277    if context.sampled.is_empty() {
278        relay_log::trace!("tagged error with `sampled = {}` flag", sampled);
279        context.sampled = Annotated::new(sampled);
280    }
281}
282
283#[cfg(test)]
284mod tests {
285    use std::collections::BTreeMap;
286    use std::sync::Arc;
287
288    use bytes::Bytes;
289    use relay_base_schema::events::EventType;
290    use relay_base_schema::project::ProjectKey;
291    use relay_cogs::Token;
292    use relay_dynamic_config::{MetricExtractionConfig, TransactionMetricsConfig};
293    use relay_event_schema::protocol::{EventId, LenientString};
294    use relay_protocol::RuleCondition;
295    use relay_sampling::config::{
296        DecayingFunction, RuleId, SamplingRule, SamplingValue, TimeRange,
297    };
298    use relay_sampling::evaluation::{ReservoirCounters, SamplingDecision, SamplingMatch};
299    use relay_system::Addr;
300
301    use crate::envelope::{ContentType, Envelope, Item};
302    use crate::extractors::RequestMeta;
303    use crate::managed::ManagedEnvelope;
304    use crate::processing;
305    use crate::services::processor::{ProcessEnvelopeGrouped, ProcessingGroup, SpanGroup, Submit};
306    use crate::services::projects::project::ProjectInfo;
307    use crate::testutils::{create_test_processor, new_envelope, state_with_rule_and_condition};
308
309    use super::*;
310
311    fn mocked_event(event_type: EventType, transaction: &str, release: &str) -> Event {
312        Event {
313            id: Annotated::new(EventId::new()),
314            ty: Annotated::new(event_type),
315            transaction: Annotated::new(transaction.to_owned()),
316            release: Annotated::new(LenientString(release.to_owned())),
317            ..Event::default()
318        }
319    }
320
321    fn dummy_reservoir() -> ReservoirEvaluator<'static> {
322        ReservoirEvaluator::new(ReservoirCounters::default())
323    }
324
325    fn test_dsc() -> DynamicSamplingContext {
326        DynamicSamplingContext {
327            trace_id: "67e5504410b1426f9247bb680e5fe0c8".parse().unwrap(),
328            public_key: ProjectKey::parse("abd0f232775f45feab79864e580d160b").unwrap(),
329            release: Some("1.1.1".to_owned()),
330            user: Default::default(),
331            replay_id: None,
332            environment: None,
333            transaction: Some("transaction1".into()),
334            sample_rate: Some(0.5),
335            sampled: Some(true),
336            other: BTreeMap::new(),
337        }
338    }
339
340    // Helper to extract the sampling match from SamplingResult if thats the variant.
341    fn get_sampling_match(sampling_result: SamplingResult) -> SamplingMatch {
342        if let SamplingResult::Match(sampling_match) = sampling_result {
343            sampling_match
344        } else {
345            panic!()
346        }
347    }
348
349    /// Always sets the processing item type to event.
350    async fn process_envelope_with_root_project_state(
351        envelope: Box<Envelope>,
352        sampling_project_info: Option<&ProjectInfo>,
353    ) -> Envelope {
354        let processor = create_test_processor(Default::default()).await;
355        let outcome_aggregator = Addr::dummy();
356
357        let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
358        assert_eq!(envelopes.len(), 1);
359        let (group, envelope) = envelopes.pop().unwrap();
360
361        let message = ProcessEnvelopeGrouped {
362            group,
363            envelope: ManagedEnvelope::new(envelope, outcome_aggregator),
364            ctx: processing::Context {
365                sampling_project_info,
366                ..processing::Context::for_test()
367            },
368            reservoir_counters: &ReservoirCounters::default(),
369        };
370
371        let Ok(Some(Submit::Envelope(envelope))) =
372            processor.process(&mut Token::noop(), message).await
373        else {
374            panic!();
375        };
376
377        envelope.envelope().clone()
378    }
379
380    fn extract_first_event_from_envelope(envelope: Envelope) -> Event {
381        let item = envelope.items().next().unwrap();
382        let annotated_event: Annotated<Event> =
383            Annotated::from_json_bytes(&item.payload()).unwrap();
384        annotated_event.into_value().unwrap()
385    }
386
387    fn mocked_error_item() -> Item {
388        let mut item = Item::new(ItemType::Event);
389        item.set_payload(
390            ContentType::Json,
391            r#"{
392              "event_id": "52df9022835246eeb317dbd739ccd059",
393              "exception": {
394                "values": [
395                    {
396                      "type": "mytype",
397                      "value": "myvalue",
398                      "module": "mymodule",
399                      "thread_id": 42,
400                      "other": "value"
401                    }
402                ]
403              }
404            }"#,
405        );
406        item
407    }
408
409    #[tokio::test]
410    async fn test_error_is_tagged_correctly_if_trace_sampling_result_is_none() {
411        let event_id = EventId::new();
412        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
413            .parse()
414            .unwrap();
415        let request_meta = RequestMeta::new(dsn);
416
417        // We test tagging when root project state and dsc are none.
418        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
419        envelope.add_item(mocked_error_item());
420        let new_envelope = process_envelope_with_root_project_state(envelope, None).await;
421        let event = extract_first_event_from_envelope(new_envelope);
422
423        assert!(event.contexts.value().is_none());
424    }
425
426    #[tokio::test]
427    async fn test_it_keeps_or_drops_transactions() {
428        let event = Event {
429            id: Annotated::new(EventId::new()),
430            ty: Annotated::new(EventType::Transaction),
431            transaction: Annotated::new("testing".to_owned()),
432            ..Event::default()
433        };
434
435        for (sample_rate, should_keep) in [(0.0, false), (1.0, true)] {
436            let sampling_config = SamplingConfig {
437                rules: vec![SamplingRule {
438                    condition: RuleCondition::all(),
439                    sampling_value: SamplingValue::SampleRate { value: sample_rate },
440                    ty: RuleType::Transaction,
441                    id: RuleId(1),
442                    time_range: Default::default(),
443                    decaying_fn: DecayingFunction::Constant,
444                }],
445                ..SamplingConfig::new()
446            };
447
448            // TODO: This does not test if the sampling decision is actually applied. This should be
449            // refactored to send a proper Envelope in and call process_state to cover the full
450            // pipeline.
451            let res = compute_sampling_decision(
452                false,
453                None,
454                Some(&sampling_config),
455                Some(&event),
456                None,
457                None,
458            )
459            .await;
460            assert_eq!(res.decision().is_keep(), should_keep);
461        }
462    }
463
464    #[tokio::test]
465    async fn test_dsc_respects_metrics_extracted() {
466        relay_test::setup();
467        let outcome_aggregator = Addr::dummy();
468
469        let config = Arc::new(
470            Config::from_json_value(serde_json::json!({
471                "processing": {
472                    "enabled": true,
473                    "kafka_config": [],
474                }
475            }))
476            .unwrap(),
477        );
478
479        let get_test_params = |version: Option<u16>| {
480            let event = Event {
481                id: Annotated::new(EventId::new()),
482                ty: Annotated::new(EventType::Transaction),
483                transaction: Annotated::new("testing".to_owned()),
484                ..Event::default()
485            };
486
487            let mut project_info = state_with_rule_and_condition(
488                Some(0.0),
489                RuleType::Transaction,
490                RuleCondition::all(),
491            );
492
493            if let Some(version) = version {
494                project_info.config.transaction_metrics =
495                    ErrorBoundary::Ok(relay_dynamic_config::TransactionMetricsConfig {
496                        version,
497                        ..Default::default()
498                    })
499                    .into();
500            }
501
502            let envelope = new_envelope(false, "foo");
503            let managed_envelope: TypedEnvelope<TransactionGroup> = (
504                ManagedEnvelope::new(envelope, outcome_aggregator.clone()),
505                ProcessingGroup::Transaction,
506            )
507                .try_into()
508                .unwrap();
509
510            let event = Annotated::from(event);
511            (managed_envelope, event, project_info)
512        };
513
514        let reservoir = dummy_reservoir();
515
516        // None represents no TransactionMetricsConfig, DS will not be run
517        let (mut managed_envelope, mut event, project_info) = get_test_params(None);
518        let sampling_result = run(
519            &mut managed_envelope,
520            &mut event,
521            &config,
522            &project_info,
523            None,
524            &reservoir,
525        )
526        .await;
527        assert_eq!(sampling_result.decision(), SamplingDecision::Keep);
528
529        // Current version is 3, so it won't run DS if it's outdated
530        let (mut managed_envelope, mut event, project_info) = get_test_params(Some(2));
531        let sampling_result = run(
532            &mut managed_envelope,
533            &mut event,
534            &config,
535            &project_info,
536            None,
537            &reservoir,
538        )
539        .await;
540        assert_eq!(sampling_result.decision(), SamplingDecision::Keep);
541
542        // Dynamic sampling is run, as the transaction metrics version is up to date.
543        let (mut managed_envelope, mut event, project_info) = get_test_params(Some(3));
544        let sampling_result = run(
545            &mut managed_envelope,
546            &mut event,
547            &config,
548            &project_info,
549            None,
550            &reservoir,
551        )
552        .await;
553        assert_eq!(sampling_result.decision(), SamplingDecision::Drop);
554    }
555
556    fn project_state_with_single_rule(sample_rate: f64) -> ProjectInfo {
557        let sampling_config = SamplingConfig {
558            rules: vec![SamplingRule {
559                condition: RuleCondition::all(),
560                sampling_value: SamplingValue::SampleRate { value: sample_rate },
561                ty: RuleType::Trace,
562                id: RuleId(1),
563                time_range: Default::default(),
564                decaying_fn: Default::default(),
565            }],
566            ..SamplingConfig::new()
567        };
568
569        let mut sampling_project_state = ProjectInfo::default();
570        sampling_project_state.config.sampling = Some(ErrorBoundary::Ok(sampling_config));
571        sampling_project_state
572    }
573
574    #[tokio::test]
575    async fn test_error_is_tagged_correctly_if_trace_sampling_result_is_some() {
576        let event_id = EventId::new();
577        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
578            .parse()
579            .unwrap();
580        let request_meta = RequestMeta::new(dsn);
581        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
582        envelope.set_dsc(test_dsc());
583        envelope.add_item(mocked_error_item());
584
585        // We test with sample rate equal to 100%.
586        let sampling_project_state = project_state_with_single_rule(1.0);
587        let new_envelope = process_envelope_with_root_project_state(
588            envelope.clone(),
589            Some(&sampling_project_state),
590        )
591        .await;
592        let event = extract_first_event_from_envelope(new_envelope);
593        let trace_context = event.context::<TraceContext>().unwrap();
594        assert!(trace_context.sampled.value().unwrap());
595
596        // We test with sample rate equal to 0%.
597        let sampling_project_state = project_state_with_single_rule(0.0);
598        let new_envelope =
599            process_envelope_with_root_project_state(envelope, Some(&sampling_project_state)).await;
600        let event = extract_first_event_from_envelope(new_envelope);
601        let trace_context = event.context::<TraceContext>().unwrap();
602        assert!(!trace_context.sampled.value().unwrap());
603    }
604
605    #[tokio::test]
606    async fn test_error_is_not_tagged_if_already_tagged() {
607        let event_id = EventId::new();
608        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
609            .parse()
610            .unwrap();
611        let request_meta = RequestMeta::new(dsn);
612
613        // We test tagging with an incoming event that has already been tagged by downstream Relay.
614        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
615        let mut item = Item::new(ItemType::Event);
616        item.set_payload(
617            ContentType::Json,
618            r#"{
619              "event_id": "52df9022835246eeb317dbd739ccd059",
620              "exception": {
621                "values": [
622                    {
623                      "type": "mytype",
624                      "value": "myvalue",
625                      "module": "mymodule",
626                      "thread_id": 42,
627                      "other": "value"
628                    }
629                ]
630              },
631              "contexts": {
632                "trace": {
633                    "sampled": true
634                }
635              }
636            }"#,
637        );
638        envelope.add_item(item);
639        let sampling_project_state = project_state_with_single_rule(0.0);
640        let new_envelope =
641            process_envelope_with_root_project_state(envelope, Some(&sampling_project_state)).await;
642        let event = extract_first_event_from_envelope(new_envelope);
643        let trace_context = event.context::<TraceContext>().unwrap();
644        assert!(trace_context.sampled.value().unwrap());
645    }
646
647    /// Happy path test for compute_sampling_decision.
648    #[tokio::test]
649    async fn test_compute_sampling_decision_matching() {
650        for rule_type in [RuleType::Transaction, RuleType::Project] {
651            let event = mocked_event(EventType::Transaction, "foo", "bar");
652            let rule = SamplingRule {
653                condition: RuleCondition::all(),
654                sampling_value: SamplingValue::SampleRate { value: 1.0 },
655                ty: rule_type,
656                id: RuleId(0),
657                time_range: TimeRange::default(),
658                decaying_fn: Default::default(),
659            };
660
661            let sampling_config = SamplingConfig {
662                rules: vec![rule],
663                ..SamplingConfig::new()
664            };
665
666            let res = compute_sampling_decision(
667                false,
668                None,
669                Some(&sampling_config),
670                Some(&event),
671                None,
672                Some(&test_dsc()),
673            )
674            .await;
675            assert!(res.is_match());
676        }
677    }
678
679    #[tokio::test]
680    async fn test_matching_with_unsupported_rule() {
681        let event = mocked_event(EventType::Transaction, "foo", "bar");
682        let rule = SamplingRule {
683            condition: RuleCondition::all(),
684            sampling_value: SamplingValue::SampleRate { value: 1.0 },
685            ty: RuleType::Transaction,
686            id: RuleId(0),
687            time_range: TimeRange::default(),
688            decaying_fn: Default::default(),
689        };
690
691        let unsupported_rule = SamplingRule {
692            condition: RuleCondition::all(),
693            sampling_value: SamplingValue::SampleRate { value: 1.0 },
694            ty: RuleType::Unsupported,
695            id: RuleId(0),
696            time_range: TimeRange::default(),
697            decaying_fn: Default::default(),
698        };
699
700        let sampling_config = SamplingConfig {
701            rules: vec![rule, unsupported_rule],
702            ..SamplingConfig::new()
703        };
704
705        // Unsupported rule should result in no match if processing is not enabled.
706        let res = compute_sampling_decision(
707            false,
708            None,
709            Some(&sampling_config),
710            Some(&event),
711            None,
712            None,
713        )
714        .await;
715        assert!(res.is_no_match());
716
717        // Match if processing is enabled.
718        let res =
719            compute_sampling_decision(true, None, Some(&sampling_config), Some(&event), None, None)
720                .await;
721        assert!(res.is_match());
722    }
723
724    #[tokio::test]
725    async fn test_client_sample_rate() {
726        let dsc = test_dsc();
727
728        let rule = SamplingRule {
729            condition: RuleCondition::all(),
730            sampling_value: SamplingValue::SampleRate { value: 0.2 },
731            ty: RuleType::Trace,
732            id: RuleId(0),
733            time_range: TimeRange::default(),
734            decaying_fn: Default::default(),
735        };
736
737        let sampling_config = SamplingConfig {
738            rules: vec![rule],
739            ..SamplingConfig::new()
740        };
741
742        let res =
743            compute_sampling_decision(false, None, None, None, Some(&sampling_config), Some(&dsc))
744                .await;
745
746        assert_eq!(get_sampling_match(res).sample_rate(), 0.2);
747    }
748
749    async fn run_with_reservoir_rule<Group>(processing_group: ProcessingGroup) -> SamplingResult
750    where
751        Group: Sampling + TryFrom<ProcessingGroup>,
752    {
753        let project_info = {
754            let mut info = ProjectInfo::default();
755            info.config.transaction_metrics = Some(ErrorBoundary::Ok(TransactionMetricsConfig {
756                version: 1,
757                ..Default::default()
758            }));
759            info
760        };
761
762        let bytes = Bytes::from(
763            r#"{"dsn":"https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42","trace":{"trace_id":"89143b0763095bd9c9955e8175d1fb23","public_key":"e12d836b15bb49d7bbf99e64295d995b"}}"#,
764        );
765        let envelope = Envelope::parse_bytes(bytes).unwrap();
766        let config = Config::default();
767
768        let mut managed_envelope: TypedEnvelope<Group> = (
769            ManagedEnvelope::new(envelope, Addr::dummy()),
770            processing_group,
771        )
772            .try_into()
773            .unwrap();
774
775        let mut event = Annotated::new(Event::default());
776
777        let sampling_project_info = {
778            let mut state = ProjectInfo::default();
779            state.config.metric_extraction = ErrorBoundary::Ok(MetricExtractionConfig::default());
780            state.config.sampling = Some(ErrorBoundary::Ok(SamplingConfig {
781                version: 2,
782                rules: vec![
783                    // Set up a reservoir (only used for transactions):
784                    SamplingRule {
785                        condition: RuleCondition::all(),
786                        sampling_value: SamplingValue::Reservoir { limit: 100 },
787                        ty: RuleType::Trace,
788                        id: RuleId(1),
789                        time_range: Default::default(),
790                        decaying_fn: Default::default(),
791                    },
792                    // Reject everything that does not go into the reservoir:
793                    SamplingRule {
794                        condition: RuleCondition::all(),
795                        sampling_value: SamplingValue::SampleRate { value: 0.0 },
796                        ty: RuleType::Trace,
797                        id: RuleId(2),
798                        time_range: Default::default(),
799                        decaying_fn: Default::default(),
800                    },
801                ],
802                rules_v2: vec![],
803            }));
804            Some(state)
805        };
806
807        let reservoir = dummy_reservoir();
808        run::<Group>(
809            &mut managed_envelope,
810            &mut event,
811            &config,
812            &project_info,
813            sampling_project_info.as_ref(),
814            &reservoir,
815        )
816        .await
817    }
818
819    #[tokio::test]
820    async fn test_reservoir_applied_for_transactions() {
821        let result =
822            run_with_reservoir_rule::<TransactionGroup>(ProcessingGroup::Transaction).await;
823        // Default sampling rate is 0.0, but transaction is retained because of reservoir:
824        assert_eq!(result.decision(), SamplingDecision::Keep);
825    }
826
827    #[tokio::test]
828    async fn test_reservoir_not_applied_for_spans() {
829        let result = run_with_reservoir_rule::<SpanGroup>(ProcessingGroup::Span).await;
830        // Default sampling rate is 0.0, and the reservoir does not apply to spans:
831        assert_eq!(result.decision(), SamplingDecision::Drop);
832    }
833}