relay_server/services/processor/
dynamic_sampling.rs

1//! Dynamic sampling processor related code.
2
3use relay_config::Config;
4use relay_dynamic_config::ErrorBoundary;
5use relay_event_schema::protocol::{Contexts, Event, TraceContext};
6use relay_protocol::{Annotated, Empty};
7use relay_quotas::DataCategory;
8
9use crate::envelope::ItemType;
10use crate::managed::TypedEnvelope;
11use crate::services::outcome::Outcome;
12use crate::services::processor::{
13    EventProcessing, SpansExtracted, TransactionGroup, event_category,
14};
15use crate::services::projects::project::ProjectInfo;
16use crate::utils::{self};
17
18/// Ensures there is a valid dynamic sampling context and corresponding project state.
19///
20/// The dynamic sampling context (DSC) specifies the project_key of the project that initiated
21/// the trace. That project state should have been loaded previously by the project cache and is
22/// available on the `ProcessEnvelopeState`. Under these conditions, this cannot happen:
23///
24///  - There is no DSC in the envelope headers. This occurs with older or third-party SDKs.
25///  - The project key does not exist. This can happen if the project key was disabled, the
26///    project removed, or in rare cases when a project from another Sentry instance is referred
27///    to.
28///  - The project key refers to a project from another organization. In this case the project
29///    cache does not resolve the state and instead leaves it blank.
30///  - The project state could not be fetched. This is a runtime error, but in this case Relay
31///    should fall back to the next-best sampling rule set.
32///
33/// In all of the above cases, this function will compute a new DSC using information from the
34/// event payload, similar to how SDKs do this. The `sampling_project_state` is also switched to
35/// the main project state.
36///
37/// If there is no transaction event in the envelope, this function will do nothing.
38///
39/// The function will return the sampling project information of the root project for the event. If
40/// no sampling project information is specified, the project information of the event’s project
41/// will be returned.
42pub fn validate_and_set_dsc<'a, T>(
43    managed_envelope: &mut TypedEnvelope<T>,
44    event: &mut Annotated<Event>,
45    project_info: &'a ProjectInfo,
46    sampling_project_info: Option<&'a ProjectInfo>,
47) -> Option<&'a ProjectInfo> {
48    let original_dsc = managed_envelope.envelope().dsc();
49    if original_dsc.is_some() && sampling_project_info.is_some() {
50        return sampling_project_info;
51    }
52
53    // The DSC can only be computed if there's a transaction event. Note that `dsc_from_event`
54    // below already checks for the event type.
55    if let Some(event) = event.value()
56        && let Some(key_config) = project_info.get_public_key_config()
57        && let Some(mut dsc) =
58            crate::processing::utils::dsc::dsc_from_event(key_config.public_key, event)
59    {
60        // All other information in the DSC must be discarded, but the sample rate was
61        // actually applied by the client and is therefore correct.
62        let original_sample_rate = original_dsc.and_then(|dsc| dsc.sample_rate);
63        dsc.sample_rate = dsc.sample_rate.or(original_sample_rate);
64
65        managed_envelope.envelope_mut().set_dsc(dsc);
66        return Some(project_info);
67    }
68
69    // If we cannot compute a new DSC but the old one is incorrect, we need to remove it.
70    managed_envelope.envelope_mut().remove_dsc();
71    None
72}
73
74/// Apply the dynamic sampling decision from `compute_sampling_decision`.
75pub fn drop_unsampled_items(
76    managed_envelope: &mut TypedEnvelope<TransactionGroup>,
77    event: Annotated<Event>,
78    outcome: Outcome,
79    spans_extracted: SpansExtracted,
80) {
81    // Remove all items from the envelope which need to be dropped due to dynamic sampling.
82    let dropped_items = managed_envelope
83        .envelope_mut()
84        // Profiles are not dropped by dynamic sampling, they are all forwarded to storage and
85        // later processed in Sentry and potentially dropped there.
86        .take_items_by(|item| *item.ty() != ItemType::Profile);
87
88    for item in dropped_items {
89        for (category, quantity) in item.quantities() {
90            // Dynamic sampling only drops indexed items.
91            //
92            // Only emit the base category, if the item does not have an indexed category.
93            if category.index_category().is_none() {
94                managed_envelope.track_outcome(outcome.clone(), category, quantity);
95            }
96        }
97    }
98
99    // Mark all remaining items in the envelope as un-sampled.
100    for item in managed_envelope.envelope_mut().items_mut() {
101        item.set_sampled(false);
102    }
103
104    // Another 'hack' to emit outcomes from the container item for the contained items (spans).
105    //
106    // The entire tracking outcomes for contained elements is not handled in a systematic way
107    // and whenever an event/transaction is discarded, contained elements are tracked in a 'best
108    // effort' basis (basically in all the cases where someone figured out this is a problem).
109    //
110    // This is yet another case, when the spans have not yet been separated from the transaction
111    // also emit dynamic sampling outcomes for the contained spans.
112    if !spans_extracted.0 {
113        let spans = event.value().and_then(|e| e.spans.value());
114        let span_count = spans.map_or(0, |s| s.len());
115
116        // Track the amount of contained spans + 1 segment span (the transaction itself which would
117        // be converted to a span).
118        managed_envelope.track_outcome(outcome.clone(), DataCategory::SpanIndexed, span_count + 1);
119    }
120
121    // All items have been dropped, now make sure the event is also handled and dropped.
122    if let Some(category) = event_category(&event) {
123        let category = category.index_category().unwrap_or(category);
124        managed_envelope.track_outcome(outcome, category, 1)
125    }
126}
127
128/// Runs dynamic sampling on an incoming error and tags it in case of successful sampling
129/// decision.
130///
131/// This execution of dynamic sampling is technically a "simulation" since we will use the result
132/// only for tagging errors and not for actually sampling incoming events.
133pub async fn tag_error_with_sampling_decision<Group: EventProcessing>(
134    managed_envelope: &mut TypedEnvelope<Group>,
135    event: &mut Annotated<Event>,
136    sampling_project_info: Option<&ProjectInfo>,
137    config: &Config,
138) {
139    let (Some(dsc), Some(event)) = (managed_envelope.envelope().dsc(), event.value_mut()) else {
140        return;
141    };
142
143    let root_state = sampling_project_info.as_ref();
144    let sampling_config = match root_state.and_then(|s| s.config.sampling.as_ref()) {
145        Some(ErrorBoundary::Ok(config)) => config,
146        _ => return,
147    };
148
149    if sampling_config.unsupported() {
150        if config.processing_enabled() {
151            relay_log::error!("found unsupported rules even as processing relay");
152        }
153
154        return;
155    }
156
157    let Some(sampled) = utils::is_trace_fully_sampled(sampling_config, dsc).await else {
158        return;
159    };
160
161    // We want to get the trace context, in which we will inject the `sampled` field.
162    let context = event
163        .contexts
164        .get_or_insert_with(Contexts::new)
165        .get_or_default::<TraceContext>();
166
167    // We want to update `sampled` only if it was not set, since if we don't check this
168    // we will end up overriding the value set by downstream Relays and this will lead
169    // to more complex debugging in case of problems.
170    if context.sampled.is_empty() {
171        relay_log::trace!("tagged error with `sampled = {}` flag", sampled);
172        context.sampled = Annotated::new(sampled);
173    }
174}
175
176#[cfg(test)]
177mod tests {
178    use std::collections::BTreeMap;
179
180    use relay_base_schema::project::ProjectKey;
181    use relay_cogs::Token;
182    use relay_event_schema::protocol::EventId;
183    use relay_protocol::RuleCondition;
184    use relay_sampling::config::{RuleId, RuleType, SamplingRule, SamplingValue};
185    use relay_sampling::{DynamicSamplingContext, SamplingConfig};
186    use relay_system::Addr;
187
188    use crate::envelope::{ContentType, Envelope, Item};
189    use crate::extractors::RequestMeta;
190    use crate::managed::ManagedEnvelope;
191    use crate::processing;
192    use crate::services::processor::{ProcessEnvelopeGrouped, ProcessingGroup, Submit};
193    use crate::services::projects::project::ProjectInfo;
194    use crate::testutils::create_test_processor;
195
196    use super::*;
197
198    /// Always sets the processing item type to event.
199    async fn process_envelope_with_root_project_state(
200        envelope: Box<Envelope>,
201        sampling_project_info: Option<&ProjectInfo>,
202    ) -> Envelope {
203        let processor = create_test_processor(Default::default()).await;
204        let outcome_aggregator = Addr::dummy();
205
206        let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
207        assert_eq!(envelopes.len(), 1);
208        let (group, envelope) = envelopes.pop().unwrap();
209
210        let message = ProcessEnvelopeGrouped {
211            group,
212            envelope: ManagedEnvelope::new(envelope, outcome_aggregator),
213            ctx: processing::Context {
214                sampling_project_info,
215                ..processing::Context::for_test()
216            },
217        };
218
219        let Ok(Some(Submit::Envelope(envelope))) =
220            processor.process(&mut Token::noop(), message).await
221        else {
222            panic!();
223        };
224
225        envelope.envelope().clone()
226    }
227
228    fn extract_first_event_from_envelope(envelope: Envelope) -> Event {
229        let item = envelope.items().next().unwrap();
230        let annotated_event: Annotated<Event> =
231            Annotated::from_json_bytes(&item.payload()).unwrap();
232        annotated_event.into_value().unwrap()
233    }
234
235    fn mocked_error_item() -> Item {
236        let mut item = Item::new(ItemType::Event);
237        item.set_payload(
238            ContentType::Json,
239            r#"{
240              "event_id": "52df9022835246eeb317dbd739ccd059",
241              "exception": {
242                "values": [
243                    {
244                      "type": "mytype",
245                      "value": "myvalue",
246                      "module": "mymodule",
247                      "thread_id": 42,
248                      "other": "value"
249                    }
250                ]
251              }
252            }"#,
253        );
254        item
255    }
256
257    #[tokio::test]
258    async fn test_error_is_tagged_correctly_if_trace_sampling_result_is_none() {
259        let event_id = EventId::new();
260        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
261            .parse()
262            .unwrap();
263        let request_meta = RequestMeta::new(dsn);
264
265        // We test tagging when root project state and dsc are none.
266        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
267        envelope.add_item(mocked_error_item());
268        let new_envelope = process_envelope_with_root_project_state(envelope, None).await;
269        let event = extract_first_event_from_envelope(new_envelope);
270
271        assert!(event.contexts.value().is_none());
272    }
273
274    fn project_state_with_single_rule(sample_rate: f64) -> ProjectInfo {
275        let sampling_config = SamplingConfig {
276            rules: vec![SamplingRule {
277                condition: RuleCondition::all(),
278                sampling_value: SamplingValue::SampleRate { value: sample_rate },
279                ty: RuleType::Trace,
280                id: RuleId(1),
281                time_range: Default::default(),
282                decaying_fn: Default::default(),
283            }],
284            ..SamplingConfig::new()
285        };
286
287        let mut sampling_project_state = ProjectInfo::default();
288        sampling_project_state.config.sampling = Some(ErrorBoundary::Ok(sampling_config));
289        sampling_project_state
290    }
291
292    fn mock_dsc() -> DynamicSamplingContext {
293        DynamicSamplingContext {
294            trace_id: "67e5504410b1426f9247bb680e5fe0c8".parse().unwrap(),
295            public_key: ProjectKey::parse("abd0f232775f45feab79864e580d160b").unwrap(),
296            release: Some("1.1.1".to_owned()),
297            user: Default::default(),
298            replay_id: None,
299            environment: None,
300            transaction: Some("transaction1".into()),
301            sample_rate: Some(0.5),
302            sampled: Some(true),
303            other: BTreeMap::new(),
304        }
305    }
306
307    #[tokio::test]
308    async fn test_error_is_tagged_correctly_if_trace_sampling_result_is_some() {
309        let event_id = EventId::new();
310        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
311            .parse()
312            .unwrap();
313        let request_meta = RequestMeta::new(dsn);
314        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
315        envelope.set_dsc(mock_dsc());
316        envelope.add_item(mocked_error_item());
317
318        // We test with sample rate equal to 100%.
319        let sampling_project_state = project_state_with_single_rule(1.0);
320        let new_envelope = process_envelope_with_root_project_state(
321            envelope.clone(),
322            Some(&sampling_project_state),
323        )
324        .await;
325        let event = extract_first_event_from_envelope(new_envelope);
326        let trace_context = event.context::<TraceContext>().unwrap();
327        assert!(trace_context.sampled.value().unwrap());
328
329        // We test with sample rate equal to 0%.
330        let sampling_project_state = project_state_with_single_rule(0.0);
331        let new_envelope =
332            process_envelope_with_root_project_state(envelope, Some(&sampling_project_state)).await;
333        let event = extract_first_event_from_envelope(new_envelope);
334        let trace_context = event.context::<TraceContext>().unwrap();
335        assert!(!trace_context.sampled.value().unwrap());
336    }
337
338    #[tokio::test]
339    async fn test_error_is_not_tagged_if_already_tagged() {
340        let event_id = EventId::new();
341        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
342            .parse()
343            .unwrap();
344        let request_meta = RequestMeta::new(dsn);
345
346        // We test tagging with an incoming event that has already been tagged by downstream Relay.
347        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
348        let mut item = Item::new(ItemType::Event);
349        item.set_payload(
350            ContentType::Json,
351            r#"{
352              "event_id": "52df9022835246eeb317dbd739ccd059",
353              "exception": {
354                "values": [
355                    {
356                      "type": "mytype",
357                      "value": "myvalue",
358                      "module": "mymodule",
359                      "thread_id": 42,
360                      "other": "value"
361                    }
362                ]
363              },
364              "contexts": {
365                "trace": {
366                    "sampled": true
367                }
368              }
369            }"#,
370        );
371        envelope.add_item(item);
372        let sampling_project_state = project_state_with_single_rule(0.0);
373        let new_envelope =
374            process_envelope_with_root_project_state(envelope, Some(&sampling_project_state)).await;
375        let event = extract_first_event_from_envelope(new_envelope);
376        let trace_context = event.context::<TraceContext>().unwrap();
377        assert!(trace_context.sampled.value().unwrap());
378    }
379}