1use relay_dynamic_config::{Feature, GlobalConfig};
3use std::net::IpAddr;
4use std::sync::Arc;
5
6use relay_base_schema::events::EventType;
7use relay_base_schema::project::ProjectId;
8use relay_config::Config;
9use relay_event_schema::protocol::{Contexts, Event, ProfileContext};
10use relay_filter::ProjectFiltersConfig;
11use relay_profiling::{ProfileError, ProfileId};
12use relay_protocol::Annotated;
13#[cfg(feature = "processing")]
14use relay_protocol::{Getter, Remark, RemarkType};
15
16use crate::envelope::{ContentType, Item, ItemType};
17use crate::managed::{ItemAction, TypedEnvelope};
18use crate::services::outcome::{DiscardReason, Outcome};
19use crate::services::processor::{TransactionGroup, event_type, should_filter};
20use crate::services::projects::project::ProjectInfo;
21
22pub fn filter<Group>(
26 managed_envelope: &mut TypedEnvelope<Group>,
27 event: &Annotated<Event>,
28 config: Arc<Config>,
29 project_id: ProjectId,
30 project_info: &ProjectInfo,
31) -> Option<ProfileId> {
32 let profiling_disabled = should_filter(&config, project_info, Feature::Profiling);
33 let has_transaction = event_type(event) == Some(EventType::Transaction);
34 let keep_unsampled_profiles = true;
35
36 let mut profile_id = None;
37 managed_envelope.retain_items(|item| match item.ty() {
38 ItemType::Profile if profile_id.is_none() => {
40 if profiling_disabled {
41 return ItemAction::DropSilently;
42 }
43
44 let profile_allowed = has_transaction || (keep_unsampled_profiles && !item.sampled());
47 if !profile_allowed {
48 return ItemAction::DropSilently;
49 }
50
51 match relay_profiling::parse_metadata(&item.payload(), project_id) {
52 Ok(id) => {
53 profile_id = Some(id);
54 ItemAction::Keep
55 }
56 Err(err) => ItemAction::Drop(Outcome::Invalid(DiscardReason::Profiling(
57 relay_profiling::discard_reason(err),
58 ))),
59 }
60 }
61 ItemType::Profile => ItemAction::Drop(Outcome::Invalid(DiscardReason::Profiling(
63 relay_profiling::discard_reason(ProfileError::TooManyProfiles),
64 ))),
65 _ => ItemAction::Keep,
66 });
67
68 profile_id
69}
70
71pub fn transfer_id(event: &mut Annotated<Event>, profile_id: Option<ProfileId>) {
77 let Some(event) = event.value_mut() else {
78 return;
79 };
80
81 match profile_id {
82 Some(profile_id) => {
83 let contexts = event.contexts.get_or_insert_with(Contexts::new);
84 contexts.add(ProfileContext {
85 profile_id: Annotated::new(profile_id),
86 ..ProfileContext::default()
87 });
88 }
89 None => {
90 if let Some(contexts) = event.contexts.value_mut()
91 && let Some(profile_context) = contexts.get_mut::<ProfileContext>()
92 {
93 profile_context.profile_id = Annotated::empty();
94 }
95 }
96 }
97}
98
99#[cfg(feature = "processing")]
104pub fn scrub_profiler_id(event: &mut Annotated<Event>) {
105 let Some(event) = event.value_mut() else {
106 return;
107 };
108 let transaction_duration = event
109 .get_value("event.duration")
110 .and_then(|duration| duration.as_f64());
111
112 if !transaction_duration.is_some_and(|duration| duration < 19.8) {
113 return;
114 }
115 if let Some(contexts) = event.contexts.value_mut().as_mut()
116 && let Some(profiler_id) = contexts
117 .get_mut::<ProfileContext>()
118 .map(|ctx| &mut ctx.profiler_id)
119 {
120 let id = std::mem::take(profiler_id.value_mut());
121 let remark = Remark::new(RemarkType::Removed, "transaction_duration");
122 profiler_id.meta_mut().add_remark(remark);
123 profiler_id.meta_mut().set_original_value(id);
124 }
125}
126
127pub fn process(
129 managed_envelope: &mut TypedEnvelope<TransactionGroup>,
130 event: &mut Annotated<Event>,
131 global_config: &GlobalConfig,
132 config: Arc<Config>,
133 project_info: Arc<ProjectInfo>,
134) -> Option<ProfileId> {
135 let client_ip = managed_envelope.envelope().meta().client_addr();
136 let filter_settings = &project_info.config.filter_settings;
137
138 let profiling_enabled = project_info.has_feature(Feature::Profiling);
139 let mut profile_id = None;
140
141 managed_envelope.retain_items(|item| match item.ty() {
142 ItemType::Profile => {
143 if !profiling_enabled {
144 return ItemAction::DropSilently;
145 }
146
147 let Some(event) = event.value() else {
150 return ItemAction::DropSilently;
151 };
152
153 match expand_profile(
154 item,
155 event,
156 &config,
157 client_ip,
158 filter_settings,
159 global_config,
160 ) {
161 Ok(id) => {
162 profile_id = Some(id);
163 ItemAction::Keep
164 }
165 Err(outcome) => ItemAction::Drop(outcome),
166 }
167 }
168 _ => ItemAction::Keep,
169 });
170
171 profile_id
172}
173
174fn expand_profile(
176 item: &mut Item,
177 event: &Event,
178 config: &Config,
179 client_ip: Option<IpAddr>,
180 filter_settings: &ProjectFiltersConfig,
181 global_config: &GlobalConfig,
182) -> Result<ProfileId, Outcome> {
183 match relay_profiling::expand_profile(
184 &item.payload(),
185 event,
186 client_ip,
187 filter_settings,
188 global_config,
189 ) {
190 Ok((id, payload)) => {
191 if payload.len() <= config.max_profile_size() {
192 item.set_payload(ContentType::Json, payload);
193 Ok(id)
194 } else {
195 Err(Outcome::Invalid(DiscardReason::Profiling(
196 relay_profiling::discard_reason(relay_profiling::ProfileError::ExceedSizeLimit),
197 )))
198 }
199 }
200 Err(relay_profiling::ProfileError::Filtered(filter_stat_key)) => {
201 Err(Outcome::Filtered(filter_stat_key))
202 }
203 Err(err) => Err(Outcome::Invalid(DiscardReason::Profiling(
204 relay_profiling::discard_reason(err),
205 ))),
206 }
207}
208
209#[cfg(test)]
210mod tests {
211 #[cfg(feature = "processing")]
212 use chrono::{Duration, TimeZone, Utc};
213 use std::sync::Arc;
214 #[cfg(feature = "processing")]
215 use uuid::Uuid;
216
217 #[cfg(feature = "processing")]
218 use insta::assert_debug_snapshot;
219 use relay_cogs::Token;
220 #[cfg(not(feature = "processing"))]
221 use relay_dynamic_config::Feature;
222 use relay_event_schema::protocol::EventId;
223 #[cfg(feature = "processing")]
224 use relay_protocol::get_value;
225 use relay_sampling::evaluation::ReservoirCounters;
226 use relay_system::Addr;
227
228 use crate::envelope::Envelope;
229 use crate::extractors::RequestMeta;
230 use crate::managed::ManagedEnvelope;
231 #[cfg(feature = "processing")]
232 use crate::services::processor::Submit;
233 use crate::services::processor::{ProcessEnvelopeGrouped, ProcessingGroup};
234 use crate::services::projects::project::ProjectInfo;
235 use crate::testutils::create_test_processor;
236
237 use super::*;
238
239 #[cfg(feature = "processing")]
240 #[tokio::test]
241 async fn test_profile_id_transfered() {
242 let config = Config::from_json_value(serde_json::json!({
245 "processing": {
246 "enabled": true,
247 "kafka_config": []
248 }
249 }))
250 .unwrap();
251 let processor = create_test_processor(config).await;
252 let event_id = EventId::new();
253 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
254 .parse()
255 .unwrap();
256 let request_meta = RequestMeta::new(dsn);
257 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
258
259 envelope.add_item({
261 let mut item = Item::new(ItemType::Transaction);
262
263 item.set_payload(
264 ContentType::Json,
265 r#"{
266 "event_id": "9b73438f70e044ecbd006b7fd15b7373",
267 "type": "transaction",
268 "transaction": "/foo/",
269 "timestamp": 946684810.0,
270 "start_timestamp": 946684800.0,
271 "contexts": {
272 "trace": {
273 "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
274 "span_id": "fa90fdead5f74053",
275 "op": "http.server",
276 "type": "trace"
277 }
278 },
279 "transaction_info": {
280 "source": "url"
281 }
282 }"#,
283 );
284 item
285 });
286
287 envelope.add_item({
289 let mut item = Item::new(ItemType::Profile);
290 item.set_payload(
291 ContentType::Json,
292 r#"{
293 "profile_id": "012d836b15bb49d7bbf99e64295d995b",
294 "version": "1",
295 "platform": "android",
296 "os": {"name": "foo", "version": "bar"},
297 "device": {"architecture": "zap"},
298 "timestamp": "2023-10-10 00:00:00Z",
299 "profile": {
300 "samples":[
301 {
302 "stack_id":0,
303 "elapsed_since_start_ns":1,
304 "thread_id":1
305 },
306 {
307 "stack_id":0,
308 "elapsed_since_start_ns":2,
309 "thread_id":1
310 }
311 ],
312 "stacks":[[0]],
313 "frames":[{
314 "function":"main"
315 }]
316 },
317 "transactions": [
318 {
319 "id": "9b73438f70e044ecbd006b7fd15b7373",
320 "name": "/foo/",
321 "trace_id": "4c79f60c11214eb38604f4ae0781bfb2"
322 }
323 ]
324 }"#,
325 );
326 item
327 });
328
329 let mut project_state = ProjectInfo::default();
330 project_state.config.features.0.insert(Feature::Profiling);
331
332 let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
333 assert_eq!(envelopes.len(), 1);
334
335 let (group, envelope) = envelopes.pop().unwrap();
336 let envelope = ManagedEnvelope::new(envelope, Addr::dummy());
337
338 let message = ProcessEnvelopeGrouped {
339 group,
340 envelope,
341 project_info: Arc::new(project_state),
342 rate_limits: Default::default(),
343 sampling_project_info: None,
344 reservoir_counters: ReservoirCounters::default(),
345 };
346
347 let Ok(Some(Submit::Envelope(new_envelope))) =
348 processor.process(&mut Token::noop(), message).await
349 else {
350 panic!();
351 };
352 let new_envelope = new_envelope.envelope();
353
354 let item = new_envelope
356 .get_item_by(|item| item.ty() == &ItemType::Transaction)
357 .unwrap();
358 let transaction = Annotated::<Event>::from_json_bytes(&item.payload()).unwrap();
359 let context = transaction
360 .value()
361 .unwrap()
362 .context::<ProfileContext>()
363 .unwrap();
364
365 assert_debug_snapshot!(context, @r###"
366 ProfileContext {
367 profile_id: EventId(
368 012d836b-15bb-49d7-bbf9-9e64295d995b,
369 ),
370 profiler_id: ~,
371 }
372 "###);
373 }
374
375 #[cfg(feature = "processing")]
376 #[tokio::test]
377 async fn test_invalid_profile_id_not_transfered() {
378 let config = Config::from_json_value(serde_json::json!({
380 "processing": {
381 "enabled": true,
382 "kafka_config": []
383 }
384 }))
385 .unwrap();
386 let processor = create_test_processor(config).await;
387 let event_id = EventId::new();
388 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
389 .parse()
390 .unwrap();
391 let request_meta = RequestMeta::new(dsn);
392 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
393
394 envelope.add_item({
396 let mut item = Item::new(ItemType::Transaction);
397
398 item.set_payload(
399 ContentType::Json,
400 r#"{
401 "event_id": "9b73438f70e044ecbd006b7fd15b7373",
402 "type": "transaction",
403 "transaction": "/foo/",
404 "timestamp": 946684810.0,
405 "start_timestamp": 946684800.0,
406 "contexts": {
407 "trace": {
408 "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
409 "span_id": "fa90fdead5f74053",
410 "op": "http.server",
411 "type": "trace"
412 }
413 },
414 "transaction_info": {
415 "source": "url"
416 }
417 }"#,
418 );
419 item
420 });
421
422 envelope.add_item({
424 let mut item = Item::new(ItemType::Profile);
425 item.set_payload(
426 ContentType::Json,
427 r#"{
428 "profile_id": "012d836b15bb49d7bbf99e64295d995b",
429 "version": "1",
430 "platform": "android",
431 "os": {"name": "foo", "version": "bar"},
432 "device": {"architecture": "zap"},
433 "timestamp": "2023-10-10 00:00:00Z",
434 "profile": {
435 "samples":[
436 {
437 "stack_id":0,
438 "elapsed_since_start_ns":1,
439 "thread_id":1
440 },
441 {
442 "stack_id":1,
443 "elapsed_since_start_ns":2,
444 "thread_id":1
445 }
446 ],
447 "stacks":[[0],[]],
448 "frames":[{
449 "function":"main"
450 }]
451 },
452 "transactions": [
453 {
454 "id": "9b73438f70e044ecbd006b7fd15b7373",
455 "name": "/foo/",
456 "trace_id": "4c79f60c11214eb38604f4ae0781bfb2"
457 }
458 ]
459 }"#,
460 );
461 item
462 });
463
464 let mut project_info = ProjectInfo::default();
465 project_info.config.features.0.insert(Feature::Profiling);
466
467 let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
468 assert_eq!(envelopes.len(), 1);
469
470 let (group, envelope) = envelopes.pop().unwrap();
471 let envelope = ManagedEnvelope::new(envelope, Addr::dummy());
472
473 let message = ProcessEnvelopeGrouped {
474 group,
475 envelope,
476 project_info: Arc::new(project_info),
477 rate_limits: Default::default(),
478 sampling_project_info: None,
479 reservoir_counters: ReservoirCounters::default(),
480 };
481
482 let Ok(Some(Submit::Envelope(new_envelope))) =
483 processor.process(&mut Token::noop(), message).await
484 else {
485 panic!();
486 };
487 let new_envelope = new_envelope.envelope();
488
489 let item = new_envelope
491 .get_item_by(|item| item.ty() == &ItemType::Transaction)
492 .unwrap();
493 let transaction = Annotated::<Event>::from_json_bytes(&item.payload()).unwrap();
494 let context = transaction
495 .value()
496 .unwrap()
497 .context::<ProfileContext>()
498 .unwrap();
499
500 assert_debug_snapshot!(context, @r###"
501 ProfileContext {
502 profile_id: ~,
503 profiler_id: ~,
504 }
505 "###);
506 }
507
508 #[tokio::test]
509 async fn filter_standalone_profile() {
510 relay_log::init_test!();
511
512 let processor = create_test_processor(Default::default()).await;
514 let event_id = EventId::new();
515 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
516 .parse()
517 .unwrap();
518 let request_meta = RequestMeta::new(dsn);
519 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
520
521 envelope.add_item({
523 let mut item = Item::new(ItemType::Profile);
524 item.set_payload(
525 ContentType::Json,
526 r#"{
527 "profile_id": "012d836b15bb49d7bbf99e64295d995b",
528 "version": "1",
529 "platform": "android",
530 "os": {"name": "foo", "version": "bar"},
531 "device": {"architecture": "zap"},
532 "timestamp": "2023-10-10 00:00:00Z"
533 }"#,
534 );
535 item
536 });
537
538 let mut project_state = ProjectInfo::default();
539 project_state.config.features.0.insert(Feature::Profiling);
540
541 let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
542 assert_eq!(envelopes.len(), 1);
543
544 let (group, envelope) = envelopes.pop().unwrap();
545 let envelope = ManagedEnvelope::new(envelope.clone(), Addr::dummy());
546
547 let message = ProcessEnvelopeGrouped {
548 group,
549 envelope,
550 project_info: Arc::new(project_state),
551 rate_limits: Default::default(),
552 sampling_project_info: None,
553 reservoir_counters: ReservoirCounters::default(),
554 };
555
556 let envelope = processor
557 .process(&mut Token::noop(), message)
558 .await
559 .unwrap();
560 assert!(envelope.is_none());
561 }
562
563 #[cfg(feature = "processing")]
564 #[tokio::test]
565 async fn test_profile_id_removed_profiler_id_kept() {
566 let config = Config::from_json_value(serde_json::json!({
568 "processing": {
569 "enabled": true,
570 "kafka_config": []
571 }
572 }))
573 .unwrap();
574 let processor = create_test_processor(config).await;
575 let event_id = EventId::new();
576 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
577 .parse()
578 .unwrap();
579 let request_meta = RequestMeta::new(dsn);
580 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
581
582 envelope.add_item({
584 let mut item = Item::new(ItemType::Transaction);
585
586 item.set_payload(
587 ContentType::Json,
588 r#"{
589 "type": "transaction",
590 "transaction": "/foo/",
591 "timestamp": 946684810.0,
592 "start_timestamp": 946684800.0,
593 "contexts": {
594 "trace": {
595 "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
596 "span_id": "fa90fdead5f74053",
597 "op": "http.server",
598 "type": "trace"
599 },
600 "profile": {
601 "profile_id": "4c79f60c11214eb38604f4ae0781bfb2",
602 "profiler_id": "4c79f60c11214eb38604f4ae0781bfb2",
603 "type": "profile"
604 }
605 },
606 "transaction_info": {
607 "source": "url"
608 }
609 }"#,
610 );
611 item
612 });
613
614 let mut project_state = ProjectInfo::default();
615 project_state.config.features.0.insert(Feature::Profiling);
616
617 let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
618 assert_eq!(envelopes.len(), 1);
619
620 let (group, envelope) = envelopes.pop().unwrap();
621 let envelope = ManagedEnvelope::new(envelope, Addr::dummy());
622
623 let message = ProcessEnvelopeGrouped {
624 group,
625 envelope,
626 project_info: Arc::new(project_state),
627 rate_limits: Default::default(),
628 sampling_project_info: None,
629 reservoir_counters: ReservoirCounters::default(),
630 };
631
632 let Ok(Some(Submit::Envelope(new_envelope))) =
633 processor.process(&mut Token::noop(), message).await
634 else {
635 panic!();
636 };
637 let new_envelope = new_envelope.envelope();
638
639 let item = new_envelope
641 .get_item_by(|item| item.ty() == &ItemType::Transaction)
642 .unwrap();
643 let transaction = Annotated::<Event>::from_json_bytes(&item.payload()).unwrap();
644 let context = transaction
645 .value()
646 .unwrap()
647 .context::<ProfileContext>()
648 .unwrap();
649
650 assert_debug_snapshot!(context, @r###"
651 ProfileContext {
652 profile_id: ~,
653 profiler_id: EventId(
654 4c79f60c-1121-4eb3-8604-f4ae0781bfb2,
655 ),
656 }
657 "###);
658 }
659
660 #[cfg(feature = "processing")]
661 #[test]
662 fn test_scrub_profiler_id_should_be_stripped() {
663 let mut contexts = Contexts::new();
664 contexts.add(ProfileContext {
665 profiler_id: Annotated::new(EventId(
666 Uuid::parse_str("52df9022835246eeb317dbd739ccd059").unwrap(),
667 )),
668 ..Default::default()
669 });
670 let mut event: Annotated<Event> = Annotated::new(Event {
671 ty: Annotated::new(EventType::Transaction),
672 start_timestamp: Annotated::new(
673 Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap().into(),
674 ),
675 timestamp: Annotated::new(
676 Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0)
677 .unwrap()
678 .checked_add_signed(Duration::milliseconds(15))
679 .unwrap()
680 .into(),
681 ),
682 contexts: Annotated::new(contexts),
683 ..Default::default()
684 });
685
686 scrub_profiler_id(&mut event);
687
688 let profile_context = get_value!(event.contexts)
689 .unwrap()
690 .get::<ProfileContext>()
691 .unwrap();
692
693 assert!(
694 profile_context
695 .profiler_id
696 .meta()
697 .iter_remarks()
698 .any(|remark| remark.rule_id == *"transaction_duration"
699 && remark.ty == RemarkType::Removed)
700 )
701 }
702
703 #[cfg(feature = "processing")]
704 #[test]
705 fn test_scrub_profiler_id_should_not_be_stripped() {
706 let mut contexts = Contexts::new();
707 contexts.add(ProfileContext {
708 profiler_id: Annotated::new(EventId(
709 Uuid::parse_str("52df9022835246eeb317dbd739ccd059").unwrap(),
710 )),
711 ..Default::default()
712 });
713 let mut event: Annotated<Event> = Annotated::new(Event {
714 ty: Annotated::new(EventType::Transaction),
715 start_timestamp: Annotated::new(
716 Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap().into(),
717 ),
718 timestamp: Annotated::new(
719 Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0)
720 .unwrap()
721 .checked_add_signed(Duration::milliseconds(20))
722 .unwrap()
723 .into(),
724 ),
725 contexts: Annotated::new(contexts),
726 ..Default::default()
727 });
728
729 scrub_profiler_id(&mut event);
730
731 let profile_context = get_value!(event.contexts)
732 .unwrap()
733 .get::<ProfileContext>()
734 .unwrap();
735
736 assert!(
737 !profile_context
738 .profiler_id
739 .meta()
740 .iter_remarks()
741 .any(|remark| remark.rule_id == *"transaction_duration"
742 && remark.ty == RemarkType::Removed)
743 )
744 }
745}