relay_server/services/processor/
dynamic_sampling.rs

1//! Dynamic sampling processor related code.
2
3use relay_config::Config;
4use relay_dynamic_config::ErrorBoundary;
5use relay_event_schema::protocol::{Contexts, Event, TraceContext};
6use relay_protocol::{Annotated, Empty};
7use relay_quotas::DataCategory;
8
9use crate::envelope::ItemType;
10use crate::managed::TypedEnvelope;
11use crate::services::outcome::Outcome;
12use crate::services::processor::{
13    EventProcessing, SpansExtracted, TransactionGroup, event_category,
14};
15use crate::services::projects::project::ProjectInfo;
16use crate::utils::{self};
17
18/// Ensures there is a valid dynamic sampling context and corresponding project state.
19///
20/// The dynamic sampling context (DSC) specifies the project_key of the project that initiated
21/// the trace. That project state should have been loaded previously by the project cache and is
22/// available on the `ProcessEnvelopeState`. Under these conditions, this cannot happen:
23///
24///  - There is no DSC in the envelope headers. This occurs with older or third-party SDKs.
25///  - The project key does not exist. This can happen if the project key was disabled, the
26///    project removed, or in rare cases when a project from another Sentry instance is referred
27///    to.
28///  - The project key refers to a project from another organization. In this case the project
29///    cache does not resolve the state and instead leaves it blank.
30///  - The project state could not be fetched. This is a runtime error, but in this case Relay
31///    should fall back to the next-best sampling rule set.
32///
33/// In all of the above cases, this function will compute a new DSC using information from the
34/// event payload, similar to how SDKs do this. The `sampling_project_state` is also switched to
35/// the main project state.
36///
37/// If there is no transaction event in the envelope, this function will do nothing.
38///
39/// The function will return the sampling project information of the root project for the event. If
40/// no sampling project information is specified, the project information of the event’s project
41/// will be returned.
42pub fn validate_and_set_dsc<'a, T>(
43    managed_envelope: &mut TypedEnvelope<T>,
44    event: &mut Annotated<Event>,
45    project_info: &'a ProjectInfo,
46    sampling_project_info: Option<&'a ProjectInfo>,
47) -> Option<&'a ProjectInfo> {
48    let original_dsc = managed_envelope.envelope().dsc();
49    if original_dsc.is_some() && sampling_project_info.is_some() {
50        return sampling_project_info;
51    }
52
53    // The DSC can only be computed if there's a transaction event. Note that `dsc_from_event`
54    // below already checks for the event type.
55    if let Some(event) = event.value()
56        && let Some(key_config) = project_info.get_public_key_config()
57        && let Some(mut dsc) = utils::dsc_from_event(key_config.public_key, event)
58    {
59        // All other information in the DSC must be discarded, but the sample rate was
60        // actually applied by the client and is therefore correct.
61        let original_sample_rate = original_dsc.and_then(|dsc| dsc.sample_rate);
62        dsc.sample_rate = dsc.sample_rate.or(original_sample_rate);
63
64        managed_envelope.envelope_mut().set_dsc(dsc);
65        return Some(project_info);
66    }
67
68    // If we cannot compute a new DSC but the old one is incorrect, we need to remove it.
69    managed_envelope.envelope_mut().remove_dsc();
70    None
71}
72
73/// Apply the dynamic sampling decision from `compute_sampling_decision`.
74pub fn drop_unsampled_items(
75    managed_envelope: &mut TypedEnvelope<TransactionGroup>,
76    event: Annotated<Event>,
77    outcome: Outcome,
78    spans_extracted: SpansExtracted,
79) {
80    // Remove all items from the envelope which need to be dropped due to dynamic sampling.
81    let dropped_items = managed_envelope
82        .envelope_mut()
83        // Profiles are not dropped by dynamic sampling, they are all forwarded to storage and
84        // later processed in Sentry and potentially dropped there.
85        .take_items_by(|item| *item.ty() != ItemType::Profile);
86
87    for item in dropped_items {
88        for (category, quantity) in item.quantities() {
89            // Dynamic sampling only drops indexed items.
90            //
91            // Only emit the base category, if the item does not have an indexed category.
92            if category.index_category().is_none() {
93                managed_envelope.track_outcome(outcome.clone(), category, quantity);
94            }
95        }
96    }
97
98    // Mark all remaining items in the envelope as un-sampled.
99    for item in managed_envelope.envelope_mut().items_mut() {
100        item.set_sampled(false);
101    }
102
103    // Another 'hack' to emit outcomes from the container item for the contained items (spans).
104    //
105    // The entire tracking outcomes for contained elements is not handled in a systematic way
106    // and whenever an event/transaction is discarded, contained elements are tracked in a 'best
107    // effort' basis (basically in all the cases where someone figured out this is a problem).
108    //
109    // This is yet another case, when the spans have not yet been separated from the transaction
110    // also emit dynamic sampling outcomes for the contained spans.
111    if !spans_extracted.0 {
112        let spans = event.value().and_then(|e| e.spans.value());
113        let span_count = spans.map_or(0, |s| s.len());
114
115        // Track the amount of contained spans + 1 segment span (the transaction itself which would
116        // be converted to a span).
117        managed_envelope.track_outcome(outcome.clone(), DataCategory::SpanIndexed, span_count + 1);
118    }
119
120    // All items have been dropped, now make sure the event is also handled and dropped.
121    if let Some(category) = event_category(&event) {
122        let category = category.index_category().unwrap_or(category);
123        managed_envelope.track_outcome(outcome, category, 1)
124    }
125}
126
127/// Runs dynamic sampling on an incoming error and tags it in case of successful sampling
128/// decision.
129///
130/// This execution of dynamic sampling is technically a "simulation" since we will use the result
131/// only for tagging errors and not for actually sampling incoming events.
132pub async fn tag_error_with_sampling_decision<Group: EventProcessing>(
133    managed_envelope: &mut TypedEnvelope<Group>,
134    event: &mut Annotated<Event>,
135    sampling_project_info: Option<&ProjectInfo>,
136    config: &Config,
137) {
138    let (Some(dsc), Some(event)) = (managed_envelope.envelope().dsc(), event.value_mut()) else {
139        return;
140    };
141
142    let root_state = sampling_project_info.as_ref();
143    let sampling_config = match root_state.and_then(|s| s.config.sampling.as_ref()) {
144        Some(ErrorBoundary::Ok(config)) => config,
145        _ => return,
146    };
147
148    if sampling_config.unsupported() {
149        if config.processing_enabled() {
150            relay_log::error!("found unsupported rules even as processing relay");
151        }
152
153        return;
154    }
155
156    let Some(sampled) = utils::is_trace_fully_sampled(sampling_config, dsc).await else {
157        return;
158    };
159
160    // We want to get the trace context, in which we will inject the `sampled` field.
161    let context = event
162        .contexts
163        .get_or_insert_with(Contexts::new)
164        .get_or_default::<TraceContext>();
165
166    // We want to update `sampled` only if it was not set, since if we don't check this
167    // we will end up overriding the value set by downstream Relays and this will lead
168    // to more complex debugging in case of problems.
169    if context.sampled.is_empty() {
170        relay_log::trace!("tagged error with `sampled = {}` flag", sampled);
171        context.sampled = Annotated::new(sampled);
172    }
173}
174
175#[cfg(test)]
176mod tests {
177    use std::collections::BTreeMap;
178
179    use relay_base_schema::project::ProjectKey;
180    use relay_cogs::Token;
181    use relay_event_schema::protocol::EventId;
182    use relay_protocol::RuleCondition;
183    use relay_sampling::config::{RuleId, RuleType, SamplingRule, SamplingValue};
184    use relay_sampling::evaluation::ReservoirCounters;
185    use relay_sampling::{DynamicSamplingContext, SamplingConfig};
186    use relay_system::Addr;
187
188    use crate::envelope::{ContentType, Envelope, Item};
189    use crate::extractors::RequestMeta;
190    use crate::managed::ManagedEnvelope;
191    use crate::processing;
192    use crate::services::processor::{ProcessEnvelopeGrouped, ProcessingGroup, Submit};
193    use crate::services::projects::project::ProjectInfo;
194    use crate::testutils::create_test_processor;
195
196    use super::*;
197
198    /// Always sets the processing item type to event.
199    async fn process_envelope_with_root_project_state(
200        envelope: Box<Envelope>,
201        sampling_project_info: Option<&ProjectInfo>,
202    ) -> Envelope {
203        let processor = create_test_processor(Default::default()).await;
204        let outcome_aggregator = Addr::dummy();
205
206        let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
207        assert_eq!(envelopes.len(), 1);
208        let (group, envelope) = envelopes.pop().unwrap();
209
210        let message = ProcessEnvelopeGrouped {
211            group,
212            envelope: ManagedEnvelope::new(envelope, outcome_aggregator),
213            ctx: processing::Context {
214                sampling_project_info,
215                ..processing::Context::for_test()
216            },
217            reservoir_counters: &ReservoirCounters::default(),
218        };
219
220        let Ok(Some(Submit::Envelope(envelope))) =
221            processor.process(&mut Token::noop(), message).await
222        else {
223            panic!();
224        };
225
226        envelope.envelope().clone()
227    }
228
229    fn extract_first_event_from_envelope(envelope: Envelope) -> Event {
230        let item = envelope.items().next().unwrap();
231        let annotated_event: Annotated<Event> =
232            Annotated::from_json_bytes(&item.payload()).unwrap();
233        annotated_event.into_value().unwrap()
234    }
235
236    fn mocked_error_item() -> Item {
237        let mut item = Item::new(ItemType::Event);
238        item.set_payload(
239            ContentType::Json,
240            r#"{
241              "event_id": "52df9022835246eeb317dbd739ccd059",
242              "exception": {
243                "values": [
244                    {
245                      "type": "mytype",
246                      "value": "myvalue",
247                      "module": "mymodule",
248                      "thread_id": 42,
249                      "other": "value"
250                    }
251                ]
252              }
253            }"#,
254        );
255        item
256    }
257
258    #[tokio::test]
259    async fn test_error_is_tagged_correctly_if_trace_sampling_result_is_none() {
260        let event_id = EventId::new();
261        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
262            .parse()
263            .unwrap();
264        let request_meta = RequestMeta::new(dsn);
265
266        // We test tagging when root project state and dsc are none.
267        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
268        envelope.add_item(mocked_error_item());
269        let new_envelope = process_envelope_with_root_project_state(envelope, None).await;
270        let event = extract_first_event_from_envelope(new_envelope);
271
272        assert!(event.contexts.value().is_none());
273    }
274
275    fn project_state_with_single_rule(sample_rate: f64) -> ProjectInfo {
276        let sampling_config = SamplingConfig {
277            rules: vec![SamplingRule {
278                condition: RuleCondition::all(),
279                sampling_value: SamplingValue::SampleRate { value: sample_rate },
280                ty: RuleType::Trace,
281                id: RuleId(1),
282                time_range: Default::default(),
283                decaying_fn: Default::default(),
284            }],
285            ..SamplingConfig::new()
286        };
287
288        let mut sampling_project_state = ProjectInfo::default();
289        sampling_project_state.config.sampling = Some(ErrorBoundary::Ok(sampling_config));
290        sampling_project_state
291    }
292
293    fn mock_dsc() -> DynamicSamplingContext {
294        DynamicSamplingContext {
295            trace_id: "67e5504410b1426f9247bb680e5fe0c8".parse().unwrap(),
296            public_key: ProjectKey::parse("abd0f232775f45feab79864e580d160b").unwrap(),
297            release: Some("1.1.1".to_owned()),
298            user: Default::default(),
299            replay_id: None,
300            environment: None,
301            transaction: Some("transaction1".into()),
302            sample_rate: Some(0.5),
303            sampled: Some(true),
304            other: BTreeMap::new(),
305        }
306    }
307
308    #[tokio::test]
309    async fn test_error_is_tagged_correctly_if_trace_sampling_result_is_some() {
310        let event_id = EventId::new();
311        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
312            .parse()
313            .unwrap();
314        let request_meta = RequestMeta::new(dsn);
315        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
316        envelope.set_dsc(mock_dsc());
317        envelope.add_item(mocked_error_item());
318
319        // We test with sample rate equal to 100%.
320        let sampling_project_state = project_state_with_single_rule(1.0);
321        let new_envelope = process_envelope_with_root_project_state(
322            envelope.clone(),
323            Some(&sampling_project_state),
324        )
325        .await;
326        let event = extract_first_event_from_envelope(new_envelope);
327        let trace_context = event.context::<TraceContext>().unwrap();
328        assert!(trace_context.sampled.value().unwrap());
329
330        // We test with sample rate equal to 0%.
331        let sampling_project_state = project_state_with_single_rule(0.0);
332        let new_envelope =
333            process_envelope_with_root_project_state(envelope, Some(&sampling_project_state)).await;
334        let event = extract_first_event_from_envelope(new_envelope);
335        let trace_context = event.context::<TraceContext>().unwrap();
336        assert!(!trace_context.sampled.value().unwrap());
337    }
338
339    #[tokio::test]
340    async fn test_error_is_not_tagged_if_already_tagged() {
341        let event_id = EventId::new();
342        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
343            .parse()
344            .unwrap();
345        let request_meta = RequestMeta::new(dsn);
346
347        // We test tagging with an incoming event that has already been tagged by downstream Relay.
348        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
349        let mut item = Item::new(ItemType::Event);
350        item.set_payload(
351            ContentType::Json,
352            r#"{
353              "event_id": "52df9022835246eeb317dbd739ccd059",
354              "exception": {
355                "values": [
356                    {
357                      "type": "mytype",
358                      "value": "myvalue",
359                      "module": "mymodule",
360                      "thread_id": 42,
361                      "other": "value"
362                    }
363                ]
364              },
365              "contexts": {
366                "trace": {
367                    "sampled": true
368                }
369              }
370            }"#,
371        );
372        envelope.add_item(item);
373        let sampling_project_state = project_state_with_single_rule(0.0);
374        let new_envelope =
375            process_envelope_with_root_project_state(envelope, Some(&sampling_project_state)).await;
376        let event = extract_first_event_from_envelope(new_envelope);
377        let trace_context = event.context::<TraceContext>().unwrap();
378        assert!(trace_context.sampled.value().unwrap());
379    }
380}