relay_server/services/processor/
dynamic_sampling.rs1use relay_config::Config;
4use relay_dynamic_config::ErrorBoundary;
5use relay_event_schema::protocol::{Contexts, Event, TraceContext};
6use relay_protocol::{Annotated, Empty};
7use relay_quotas::DataCategory;
8
9use crate::envelope::ItemType;
10use crate::managed::TypedEnvelope;
11use crate::services::outcome::Outcome;
12use crate::services::processor::{
13 EventProcessing, SpansExtracted, TransactionGroup, event_category,
14};
15use crate::services::projects::project::ProjectInfo;
16use crate::utils::{self};
17
18pub fn validate_and_set_dsc<'a, T>(
43 managed_envelope: &mut TypedEnvelope<T>,
44 event: &mut Annotated<Event>,
45 project_info: &'a ProjectInfo,
46 sampling_project_info: Option<&'a ProjectInfo>,
47) -> Option<&'a ProjectInfo> {
48 let original_dsc = managed_envelope.envelope().dsc();
49 if original_dsc.is_some() && sampling_project_info.is_some() {
50 return sampling_project_info;
51 }
52
53 if let Some(event) = event.value()
56 && let Some(key_config) = project_info.get_public_key_config()
57 && let Some(mut dsc) = utils::dsc_from_event(key_config.public_key, event)
58 {
59 let original_sample_rate = original_dsc.and_then(|dsc| dsc.sample_rate);
62 dsc.sample_rate = dsc.sample_rate.or(original_sample_rate);
63
64 managed_envelope.envelope_mut().set_dsc(dsc);
65 return Some(project_info);
66 }
67
68 managed_envelope.envelope_mut().remove_dsc();
70 None
71}
72
73pub fn drop_unsampled_items(
75 managed_envelope: &mut TypedEnvelope<TransactionGroup>,
76 event: Annotated<Event>,
77 outcome: Outcome,
78 spans_extracted: SpansExtracted,
79) {
80 let dropped_items = managed_envelope
82 .envelope_mut()
83 .take_items_by(|item| *item.ty() != ItemType::Profile);
86
87 for item in dropped_items {
88 for (category, quantity) in item.quantities() {
89 if category.index_category().is_none() {
93 managed_envelope.track_outcome(outcome.clone(), category, quantity);
94 }
95 }
96 }
97
98 for item in managed_envelope.envelope_mut().items_mut() {
100 item.set_sampled(false);
101 }
102
103 if !spans_extracted.0 {
112 let spans = event.value().and_then(|e| e.spans.value());
113 let span_count = spans.map_or(0, |s| s.len());
114
115 managed_envelope.track_outcome(outcome.clone(), DataCategory::SpanIndexed, span_count + 1);
118 }
119
120 if let Some(category) = event_category(&event) {
122 let category = category.index_category().unwrap_or(category);
123 managed_envelope.track_outcome(outcome, category, 1)
124 }
125}
126
127pub async fn tag_error_with_sampling_decision<Group: EventProcessing>(
133 managed_envelope: &mut TypedEnvelope<Group>,
134 event: &mut Annotated<Event>,
135 sampling_project_info: Option<&ProjectInfo>,
136 config: &Config,
137) {
138 let (Some(dsc), Some(event)) = (managed_envelope.envelope().dsc(), event.value_mut()) else {
139 return;
140 };
141
142 let root_state = sampling_project_info.as_ref();
143 let sampling_config = match root_state.and_then(|s| s.config.sampling.as_ref()) {
144 Some(ErrorBoundary::Ok(config)) => config,
145 _ => return,
146 };
147
148 if sampling_config.unsupported() {
149 if config.processing_enabled() {
150 relay_log::error!("found unsupported rules even as processing relay");
151 }
152
153 return;
154 }
155
156 let Some(sampled) = utils::is_trace_fully_sampled(sampling_config, dsc).await else {
157 return;
158 };
159
160 let context = event
162 .contexts
163 .get_or_insert_with(Contexts::new)
164 .get_or_default::<TraceContext>();
165
166 if context.sampled.is_empty() {
170 relay_log::trace!("tagged error with `sampled = {}` flag", sampled);
171 context.sampled = Annotated::new(sampled);
172 }
173}
174
175#[cfg(test)]
176mod tests {
177 use std::collections::BTreeMap;
178
179 use relay_base_schema::project::ProjectKey;
180 use relay_cogs::Token;
181 use relay_event_schema::protocol::EventId;
182 use relay_protocol::RuleCondition;
183 use relay_sampling::config::{RuleId, RuleType, SamplingRule, SamplingValue};
184 use relay_sampling::evaluation::ReservoirCounters;
185 use relay_sampling::{DynamicSamplingContext, SamplingConfig};
186 use relay_system::Addr;
187
188 use crate::envelope::{ContentType, Envelope, Item};
189 use crate::extractors::RequestMeta;
190 use crate::managed::ManagedEnvelope;
191 use crate::processing;
192 use crate::services::processor::{ProcessEnvelopeGrouped, ProcessingGroup, Submit};
193 use crate::services::projects::project::ProjectInfo;
194 use crate::testutils::create_test_processor;
195
196 use super::*;
197
198 async fn process_envelope_with_root_project_state(
200 envelope: Box<Envelope>,
201 sampling_project_info: Option<&ProjectInfo>,
202 ) -> Envelope {
203 let processor = create_test_processor(Default::default()).await;
204 let outcome_aggregator = Addr::dummy();
205
206 let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
207 assert_eq!(envelopes.len(), 1);
208 let (group, envelope) = envelopes.pop().unwrap();
209
210 let message = ProcessEnvelopeGrouped {
211 group,
212 envelope: ManagedEnvelope::new(envelope, outcome_aggregator),
213 ctx: processing::Context {
214 sampling_project_info,
215 ..processing::Context::for_test()
216 },
217 reservoir_counters: &ReservoirCounters::default(),
218 };
219
220 let Ok(Some(Submit::Envelope(envelope))) =
221 processor.process(&mut Token::noop(), message).await
222 else {
223 panic!();
224 };
225
226 envelope.envelope().clone()
227 }
228
229 fn extract_first_event_from_envelope(envelope: Envelope) -> Event {
230 let item = envelope.items().next().unwrap();
231 let annotated_event: Annotated<Event> =
232 Annotated::from_json_bytes(&item.payload()).unwrap();
233 annotated_event.into_value().unwrap()
234 }
235
236 fn mocked_error_item() -> Item {
237 let mut item = Item::new(ItemType::Event);
238 item.set_payload(
239 ContentType::Json,
240 r#"{
241 "event_id": "52df9022835246eeb317dbd739ccd059",
242 "exception": {
243 "values": [
244 {
245 "type": "mytype",
246 "value": "myvalue",
247 "module": "mymodule",
248 "thread_id": 42,
249 "other": "value"
250 }
251 ]
252 }
253 }"#,
254 );
255 item
256 }
257
258 #[tokio::test]
259 async fn test_error_is_tagged_correctly_if_trace_sampling_result_is_none() {
260 let event_id = EventId::new();
261 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
262 .parse()
263 .unwrap();
264 let request_meta = RequestMeta::new(dsn);
265
266 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
268 envelope.add_item(mocked_error_item());
269 let new_envelope = process_envelope_with_root_project_state(envelope, None).await;
270 let event = extract_first_event_from_envelope(new_envelope);
271
272 assert!(event.contexts.value().is_none());
273 }
274
275 fn project_state_with_single_rule(sample_rate: f64) -> ProjectInfo {
276 let sampling_config = SamplingConfig {
277 rules: vec![SamplingRule {
278 condition: RuleCondition::all(),
279 sampling_value: SamplingValue::SampleRate { value: sample_rate },
280 ty: RuleType::Trace,
281 id: RuleId(1),
282 time_range: Default::default(),
283 decaying_fn: Default::default(),
284 }],
285 ..SamplingConfig::new()
286 };
287
288 let mut sampling_project_state = ProjectInfo::default();
289 sampling_project_state.config.sampling = Some(ErrorBoundary::Ok(sampling_config));
290 sampling_project_state
291 }
292
293 fn mock_dsc() -> DynamicSamplingContext {
294 DynamicSamplingContext {
295 trace_id: "67e5504410b1426f9247bb680e5fe0c8".parse().unwrap(),
296 public_key: ProjectKey::parse("abd0f232775f45feab79864e580d160b").unwrap(),
297 release: Some("1.1.1".to_owned()),
298 user: Default::default(),
299 replay_id: None,
300 environment: None,
301 transaction: Some("transaction1".into()),
302 sample_rate: Some(0.5),
303 sampled: Some(true),
304 other: BTreeMap::new(),
305 }
306 }
307
308 #[tokio::test]
309 async fn test_error_is_tagged_correctly_if_trace_sampling_result_is_some() {
310 let event_id = EventId::new();
311 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
312 .parse()
313 .unwrap();
314 let request_meta = RequestMeta::new(dsn);
315 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
316 envelope.set_dsc(mock_dsc());
317 envelope.add_item(mocked_error_item());
318
319 let sampling_project_state = project_state_with_single_rule(1.0);
321 let new_envelope = process_envelope_with_root_project_state(
322 envelope.clone(),
323 Some(&sampling_project_state),
324 )
325 .await;
326 let event = extract_first_event_from_envelope(new_envelope);
327 let trace_context = event.context::<TraceContext>().unwrap();
328 assert!(trace_context.sampled.value().unwrap());
329
330 let sampling_project_state = project_state_with_single_rule(0.0);
332 let new_envelope =
333 process_envelope_with_root_project_state(envelope, Some(&sampling_project_state)).await;
334 let event = extract_first_event_from_envelope(new_envelope);
335 let trace_context = event.context::<TraceContext>().unwrap();
336 assert!(!trace_context.sampled.value().unwrap());
337 }
338
339 #[tokio::test]
340 async fn test_error_is_not_tagged_if_already_tagged() {
341 let event_id = EventId::new();
342 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
343 .parse()
344 .unwrap();
345 let request_meta = RequestMeta::new(dsn);
346
347 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
349 let mut item = Item::new(ItemType::Event);
350 item.set_payload(
351 ContentType::Json,
352 r#"{
353 "event_id": "52df9022835246eeb317dbd739ccd059",
354 "exception": {
355 "values": [
356 {
357 "type": "mytype",
358 "value": "myvalue",
359 "module": "mymodule",
360 "thread_id": 42,
361 "other": "value"
362 }
363 ]
364 },
365 "contexts": {
366 "trace": {
367 "sampled": true
368 }
369 }
370 }"#,
371 );
372 envelope.add_item(item);
373 let sampling_project_state = project_state_with_single_rule(0.0);
374 let new_envelope =
375 process_envelope_with_root_project_state(envelope, Some(&sampling_project_state)).await;
376 let event = extract_first_event_from_envelope(new_envelope);
377 let trace_context = event.context::<TraceContext>().unwrap();
378 assert!(trace_context.sampled.value().unwrap());
379 }
380}