relay_server/services/processor/
dynamic_sampling.rs1use relay_config::Config;
4use relay_dynamic_config::ErrorBoundary;
5use relay_event_schema::protocol::{Contexts, Event, TraceContext};
6use relay_protocol::{Annotated, Empty};
7
8use crate::managed::TypedEnvelope;
9use crate::services::processor::EventProcessing;
10use crate::services::projects::project::ProjectInfo;
11use crate::utils::{self};
12
13pub fn validate_and_set_dsc<'a, T>(
38 managed_envelope: &mut TypedEnvelope<T>,
39 event: &mut Annotated<Event>,
40 project_info: &'a ProjectInfo,
41 sampling_project_info: Option<&'a ProjectInfo>,
42) -> Option<&'a ProjectInfo> {
43 let original_dsc = managed_envelope.envelope().dsc();
44 if original_dsc.is_some() && sampling_project_info.is_some() {
45 return sampling_project_info;
46 }
47
48 if let Some(event) = event.value()
51 && let Some(key_config) = project_info.get_public_key_config()
52 && let Some(mut dsc) =
53 crate::processing::utils::dsc::dsc_from_event(key_config.public_key, event)
54 {
55 let original_sample_rate = original_dsc.and_then(|dsc| dsc.sample_rate);
58 dsc.sample_rate = dsc.sample_rate.or(original_sample_rate);
59
60 managed_envelope.envelope_mut().set_dsc(dsc);
61 return Some(project_info);
62 }
63
64 managed_envelope.envelope_mut().remove_dsc();
66 None
67}
68
69pub async fn tag_error_with_sampling_decision<Group: EventProcessing>(
75 managed_envelope: &mut TypedEnvelope<Group>,
76 event: &mut Annotated<Event>,
77 sampling_project_info: Option<&ProjectInfo>,
78 config: &Config,
79) {
80 let (Some(dsc), Some(event)) = (managed_envelope.envelope().dsc(), event.value_mut()) else {
81 return;
82 };
83
84 let root_state = sampling_project_info.as_ref();
85 let sampling_config = match root_state.and_then(|s| s.config.sampling.as_ref()) {
86 Some(ErrorBoundary::Ok(config)) => config,
87 _ => return,
88 };
89
90 if sampling_config.unsupported() {
91 if config.processing_enabled() {
92 relay_log::error!("found unsupported rules even as processing relay");
93 }
94
95 return;
96 }
97
98 let Some(sampled) = utils::is_trace_fully_sampled(sampling_config, dsc).await else {
99 return;
100 };
101
102 let context = event
104 .contexts
105 .get_or_insert_with(Contexts::new)
106 .get_or_default::<TraceContext>();
107
108 if context.sampled.is_empty() {
112 relay_log::trace!("tagged error with `sampled = {}` flag", sampled);
113 context.sampled = Annotated::new(sampled);
114 }
115}
116
117#[cfg(test)]
118mod tests {
119 use std::collections::BTreeMap;
120
121 use relay_base_schema::project::ProjectKey;
122 use relay_event_schema::protocol::EventId;
123 use relay_protocol::RuleCondition;
124 use relay_sampling::config::{RuleId, RuleType, SamplingRule, SamplingValue};
125 use relay_sampling::{DynamicSamplingContext, SamplingConfig};
126 use relay_system::Addr;
127
128 use crate::envelope::{ContentType, Envelope, Item, ItemType};
129 use crate::extractors::RequestMeta;
130 use crate::managed::ManagedEnvelope;
131 use crate::processing;
132 use crate::services::processor::{ProcessEnvelopeGrouped, ProcessingGroup, Submit};
133 use crate::services::projects::project::ProjectInfo;
134 use crate::testutils::create_test_processor;
135
136 use super::*;
137
138 async fn process_envelope_with_root_project_state(
140 envelope: Box<Envelope>,
141 sampling_project_info: Option<&ProjectInfo>,
142 ) -> Envelope {
143 let processor = create_test_processor(Default::default()).await;
144 let outcome_aggregator = Addr::dummy();
145
146 let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
147 assert_eq!(envelopes.len(), 1);
148 let (group, envelope) = envelopes.pop().unwrap();
149
150 let message = ProcessEnvelopeGrouped {
151 group,
152 envelope: ManagedEnvelope::new(envelope, outcome_aggregator),
153 ctx: processing::Context {
154 sampling_project_info,
155 ..processing::Context::for_test()
156 },
157 };
158
159 let Ok(Some(Submit::Envelope(envelope))) = processor.process(message).await else {
160 panic!();
161 };
162
163 envelope.envelope().clone()
164 }
165
166 fn extract_first_event_from_envelope(envelope: Envelope) -> Event {
167 let item = envelope.items().next().unwrap();
168 let annotated_event: Annotated<Event> =
169 Annotated::from_json_bytes(&item.payload()).unwrap();
170 annotated_event.into_value().unwrap()
171 }
172
173 fn mocked_error_item() -> Item {
174 let mut item = Item::new(ItemType::Event);
175 item.set_payload(
176 ContentType::Json,
177 r#"{
178 "event_id": "52df9022835246eeb317dbd739ccd059",
179 "exception": {
180 "values": [
181 {
182 "type": "mytype",
183 "value": "myvalue",
184 "module": "mymodule",
185 "thread_id": 42,
186 "other": "value"
187 }
188 ]
189 }
190 }"#,
191 );
192 item
193 }
194
195 #[tokio::test]
196 async fn test_error_is_tagged_correctly_if_trace_sampling_result_is_none() {
197 let event_id = EventId::new();
198 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
199 .parse()
200 .unwrap();
201 let request_meta = RequestMeta::new(dsn);
202
203 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
205 envelope.add_item(mocked_error_item());
206 let new_envelope = process_envelope_with_root_project_state(envelope, None).await;
207 let event = extract_first_event_from_envelope(new_envelope);
208
209 assert!(event.contexts.value().is_none());
210 }
211
212 fn project_state_with_single_rule(sample_rate: f64) -> ProjectInfo {
213 let sampling_config = SamplingConfig {
214 rules: vec![SamplingRule {
215 condition: RuleCondition::all(),
216 sampling_value: SamplingValue::SampleRate { value: sample_rate },
217 ty: RuleType::Trace,
218 id: RuleId(1),
219 time_range: Default::default(),
220 decaying_fn: Default::default(),
221 }],
222 ..SamplingConfig::new()
223 };
224
225 let mut sampling_project_state = ProjectInfo::default();
226 sampling_project_state.config.sampling = Some(ErrorBoundary::Ok(sampling_config));
227 sampling_project_state
228 }
229
230 fn mock_dsc() -> DynamicSamplingContext {
231 DynamicSamplingContext {
232 trace_id: "67e5504410b1426f9247bb680e5fe0c8".parse().unwrap(),
233 public_key: ProjectKey::parse("abd0f232775f45feab79864e580d160b").unwrap(),
234 release: Some("1.1.1".to_owned()),
235 user: Default::default(),
236 replay_id: None,
237 environment: None,
238 transaction: Some("transaction1".into()),
239 sample_rate: Some(0.5),
240 sampled: Some(true),
241 other: BTreeMap::new(),
242 }
243 }
244
245 #[tokio::test]
246 async fn test_error_is_tagged_correctly_if_trace_sampling_result_is_some() {
247 let event_id = EventId::new();
248 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
249 .parse()
250 .unwrap();
251 let request_meta = RequestMeta::new(dsn);
252 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
253 envelope.set_dsc(mock_dsc());
254 envelope.add_item(mocked_error_item());
255
256 let sampling_project_state = project_state_with_single_rule(1.0);
258 let new_envelope = process_envelope_with_root_project_state(
259 envelope.clone(),
260 Some(&sampling_project_state),
261 )
262 .await;
263 let event = extract_first_event_from_envelope(new_envelope);
264 let trace_context = event.context::<TraceContext>().unwrap();
265 assert!(trace_context.sampled.value().unwrap());
266
267 let sampling_project_state = project_state_with_single_rule(0.0);
269 let new_envelope =
270 process_envelope_with_root_project_state(envelope, Some(&sampling_project_state)).await;
271 let event = extract_first_event_from_envelope(new_envelope);
272 let trace_context = event.context::<TraceContext>().unwrap();
273 assert!(!trace_context.sampled.value().unwrap());
274 }
275
276 #[tokio::test]
277 async fn test_error_is_not_tagged_if_already_tagged() {
278 let event_id = EventId::new();
279 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
280 .parse()
281 .unwrap();
282 let request_meta = RequestMeta::new(dsn);
283
284 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
286 let mut item = Item::new(ItemType::Event);
287 item.set_payload(
288 ContentType::Json,
289 r#"{
290 "event_id": "52df9022835246eeb317dbd739ccd059",
291 "exception": {
292 "values": [
293 {
294 "type": "mytype",
295 "value": "myvalue",
296 "module": "mymodule",
297 "thread_id": 42,
298 "other": "value"
299 }
300 ]
301 },
302 "contexts": {
303 "trace": {
304 "sampled": true
305 }
306 }
307 }"#,
308 );
309 envelope.add_item(item);
310 let sampling_project_state = project_state_with_single_rule(0.0);
311 let new_envelope =
312 process_envelope_with_root_project_state(envelope, Some(&sampling_project_state)).await;
313 let event = extract_first_event_from_envelope(new_envelope);
314 let trace_context = event.context::<TraceContext>().unwrap();
315 assert!(trace_context.sampled.value().unwrap());
316 }
317}