1use relay_dynamic_config::{Feature, GlobalConfig};
3use relay_quotas::DataCategory;
4use std::net::IpAddr;
5
6use relay_base_schema::events::EventType;
7use relay_base_schema::project::ProjectId;
8use relay_config::Config;
9use relay_event_schema::protocol::{Contexts, Event, ProfileContext};
10use relay_filter::ProjectFiltersConfig;
11use relay_profiling::{ProfileError, ProfileId, ProfileType};
12use relay_protocol::{Annotated, Empty};
13#[cfg(feature = "processing")]
14use relay_protocol::{Getter, Remark, RemarkType};
15
16use crate::envelope::{ContentType, Item, ItemType};
17use crate::managed::{ItemAction, ManagedEnvelope, TypedEnvelope};
18use crate::processing::Context;
19use crate::services::outcome::{DiscardReason, Outcome};
20use crate::services::processor::{TransactionGroup, event_type, should_filter};
21use crate::services::projects::project::ProjectInfo;
22
23pub fn filter<Group>(
27 managed_envelope: &mut TypedEnvelope<Group>,
28 event: &Annotated<Event>,
29 config: &Config,
30 project_id: ProjectId,
31 project_info: &ProjectInfo,
32) -> Option<ProfileId> {
33 let profiling_disabled = should_filter(config, project_info, Feature::Profiling);
34 let has_transaction = event_type(event) == Some(EventType::Transaction);
35 let keep_unsampled_profiles = true;
36
37 let mut profile_id = None;
38 managed_envelope.retain_items(|item| match item.ty() {
39 ItemType::Profile if profile_id.is_none() => {
41 if profiling_disabled {
42 return ItemAction::DropSilently;
43 }
44
45 let profile_allowed = has_transaction || (keep_unsampled_profiles && !item.sampled());
48 if !profile_allowed {
49 return ItemAction::DropSilently;
50 }
51
52 match relay_profiling::parse_metadata(&item.payload(), project_id) {
53 Ok(id) => {
54 profile_id = Some(id);
55 ItemAction::Keep
56 }
57 Err(err) => ItemAction::Drop(Outcome::Invalid(DiscardReason::Profiling(
58 relay_profiling::discard_reason(err),
59 ))),
60 }
61 }
62 ItemType::Profile => ItemAction::Drop(Outcome::Invalid(DiscardReason::Profiling(
64 relay_profiling::discard_reason(ProfileError::TooManyProfiles),
65 ))),
66 _ => ItemAction::Keep,
67 });
68
69 profile_id
70}
71
72pub fn transfer_id(event: &mut Annotated<Event>, profile_id: Option<ProfileId>) {
78 let Some(event) = event.value_mut() else {
79 return;
80 };
81
82 match profile_id {
83 Some(profile_id) => {
84 let contexts = event.contexts.get_or_insert_with(Contexts::new);
85 contexts.add(ProfileContext {
86 profile_id: Annotated::new(profile_id),
87 ..ProfileContext::default()
88 });
89 }
90 None => {
91 if let Some(contexts) = event.contexts.value_mut()
92 && let Some(profile_context) = contexts.get_mut::<ProfileContext>()
93 {
94 profile_context.profile_id = Annotated::empty();
95 }
96 }
97 }
98}
99
100pub fn remove_context_if_rate_limited(
108 event: &mut Annotated<Event>,
109 envelope: &ManagedEnvelope,
110 ctx: Context<'_>,
111) {
112 let Some(event) = event.value_mut() else {
113 return;
114 };
115
116 let profile_ctx = event.context::<ProfileContext>();
122 if profile_ctx.is_none_or(|pctx| pctx.profiler_id.is_empty()) {
123 return;
124 }
125
126 let scoping = envelope.scoping();
129 let categories = match event.platform.as_str().map(ProfileType::from_platform) {
130 Some(ProfileType::Ui) => &[
131 scoping.item(DataCategory::ProfileChunkUi),
132 scoping.item(DataCategory::ProfileDurationUi),
133 ],
134 Some(ProfileType::Backend) => &[
135 scoping.item(DataCategory::ProfileChunk),
136 scoping.item(DataCategory::ProfileDuration),
137 ],
138 _ => return,
139 };
140
141 let is_limited = ctx
144 .rate_limits
145 .is_any_limited_with_quotas(ctx.project_info.get_quotas(), categories);
146
147 if is_limited && let Some(contexts) = event.contexts.value_mut() {
148 let _ = contexts.remove::<ProfileContext>();
149 }
150}
151
152#[cfg(feature = "processing")]
157pub fn scrub_profiler_id(event: &mut Annotated<Event>) {
158 let Some(event) = event.value_mut() else {
159 return;
160 };
161 let transaction_duration = event
162 .get_value("event.duration")
163 .and_then(|duration| duration.as_f64());
164
165 if !transaction_duration.is_some_and(|duration| duration < 19.8) {
166 return;
167 }
168 if let Some(contexts) = event.contexts.value_mut().as_mut()
169 && let Some(profiler_id) = contexts
170 .get_mut::<ProfileContext>()
171 .map(|ctx| &mut ctx.profiler_id)
172 {
173 let id = std::mem::take(profiler_id.value_mut());
174 let remark = Remark::new(RemarkType::Removed, "transaction_duration");
175 profiler_id.meta_mut().add_remark(remark);
176 profiler_id.meta_mut().set_original_value(id);
177 }
178}
179
180pub fn process(
182 managed_envelope: &mut TypedEnvelope<TransactionGroup>,
183 event: &mut Annotated<Event>,
184 global_config: &GlobalConfig,
185 config: &Config,
186 project_info: &ProjectInfo,
187) -> Option<ProfileId> {
188 let client_ip = managed_envelope.envelope().meta().client_addr();
189 let filter_settings = &project_info.config.filter_settings;
190
191 let profiling_enabled = project_info.has_feature(Feature::Profiling);
192 let mut profile_id = None;
193
194 managed_envelope.retain_items(|item| match item.ty() {
195 ItemType::Profile => {
196 if !profiling_enabled {
197 return ItemAction::DropSilently;
198 }
199
200 let Some(event) = event.value() else {
203 return ItemAction::DropSilently;
204 };
205
206 match expand_profile(
207 item,
208 event,
209 config,
210 client_ip,
211 filter_settings,
212 global_config,
213 ) {
214 Ok(id) => {
215 profile_id = Some(id);
216 ItemAction::Keep
217 }
218 Err(outcome) => ItemAction::Drop(outcome),
219 }
220 }
221 _ => ItemAction::Keep,
222 });
223
224 profile_id
225}
226
227fn expand_profile(
229 item: &mut Item,
230 event: &Event,
231 config: &Config,
232 client_ip: Option<IpAddr>,
233 filter_settings: &ProjectFiltersConfig,
234 global_config: &GlobalConfig,
235) -> Result<ProfileId, Outcome> {
236 match relay_profiling::expand_profile(
237 &item.payload(),
238 event,
239 client_ip,
240 filter_settings,
241 global_config,
242 ) {
243 Ok((id, payload)) => {
244 if payload.len() <= config.max_profile_size() {
245 item.set_payload(ContentType::Json, payload);
246 Ok(id)
247 } else {
248 Err(Outcome::Invalid(DiscardReason::Profiling(
249 relay_profiling::discard_reason(relay_profiling::ProfileError::ExceedSizeLimit),
250 )))
251 }
252 }
253 Err(relay_profiling::ProfileError::Filtered(filter_stat_key)) => {
254 Err(Outcome::Filtered(filter_stat_key))
255 }
256 Err(err) => Err(Outcome::Invalid(DiscardReason::Profiling(
257 relay_profiling::discard_reason(err),
258 ))),
259 }
260}
261
262#[cfg(test)]
263mod tests {
264 #[cfg(feature = "processing")]
265 use chrono::{Duration, TimeZone, Utc};
266 #[cfg(feature = "processing")]
267 use uuid::Uuid;
268
269 #[cfg(feature = "processing")]
270 use insta::assert_debug_snapshot;
271 use relay_cogs::Token;
272 #[cfg(not(feature = "processing"))]
273 use relay_dynamic_config::Feature;
274 use relay_event_schema::protocol::EventId;
275 #[cfg(feature = "processing")]
276 use relay_protocol::get_value;
277 use relay_sampling::evaluation::ReservoirCounters;
278 use relay_system::Addr;
279
280 use crate::envelope::Envelope;
281 use crate::extractors::RequestMeta;
282 use crate::managed::ManagedEnvelope;
283 use crate::processing;
284 #[cfg(feature = "processing")]
285 use crate::services::processor::Submit;
286 use crate::services::processor::{ProcessEnvelopeGrouped, ProcessingGroup};
287 use crate::services::projects::project::ProjectInfo;
288 use crate::testutils::create_test_processor;
289
290 use super::*;
291
292 #[cfg(feature = "processing")]
293 #[tokio::test]
294 async fn test_profile_id_transfered() {
295 let config = Config::from_json_value(serde_json::json!({
296 "processing": {
297 "enabled": true,
298 "kafka_config": []
299 }
300 }))
301 .unwrap();
302 let processor = create_test_processor(config).await;
303 let event_id = EventId::new();
304 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
305 .parse()
306 .unwrap();
307 let request_meta = RequestMeta::new(dsn);
308 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
309
310 envelope.add_item({
312 let mut item = Item::new(ItemType::Transaction);
313
314 item.set_payload(
315 ContentType::Json,
316 r#"{
317 "event_id": "9b73438f70e044ecbd006b7fd15b7373",
318 "type": "transaction",
319 "transaction": "/foo/",
320 "timestamp": 946684810.0,
321 "start_timestamp": 946684800.0,
322 "contexts": {
323 "trace": {
324 "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
325 "span_id": "fa90fdead5f74053",
326 "op": "http.server",
327 "type": "trace"
328 }
329 },
330 "transaction_info": {
331 "source": "url"
332 }
333 }"#,
334 );
335 item
336 });
337
338 envelope.add_item({
340 let mut item = Item::new(ItemType::Profile);
341 item.set_payload(
342 ContentType::Json,
343 r#"{
344 "profile_id": "012d836b15bb49d7bbf99e64295d995b",
345 "version": "1",
346 "platform": "android",
347 "os": {"name": "foo", "version": "bar"},
348 "device": {"architecture": "zap"},
349 "timestamp": "2023-10-10 00:00:00Z",
350 "profile": {
351 "samples":[
352 {
353 "stack_id":0,
354 "elapsed_since_start_ns":1,
355 "thread_id":1
356 },
357 {
358 "stack_id":0,
359 "elapsed_since_start_ns":2,
360 "thread_id":1
361 }
362 ],
363 "stacks":[[0]],
364 "frames":[{
365 "function":"main"
366 }]
367 },
368 "transactions": [
369 {
370 "id": "9b73438f70e044ecbd006b7fd15b7373",
371 "name": "/foo/",
372 "trace_id": "4c79f60c11214eb38604f4ae0781bfb2"
373 }
374 ]
375 }"#,
376 );
377 item
378 });
379
380 let mut project_info = ProjectInfo::default();
381 project_info.config.features.0.insert(Feature::Profiling);
382
383 let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
384 assert_eq!(envelopes.len(), 1);
385
386 let (group, envelope) = envelopes.pop().unwrap();
387 let envelope = ManagedEnvelope::new(envelope, Addr::dummy());
388
389 let message = ProcessEnvelopeGrouped {
390 group,
391 envelope,
392 ctx: processing::Context {
393 project_info: &project_info,
394 ..processing::Context::for_test()
395 },
396 reservoir_counters: &ReservoirCounters::default(),
397 };
398
399 let Ok(Some(Submit::Envelope(new_envelope))) =
400 processor.process(&mut Token::noop(), message).await
401 else {
402 panic!();
403 };
404 let new_envelope = new_envelope.envelope();
405
406 let item = new_envelope
408 .get_item_by(|item| item.ty() == &ItemType::Transaction)
409 .unwrap();
410 let transaction = Annotated::<Event>::from_json_bytes(&item.payload()).unwrap();
411 let context = transaction
412 .value()
413 .unwrap()
414 .context::<ProfileContext>()
415 .unwrap();
416
417 assert_debug_snapshot!(context, @r###"
418 ProfileContext {
419 profile_id: EventId(
420 012d836b-15bb-49d7-bbf9-9e64295d995b,
421 ),
422 profiler_id: ~,
423 }
424 "###);
425 }
426
427 #[cfg(feature = "processing")]
428 #[tokio::test]
429 async fn test_invalid_profile_id_not_transfered() {
430 let config = Config::from_json_value(serde_json::json!({
432 "processing": {
433 "enabled": true,
434 "kafka_config": []
435 }
436 }))
437 .unwrap();
438 let processor = create_test_processor(config).await;
439 let event_id = EventId::new();
440 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
441 .parse()
442 .unwrap();
443 let request_meta = RequestMeta::new(dsn);
444 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
445
446 envelope.add_item({
448 let mut item = Item::new(ItemType::Transaction);
449
450 item.set_payload(
451 ContentType::Json,
452 r#"{
453 "event_id": "9b73438f70e044ecbd006b7fd15b7373",
454 "type": "transaction",
455 "transaction": "/foo/",
456 "timestamp": 946684810.0,
457 "start_timestamp": 946684800.0,
458 "contexts": {
459 "trace": {
460 "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
461 "span_id": "fa90fdead5f74053",
462 "op": "http.server",
463 "type": "trace"
464 }
465 },
466 "transaction_info": {
467 "source": "url"
468 }
469 }"#,
470 );
471 item
472 });
473
474 envelope.add_item({
476 let mut item = Item::new(ItemType::Profile);
477 item.set_payload(
478 ContentType::Json,
479 r#"{
480 "profile_id": "012d836b15bb49d7bbf99e64295d995b",
481 "version": "1",
482 "platform": "android",
483 "os": {"name": "foo", "version": "bar"},
484 "device": {"architecture": "zap"},
485 "timestamp": "2023-10-10 00:00:00Z",
486 "profile": {
487 "samples":[
488 {
489 "stack_id":0,
490 "elapsed_since_start_ns":1,
491 "thread_id":1
492 },
493 {
494 "stack_id":1,
495 "elapsed_since_start_ns":2,
496 "thread_id":1
497 }
498 ],
499 "stacks":[[0],[]],
500 "frames":[{
501 "function":"main"
502 }]
503 },
504 "transactions": [
505 {
506 "id": "9b73438f70e044ecbd006b7fd15b7373",
507 "name": "/foo/",
508 "trace_id": "4c79f60c11214eb38604f4ae0781bfb2"
509 }
510 ]
511 }"#,
512 );
513 item
514 });
515
516 let mut project_info = ProjectInfo::default();
517 project_info.config.features.0.insert(Feature::Profiling);
518
519 let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
520 assert_eq!(envelopes.len(), 1);
521
522 let (group, envelope) = envelopes.pop().unwrap();
523 let envelope = ManagedEnvelope::new(envelope, Addr::dummy());
524
525 let message = ProcessEnvelopeGrouped {
526 group,
527 envelope,
528 ctx: processing::Context {
529 project_info: &project_info,
530 ..processing::Context::for_test()
531 },
532 reservoir_counters: &ReservoirCounters::default(),
533 };
534
535 let Ok(Some(Submit::Envelope(new_envelope))) =
536 processor.process(&mut Token::noop(), message).await
537 else {
538 panic!();
539 };
540 let new_envelope = new_envelope.envelope();
541
542 let item = new_envelope
544 .get_item_by(|item| item.ty() == &ItemType::Transaction)
545 .unwrap();
546 let transaction = Annotated::<Event>::from_json_bytes(&item.payload()).unwrap();
547 let context = transaction
548 .value()
549 .unwrap()
550 .context::<ProfileContext>()
551 .unwrap();
552
553 assert_debug_snapshot!(context, @r###"
554 ProfileContext {
555 profile_id: ~,
556 profiler_id: ~,
557 }
558 "###);
559 }
560
561 #[tokio::test]
562 async fn filter_standalone_profile() {
563 relay_log::init_test!();
564
565 let processor = create_test_processor(Default::default()).await;
567 let event_id = EventId::new();
568 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
569 .parse()
570 .unwrap();
571 let request_meta = RequestMeta::new(dsn);
572 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
573
574 envelope.add_item({
576 let mut item = Item::new(ItemType::Profile);
577 item.set_payload(
578 ContentType::Json,
579 r#"{
580 "profile_id": "012d836b15bb49d7bbf99e64295d995b",
581 "version": "1",
582 "platform": "android",
583 "os": {"name": "foo", "version": "bar"},
584 "device": {"architecture": "zap"},
585 "timestamp": "2023-10-10 00:00:00Z"
586 }"#,
587 );
588 item
589 });
590
591 let mut project_info = ProjectInfo::default();
592 project_info.config.features.0.insert(Feature::Profiling);
593
594 let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
595 assert_eq!(envelopes.len(), 1);
596
597 let (group, envelope) = envelopes.pop().unwrap();
598 let envelope = ManagedEnvelope::new(envelope.clone(), Addr::dummy());
599
600 let message = ProcessEnvelopeGrouped {
601 group,
602 envelope,
603 ctx: processing::Context {
604 project_info: &project_info,
605 ..processing::Context::for_test()
606 },
607 reservoir_counters: &ReservoirCounters::default(),
608 };
609
610 let envelope = processor
611 .process(&mut Token::noop(), message)
612 .await
613 .unwrap();
614 assert!(envelope.is_none());
615 }
616
617 #[cfg(feature = "processing")]
618 #[tokio::test]
619 async fn test_profile_id_removed_profiler_id_kept() {
620 let config = Config::from_json_value(serde_json::json!({
622 "processing": {
623 "enabled": true,
624 "kafka_config": []
625 }
626 }))
627 .unwrap();
628 let processor = create_test_processor(config).await;
629 let event_id = EventId::new();
630 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
631 .parse()
632 .unwrap();
633 let request_meta = RequestMeta::new(dsn);
634 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
635
636 envelope.add_item({
638 let mut item = Item::new(ItemType::Transaction);
639
640 item.set_payload(
641 ContentType::Json,
642 r#"{
643 "type": "transaction",
644 "transaction": "/foo/",
645 "timestamp": 946684810.0,
646 "start_timestamp": 946684800.0,
647 "contexts": {
648 "trace": {
649 "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
650 "span_id": "fa90fdead5f74053",
651 "op": "http.server",
652 "type": "trace"
653 },
654 "profile": {
655 "profile_id": "4c79f60c11214eb38604f4ae0781bfb2",
656 "profiler_id": "4c79f60c11214eb38604f4ae0781bfb2",
657 "type": "profile"
658 }
659 },
660 "transaction_info": {
661 "source": "url"
662 }
663 }"#,
664 );
665 item
666 });
667
668 let mut project_info = ProjectInfo::default();
669 project_info.config.features.0.insert(Feature::Profiling);
670
671 let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
672 assert_eq!(envelopes.len(), 1);
673
674 let (group, envelope) = envelopes.pop().unwrap();
675 let envelope = ManagedEnvelope::new(envelope, Addr::dummy());
676
677 let message = ProcessEnvelopeGrouped {
678 group,
679 envelope,
680 ctx: processing::Context {
681 project_info: &project_info,
682 ..processing::Context::for_test()
683 },
684 reservoir_counters: &ReservoirCounters::default(),
685 };
686
687 let Ok(Some(Submit::Envelope(new_envelope))) =
688 processor.process(&mut Token::noop(), message).await
689 else {
690 panic!();
691 };
692 let new_envelope = new_envelope.envelope();
693
694 let item = new_envelope
696 .get_item_by(|item| item.ty() == &ItemType::Transaction)
697 .unwrap();
698 let transaction = Annotated::<Event>::from_json_bytes(&item.payload()).unwrap();
699 let context = transaction
700 .value()
701 .unwrap()
702 .context::<ProfileContext>()
703 .unwrap();
704
705 assert_debug_snapshot!(context, @r###"
706 ProfileContext {
707 profile_id: ~,
708 profiler_id: EventId(
709 4c79f60c-1121-4eb3-8604-f4ae0781bfb2,
710 ),
711 }
712 "###);
713 }
714
715 #[cfg(feature = "processing")]
716 #[test]
717 fn test_scrub_profiler_id_should_be_stripped() {
718 let mut contexts = Contexts::new();
719 contexts.add(ProfileContext {
720 profiler_id: Annotated::new(EventId(
721 Uuid::parse_str("52df9022835246eeb317dbd739ccd059").unwrap(),
722 )),
723 ..Default::default()
724 });
725 let mut event: Annotated<Event> = Annotated::new(Event {
726 ty: Annotated::new(EventType::Transaction),
727 start_timestamp: Annotated::new(
728 Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap().into(),
729 ),
730 timestamp: Annotated::new(
731 Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0)
732 .unwrap()
733 .checked_add_signed(Duration::milliseconds(15))
734 .unwrap()
735 .into(),
736 ),
737 contexts: Annotated::new(contexts),
738 ..Default::default()
739 });
740
741 scrub_profiler_id(&mut event);
742
743 let profile_context = get_value!(event.contexts)
744 .unwrap()
745 .get::<ProfileContext>()
746 .unwrap();
747
748 assert!(
749 profile_context
750 .profiler_id
751 .meta()
752 .iter_remarks()
753 .any(|remark| remark.rule_id == *"transaction_duration"
754 && remark.ty == RemarkType::Removed)
755 )
756 }
757
758 #[cfg(feature = "processing")]
759 #[test]
760 fn test_scrub_profiler_id_should_not_be_stripped() {
761 let mut contexts = Contexts::new();
762 contexts.add(ProfileContext {
763 profiler_id: Annotated::new(EventId(
764 Uuid::parse_str("52df9022835246eeb317dbd739ccd059").unwrap(),
765 )),
766 ..Default::default()
767 });
768 let mut event: Annotated<Event> = Annotated::new(Event {
769 ty: Annotated::new(EventType::Transaction),
770 start_timestamp: Annotated::new(
771 Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap().into(),
772 ),
773 timestamp: Annotated::new(
774 Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0)
775 .unwrap()
776 .checked_add_signed(Duration::milliseconds(20))
777 .unwrap()
778 .into(),
779 ),
780 contexts: Annotated::new(contexts),
781 ..Default::default()
782 });
783
784 scrub_profiler_id(&mut event);
785
786 let profile_context = get_value!(event.contexts)
787 .unwrap()
788 .get::<ProfileContext>()
789 .unwrap();
790
791 assert!(
792 !profile_context
793 .profiler_id
794 .meta()
795 .iter_remarks()
796 .any(|remark| remark.rule_id == *"transaction_duration"
797 && remark.ty == RemarkType::Removed)
798 )
799 }
800}