relay_server/services/processor/
profile.rs

1//! Profiles related processor code.
2use relay_dynamic_config::{Feature, GlobalConfig};
3use std::net::IpAddr;
4use std::sync::Arc;
5
6use relay_base_schema::events::EventType;
7use relay_base_schema::project::ProjectId;
8use relay_config::Config;
9use relay_event_schema::protocol::{Contexts, Event, ProfileContext};
10use relay_filter::ProjectFiltersConfig;
11use relay_profiling::{ProfileError, ProfileId};
12use relay_protocol::Annotated;
13#[cfg(feature = "processing")]
14use relay_protocol::{Getter, Remark, RemarkType};
15
16use crate::envelope::{ContentType, Item, ItemType};
17use crate::managed::{ItemAction, TypedEnvelope};
18use crate::services::outcome::{DiscardReason, Outcome};
19use crate::services::processor::{TransactionGroup, event_type, should_filter};
20use crate::services::projects::project::ProjectInfo;
21
22/// Filters out invalid and duplicate profiles.
23///
24/// Returns the profile id of the single remaining profile, if there is one.
25pub fn filter<Group>(
26    managed_envelope: &mut TypedEnvelope<Group>,
27    event: &Annotated<Event>,
28    config: Arc<Config>,
29    project_id: ProjectId,
30    project_info: &ProjectInfo,
31) -> Option<ProfileId> {
32    let profiling_disabled = should_filter(&config, project_info, Feature::Profiling);
33    let has_transaction = event_type(event) == Some(EventType::Transaction);
34    let keep_unsampled_profiles = true;
35
36    let mut profile_id = None;
37    managed_envelope.retain_items(|item| match item.ty() {
38        // First profile found in the envelope, we'll keep it if metadata are valid.
39        ItemType::Profile if profile_id.is_none() => {
40            if profiling_disabled {
41                return ItemAction::DropSilently;
42            }
43
44            // Drop profile without a transaction in the same envelope,
45            // except if unsampled profiles are allowed for this project.
46            let profile_allowed = has_transaction || (keep_unsampled_profiles && !item.sampled());
47            if !profile_allowed {
48                return ItemAction::DropSilently;
49            }
50
51            match relay_profiling::parse_metadata(&item.payload(), project_id) {
52                Ok(id) => {
53                    profile_id = Some(id);
54                    ItemAction::Keep
55                }
56                Err(err) => ItemAction::Drop(Outcome::Invalid(DiscardReason::Profiling(
57                    relay_profiling::discard_reason(err),
58                ))),
59            }
60        }
61        // We found another profile, we'll drop it.
62        ItemType::Profile => ItemAction::Drop(Outcome::Invalid(DiscardReason::Profiling(
63            relay_profiling::discard_reason(ProfileError::TooManyProfiles),
64        ))),
65        _ => ItemAction::Keep,
66    });
67
68    profile_id
69}
70
71/// Transfers the profile ID from the profile item to the transaction item.
72///
73/// The profile id may be `None` when the envelope does not contain a profile,
74/// in that case the profile context is removed.
75/// Some SDKs send transactions with profile ids but omit the profile in the envelope.
76pub fn transfer_id(event: &mut Annotated<Event>, profile_id: Option<ProfileId>) {
77    let Some(event) = event.value_mut() else {
78        return;
79    };
80
81    match profile_id {
82        Some(profile_id) => {
83            let contexts = event.contexts.get_or_insert_with(Contexts::new);
84            contexts.add(ProfileContext {
85                profile_id: Annotated::new(profile_id),
86                ..ProfileContext::default()
87            });
88        }
89        None => {
90            if let Some(contexts) = event.contexts.value_mut()
91                && let Some(profile_context) = contexts.get_mut::<ProfileContext>()
92            {
93                profile_context.profile_id = Annotated::empty();
94            }
95        }
96    }
97}
98
99/// Strip out the profiler_id from the transaction's profile context if the transaction lasts less than 20ms.
100///
101/// This is necessary because if the transaction lasts less than 19.8ms, we know that the respective
102/// profile data won't have enough samples to be of any use, hence we "unlink" the profile from the transaction.
103#[cfg(feature = "processing")]
104pub fn scrub_profiler_id(event: &mut Annotated<Event>) {
105    let Some(event) = event.value_mut() else {
106        return;
107    };
108    let transaction_duration = event
109        .get_value("event.duration")
110        .and_then(|duration| duration.as_f64());
111
112    if !transaction_duration.is_some_and(|duration| duration < 19.8) {
113        return;
114    }
115    if let Some(contexts) = event.contexts.value_mut().as_mut()
116        && let Some(profiler_id) = contexts
117            .get_mut::<ProfileContext>()
118            .map(|ctx| &mut ctx.profiler_id)
119    {
120        let id = std::mem::take(profiler_id.value_mut());
121        let remark = Remark::new(RemarkType::Removed, "transaction_duration");
122        profiler_id.meta_mut().add_remark(remark);
123        profiler_id.meta_mut().set_original_value(id);
124    }
125}
126
127/// Processes profiles and set the profile ID in the profile context on the transaction if successful.
128pub fn process(
129    managed_envelope: &mut TypedEnvelope<TransactionGroup>,
130    event: &mut Annotated<Event>,
131    global_config: &GlobalConfig,
132    config: Arc<Config>,
133    project_info: Arc<ProjectInfo>,
134) -> Option<ProfileId> {
135    let client_ip = managed_envelope.envelope().meta().client_addr();
136    let filter_settings = &project_info.config.filter_settings;
137
138    let profiling_enabled = project_info.has_feature(Feature::Profiling);
139    let mut profile_id = None;
140
141    managed_envelope.retain_items(|item| match item.ty() {
142        ItemType::Profile => {
143            if !profiling_enabled {
144                return ItemAction::DropSilently;
145            }
146
147            // There should always be an event/transaction available at this stage.
148            // It is required to expand the profile. If it's missing, drop the item.
149            let Some(event) = event.value() else {
150                return ItemAction::DropSilently;
151            };
152
153            match expand_profile(
154                item,
155                event,
156                &config,
157                client_ip,
158                filter_settings,
159                global_config,
160            ) {
161                Ok(id) => {
162                    profile_id = Some(id);
163                    ItemAction::Keep
164                }
165                Err(outcome) => ItemAction::Drop(outcome),
166            }
167        }
168        _ => ItemAction::Keep,
169    });
170
171    profile_id
172}
173
174/// Transfers transaction metadata to profile and check its size.
175fn expand_profile(
176    item: &mut Item,
177    event: &Event,
178    config: &Config,
179    client_ip: Option<IpAddr>,
180    filter_settings: &ProjectFiltersConfig,
181    global_config: &GlobalConfig,
182) -> Result<ProfileId, Outcome> {
183    match relay_profiling::expand_profile(
184        &item.payload(),
185        event,
186        client_ip,
187        filter_settings,
188        global_config,
189    ) {
190        Ok((id, payload)) => {
191            if payload.len() <= config.max_profile_size() {
192                item.set_payload(ContentType::Json, payload);
193                Ok(id)
194            } else {
195                Err(Outcome::Invalid(DiscardReason::Profiling(
196                    relay_profiling::discard_reason(relay_profiling::ProfileError::ExceedSizeLimit),
197                )))
198            }
199        }
200        Err(relay_profiling::ProfileError::Filtered(filter_stat_key)) => {
201            Err(Outcome::Filtered(filter_stat_key))
202        }
203        Err(err) => Err(Outcome::Invalid(DiscardReason::Profiling(
204            relay_profiling::discard_reason(err),
205        ))),
206    }
207}
208
209#[cfg(test)]
210mod tests {
211    #[cfg(feature = "processing")]
212    use chrono::{Duration, TimeZone, Utc};
213    use std::sync::Arc;
214    #[cfg(feature = "processing")]
215    use uuid::Uuid;
216
217    #[cfg(feature = "processing")]
218    use insta::assert_debug_snapshot;
219    use relay_cogs::Token;
220    #[cfg(not(feature = "processing"))]
221    use relay_dynamic_config::Feature;
222    use relay_event_schema::protocol::EventId;
223    #[cfg(feature = "processing")]
224    use relay_protocol::get_value;
225    use relay_sampling::evaluation::ReservoirCounters;
226    use relay_system::Addr;
227
228    use crate::envelope::Envelope;
229    use crate::extractors::RequestMeta;
230    use crate::managed::ManagedEnvelope;
231    #[cfg(feature = "processing")]
232    use crate::services::processor::Submit;
233    use crate::services::processor::{ProcessEnvelopeGrouped, ProcessingGroup};
234    use crate::services::projects::project::ProjectInfo;
235    use crate::testutils::create_test_processor;
236
237    use super::*;
238
239    #[cfg(feature = "processing")]
240    #[tokio::test]
241    async fn test_profile_id_transfered() {
242        // Setup
243
244        let config = Config::from_json_value(serde_json::json!({
245            "processing": {
246                "enabled": true,
247                "kafka_config": []
248            }
249        }))
250        .unwrap();
251        let processor = create_test_processor(config).await;
252        let event_id = EventId::new();
253        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
254            .parse()
255            .unwrap();
256        let request_meta = RequestMeta::new(dsn);
257        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
258
259        // Add a valid transaction item.
260        envelope.add_item({
261            let mut item = Item::new(ItemType::Transaction);
262
263            item.set_payload(
264                ContentType::Json,
265                r#"{
266                    "event_id": "9b73438f70e044ecbd006b7fd15b7373",
267                    "type": "transaction",
268                    "transaction": "/foo/",
269                    "timestamp": 946684810.0,
270                    "start_timestamp": 946684800.0,
271                    "contexts": {
272                        "trace": {
273                        "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
274                        "span_id": "fa90fdead5f74053",
275                        "op": "http.server",
276                        "type": "trace"
277                        }
278                    },
279                    "transaction_info": {
280                        "source": "url"
281                    }
282                }"#,
283            );
284            item
285        });
286
287        // Add a profile to the same envelope.
288        envelope.add_item({
289            let mut item = Item::new(ItemType::Profile);
290            item.set_payload(
291                ContentType::Json,
292                r#"{
293                    "profile_id": "012d836b15bb49d7bbf99e64295d995b",
294                    "version": "1",
295                    "platform": "android",
296                    "os": {"name": "foo", "version": "bar"},
297                    "device": {"architecture": "zap"},
298                    "timestamp": "2023-10-10 00:00:00Z",
299                    "profile": {
300                        "samples":[
301                            {
302                                "stack_id":0,
303                                "elapsed_since_start_ns":1,
304                                "thread_id":1
305                            },
306                            {
307                                "stack_id":0,
308                                "elapsed_since_start_ns":2,
309                                "thread_id":1
310                            }
311                        ],
312                        "stacks":[[0]],
313                        "frames":[{
314                            "function":"main"
315                        }]
316                    },
317                    "transactions": [
318                        {
319                            "id": "9b73438f70e044ecbd006b7fd15b7373",
320                            "name": "/foo/",
321                            "trace_id": "4c79f60c11214eb38604f4ae0781bfb2"
322                        }
323                    ]
324                }"#,
325            );
326            item
327        });
328
329        let mut project_state = ProjectInfo::default();
330        project_state.config.features.0.insert(Feature::Profiling);
331
332        let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
333        assert_eq!(envelopes.len(), 1);
334
335        let (group, envelope) = envelopes.pop().unwrap();
336        let envelope = ManagedEnvelope::new(envelope, Addr::dummy());
337
338        let message = ProcessEnvelopeGrouped {
339            group,
340            envelope,
341            project_info: Arc::new(project_state),
342            rate_limits: Default::default(),
343            sampling_project_info: None,
344            reservoir_counters: ReservoirCounters::default(),
345        };
346
347        let Ok(Some(Submit::Envelope(new_envelope))) =
348            processor.process(&mut Token::noop(), message).await
349        else {
350            panic!();
351        };
352        let new_envelope = new_envelope.envelope();
353
354        // Get the re-serialized context.
355        let item = new_envelope
356            .get_item_by(|item| item.ty() == &ItemType::Transaction)
357            .unwrap();
358        let transaction = Annotated::<Event>::from_json_bytes(&item.payload()).unwrap();
359        let context = transaction
360            .value()
361            .unwrap()
362            .context::<ProfileContext>()
363            .unwrap();
364
365        assert_debug_snapshot!(context, @r###"
366        ProfileContext {
367            profile_id: EventId(
368                012d836b-15bb-49d7-bbf9-9e64295d995b,
369            ),
370            profiler_id: ~,
371        }
372        "###);
373    }
374
375    #[cfg(feature = "processing")]
376    #[tokio::test]
377    async fn test_invalid_profile_id_not_transfered() {
378        // Setup
379        let config = Config::from_json_value(serde_json::json!({
380            "processing": {
381                "enabled": true,
382                "kafka_config": []
383            }
384        }))
385        .unwrap();
386        let processor = create_test_processor(config).await;
387        let event_id = EventId::new();
388        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
389            .parse()
390            .unwrap();
391        let request_meta = RequestMeta::new(dsn);
392        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
393
394        // Add a valid transaction item.
395        envelope.add_item({
396            let mut item = Item::new(ItemType::Transaction);
397
398            item.set_payload(
399                ContentType::Json,
400                r#"{
401                    "event_id": "9b73438f70e044ecbd006b7fd15b7373",
402                    "type": "transaction",
403                    "transaction": "/foo/",
404                    "timestamp": 946684810.0,
405                    "start_timestamp": 946684800.0,
406                    "contexts": {
407                        "trace": {
408                        "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
409                        "span_id": "fa90fdead5f74053",
410                        "op": "http.server",
411                        "type": "trace"
412                        }
413                    },
414                    "transaction_info": {
415                        "source": "url"
416                    }
417                }"#,
418            );
419            item
420        });
421
422        // Add a profile to the same envelope.
423        envelope.add_item({
424            let mut item = Item::new(ItemType::Profile);
425            item.set_payload(
426                ContentType::Json,
427                r#"{
428                    "profile_id": "012d836b15bb49d7bbf99e64295d995b",
429                    "version": "1",
430                    "platform": "android",
431                    "os": {"name": "foo", "version": "bar"},
432                    "device": {"architecture": "zap"},
433                    "timestamp": "2023-10-10 00:00:00Z",
434                    "profile": {
435                        "samples":[
436                            {
437                                "stack_id":0,
438                                "elapsed_since_start_ns":1,
439                                "thread_id":1
440                            },
441                            {
442                                "stack_id":1,
443                                "elapsed_since_start_ns":2,
444                                "thread_id":1
445                            }
446                        ],
447                        "stacks":[[0],[]],
448                        "frames":[{
449                            "function":"main"
450                        }]
451                    },
452                    "transactions": [
453                        {
454                            "id": "9b73438f70e044ecbd006b7fd15b7373",
455                            "name": "/foo/",
456                            "trace_id": "4c79f60c11214eb38604f4ae0781bfb2"
457                        }
458                    ]
459                }"#,
460            );
461            item
462        });
463
464        let mut project_info = ProjectInfo::default();
465        project_info.config.features.0.insert(Feature::Profiling);
466
467        let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
468        assert_eq!(envelopes.len(), 1);
469
470        let (group, envelope) = envelopes.pop().unwrap();
471        let envelope = ManagedEnvelope::new(envelope, Addr::dummy());
472
473        let message = ProcessEnvelopeGrouped {
474            group,
475            envelope,
476            project_info: Arc::new(project_info),
477            rate_limits: Default::default(),
478            sampling_project_info: None,
479            reservoir_counters: ReservoirCounters::default(),
480        };
481
482        let Ok(Some(Submit::Envelope(new_envelope))) =
483            processor.process(&mut Token::noop(), message).await
484        else {
485            panic!();
486        };
487        let new_envelope = new_envelope.envelope();
488
489        // Get the re-serialized context.
490        let item = new_envelope
491            .get_item_by(|item| item.ty() == &ItemType::Transaction)
492            .unwrap();
493        let transaction = Annotated::<Event>::from_json_bytes(&item.payload()).unwrap();
494        let context = transaction
495            .value()
496            .unwrap()
497            .context::<ProfileContext>()
498            .unwrap();
499
500        assert_debug_snapshot!(context, @r###"
501        ProfileContext {
502            profile_id: ~,
503            profiler_id: ~,
504        }
505        "###);
506    }
507
508    #[tokio::test]
509    async fn filter_standalone_profile() {
510        relay_log::init_test!();
511
512        // Setup
513        let processor = create_test_processor(Default::default()).await;
514        let event_id = EventId::new();
515        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
516            .parse()
517            .unwrap();
518        let request_meta = RequestMeta::new(dsn);
519        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
520
521        // Add a profile to the same envelope.
522        envelope.add_item({
523            let mut item = Item::new(ItemType::Profile);
524            item.set_payload(
525                ContentType::Json,
526                r#"{
527                    "profile_id": "012d836b15bb49d7bbf99e64295d995b",
528                    "version": "1",
529                    "platform": "android",
530                    "os": {"name": "foo", "version": "bar"},
531                    "device": {"architecture": "zap"},
532                    "timestamp": "2023-10-10 00:00:00Z"
533                }"#,
534            );
535            item
536        });
537
538        let mut project_state = ProjectInfo::default();
539        project_state.config.features.0.insert(Feature::Profiling);
540
541        let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
542        assert_eq!(envelopes.len(), 1);
543
544        let (group, envelope) = envelopes.pop().unwrap();
545        let envelope = ManagedEnvelope::new(envelope.clone(), Addr::dummy());
546
547        let message = ProcessEnvelopeGrouped {
548            group,
549            envelope,
550            project_info: Arc::new(project_state),
551            rate_limits: Default::default(),
552            sampling_project_info: None,
553            reservoir_counters: ReservoirCounters::default(),
554        };
555
556        let envelope = processor
557            .process(&mut Token::noop(), message)
558            .await
559            .unwrap();
560        assert!(envelope.is_none());
561    }
562
563    #[cfg(feature = "processing")]
564    #[tokio::test]
565    async fn test_profile_id_removed_profiler_id_kept() {
566        // Setup
567        let config = Config::from_json_value(serde_json::json!({
568            "processing": {
569                "enabled": true,
570                "kafka_config": []
571            }
572        }))
573        .unwrap();
574        let processor = create_test_processor(config).await;
575        let event_id = EventId::new();
576        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
577            .parse()
578            .unwrap();
579        let request_meta = RequestMeta::new(dsn);
580        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
581
582        // Add a valid transaction item.
583        envelope.add_item({
584            let mut item = Item::new(ItemType::Transaction);
585
586            item.set_payload(
587                ContentType::Json,
588                r#"{
589                "type": "transaction",
590                "transaction": "/foo/",
591                "timestamp": 946684810.0,
592                "start_timestamp": 946684800.0,
593                "contexts": {
594                    "trace": {
595                        "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
596                        "span_id": "fa90fdead5f74053",
597                        "op": "http.server",
598                        "type": "trace"
599                    },
600                    "profile": {
601                        "profile_id": "4c79f60c11214eb38604f4ae0781bfb2",
602                        "profiler_id": "4c79f60c11214eb38604f4ae0781bfb2",
603                        "type": "profile"
604                    }
605                },
606                "transaction_info": {
607                    "source": "url"
608                }
609            }"#,
610            );
611            item
612        });
613
614        let mut project_state = ProjectInfo::default();
615        project_state.config.features.0.insert(Feature::Profiling);
616
617        let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
618        assert_eq!(envelopes.len(), 1);
619
620        let (group, envelope) = envelopes.pop().unwrap();
621        let envelope = ManagedEnvelope::new(envelope, Addr::dummy());
622
623        let message = ProcessEnvelopeGrouped {
624            group,
625            envelope,
626            project_info: Arc::new(project_state),
627            rate_limits: Default::default(),
628            sampling_project_info: None,
629            reservoir_counters: ReservoirCounters::default(),
630        };
631
632        let Ok(Some(Submit::Envelope(new_envelope))) =
633            processor.process(&mut Token::noop(), message).await
634        else {
635            panic!();
636        };
637        let new_envelope = new_envelope.envelope();
638
639        // Get the re-serialized context.
640        let item = new_envelope
641            .get_item_by(|item| item.ty() == &ItemType::Transaction)
642            .unwrap();
643        let transaction = Annotated::<Event>::from_json_bytes(&item.payload()).unwrap();
644        let context = transaction
645            .value()
646            .unwrap()
647            .context::<ProfileContext>()
648            .unwrap();
649
650        assert_debug_snapshot!(context, @r###"
651        ProfileContext {
652            profile_id: ~,
653            profiler_id: EventId(
654                4c79f60c-1121-4eb3-8604-f4ae0781bfb2,
655            ),
656        }
657        "###);
658    }
659
660    #[cfg(feature = "processing")]
661    #[test]
662    fn test_scrub_profiler_id_should_be_stripped() {
663        let mut contexts = Contexts::new();
664        contexts.add(ProfileContext {
665            profiler_id: Annotated::new(EventId(
666                Uuid::parse_str("52df9022835246eeb317dbd739ccd059").unwrap(),
667            )),
668            ..Default::default()
669        });
670        let mut event: Annotated<Event> = Annotated::new(Event {
671            ty: Annotated::new(EventType::Transaction),
672            start_timestamp: Annotated::new(
673                Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap().into(),
674            ),
675            timestamp: Annotated::new(
676                Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0)
677                    .unwrap()
678                    .checked_add_signed(Duration::milliseconds(15))
679                    .unwrap()
680                    .into(),
681            ),
682            contexts: Annotated::new(contexts),
683            ..Default::default()
684        });
685
686        scrub_profiler_id(&mut event);
687
688        let profile_context = get_value!(event.contexts)
689            .unwrap()
690            .get::<ProfileContext>()
691            .unwrap();
692
693        assert!(
694            profile_context
695                .profiler_id
696                .meta()
697                .iter_remarks()
698                .any(|remark| remark.rule_id == *"transaction_duration"
699                    && remark.ty == RemarkType::Removed)
700        )
701    }
702
703    #[cfg(feature = "processing")]
704    #[test]
705    fn test_scrub_profiler_id_should_not_be_stripped() {
706        let mut contexts = Contexts::new();
707        contexts.add(ProfileContext {
708            profiler_id: Annotated::new(EventId(
709                Uuid::parse_str("52df9022835246eeb317dbd739ccd059").unwrap(),
710            )),
711            ..Default::default()
712        });
713        let mut event: Annotated<Event> = Annotated::new(Event {
714            ty: Annotated::new(EventType::Transaction),
715            start_timestamp: Annotated::new(
716                Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap().into(),
717            ),
718            timestamp: Annotated::new(
719                Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0)
720                    .unwrap()
721                    .checked_add_signed(Duration::milliseconds(20))
722                    .unwrap()
723                    .into(),
724            ),
725            contexts: Annotated::new(contexts),
726            ..Default::default()
727        });
728
729        scrub_profiler_id(&mut event);
730
731        let profile_context = get_value!(event.contexts)
732            .unwrap()
733            .get::<ProfileContext>()
734            .unwrap();
735
736        assert!(
737            !profile_context
738                .profiler_id
739                .meta()
740                .iter_remarks()
741                .any(|remark| remark.rule_id == *"transaction_duration"
742                    && remark.ty == RemarkType::Removed)
743        )
744    }
745}