relay_server/services/processor/
dynamic_sampling.rs1use relay_config::Config;
4use relay_dynamic_config::ErrorBoundary;
5use relay_event_schema::protocol::{Contexts, Event, TraceContext};
6use relay_protocol::{Annotated, Empty};
7use relay_quotas::DataCategory;
8
9use crate::envelope::ItemType;
10use crate::managed::TypedEnvelope;
11use crate::services::outcome::Outcome;
12use crate::services::processor::{
13 EventProcessing, SpansExtracted, TransactionGroup, event_category,
14};
15use crate::services::projects::project::ProjectInfo;
16use crate::utils::{self};
17
18pub fn validate_and_set_dsc<'a, T>(
43 managed_envelope: &mut TypedEnvelope<T>,
44 event: &mut Annotated<Event>,
45 project_info: &'a ProjectInfo,
46 sampling_project_info: Option<&'a ProjectInfo>,
47) -> Option<&'a ProjectInfo> {
48 let original_dsc = managed_envelope.envelope().dsc();
49 if original_dsc.is_some() && sampling_project_info.is_some() {
50 return sampling_project_info;
51 }
52
53 if let Some(event) = event.value()
56 && let Some(key_config) = project_info.get_public_key_config()
57 && let Some(mut dsc) =
58 crate::processing::utils::dsc::dsc_from_event(key_config.public_key, event)
59 {
60 let original_sample_rate = original_dsc.and_then(|dsc| dsc.sample_rate);
63 dsc.sample_rate = dsc.sample_rate.or(original_sample_rate);
64
65 managed_envelope.envelope_mut().set_dsc(dsc);
66 return Some(project_info);
67 }
68
69 managed_envelope.envelope_mut().remove_dsc();
71 None
72}
73
74pub fn drop_unsampled_items(
76 managed_envelope: &mut TypedEnvelope<TransactionGroup>,
77 event: Annotated<Event>,
78 outcome: Outcome,
79 spans_extracted: SpansExtracted,
80) {
81 let dropped_items = managed_envelope
83 .envelope_mut()
84 .take_items_by(|item| *item.ty() != ItemType::Profile);
87
88 for item in dropped_items {
89 for (category, quantity) in item.quantities() {
90 if category.index_category().is_none() {
94 managed_envelope.track_outcome(outcome.clone(), category, quantity);
95 }
96 }
97 }
98
99 for item in managed_envelope.envelope_mut().items_mut() {
101 item.set_sampled(false);
102 }
103
104 if !spans_extracted.0 {
113 let spans = event.value().and_then(|e| e.spans.value());
114 let span_count = spans.map_or(0, |s| s.len());
115
116 managed_envelope.track_outcome(outcome.clone(), DataCategory::SpanIndexed, span_count + 1);
119 }
120
121 if let Some(category) = event_category(&event) {
123 let category = category.index_category().unwrap_or(category);
124 managed_envelope.track_outcome(outcome, category, 1)
125 }
126}
127
128pub async fn tag_error_with_sampling_decision<Group: EventProcessing>(
134 managed_envelope: &mut TypedEnvelope<Group>,
135 event: &mut Annotated<Event>,
136 sampling_project_info: Option<&ProjectInfo>,
137 config: &Config,
138) {
139 let (Some(dsc), Some(event)) = (managed_envelope.envelope().dsc(), event.value_mut()) else {
140 return;
141 };
142
143 let root_state = sampling_project_info.as_ref();
144 let sampling_config = match root_state.and_then(|s| s.config.sampling.as_ref()) {
145 Some(ErrorBoundary::Ok(config)) => config,
146 _ => return,
147 };
148
149 if sampling_config.unsupported() {
150 if config.processing_enabled() {
151 relay_log::error!("found unsupported rules even as processing relay");
152 }
153
154 return;
155 }
156
157 let Some(sampled) = utils::is_trace_fully_sampled(sampling_config, dsc).await else {
158 return;
159 };
160
161 let context = event
163 .contexts
164 .get_or_insert_with(Contexts::new)
165 .get_or_default::<TraceContext>();
166
167 if context.sampled.is_empty() {
171 relay_log::trace!("tagged error with `sampled = {}` flag", sampled);
172 context.sampled = Annotated::new(sampled);
173 }
174}
175
176#[cfg(test)]
177mod tests {
178 use std::collections::BTreeMap;
179
180 use relay_base_schema::project::ProjectKey;
181 use relay_cogs::Token;
182 use relay_event_schema::protocol::EventId;
183 use relay_protocol::RuleCondition;
184 use relay_sampling::config::{RuleId, RuleType, SamplingRule, SamplingValue};
185 use relay_sampling::{DynamicSamplingContext, SamplingConfig};
186 use relay_system::Addr;
187
188 use crate::envelope::{ContentType, Envelope, Item};
189 use crate::extractors::RequestMeta;
190 use crate::managed::ManagedEnvelope;
191 use crate::processing;
192 use crate::services::processor::{ProcessEnvelopeGrouped, ProcessingGroup, Submit};
193 use crate::services::projects::project::ProjectInfo;
194 use crate::testutils::create_test_processor;
195
196 use super::*;
197
198 async fn process_envelope_with_root_project_state(
200 envelope: Box<Envelope>,
201 sampling_project_info: Option<&ProjectInfo>,
202 ) -> Envelope {
203 let processor = create_test_processor(Default::default()).await;
204 let outcome_aggregator = Addr::dummy();
205
206 let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
207 assert_eq!(envelopes.len(), 1);
208 let (group, envelope) = envelopes.pop().unwrap();
209
210 let message = ProcessEnvelopeGrouped {
211 group,
212 envelope: ManagedEnvelope::new(envelope, outcome_aggregator),
213 ctx: processing::Context {
214 sampling_project_info,
215 ..processing::Context::for_test()
216 },
217 };
218
219 let Ok(Some(Submit::Envelope(envelope))) =
220 processor.process(&mut Token::noop(), message).await
221 else {
222 panic!();
223 };
224
225 envelope.envelope().clone()
226 }
227
228 fn extract_first_event_from_envelope(envelope: Envelope) -> Event {
229 let item = envelope.items().next().unwrap();
230 let annotated_event: Annotated<Event> =
231 Annotated::from_json_bytes(&item.payload()).unwrap();
232 annotated_event.into_value().unwrap()
233 }
234
235 fn mocked_error_item() -> Item {
236 let mut item = Item::new(ItemType::Event);
237 item.set_payload(
238 ContentType::Json,
239 r#"{
240 "event_id": "52df9022835246eeb317dbd739ccd059",
241 "exception": {
242 "values": [
243 {
244 "type": "mytype",
245 "value": "myvalue",
246 "module": "mymodule",
247 "thread_id": 42,
248 "other": "value"
249 }
250 ]
251 }
252 }"#,
253 );
254 item
255 }
256
257 #[tokio::test]
258 async fn test_error_is_tagged_correctly_if_trace_sampling_result_is_none() {
259 let event_id = EventId::new();
260 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
261 .parse()
262 .unwrap();
263 let request_meta = RequestMeta::new(dsn);
264
265 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
267 envelope.add_item(mocked_error_item());
268 let new_envelope = process_envelope_with_root_project_state(envelope, None).await;
269 let event = extract_first_event_from_envelope(new_envelope);
270
271 assert!(event.contexts.value().is_none());
272 }
273
274 fn project_state_with_single_rule(sample_rate: f64) -> ProjectInfo {
275 let sampling_config = SamplingConfig {
276 rules: vec![SamplingRule {
277 condition: RuleCondition::all(),
278 sampling_value: SamplingValue::SampleRate { value: sample_rate },
279 ty: RuleType::Trace,
280 id: RuleId(1),
281 time_range: Default::default(),
282 decaying_fn: Default::default(),
283 }],
284 ..SamplingConfig::new()
285 };
286
287 let mut sampling_project_state = ProjectInfo::default();
288 sampling_project_state.config.sampling = Some(ErrorBoundary::Ok(sampling_config));
289 sampling_project_state
290 }
291
292 fn mock_dsc() -> DynamicSamplingContext {
293 DynamicSamplingContext {
294 trace_id: "67e5504410b1426f9247bb680e5fe0c8".parse().unwrap(),
295 public_key: ProjectKey::parse("abd0f232775f45feab79864e580d160b").unwrap(),
296 release: Some("1.1.1".to_owned()),
297 user: Default::default(),
298 replay_id: None,
299 environment: None,
300 transaction: Some("transaction1".into()),
301 sample_rate: Some(0.5),
302 sampled: Some(true),
303 other: BTreeMap::new(),
304 }
305 }
306
307 #[tokio::test]
308 async fn test_error_is_tagged_correctly_if_trace_sampling_result_is_some() {
309 let event_id = EventId::new();
310 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
311 .parse()
312 .unwrap();
313 let request_meta = RequestMeta::new(dsn);
314 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
315 envelope.set_dsc(mock_dsc());
316 envelope.add_item(mocked_error_item());
317
318 let sampling_project_state = project_state_with_single_rule(1.0);
320 let new_envelope = process_envelope_with_root_project_state(
321 envelope.clone(),
322 Some(&sampling_project_state),
323 )
324 .await;
325 let event = extract_first_event_from_envelope(new_envelope);
326 let trace_context = event.context::<TraceContext>().unwrap();
327 assert!(trace_context.sampled.value().unwrap());
328
329 let sampling_project_state = project_state_with_single_rule(0.0);
331 let new_envelope =
332 process_envelope_with_root_project_state(envelope, Some(&sampling_project_state)).await;
333 let event = extract_first_event_from_envelope(new_envelope);
334 let trace_context = event.context::<TraceContext>().unwrap();
335 assert!(!trace_context.sampled.value().unwrap());
336 }
337
338 #[tokio::test]
339 async fn test_error_is_not_tagged_if_already_tagged() {
340 let event_id = EventId::new();
341 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
342 .parse()
343 .unwrap();
344 let request_meta = RequestMeta::new(dsn);
345
346 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
348 let mut item = Item::new(ItemType::Event);
349 item.set_payload(
350 ContentType::Json,
351 r#"{
352 "event_id": "52df9022835246eeb317dbd739ccd059",
353 "exception": {
354 "values": [
355 {
356 "type": "mytype",
357 "value": "myvalue",
358 "module": "mymodule",
359 "thread_id": 42,
360 "other": "value"
361 }
362 ]
363 },
364 "contexts": {
365 "trace": {
366 "sampled": true
367 }
368 }
369 }"#,
370 );
371 envelope.add_item(item);
372 let sampling_project_state = project_state_with_single_rule(0.0);
373 let new_envelope =
374 process_envelope_with_root_project_state(envelope, Some(&sampling_project_state)).await;
375 let event = extract_first_event_from_envelope(new_envelope);
376 let trace_context = event.context::<TraceContext>().unwrap();
377 assert!(trace_context.sampled.value().unwrap());
378 }
379}