1use relay_dynamic_config::{Feature, GlobalConfig};
3use std::net::IpAddr;
4
5use relay_base_schema::events::EventType;
6use relay_base_schema::project::ProjectId;
7use relay_config::Config;
8use relay_event_schema::protocol::{Contexts, Event, ProfileContext};
9use relay_filter::ProjectFiltersConfig;
10use relay_profiling::{ProfileError, ProfileId};
11use relay_protocol::Annotated;
12#[cfg(feature = "processing")]
13use relay_protocol::{Getter, Remark, RemarkType};
14
15use crate::envelope::{ContentType, Item, ItemType};
16use crate::managed::{ItemAction, TypedEnvelope};
17use crate::services::outcome::{DiscardReason, Outcome};
18use crate::services::processor::{TransactionGroup, event_type, should_filter};
19use crate::services::projects::project::ProjectInfo;
20
21pub fn filter<Group>(
25 managed_envelope: &mut TypedEnvelope<Group>,
26 event: &Annotated<Event>,
27 config: &Config,
28 project_id: ProjectId,
29 project_info: &ProjectInfo,
30) -> Option<ProfileId> {
31 let profiling_disabled = should_filter(config, project_info, Feature::Profiling);
32 let has_transaction = event_type(event) == Some(EventType::Transaction);
33 let keep_unsampled_profiles = true;
34
35 let mut profile_id = None;
36 managed_envelope.retain_items(|item| match item.ty() {
37 ItemType::Profile if profile_id.is_none() => {
39 if profiling_disabled {
40 return ItemAction::DropSilently;
41 }
42
43 let profile_allowed = has_transaction || (keep_unsampled_profiles && !item.sampled());
46 if !profile_allowed {
47 return ItemAction::DropSilently;
48 }
49
50 match relay_profiling::parse_metadata(&item.payload(), project_id) {
51 Ok(id) => {
52 profile_id = Some(id);
53 ItemAction::Keep
54 }
55 Err(err) => ItemAction::Drop(Outcome::Invalid(DiscardReason::Profiling(
56 relay_profiling::discard_reason(err),
57 ))),
58 }
59 }
60 ItemType::Profile => ItemAction::Drop(Outcome::Invalid(DiscardReason::Profiling(
62 relay_profiling::discard_reason(ProfileError::TooManyProfiles),
63 ))),
64 _ => ItemAction::Keep,
65 });
66
67 profile_id
68}
69
70pub fn transfer_id(event: &mut Annotated<Event>, profile_id: Option<ProfileId>) {
76 let Some(event) = event.value_mut() else {
77 return;
78 };
79
80 match profile_id {
81 Some(profile_id) => {
82 let contexts = event.contexts.get_or_insert_with(Contexts::new);
83 contexts.add(ProfileContext {
84 profile_id: Annotated::new(profile_id),
85 ..ProfileContext::default()
86 });
87 }
88 None => {
89 if let Some(contexts) = event.contexts.value_mut()
90 && let Some(profile_context) = contexts.get_mut::<ProfileContext>()
91 {
92 profile_context.profile_id = Annotated::empty();
93 }
94 }
95 }
96}
97
98#[cfg(feature = "processing")]
103pub fn scrub_profiler_id(event: &mut Annotated<Event>) {
104 let Some(event) = event.value_mut() else {
105 return;
106 };
107 let transaction_duration = event
108 .get_value("event.duration")
109 .and_then(|duration| duration.as_f64());
110
111 if !transaction_duration.is_some_and(|duration| duration < 19.8) {
112 return;
113 }
114 if let Some(contexts) = event.contexts.value_mut().as_mut()
115 && let Some(profiler_id) = contexts
116 .get_mut::<ProfileContext>()
117 .map(|ctx| &mut ctx.profiler_id)
118 {
119 let id = std::mem::take(profiler_id.value_mut());
120 let remark = Remark::new(RemarkType::Removed, "transaction_duration");
121 profiler_id.meta_mut().add_remark(remark);
122 profiler_id.meta_mut().set_original_value(id);
123 }
124}
125
126pub fn process(
128 managed_envelope: &mut TypedEnvelope<TransactionGroup>,
129 event: &mut Annotated<Event>,
130 global_config: &GlobalConfig,
131 config: &Config,
132 project_info: &ProjectInfo,
133) -> Option<ProfileId> {
134 let client_ip = managed_envelope.envelope().meta().client_addr();
135 let filter_settings = &project_info.config.filter_settings;
136
137 let profiling_enabled = project_info.has_feature(Feature::Profiling);
138 let mut profile_id = None;
139
140 managed_envelope.retain_items(|item| match item.ty() {
141 ItemType::Profile => {
142 if !profiling_enabled {
143 return ItemAction::DropSilently;
144 }
145
146 let Some(event) = event.value() else {
149 return ItemAction::DropSilently;
150 };
151
152 match expand_profile(
153 item,
154 event,
155 config,
156 client_ip,
157 filter_settings,
158 global_config,
159 ) {
160 Ok(id) => {
161 profile_id = Some(id);
162 ItemAction::Keep
163 }
164 Err(outcome) => ItemAction::Drop(outcome),
165 }
166 }
167 _ => ItemAction::Keep,
168 });
169
170 profile_id
171}
172
173fn expand_profile(
175 item: &mut Item,
176 event: &Event,
177 config: &Config,
178 client_ip: Option<IpAddr>,
179 filter_settings: &ProjectFiltersConfig,
180 global_config: &GlobalConfig,
181) -> Result<ProfileId, Outcome> {
182 match relay_profiling::expand_profile(
183 &item.payload(),
184 event,
185 client_ip,
186 filter_settings,
187 global_config,
188 ) {
189 Ok((id, payload)) => {
190 if payload.len() <= config.max_profile_size() {
191 item.set_payload(ContentType::Json, payload);
192 Ok(id)
193 } else {
194 Err(Outcome::Invalid(DiscardReason::Profiling(
195 relay_profiling::discard_reason(relay_profiling::ProfileError::ExceedSizeLimit),
196 )))
197 }
198 }
199 Err(relay_profiling::ProfileError::Filtered(filter_stat_key)) => {
200 Err(Outcome::Filtered(filter_stat_key))
201 }
202 Err(err) => Err(Outcome::Invalid(DiscardReason::Profiling(
203 relay_profiling::discard_reason(err),
204 ))),
205 }
206}
207
208#[cfg(test)]
209mod tests {
210 #[cfg(feature = "processing")]
211 use chrono::{Duration, TimeZone, Utc};
212 #[cfg(feature = "processing")]
213 use uuid::Uuid;
214
215 #[cfg(feature = "processing")]
216 use insta::assert_debug_snapshot;
217 use relay_cogs::Token;
218 #[cfg(not(feature = "processing"))]
219 use relay_dynamic_config::Feature;
220 use relay_event_schema::protocol::EventId;
221 #[cfg(feature = "processing")]
222 use relay_protocol::get_value;
223 use relay_sampling::evaluation::ReservoirCounters;
224 use relay_system::Addr;
225
226 use crate::envelope::Envelope;
227 use crate::extractors::RequestMeta;
228 use crate::managed::ManagedEnvelope;
229 use crate::processing;
230 #[cfg(feature = "processing")]
231 use crate::services::processor::Submit;
232 use crate::services::processor::{ProcessEnvelopeGrouped, ProcessingGroup};
233 use crate::services::projects::project::ProjectInfo;
234 use crate::testutils::create_test_processor;
235
236 use super::*;
237
238 #[cfg(feature = "processing")]
239 #[tokio::test]
240 async fn test_profile_id_transfered() {
241 let config = Config::from_json_value(serde_json::json!({
242 "processing": {
243 "enabled": true,
244 "kafka_config": []
245 }
246 }))
247 .unwrap();
248 let processor = create_test_processor(config).await;
249 let event_id = EventId::new();
250 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
251 .parse()
252 .unwrap();
253 let request_meta = RequestMeta::new(dsn);
254 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
255
256 envelope.add_item({
258 let mut item = Item::new(ItemType::Transaction);
259
260 item.set_payload(
261 ContentType::Json,
262 r#"{
263 "event_id": "9b73438f70e044ecbd006b7fd15b7373",
264 "type": "transaction",
265 "transaction": "/foo/",
266 "timestamp": 946684810.0,
267 "start_timestamp": 946684800.0,
268 "contexts": {
269 "trace": {
270 "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
271 "span_id": "fa90fdead5f74053",
272 "op": "http.server",
273 "type": "trace"
274 }
275 },
276 "transaction_info": {
277 "source": "url"
278 }
279 }"#,
280 );
281 item
282 });
283
284 envelope.add_item({
286 let mut item = Item::new(ItemType::Profile);
287 item.set_payload(
288 ContentType::Json,
289 r#"{
290 "profile_id": "012d836b15bb49d7bbf99e64295d995b",
291 "version": "1",
292 "platform": "android",
293 "os": {"name": "foo", "version": "bar"},
294 "device": {"architecture": "zap"},
295 "timestamp": "2023-10-10 00:00:00Z",
296 "profile": {
297 "samples":[
298 {
299 "stack_id":0,
300 "elapsed_since_start_ns":1,
301 "thread_id":1
302 },
303 {
304 "stack_id":0,
305 "elapsed_since_start_ns":2,
306 "thread_id":1
307 }
308 ],
309 "stacks":[[0]],
310 "frames":[{
311 "function":"main"
312 }]
313 },
314 "transactions": [
315 {
316 "id": "9b73438f70e044ecbd006b7fd15b7373",
317 "name": "/foo/",
318 "trace_id": "4c79f60c11214eb38604f4ae0781bfb2"
319 }
320 ]
321 }"#,
322 );
323 item
324 });
325
326 let mut project_info = ProjectInfo::default();
327 project_info.config.features.0.insert(Feature::Profiling);
328
329 let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
330 assert_eq!(envelopes.len(), 1);
331
332 let (group, envelope) = envelopes.pop().unwrap();
333 let envelope = ManagedEnvelope::new(envelope, Addr::dummy());
334
335 let message = ProcessEnvelopeGrouped {
336 group,
337 envelope,
338 ctx: processing::Context {
339 project_info: &project_info,
340 ..processing::Context::for_test()
341 },
342 reservoir_counters: &ReservoirCounters::default(),
343 };
344
345 let Ok(Some(Submit::Envelope(new_envelope))) =
346 processor.process(&mut Token::noop(), message).await
347 else {
348 panic!();
349 };
350 let new_envelope = new_envelope.envelope();
351
352 let item = new_envelope
354 .get_item_by(|item| item.ty() == &ItemType::Transaction)
355 .unwrap();
356 let transaction = Annotated::<Event>::from_json_bytes(&item.payload()).unwrap();
357 let context = transaction
358 .value()
359 .unwrap()
360 .context::<ProfileContext>()
361 .unwrap();
362
363 assert_debug_snapshot!(context, @r###"
364 ProfileContext {
365 profile_id: EventId(
366 012d836b-15bb-49d7-bbf9-9e64295d995b,
367 ),
368 profiler_id: ~,
369 }
370 "###);
371 }
372
373 #[cfg(feature = "processing")]
374 #[tokio::test]
375 async fn test_invalid_profile_id_not_transfered() {
376 let config = Config::from_json_value(serde_json::json!({
378 "processing": {
379 "enabled": true,
380 "kafka_config": []
381 }
382 }))
383 .unwrap();
384 let processor = create_test_processor(config).await;
385 let event_id = EventId::new();
386 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
387 .parse()
388 .unwrap();
389 let request_meta = RequestMeta::new(dsn);
390 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
391
392 envelope.add_item({
394 let mut item = Item::new(ItemType::Transaction);
395
396 item.set_payload(
397 ContentType::Json,
398 r#"{
399 "event_id": "9b73438f70e044ecbd006b7fd15b7373",
400 "type": "transaction",
401 "transaction": "/foo/",
402 "timestamp": 946684810.0,
403 "start_timestamp": 946684800.0,
404 "contexts": {
405 "trace": {
406 "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
407 "span_id": "fa90fdead5f74053",
408 "op": "http.server",
409 "type": "trace"
410 }
411 },
412 "transaction_info": {
413 "source": "url"
414 }
415 }"#,
416 );
417 item
418 });
419
420 envelope.add_item({
422 let mut item = Item::new(ItemType::Profile);
423 item.set_payload(
424 ContentType::Json,
425 r#"{
426 "profile_id": "012d836b15bb49d7bbf99e64295d995b",
427 "version": "1",
428 "platform": "android",
429 "os": {"name": "foo", "version": "bar"},
430 "device": {"architecture": "zap"},
431 "timestamp": "2023-10-10 00:00:00Z",
432 "profile": {
433 "samples":[
434 {
435 "stack_id":0,
436 "elapsed_since_start_ns":1,
437 "thread_id":1
438 },
439 {
440 "stack_id":1,
441 "elapsed_since_start_ns":2,
442 "thread_id":1
443 }
444 ],
445 "stacks":[[0],[]],
446 "frames":[{
447 "function":"main"
448 }]
449 },
450 "transactions": [
451 {
452 "id": "9b73438f70e044ecbd006b7fd15b7373",
453 "name": "/foo/",
454 "trace_id": "4c79f60c11214eb38604f4ae0781bfb2"
455 }
456 ]
457 }"#,
458 );
459 item
460 });
461
462 let mut project_info = ProjectInfo::default();
463 project_info.config.features.0.insert(Feature::Profiling);
464
465 let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
466 assert_eq!(envelopes.len(), 1);
467
468 let (group, envelope) = envelopes.pop().unwrap();
469 let envelope = ManagedEnvelope::new(envelope, Addr::dummy());
470
471 let message = ProcessEnvelopeGrouped {
472 group,
473 envelope,
474 ctx: processing::Context {
475 project_info: &project_info,
476 ..processing::Context::for_test()
477 },
478 reservoir_counters: &ReservoirCounters::default(),
479 };
480
481 let Ok(Some(Submit::Envelope(new_envelope))) =
482 processor.process(&mut Token::noop(), message).await
483 else {
484 panic!();
485 };
486 let new_envelope = new_envelope.envelope();
487
488 let item = new_envelope
490 .get_item_by(|item| item.ty() == &ItemType::Transaction)
491 .unwrap();
492 let transaction = Annotated::<Event>::from_json_bytes(&item.payload()).unwrap();
493 let context = transaction
494 .value()
495 .unwrap()
496 .context::<ProfileContext>()
497 .unwrap();
498
499 assert_debug_snapshot!(context, @r###"
500 ProfileContext {
501 profile_id: ~,
502 profiler_id: ~,
503 }
504 "###);
505 }
506
507 #[tokio::test]
508 async fn filter_standalone_profile() {
509 relay_log::init_test!();
510
511 let processor = create_test_processor(Default::default()).await;
513 let event_id = EventId::new();
514 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
515 .parse()
516 .unwrap();
517 let request_meta = RequestMeta::new(dsn);
518 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
519
520 envelope.add_item({
522 let mut item = Item::new(ItemType::Profile);
523 item.set_payload(
524 ContentType::Json,
525 r#"{
526 "profile_id": "012d836b15bb49d7bbf99e64295d995b",
527 "version": "1",
528 "platform": "android",
529 "os": {"name": "foo", "version": "bar"},
530 "device": {"architecture": "zap"},
531 "timestamp": "2023-10-10 00:00:00Z"
532 }"#,
533 );
534 item
535 });
536
537 let mut project_info = ProjectInfo::default();
538 project_info.config.features.0.insert(Feature::Profiling);
539
540 let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
541 assert_eq!(envelopes.len(), 1);
542
543 let (group, envelope) = envelopes.pop().unwrap();
544 let envelope = ManagedEnvelope::new(envelope.clone(), Addr::dummy());
545
546 let message = ProcessEnvelopeGrouped {
547 group,
548 envelope,
549 ctx: processing::Context {
550 project_info: &project_info,
551 ..processing::Context::for_test()
552 },
553 reservoir_counters: &ReservoirCounters::default(),
554 };
555
556 let envelope = processor
557 .process(&mut Token::noop(), message)
558 .await
559 .unwrap();
560 assert!(envelope.is_none());
561 }
562
563 #[cfg(feature = "processing")]
564 #[tokio::test]
565 async fn test_profile_id_removed_profiler_id_kept() {
566 let config = Config::from_json_value(serde_json::json!({
568 "processing": {
569 "enabled": true,
570 "kafka_config": []
571 }
572 }))
573 .unwrap();
574 let processor = create_test_processor(config).await;
575 let event_id = EventId::new();
576 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
577 .parse()
578 .unwrap();
579 let request_meta = RequestMeta::new(dsn);
580 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
581
582 envelope.add_item({
584 let mut item = Item::new(ItemType::Transaction);
585
586 item.set_payload(
587 ContentType::Json,
588 r#"{
589 "type": "transaction",
590 "transaction": "/foo/",
591 "timestamp": 946684810.0,
592 "start_timestamp": 946684800.0,
593 "contexts": {
594 "trace": {
595 "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
596 "span_id": "fa90fdead5f74053",
597 "op": "http.server",
598 "type": "trace"
599 },
600 "profile": {
601 "profile_id": "4c79f60c11214eb38604f4ae0781bfb2",
602 "profiler_id": "4c79f60c11214eb38604f4ae0781bfb2",
603 "type": "profile"
604 }
605 },
606 "transaction_info": {
607 "source": "url"
608 }
609 }"#,
610 );
611 item
612 });
613
614 let mut project_info = ProjectInfo::default();
615 project_info.config.features.0.insert(Feature::Profiling);
616
617 let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
618 assert_eq!(envelopes.len(), 1);
619
620 let (group, envelope) = envelopes.pop().unwrap();
621 let envelope = ManagedEnvelope::new(envelope, Addr::dummy());
622
623 let message = ProcessEnvelopeGrouped {
624 group,
625 envelope,
626 ctx: processing::Context {
627 project_info: &project_info,
628 ..processing::Context::for_test()
629 },
630 reservoir_counters: &ReservoirCounters::default(),
631 };
632
633 let Ok(Some(Submit::Envelope(new_envelope))) =
634 processor.process(&mut Token::noop(), message).await
635 else {
636 panic!();
637 };
638 let new_envelope = new_envelope.envelope();
639
640 let item = new_envelope
642 .get_item_by(|item| item.ty() == &ItemType::Transaction)
643 .unwrap();
644 let transaction = Annotated::<Event>::from_json_bytes(&item.payload()).unwrap();
645 let context = transaction
646 .value()
647 .unwrap()
648 .context::<ProfileContext>()
649 .unwrap();
650
651 assert_debug_snapshot!(context, @r###"
652 ProfileContext {
653 profile_id: ~,
654 profiler_id: EventId(
655 4c79f60c-1121-4eb3-8604-f4ae0781bfb2,
656 ),
657 }
658 "###);
659 }
660
661 #[cfg(feature = "processing")]
662 #[test]
663 fn test_scrub_profiler_id_should_be_stripped() {
664 let mut contexts = Contexts::new();
665 contexts.add(ProfileContext {
666 profiler_id: Annotated::new(EventId(
667 Uuid::parse_str("52df9022835246eeb317dbd739ccd059").unwrap(),
668 )),
669 ..Default::default()
670 });
671 let mut event: Annotated<Event> = Annotated::new(Event {
672 ty: Annotated::new(EventType::Transaction),
673 start_timestamp: Annotated::new(
674 Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap().into(),
675 ),
676 timestamp: Annotated::new(
677 Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0)
678 .unwrap()
679 .checked_add_signed(Duration::milliseconds(15))
680 .unwrap()
681 .into(),
682 ),
683 contexts: Annotated::new(contexts),
684 ..Default::default()
685 });
686
687 scrub_profiler_id(&mut event);
688
689 let profile_context = get_value!(event.contexts)
690 .unwrap()
691 .get::<ProfileContext>()
692 .unwrap();
693
694 assert!(
695 profile_context
696 .profiler_id
697 .meta()
698 .iter_remarks()
699 .any(|remark| remark.rule_id == *"transaction_duration"
700 && remark.ty == RemarkType::Removed)
701 )
702 }
703
704 #[cfg(feature = "processing")]
705 #[test]
706 fn test_scrub_profiler_id_should_not_be_stripped() {
707 let mut contexts = Contexts::new();
708 contexts.add(ProfileContext {
709 profiler_id: Annotated::new(EventId(
710 Uuid::parse_str("52df9022835246eeb317dbd739ccd059").unwrap(),
711 )),
712 ..Default::default()
713 });
714 let mut event: Annotated<Event> = Annotated::new(Event {
715 ty: Annotated::new(EventType::Transaction),
716 start_timestamp: Annotated::new(
717 Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap().into(),
718 ),
719 timestamp: Annotated::new(
720 Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0)
721 .unwrap()
722 .checked_add_signed(Duration::milliseconds(20))
723 .unwrap()
724 .into(),
725 ),
726 contexts: Annotated::new(contexts),
727 ..Default::default()
728 });
729
730 scrub_profiler_id(&mut event);
731
732 let profile_context = get_value!(event.contexts)
733 .unwrap()
734 .get::<ProfileContext>()
735 .unwrap();
736
737 assert!(
738 !profile_context
739 .profiler_id
740 .meta()
741 .iter_remarks()
742 .any(|remark| remark.rule_id == *"transaction_duration"
743 && remark.ty == RemarkType::Removed)
744 )
745 }
746}