relay_server/services/processor/
dynamic_sampling.rs

1//! Dynamic sampling processor related code.
2
3use relay_config::Config;
4use relay_dynamic_config::ErrorBoundary;
5use relay_event_schema::protocol::{Contexts, Event, TraceContext};
6use relay_protocol::{Annotated, Empty};
7
8use crate::managed::TypedEnvelope;
9use crate::services::processor::EventProcessing;
10use crate::services::projects::project::ProjectInfo;
11use crate::utils::{self};
12
13/// Ensures there is a valid dynamic sampling context and corresponding project state.
14///
15/// The dynamic sampling context (DSC) specifies the project_key of the project that initiated
16/// the trace. That project state should have been loaded previously by the project cache and is
17/// available on the `ProcessEnvelopeState`. Under these conditions, this cannot happen:
18///
19///  - There is no DSC in the envelope headers. This occurs with older or third-party SDKs.
20///  - The project key does not exist. This can happen if the project key was disabled, the
21///    project removed, or in rare cases when a project from another Sentry instance is referred
22///    to.
23///  - The project key refers to a project from another organization. In this case the project
24///    cache does not resolve the state and instead leaves it blank.
25///  - The project state could not be fetched. This is a runtime error, but in this case Relay
26///    should fall back to the next-best sampling rule set.
27///
28/// In all of the above cases, this function will compute a new DSC using information from the
29/// event payload, similar to how SDKs do this. The `sampling_project_state` is also switched to
30/// the main project state.
31///
32/// If there is no transaction event in the envelope, this function will do nothing.
33///
34/// The function will return the sampling project information of the root project for the event. If
35/// no sampling project information is specified, the project information of the event’s project
36/// will be returned.
37pub fn validate_and_set_dsc<'a, T>(
38    managed_envelope: &mut TypedEnvelope<T>,
39    event: &mut Annotated<Event>,
40    project_info: &'a ProjectInfo,
41    sampling_project_info: Option<&'a ProjectInfo>,
42) -> Option<&'a ProjectInfo> {
43    let original_dsc = managed_envelope.envelope().dsc();
44    if original_dsc.is_some() && sampling_project_info.is_some() {
45        return sampling_project_info;
46    }
47
48    // The DSC can only be computed if there's a transaction event. Note that `dsc_from_event`
49    // below already checks for the event type.
50    if let Some(event) = event.value()
51        && let Some(key_config) = project_info.get_public_key_config()
52        && let Some(mut dsc) =
53            crate::processing::utils::dsc::dsc_from_event(key_config.public_key, event)
54    {
55        // All other information in the DSC must be discarded, but the sample rate was
56        // actually applied by the client and is therefore correct.
57        let original_sample_rate = original_dsc.and_then(|dsc| dsc.sample_rate);
58        dsc.sample_rate = dsc.sample_rate.or(original_sample_rate);
59
60        managed_envelope.envelope_mut().set_dsc(dsc);
61        return Some(project_info);
62    }
63
64    // If we cannot compute a new DSC but the old one is incorrect, we need to remove it.
65    managed_envelope.envelope_mut().remove_dsc();
66    None
67}
68
69/// Runs dynamic sampling on an incoming error and tags it in case of successful sampling
70/// decision.
71///
72/// This execution of dynamic sampling is technically a "simulation" since we will use the result
73/// only for tagging errors and not for actually sampling incoming events.
74pub async fn tag_error_with_sampling_decision<Group: EventProcessing>(
75    managed_envelope: &mut TypedEnvelope<Group>,
76    event: &mut Annotated<Event>,
77    sampling_project_info: Option<&ProjectInfo>,
78    config: &Config,
79) {
80    let (Some(dsc), Some(event)) = (managed_envelope.envelope().dsc(), event.value_mut()) else {
81        return;
82    };
83
84    let root_state = sampling_project_info.as_ref();
85    let sampling_config = match root_state.and_then(|s| s.config.sampling.as_ref()) {
86        Some(ErrorBoundary::Ok(config)) => config,
87        _ => return,
88    };
89
90    if sampling_config.unsupported() {
91        if config.processing_enabled() {
92            relay_log::error!("found unsupported rules even as processing relay");
93        }
94
95        return;
96    }
97
98    let Some(sampled) = utils::is_trace_fully_sampled(sampling_config, dsc).await else {
99        return;
100    };
101
102    // We want to get the trace context, in which we will inject the `sampled` field.
103    let context = event
104        .contexts
105        .get_or_insert_with(Contexts::new)
106        .get_or_default::<TraceContext>();
107
108    // We want to update `sampled` only if it was not set, since if we don't check this
109    // we will end up overriding the value set by downstream Relays and this will lead
110    // to more complex debugging in case of problems.
111    if context.sampled.is_empty() {
112        relay_log::trace!("tagged error with `sampled = {}` flag", sampled);
113        context.sampled = Annotated::new(sampled);
114    }
115}
116
117#[cfg(test)]
118mod tests {
119    use std::collections::BTreeMap;
120
121    use relay_base_schema::project::ProjectKey;
122    use relay_event_schema::protocol::EventId;
123    use relay_protocol::RuleCondition;
124    use relay_sampling::config::{RuleId, RuleType, SamplingRule, SamplingValue};
125    use relay_sampling::{DynamicSamplingContext, SamplingConfig};
126    use relay_system::Addr;
127
128    use crate::envelope::{ContentType, Envelope, Item, ItemType};
129    use crate::extractors::RequestMeta;
130    use crate::managed::ManagedEnvelope;
131    use crate::processing;
132    use crate::services::processor::{ProcessEnvelopeGrouped, ProcessingGroup, Submit};
133    use crate::services::projects::project::ProjectInfo;
134    use crate::testutils::create_test_processor;
135
136    use super::*;
137
138    /// Always sets the processing item type to event.
139    async fn process_envelope_with_root_project_state(
140        envelope: Box<Envelope>,
141        sampling_project_info: Option<&ProjectInfo>,
142    ) -> Envelope {
143        let processor = create_test_processor(Default::default()).await;
144        let outcome_aggregator = Addr::dummy();
145
146        let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
147        assert_eq!(envelopes.len(), 1);
148        let (group, envelope) = envelopes.pop().unwrap();
149
150        let message = ProcessEnvelopeGrouped {
151            group,
152            envelope: ManagedEnvelope::new(envelope, outcome_aggregator),
153            ctx: processing::Context {
154                sampling_project_info,
155                ..processing::Context::for_test()
156            },
157        };
158
159        let Ok(Some(Submit::Envelope(envelope))) = processor.process(message).await else {
160            panic!();
161        };
162
163        envelope.envelope().clone()
164    }
165
166    fn extract_first_event_from_envelope(envelope: Envelope) -> Event {
167        let item = envelope.items().next().unwrap();
168        let annotated_event: Annotated<Event> =
169            Annotated::from_json_bytes(&item.payload()).unwrap();
170        annotated_event.into_value().unwrap()
171    }
172
173    fn mocked_error_item() -> Item {
174        let mut item = Item::new(ItemType::Event);
175        item.set_payload(
176            ContentType::Json,
177            r#"{
178              "event_id": "52df9022835246eeb317dbd739ccd059",
179              "exception": {
180                "values": [
181                    {
182                      "type": "mytype",
183                      "value": "myvalue",
184                      "module": "mymodule",
185                      "thread_id": 42,
186                      "other": "value"
187                    }
188                ]
189              }
190            }"#,
191        );
192        item
193    }
194
195    #[tokio::test]
196    async fn test_error_is_tagged_correctly_if_trace_sampling_result_is_none() {
197        let event_id = EventId::new();
198        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
199            .parse()
200            .unwrap();
201        let request_meta = RequestMeta::new(dsn);
202
203        // We test tagging when root project state and dsc are none.
204        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
205        envelope.add_item(mocked_error_item());
206        let new_envelope = process_envelope_with_root_project_state(envelope, None).await;
207        let event = extract_first_event_from_envelope(new_envelope);
208
209        assert!(event.contexts.value().is_none());
210    }
211
212    fn project_state_with_single_rule(sample_rate: f64) -> ProjectInfo {
213        let sampling_config = SamplingConfig {
214            rules: vec![SamplingRule {
215                condition: RuleCondition::all(),
216                sampling_value: SamplingValue::SampleRate { value: sample_rate },
217                ty: RuleType::Trace,
218                id: RuleId(1),
219                time_range: Default::default(),
220                decaying_fn: Default::default(),
221            }],
222            ..SamplingConfig::new()
223        };
224
225        let mut sampling_project_state = ProjectInfo::default();
226        sampling_project_state.config.sampling = Some(ErrorBoundary::Ok(sampling_config));
227        sampling_project_state
228    }
229
230    fn mock_dsc() -> DynamicSamplingContext {
231        DynamicSamplingContext {
232            trace_id: "67e5504410b1426f9247bb680e5fe0c8".parse().unwrap(),
233            public_key: ProjectKey::parse("abd0f232775f45feab79864e580d160b").unwrap(),
234            release: Some("1.1.1".to_owned()),
235            user: Default::default(),
236            replay_id: None,
237            environment: None,
238            transaction: Some("transaction1".into()),
239            sample_rate: Some(0.5),
240            sampled: Some(true),
241            other: BTreeMap::new(),
242        }
243    }
244
245    #[tokio::test]
246    async fn test_error_is_tagged_correctly_if_trace_sampling_result_is_some() {
247        let event_id = EventId::new();
248        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
249            .parse()
250            .unwrap();
251        let request_meta = RequestMeta::new(dsn);
252        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
253        envelope.set_dsc(mock_dsc());
254        envelope.add_item(mocked_error_item());
255
256        // We test with sample rate equal to 100%.
257        let sampling_project_state = project_state_with_single_rule(1.0);
258        let new_envelope = process_envelope_with_root_project_state(
259            envelope.clone(),
260            Some(&sampling_project_state),
261        )
262        .await;
263        let event = extract_first_event_from_envelope(new_envelope);
264        let trace_context = event.context::<TraceContext>().unwrap();
265        assert!(trace_context.sampled.value().unwrap());
266
267        // We test with sample rate equal to 0%.
268        let sampling_project_state = project_state_with_single_rule(0.0);
269        let new_envelope =
270            process_envelope_with_root_project_state(envelope, Some(&sampling_project_state)).await;
271        let event = extract_first_event_from_envelope(new_envelope);
272        let trace_context = event.context::<TraceContext>().unwrap();
273        assert!(!trace_context.sampled.value().unwrap());
274    }
275
276    #[tokio::test]
277    async fn test_error_is_not_tagged_if_already_tagged() {
278        let event_id = EventId::new();
279        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
280            .parse()
281            .unwrap();
282        let request_meta = RequestMeta::new(dsn);
283
284        // We test tagging with an incoming event that has already been tagged by downstream Relay.
285        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
286        let mut item = Item::new(ItemType::Event);
287        item.set_payload(
288            ContentType::Json,
289            r#"{
290              "event_id": "52df9022835246eeb317dbd739ccd059",
291              "exception": {
292                "values": [
293                    {
294                      "type": "mytype",
295                      "value": "myvalue",
296                      "module": "mymodule",
297                      "thread_id": 42,
298                      "other": "value"
299                    }
300                ]
301              },
302              "contexts": {
303                "trace": {
304                    "sampled": true
305                }
306              }
307            }"#,
308        );
309        envelope.add_item(item);
310        let sampling_project_state = project_state_with_single_rule(0.0);
311        let new_envelope =
312            process_envelope_with_root_project_state(envelope, Some(&sampling_project_state)).await;
313        let event = extract_first_event_from_envelope(new_envelope);
314        let trace_context = event.context::<TraceContext>().unwrap();
315        assert!(trace_context.sampled.value().unwrap());
316    }
317}