1use relay_dynamic_config::{Feature, GlobalConfig};
3use std::net::IpAddr;
4use std::sync::Arc;
5
6use relay_base_schema::events::EventType;
7use relay_base_schema::project::ProjectId;
8use relay_config::Config;
9use relay_event_schema::protocol::{Contexts, Event, ProfileContext};
10use relay_filter::ProjectFiltersConfig;
11use relay_profiling::{ProfileError, ProfileId};
12use relay_protocol::Annotated;
13#[cfg(feature = "processing")]
14use relay_protocol::{Getter, Remark, RemarkType};
15
16use crate::envelope::{ContentType, Item, ItemType};
17use crate::managed::{ItemAction, TypedEnvelope};
18use crate::services::outcome::{DiscardReason, Outcome};
19use crate::services::processor::{TransactionGroup, event_type, should_filter};
20use crate::services::projects::project::ProjectInfo;
21
22pub fn filter<Group>(
26 managed_envelope: &mut TypedEnvelope<Group>,
27 event: &Annotated<Event>,
28 config: Arc<Config>,
29 project_id: ProjectId,
30 project_info: &ProjectInfo,
31) -> Option<ProfileId> {
32 let profiling_disabled = should_filter(&config, project_info, Feature::Profiling);
33 let has_transaction = event_type(event) == Some(EventType::Transaction);
34 let keep_unsampled_profiles = true;
35
36 let mut profile_id = None;
37 managed_envelope.retain_items(|item| match item.ty() {
38 ItemType::Profile if profile_id.is_none() => {
40 if profiling_disabled {
41 return ItemAction::DropSilently;
42 }
43
44 let profile_allowed = has_transaction || (keep_unsampled_profiles && !item.sampled());
47 if !profile_allowed {
48 return ItemAction::DropSilently;
49 }
50
51 match relay_profiling::parse_metadata(&item.payload(), project_id) {
52 Ok(id) => {
53 profile_id = Some(id);
54 ItemAction::Keep
55 }
56 Err(err) => ItemAction::Drop(Outcome::Invalid(DiscardReason::Profiling(
57 relay_profiling::discard_reason(err),
58 ))),
59 }
60 }
61 ItemType::Profile => ItemAction::Drop(Outcome::Invalid(DiscardReason::Profiling(
63 relay_profiling::discard_reason(ProfileError::TooManyProfiles),
64 ))),
65 _ => ItemAction::Keep,
66 });
67
68 profile_id
69}
70
71pub fn transfer_id(event: &mut Annotated<Event>, profile_id: Option<ProfileId>) {
77 let Some(event) = event.value_mut() else {
78 return;
79 };
80
81 match profile_id {
82 Some(profile_id) => {
83 let contexts = event.contexts.get_or_insert_with(Contexts::new);
84 contexts.add(ProfileContext {
85 profile_id: Annotated::new(profile_id),
86 ..ProfileContext::default()
87 });
88 }
89 None => {
90 if let Some(contexts) = event.contexts.value_mut() {
91 if let Some(profile_context) = contexts.get_mut::<ProfileContext>() {
92 profile_context.profile_id = Annotated::empty();
93 }
94 }
95 }
96 }
97}
98
99#[cfg(feature = "processing")]
104pub fn scrub_profiler_id(event: &mut Annotated<Event>) {
105 let Some(event) = event.value_mut() else {
106 return;
107 };
108 let transaction_duration = event
109 .get_value("event.duration")
110 .and_then(|duration| duration.as_f64());
111
112 if !transaction_duration.is_some_and(|duration| duration < 19.8) {
113 return;
114 }
115 if let Some(contexts) = event.contexts.value_mut().as_mut() {
116 if let Some(profiler_id) = contexts
117 .get_mut::<ProfileContext>()
118 .map(|ctx| &mut ctx.profiler_id)
119 {
120 let id = std::mem::take(profiler_id.value_mut());
121 let remark = Remark::new(RemarkType::Removed, "transaction_duration");
122 profiler_id.meta_mut().add_remark(remark);
123 profiler_id.meta_mut().set_original_value(id);
124 }
125 }
126}
127
128pub fn process(
130 managed_envelope: &mut TypedEnvelope<TransactionGroup>,
131 event: &mut Annotated<Event>,
132 global_config: &GlobalConfig,
133 config: Arc<Config>,
134 project_info: Arc<ProjectInfo>,
135) -> Option<ProfileId> {
136 let client_ip = managed_envelope.envelope().meta().client_addr();
137 let filter_settings = &project_info.config.filter_settings;
138
139 let profiling_enabled = project_info.has_feature(Feature::Profiling);
140 let mut profile_id = None;
141
142 managed_envelope.retain_items(|item| match item.ty() {
143 ItemType::Profile => {
144 if !profiling_enabled {
145 return ItemAction::DropSilently;
146 }
147
148 let Some(event) = event.value() else {
151 return ItemAction::DropSilently;
152 };
153
154 match expand_profile(
155 item,
156 event,
157 &config,
158 client_ip,
159 filter_settings,
160 global_config,
161 ) {
162 Ok(id) => {
163 profile_id = Some(id);
164 ItemAction::Keep
165 }
166 Err(outcome) => ItemAction::Drop(outcome),
167 }
168 }
169 _ => ItemAction::Keep,
170 });
171
172 profile_id
173}
174
175fn expand_profile(
177 item: &mut Item,
178 event: &Event,
179 config: &Config,
180 client_ip: Option<IpAddr>,
181 filter_settings: &ProjectFiltersConfig,
182 global_config: &GlobalConfig,
183) -> Result<ProfileId, Outcome> {
184 match relay_profiling::expand_profile(
185 &item.payload(),
186 event,
187 client_ip,
188 filter_settings,
189 global_config,
190 ) {
191 Ok((id, payload)) => {
192 if payload.len() <= config.max_profile_size() {
193 item.set_payload(ContentType::Json, payload);
194 Ok(id)
195 } else {
196 Err(Outcome::Invalid(DiscardReason::Profiling(
197 relay_profiling::discard_reason(relay_profiling::ProfileError::ExceedSizeLimit),
198 )))
199 }
200 }
201 Err(relay_profiling::ProfileError::Filtered(filter_stat_key)) => {
202 Err(Outcome::Filtered(filter_stat_key))
203 }
204 Err(err) => Err(Outcome::Invalid(DiscardReason::Profiling(
205 relay_profiling::discard_reason(err),
206 ))),
207 }
208}
209
210#[cfg(test)]
211mod tests {
212 #[cfg(feature = "processing")]
213 use chrono::{Duration, TimeZone, Utc};
214 use std::sync::Arc;
215 #[cfg(feature = "processing")]
216 use uuid::Uuid;
217
218 #[cfg(feature = "processing")]
219 use insta::assert_debug_snapshot;
220 use relay_cogs::Token;
221 #[cfg(not(feature = "processing"))]
222 use relay_dynamic_config::Feature;
223 use relay_event_schema::protocol::EventId;
224 #[cfg(feature = "processing")]
225 use relay_protocol::get_value;
226 use relay_sampling::evaluation::ReservoirCounters;
227 use relay_system::Addr;
228
229 use crate::envelope::Envelope;
230 use crate::extractors::RequestMeta;
231 use crate::managed::ManagedEnvelope;
232 #[cfg(feature = "processing")]
233 use crate::services::processor::Submit;
234 use crate::services::processor::{ProcessEnvelopeGrouped, ProcessingGroup};
235 use crate::services::projects::project::ProjectInfo;
236 use crate::testutils::create_test_processor;
237
238 use super::*;
239
240 #[cfg(feature = "processing")]
241 #[tokio::test]
242 async fn test_profile_id_transfered() {
243 let config = Config::from_json_value(serde_json::json!({
246 "processing": {
247 "enabled": true,
248 "kafka_config": []
249 }
250 }))
251 .unwrap();
252 let processor = create_test_processor(config).await;
253 let event_id = EventId::new();
254 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
255 .parse()
256 .unwrap();
257 let request_meta = RequestMeta::new(dsn);
258 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
259
260 envelope.add_item({
262 let mut item = Item::new(ItemType::Transaction);
263
264 item.set_payload(
265 ContentType::Json,
266 r#"{
267 "event_id": "9b73438f70e044ecbd006b7fd15b7373",
268 "type": "transaction",
269 "transaction": "/foo/",
270 "timestamp": 946684810.0,
271 "start_timestamp": 946684800.0,
272 "contexts": {
273 "trace": {
274 "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
275 "span_id": "fa90fdead5f74053",
276 "op": "http.server",
277 "type": "trace"
278 }
279 },
280 "transaction_info": {
281 "source": "url"
282 }
283 }"#,
284 );
285 item
286 });
287
288 envelope.add_item({
290 let mut item = Item::new(ItemType::Profile);
291 item.set_payload(
292 ContentType::Json,
293 r#"{
294 "profile_id": "012d836b15bb49d7bbf99e64295d995b",
295 "version": "1",
296 "platform": "android",
297 "os": {"name": "foo", "version": "bar"},
298 "device": {"architecture": "zap"},
299 "timestamp": "2023-10-10 00:00:00Z",
300 "profile": {
301 "samples":[
302 {
303 "stack_id":0,
304 "elapsed_since_start_ns":1,
305 "thread_id":1
306 },
307 {
308 "stack_id":0,
309 "elapsed_since_start_ns":2,
310 "thread_id":1
311 }
312 ],
313 "stacks":[[0]],
314 "frames":[{
315 "function":"main"
316 }]
317 },
318 "transactions": [
319 {
320 "id": "9b73438f70e044ecbd006b7fd15b7373",
321 "name": "/foo/",
322 "trace_id": "4c79f60c11214eb38604f4ae0781bfb2"
323 }
324 ]
325 }"#,
326 );
327 item
328 });
329
330 let mut project_state = ProjectInfo::default();
331 project_state.config.features.0.insert(Feature::Profiling);
332
333 let mut envelopes = ProcessingGroup::split_envelope(*envelope);
334 assert_eq!(envelopes.len(), 1);
335
336 let (group, envelope) = envelopes.pop().unwrap();
337 let envelope = ManagedEnvelope::new(envelope, Addr::dummy(), Addr::dummy());
338
339 let message = ProcessEnvelopeGrouped {
340 group,
341 envelope,
342 project_info: Arc::new(project_state),
343 rate_limits: Default::default(),
344 sampling_project_info: None,
345 reservoir_counters: ReservoirCounters::default(),
346 };
347
348 let Ok(Some(Submit::Envelope(new_envelope))) =
349 processor.process(&mut Token::noop(), message).await
350 else {
351 panic!();
352 };
353 let new_envelope = new_envelope.envelope();
354
355 let item = new_envelope
357 .get_item_by(|item| item.ty() == &ItemType::Transaction)
358 .unwrap();
359 let transaction = Annotated::<Event>::from_json_bytes(&item.payload()).unwrap();
360 let context = transaction
361 .value()
362 .unwrap()
363 .context::<ProfileContext>()
364 .unwrap();
365
366 assert_debug_snapshot!(context, @r###"
367 ProfileContext {
368 profile_id: EventId(
369 012d836b-15bb-49d7-bbf9-9e64295d995b,
370 ),
371 profiler_id: ~,
372 }
373 "###);
374 }
375
376 #[cfg(feature = "processing")]
377 #[tokio::test]
378 async fn test_invalid_profile_id_not_transfered() {
379 let config = Config::from_json_value(serde_json::json!({
381 "processing": {
382 "enabled": true,
383 "kafka_config": []
384 }
385 }))
386 .unwrap();
387 let processor = create_test_processor(config).await;
388 let event_id = EventId::new();
389 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
390 .parse()
391 .unwrap();
392 let request_meta = RequestMeta::new(dsn);
393 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
394
395 envelope.add_item({
397 let mut item = Item::new(ItemType::Transaction);
398
399 item.set_payload(
400 ContentType::Json,
401 r#"{
402 "event_id": "9b73438f70e044ecbd006b7fd15b7373",
403 "type": "transaction",
404 "transaction": "/foo/",
405 "timestamp": 946684810.0,
406 "start_timestamp": 946684800.0,
407 "contexts": {
408 "trace": {
409 "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
410 "span_id": "fa90fdead5f74053",
411 "op": "http.server",
412 "type": "trace"
413 }
414 },
415 "transaction_info": {
416 "source": "url"
417 }
418 }"#,
419 );
420 item
421 });
422
423 envelope.add_item({
425 let mut item = Item::new(ItemType::Profile);
426 item.set_payload(
427 ContentType::Json,
428 r#"{
429 "profile_id": "012d836b15bb49d7bbf99e64295d995b",
430 "version": "1",
431 "platform": "android",
432 "os": {"name": "foo", "version": "bar"},
433 "device": {"architecture": "zap"},
434 "timestamp": "2023-10-10 00:00:00Z",
435 "profile": {
436 "samples":[
437 {
438 "stack_id":0,
439 "elapsed_since_start_ns":1,
440 "thread_id":1
441 },
442 {
443 "stack_id":1,
444 "elapsed_since_start_ns":2,
445 "thread_id":1
446 }
447 ],
448 "stacks":[[0],[]],
449 "frames":[{
450 "function":"main"
451 }]
452 },
453 "transactions": [
454 {
455 "id": "9b73438f70e044ecbd006b7fd15b7373",
456 "name": "/foo/",
457 "trace_id": "4c79f60c11214eb38604f4ae0781bfb2"
458 }
459 ]
460 }"#,
461 );
462 item
463 });
464
465 let mut project_info = ProjectInfo::default();
466 project_info.config.features.0.insert(Feature::Profiling);
467
468 let mut envelopes = ProcessingGroup::split_envelope(*envelope);
469 assert_eq!(envelopes.len(), 1);
470
471 let (group, envelope) = envelopes.pop().unwrap();
472 let envelope = ManagedEnvelope::new(envelope, Addr::dummy(), Addr::dummy());
473
474 let message = ProcessEnvelopeGrouped {
475 group,
476 envelope,
477 project_info: Arc::new(project_info),
478 rate_limits: Default::default(),
479 sampling_project_info: None,
480 reservoir_counters: ReservoirCounters::default(),
481 };
482
483 let Ok(Some(Submit::Envelope(new_envelope))) =
484 processor.process(&mut Token::noop(), message).await
485 else {
486 panic!();
487 };
488 let new_envelope = new_envelope.envelope();
489
490 let item = new_envelope
492 .get_item_by(|item| item.ty() == &ItemType::Transaction)
493 .unwrap();
494 let transaction = Annotated::<Event>::from_json_bytes(&item.payload()).unwrap();
495 let context = transaction
496 .value()
497 .unwrap()
498 .context::<ProfileContext>()
499 .unwrap();
500
501 assert_debug_snapshot!(context, @r###"
502 ProfileContext {
503 profile_id: ~,
504 profiler_id: ~,
505 }
506 "###);
507 }
508
509 #[tokio::test]
510 async fn filter_standalone_profile() {
511 relay_log::init_test!();
512
513 let processor = create_test_processor(Default::default()).await;
515 let event_id = EventId::new();
516 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
517 .parse()
518 .unwrap();
519 let request_meta = RequestMeta::new(dsn);
520 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
521
522 envelope.add_item({
524 let mut item = Item::new(ItemType::Profile);
525 item.set_payload(
526 ContentType::Json,
527 r#"{
528 "profile_id": "012d836b15bb49d7bbf99e64295d995b",
529 "version": "1",
530 "platform": "android",
531 "os": {"name": "foo", "version": "bar"},
532 "device": {"architecture": "zap"},
533 "timestamp": "2023-10-10 00:00:00Z"
534 }"#,
535 );
536 item
537 });
538
539 let mut project_state = ProjectInfo::default();
540 project_state.config.features.0.insert(Feature::Profiling);
541
542 let mut envelopes = ProcessingGroup::split_envelope(*envelope);
543 assert_eq!(envelopes.len(), 1);
544
545 let (group, envelope) = envelopes.pop().unwrap();
546 let envelope = ManagedEnvelope::new(envelope.clone(), Addr::dummy(), Addr::dummy());
547
548 let message = ProcessEnvelopeGrouped {
549 group,
550 envelope,
551 project_info: Arc::new(project_state),
552 rate_limits: Default::default(),
553 sampling_project_info: None,
554 reservoir_counters: ReservoirCounters::default(),
555 };
556
557 let envelope = processor
558 .process(&mut Token::noop(), message)
559 .await
560 .unwrap();
561 assert!(envelope.is_none());
562 }
563
564 #[cfg(feature = "processing")]
565 #[tokio::test]
566 async fn test_profile_id_removed_profiler_id_kept() {
567 let config = Config::from_json_value(serde_json::json!({
569 "processing": {
570 "enabled": true,
571 "kafka_config": []
572 }
573 }))
574 .unwrap();
575 let processor = create_test_processor(config).await;
576 let event_id = EventId::new();
577 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
578 .parse()
579 .unwrap();
580 let request_meta = RequestMeta::new(dsn);
581 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
582
583 envelope.add_item({
585 let mut item = Item::new(ItemType::Transaction);
586
587 item.set_payload(
588 ContentType::Json,
589 r#"{
590 "type": "transaction",
591 "transaction": "/foo/",
592 "timestamp": 946684810.0,
593 "start_timestamp": 946684800.0,
594 "contexts": {
595 "trace": {
596 "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
597 "span_id": "fa90fdead5f74053",
598 "op": "http.server",
599 "type": "trace"
600 },
601 "profile": {
602 "profile_id": "4c79f60c11214eb38604f4ae0781bfb2",
603 "profiler_id": "4c79f60c11214eb38604f4ae0781bfb2",
604 "type": "profile"
605 }
606 },
607 "transaction_info": {
608 "source": "url"
609 }
610 }"#,
611 );
612 item
613 });
614
615 let mut project_state = ProjectInfo::default();
616 project_state.config.features.0.insert(Feature::Profiling);
617
618 let mut envelopes = ProcessingGroup::split_envelope(*envelope);
619 assert_eq!(envelopes.len(), 1);
620
621 let (group, envelope) = envelopes.pop().unwrap();
622 let envelope = ManagedEnvelope::new(envelope, Addr::dummy(), Addr::dummy());
623
624 let message = ProcessEnvelopeGrouped {
625 group,
626 envelope,
627 project_info: Arc::new(project_state),
628 rate_limits: Default::default(),
629 sampling_project_info: None,
630 reservoir_counters: ReservoirCounters::default(),
631 };
632
633 let Ok(Some(Submit::Envelope(new_envelope))) =
634 processor.process(&mut Token::noop(), message).await
635 else {
636 panic!();
637 };
638 let new_envelope = new_envelope.envelope();
639
640 let item = new_envelope
642 .get_item_by(|item| item.ty() == &ItemType::Transaction)
643 .unwrap();
644 let transaction = Annotated::<Event>::from_json_bytes(&item.payload()).unwrap();
645 let context = transaction
646 .value()
647 .unwrap()
648 .context::<ProfileContext>()
649 .unwrap();
650
651 assert_debug_snapshot!(context, @r###"
652 ProfileContext {
653 profile_id: ~,
654 profiler_id: EventId(
655 4c79f60c-1121-4eb3-8604-f4ae0781bfb2,
656 ),
657 }
658 "###);
659 }
660
661 #[cfg(feature = "processing")]
662 #[test]
663 fn test_scrub_profiler_id_should_be_stripped() {
664 let mut contexts = Contexts::new();
665 contexts.add(ProfileContext {
666 profiler_id: Annotated::new(EventId(
667 Uuid::parse_str("52df9022835246eeb317dbd739ccd059").unwrap(),
668 )),
669 ..Default::default()
670 });
671 let mut event: Annotated<Event> = Annotated::new(Event {
672 ty: Annotated::new(EventType::Transaction),
673 start_timestamp: Annotated::new(
674 Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap().into(),
675 ),
676 timestamp: Annotated::new(
677 Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0)
678 .unwrap()
679 .checked_add_signed(Duration::milliseconds(15))
680 .unwrap()
681 .into(),
682 ),
683 contexts: Annotated::new(contexts),
684 ..Default::default()
685 });
686
687 scrub_profiler_id(&mut event);
688
689 let profile_context = get_value!(event.contexts)
690 .unwrap()
691 .get::<ProfileContext>()
692 .unwrap();
693
694 assert!(
695 profile_context
696 .profiler_id
697 .meta()
698 .iter_remarks()
699 .any(|remark| remark.rule_id == *"transaction_duration"
700 && remark.ty == RemarkType::Removed)
701 )
702 }
703
704 #[cfg(feature = "processing")]
705 #[test]
706 fn test_scrub_profiler_id_should_not_be_stripped() {
707 let mut contexts = Contexts::new();
708 contexts.add(ProfileContext {
709 profiler_id: Annotated::new(EventId(
710 Uuid::parse_str("52df9022835246eeb317dbd739ccd059").unwrap(),
711 )),
712 ..Default::default()
713 });
714 let mut event: Annotated<Event> = Annotated::new(Event {
715 ty: Annotated::new(EventType::Transaction),
716 start_timestamp: Annotated::new(
717 Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap().into(),
718 ),
719 timestamp: Annotated::new(
720 Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0)
721 .unwrap()
722 .checked_add_signed(Duration::milliseconds(20))
723 .unwrap()
724 .into(),
725 ),
726 contexts: Annotated::new(contexts),
727 ..Default::default()
728 });
729
730 scrub_profiler_id(&mut event);
731
732 let profile_context = get_value!(event.contexts)
733 .unwrap()
734 .get::<ProfileContext>()
735 .unwrap();
736
737 assert!(
738 !profile_context
739 .profiler_id
740 .meta()
741 .iter_remarks()
742 .any(|remark| remark.rule_id == *"transaction_duration"
743 && remark.ty == RemarkType::Removed)
744 )
745 }
746}