relay_server/services/processor/
profile.rs

1//! Profiles related processor code.
2use relay_dynamic_config::{Feature, GlobalConfig};
3use std::net::IpAddr;
4
5use relay_base_schema::events::EventType;
6use relay_base_schema::project::ProjectId;
7use relay_config::Config;
8use relay_event_schema::protocol::{Contexts, Event, ProfileContext};
9use relay_filter::ProjectFiltersConfig;
10use relay_profiling::{ProfileError, ProfileId};
11use relay_protocol::Annotated;
12#[cfg(feature = "processing")]
13use relay_protocol::{Getter, Remark, RemarkType};
14
15use crate::envelope::{ContentType, Item, ItemType};
16use crate::managed::{ItemAction, TypedEnvelope};
17use crate::services::outcome::{DiscardReason, Outcome};
18use crate::services::processor::{TransactionGroup, event_type, should_filter};
19use crate::services::projects::project::ProjectInfo;
20
21/// Filters out invalid and duplicate profiles.
22///
23/// Returns the profile id of the single remaining profile, if there is one.
24pub fn filter<Group>(
25    managed_envelope: &mut TypedEnvelope<Group>,
26    event: &Annotated<Event>,
27    config: &Config,
28    project_id: ProjectId,
29    project_info: &ProjectInfo,
30) -> Option<ProfileId> {
31    let profiling_disabled = should_filter(config, project_info, Feature::Profiling);
32    let has_transaction = event_type(event) == Some(EventType::Transaction);
33    let keep_unsampled_profiles = true;
34
35    let mut profile_id = None;
36    managed_envelope.retain_items(|item| match item.ty() {
37        // First profile found in the envelope, we'll keep it if metadata are valid.
38        ItemType::Profile if profile_id.is_none() => {
39            if profiling_disabled {
40                return ItemAction::DropSilently;
41            }
42
43            // Drop profile without a transaction in the same envelope,
44            // except if unsampled profiles are allowed for this project.
45            let profile_allowed = has_transaction || (keep_unsampled_profiles && !item.sampled());
46            if !profile_allowed {
47                return ItemAction::DropSilently;
48            }
49
50            match relay_profiling::parse_metadata(&item.payload(), project_id) {
51                Ok(id) => {
52                    profile_id = Some(id);
53                    ItemAction::Keep
54                }
55                Err(err) => ItemAction::Drop(Outcome::Invalid(DiscardReason::Profiling(
56                    relay_profiling::discard_reason(err),
57                ))),
58            }
59        }
60        // We found another profile, we'll drop it.
61        ItemType::Profile => ItemAction::Drop(Outcome::Invalid(DiscardReason::Profiling(
62            relay_profiling::discard_reason(ProfileError::TooManyProfiles),
63        ))),
64        _ => ItemAction::Keep,
65    });
66
67    profile_id
68}
69
70/// Transfers the profile ID from the profile item to the transaction item.
71///
72/// The profile id may be `None` when the envelope does not contain a profile,
73/// in that case the profile context is removed.
74/// Some SDKs send transactions with profile ids but omit the profile in the envelope.
75pub fn transfer_id(event: &mut Annotated<Event>, profile_id: Option<ProfileId>) {
76    let Some(event) = event.value_mut() else {
77        return;
78    };
79
80    match profile_id {
81        Some(profile_id) => {
82            let contexts = event.contexts.get_or_insert_with(Contexts::new);
83            contexts.add(ProfileContext {
84                profile_id: Annotated::new(profile_id),
85                ..ProfileContext::default()
86            });
87        }
88        None => {
89            if let Some(contexts) = event.contexts.value_mut()
90                && let Some(profile_context) = contexts.get_mut::<ProfileContext>()
91            {
92                profile_context.profile_id = Annotated::empty();
93            }
94        }
95    }
96}
97
98/// Strip out the profiler_id from the transaction's profile context if the transaction lasts less than 20ms.
99///
100/// This is necessary because if the transaction lasts less than 19.8ms, we know that the respective
101/// profile data won't have enough samples to be of any use, hence we "unlink" the profile from the transaction.
102#[cfg(feature = "processing")]
103pub fn scrub_profiler_id(event: &mut Annotated<Event>) {
104    let Some(event) = event.value_mut() else {
105        return;
106    };
107    let transaction_duration = event
108        .get_value("event.duration")
109        .and_then(|duration| duration.as_f64());
110
111    if !transaction_duration.is_some_and(|duration| duration < 19.8) {
112        return;
113    }
114    if let Some(contexts) = event.contexts.value_mut().as_mut()
115        && let Some(profiler_id) = contexts
116            .get_mut::<ProfileContext>()
117            .map(|ctx| &mut ctx.profiler_id)
118    {
119        let id = std::mem::take(profiler_id.value_mut());
120        let remark = Remark::new(RemarkType::Removed, "transaction_duration");
121        profiler_id.meta_mut().add_remark(remark);
122        profiler_id.meta_mut().set_original_value(id);
123    }
124}
125
126/// Processes profiles and set the profile ID in the profile context on the transaction if successful.
127pub fn process(
128    managed_envelope: &mut TypedEnvelope<TransactionGroup>,
129    event: &mut Annotated<Event>,
130    global_config: &GlobalConfig,
131    config: &Config,
132    project_info: &ProjectInfo,
133) -> Option<ProfileId> {
134    let client_ip = managed_envelope.envelope().meta().client_addr();
135    let filter_settings = &project_info.config.filter_settings;
136
137    let profiling_enabled = project_info.has_feature(Feature::Profiling);
138    let mut profile_id = None;
139
140    managed_envelope.retain_items(|item| match item.ty() {
141        ItemType::Profile => {
142            if !profiling_enabled {
143                return ItemAction::DropSilently;
144            }
145
146            // There should always be an event/transaction available at this stage.
147            // It is required to expand the profile. If it's missing, drop the item.
148            let Some(event) = event.value() else {
149                return ItemAction::DropSilently;
150            };
151
152            match expand_profile(
153                item,
154                event,
155                config,
156                client_ip,
157                filter_settings,
158                global_config,
159            ) {
160                Ok(id) => {
161                    profile_id = Some(id);
162                    ItemAction::Keep
163                }
164                Err(outcome) => ItemAction::Drop(outcome),
165            }
166        }
167        _ => ItemAction::Keep,
168    });
169
170    profile_id
171}
172
173/// Transfers transaction metadata to profile and check its size.
174fn expand_profile(
175    item: &mut Item,
176    event: &Event,
177    config: &Config,
178    client_ip: Option<IpAddr>,
179    filter_settings: &ProjectFiltersConfig,
180    global_config: &GlobalConfig,
181) -> Result<ProfileId, Outcome> {
182    match relay_profiling::expand_profile(
183        &item.payload(),
184        event,
185        client_ip,
186        filter_settings,
187        global_config,
188    ) {
189        Ok((id, payload)) => {
190            if payload.len() <= config.max_profile_size() {
191                item.set_payload(ContentType::Json, payload);
192                Ok(id)
193            } else {
194                Err(Outcome::Invalid(DiscardReason::Profiling(
195                    relay_profiling::discard_reason(relay_profiling::ProfileError::ExceedSizeLimit),
196                )))
197            }
198        }
199        Err(relay_profiling::ProfileError::Filtered(filter_stat_key)) => {
200            Err(Outcome::Filtered(filter_stat_key))
201        }
202        Err(err) => Err(Outcome::Invalid(DiscardReason::Profiling(
203            relay_profiling::discard_reason(err),
204        ))),
205    }
206}
207
208#[cfg(test)]
209mod tests {
210    #[cfg(feature = "processing")]
211    use chrono::{Duration, TimeZone, Utc};
212    #[cfg(feature = "processing")]
213    use uuid::Uuid;
214
215    #[cfg(feature = "processing")]
216    use insta::assert_debug_snapshot;
217    use relay_cogs::Token;
218    #[cfg(not(feature = "processing"))]
219    use relay_dynamic_config::Feature;
220    use relay_event_schema::protocol::EventId;
221    #[cfg(feature = "processing")]
222    use relay_protocol::get_value;
223    use relay_sampling::evaluation::ReservoirCounters;
224    use relay_system::Addr;
225
226    use crate::envelope::Envelope;
227    use crate::extractors::RequestMeta;
228    use crate::managed::ManagedEnvelope;
229    use crate::processing;
230    #[cfg(feature = "processing")]
231    use crate::services::processor::Submit;
232    use crate::services::processor::{ProcessEnvelopeGrouped, ProcessingGroup};
233    use crate::services::projects::project::ProjectInfo;
234    use crate::testutils::create_test_processor;
235
236    use super::*;
237
238    #[cfg(feature = "processing")]
239    #[tokio::test]
240    async fn test_profile_id_transfered() {
241        let config = Config::from_json_value(serde_json::json!({
242            "processing": {
243                "enabled": true,
244                "kafka_config": []
245            }
246        }))
247        .unwrap();
248        let processor = create_test_processor(config).await;
249        let event_id = EventId::new();
250        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
251            .parse()
252            .unwrap();
253        let request_meta = RequestMeta::new(dsn);
254        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
255
256        // Add a valid transaction item.
257        envelope.add_item({
258            let mut item = Item::new(ItemType::Transaction);
259
260            item.set_payload(
261                ContentType::Json,
262                r#"{
263                    "event_id": "9b73438f70e044ecbd006b7fd15b7373",
264                    "type": "transaction",
265                    "transaction": "/foo/",
266                    "timestamp": 946684810.0,
267                    "start_timestamp": 946684800.0,
268                    "contexts": {
269                        "trace": {
270                        "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
271                        "span_id": "fa90fdead5f74053",
272                        "op": "http.server",
273                        "type": "trace"
274                        }
275                    },
276                    "transaction_info": {
277                        "source": "url"
278                    }
279                }"#,
280            );
281            item
282        });
283
284        // Add a profile to the same envelope.
285        envelope.add_item({
286            let mut item = Item::new(ItemType::Profile);
287            item.set_payload(
288                ContentType::Json,
289                r#"{
290                    "profile_id": "012d836b15bb49d7bbf99e64295d995b",
291                    "version": "1",
292                    "platform": "android",
293                    "os": {"name": "foo", "version": "bar"},
294                    "device": {"architecture": "zap"},
295                    "timestamp": "2023-10-10 00:00:00Z",
296                    "profile": {
297                        "samples":[
298                            {
299                                "stack_id":0,
300                                "elapsed_since_start_ns":1,
301                                "thread_id":1
302                            },
303                            {
304                                "stack_id":0,
305                                "elapsed_since_start_ns":2,
306                                "thread_id":1
307                            }
308                        ],
309                        "stacks":[[0]],
310                        "frames":[{
311                            "function":"main"
312                        }]
313                    },
314                    "transactions": [
315                        {
316                            "id": "9b73438f70e044ecbd006b7fd15b7373",
317                            "name": "/foo/",
318                            "trace_id": "4c79f60c11214eb38604f4ae0781bfb2"
319                        }
320                    ]
321                }"#,
322            );
323            item
324        });
325
326        let mut project_info = ProjectInfo::default();
327        project_info.config.features.0.insert(Feature::Profiling);
328
329        let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
330        assert_eq!(envelopes.len(), 1);
331
332        let (group, envelope) = envelopes.pop().unwrap();
333        let envelope = ManagedEnvelope::new(envelope, Addr::dummy());
334
335        let message = ProcessEnvelopeGrouped {
336            group,
337            envelope,
338            ctx: processing::Context {
339                project_info: &project_info,
340                ..processing::Context::for_test()
341            },
342            reservoir_counters: &ReservoirCounters::default(),
343        };
344
345        let Ok(Some(Submit::Envelope(new_envelope))) =
346            processor.process(&mut Token::noop(), message).await
347        else {
348            panic!();
349        };
350        let new_envelope = new_envelope.envelope();
351
352        // Get the re-serialized context.
353        let item = new_envelope
354            .get_item_by(|item| item.ty() == &ItemType::Transaction)
355            .unwrap();
356        let transaction = Annotated::<Event>::from_json_bytes(&item.payload()).unwrap();
357        let context = transaction
358            .value()
359            .unwrap()
360            .context::<ProfileContext>()
361            .unwrap();
362
363        assert_debug_snapshot!(context, @r###"
364        ProfileContext {
365            profile_id: EventId(
366                012d836b-15bb-49d7-bbf9-9e64295d995b,
367            ),
368            profiler_id: ~,
369        }
370        "###);
371    }
372
373    #[cfg(feature = "processing")]
374    #[tokio::test]
375    async fn test_invalid_profile_id_not_transfered() {
376        // Setup
377        let config = Config::from_json_value(serde_json::json!({
378            "processing": {
379                "enabled": true,
380                "kafka_config": []
381            }
382        }))
383        .unwrap();
384        let processor = create_test_processor(config).await;
385        let event_id = EventId::new();
386        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
387            .parse()
388            .unwrap();
389        let request_meta = RequestMeta::new(dsn);
390        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
391
392        // Add a valid transaction item.
393        envelope.add_item({
394            let mut item = Item::new(ItemType::Transaction);
395
396            item.set_payload(
397                ContentType::Json,
398                r#"{
399                    "event_id": "9b73438f70e044ecbd006b7fd15b7373",
400                    "type": "transaction",
401                    "transaction": "/foo/",
402                    "timestamp": 946684810.0,
403                    "start_timestamp": 946684800.0,
404                    "contexts": {
405                        "trace": {
406                        "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
407                        "span_id": "fa90fdead5f74053",
408                        "op": "http.server",
409                        "type": "trace"
410                        }
411                    },
412                    "transaction_info": {
413                        "source": "url"
414                    }
415                }"#,
416            );
417            item
418        });
419
420        // Add a profile to the same envelope.
421        envelope.add_item({
422            let mut item = Item::new(ItemType::Profile);
423            item.set_payload(
424                ContentType::Json,
425                r#"{
426                    "profile_id": "012d836b15bb49d7bbf99e64295d995b",
427                    "version": "1",
428                    "platform": "android",
429                    "os": {"name": "foo", "version": "bar"},
430                    "device": {"architecture": "zap"},
431                    "timestamp": "2023-10-10 00:00:00Z",
432                    "profile": {
433                        "samples":[
434                            {
435                                "stack_id":0,
436                                "elapsed_since_start_ns":1,
437                                "thread_id":1
438                            },
439                            {
440                                "stack_id":1,
441                                "elapsed_since_start_ns":2,
442                                "thread_id":1
443                            }
444                        ],
445                        "stacks":[[0],[]],
446                        "frames":[{
447                            "function":"main"
448                        }]
449                    },
450                    "transactions": [
451                        {
452                            "id": "9b73438f70e044ecbd006b7fd15b7373",
453                            "name": "/foo/",
454                            "trace_id": "4c79f60c11214eb38604f4ae0781bfb2"
455                        }
456                    ]
457                }"#,
458            );
459            item
460        });
461
462        let mut project_info = ProjectInfo::default();
463        project_info.config.features.0.insert(Feature::Profiling);
464
465        let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
466        assert_eq!(envelopes.len(), 1);
467
468        let (group, envelope) = envelopes.pop().unwrap();
469        let envelope = ManagedEnvelope::new(envelope, Addr::dummy());
470
471        let message = ProcessEnvelopeGrouped {
472            group,
473            envelope,
474            ctx: processing::Context {
475                project_info: &project_info,
476                ..processing::Context::for_test()
477            },
478            reservoir_counters: &ReservoirCounters::default(),
479        };
480
481        let Ok(Some(Submit::Envelope(new_envelope))) =
482            processor.process(&mut Token::noop(), message).await
483        else {
484            panic!();
485        };
486        let new_envelope = new_envelope.envelope();
487
488        // Get the re-serialized context.
489        let item = new_envelope
490            .get_item_by(|item| item.ty() == &ItemType::Transaction)
491            .unwrap();
492        let transaction = Annotated::<Event>::from_json_bytes(&item.payload()).unwrap();
493        let context = transaction
494            .value()
495            .unwrap()
496            .context::<ProfileContext>()
497            .unwrap();
498
499        assert_debug_snapshot!(context, @r###"
500        ProfileContext {
501            profile_id: ~,
502            profiler_id: ~,
503        }
504        "###);
505    }
506
507    #[tokio::test]
508    async fn filter_standalone_profile() {
509        relay_log::init_test!();
510
511        // Setup
512        let processor = create_test_processor(Default::default()).await;
513        let event_id = EventId::new();
514        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
515            .parse()
516            .unwrap();
517        let request_meta = RequestMeta::new(dsn);
518        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
519
520        // Add a profile to the same envelope.
521        envelope.add_item({
522            let mut item = Item::new(ItemType::Profile);
523            item.set_payload(
524                ContentType::Json,
525                r#"{
526                    "profile_id": "012d836b15bb49d7bbf99e64295d995b",
527                    "version": "1",
528                    "platform": "android",
529                    "os": {"name": "foo", "version": "bar"},
530                    "device": {"architecture": "zap"},
531                    "timestamp": "2023-10-10 00:00:00Z"
532                }"#,
533            );
534            item
535        });
536
537        let mut project_info = ProjectInfo::default();
538        project_info.config.features.0.insert(Feature::Profiling);
539
540        let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
541        assert_eq!(envelopes.len(), 1);
542
543        let (group, envelope) = envelopes.pop().unwrap();
544        let envelope = ManagedEnvelope::new(envelope.clone(), Addr::dummy());
545
546        let message = ProcessEnvelopeGrouped {
547            group,
548            envelope,
549            ctx: processing::Context {
550                project_info: &project_info,
551                ..processing::Context::for_test()
552            },
553            reservoir_counters: &ReservoirCounters::default(),
554        };
555
556        let envelope = processor
557            .process(&mut Token::noop(), message)
558            .await
559            .unwrap();
560        assert!(envelope.is_none());
561    }
562
563    #[cfg(feature = "processing")]
564    #[tokio::test]
565    async fn test_profile_id_removed_profiler_id_kept() {
566        // Setup
567        let config = Config::from_json_value(serde_json::json!({
568            "processing": {
569                "enabled": true,
570                "kafka_config": []
571            }
572        }))
573        .unwrap();
574        let processor = create_test_processor(config).await;
575        let event_id = EventId::new();
576        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
577            .parse()
578            .unwrap();
579        let request_meta = RequestMeta::new(dsn);
580        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
581
582        // Add a valid transaction item.
583        envelope.add_item({
584            let mut item = Item::new(ItemType::Transaction);
585
586            item.set_payload(
587                ContentType::Json,
588                r#"{
589                "type": "transaction",
590                "transaction": "/foo/",
591                "timestamp": 946684810.0,
592                "start_timestamp": 946684800.0,
593                "contexts": {
594                    "trace": {
595                        "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
596                        "span_id": "fa90fdead5f74053",
597                        "op": "http.server",
598                        "type": "trace"
599                    },
600                    "profile": {
601                        "profile_id": "4c79f60c11214eb38604f4ae0781bfb2",
602                        "profiler_id": "4c79f60c11214eb38604f4ae0781bfb2",
603                        "type": "profile"
604                    }
605                },
606                "transaction_info": {
607                    "source": "url"
608                }
609            }"#,
610            );
611            item
612        });
613
614        let mut project_info = ProjectInfo::default();
615        project_info.config.features.0.insert(Feature::Profiling);
616
617        let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
618        assert_eq!(envelopes.len(), 1);
619
620        let (group, envelope) = envelopes.pop().unwrap();
621        let envelope = ManagedEnvelope::new(envelope, Addr::dummy());
622
623        let message = ProcessEnvelopeGrouped {
624            group,
625            envelope,
626            ctx: processing::Context {
627                project_info: &project_info,
628                ..processing::Context::for_test()
629            },
630            reservoir_counters: &ReservoirCounters::default(),
631        };
632
633        let Ok(Some(Submit::Envelope(new_envelope))) =
634            processor.process(&mut Token::noop(), message).await
635        else {
636            panic!();
637        };
638        let new_envelope = new_envelope.envelope();
639
640        // Get the re-serialized context.
641        let item = new_envelope
642            .get_item_by(|item| item.ty() == &ItemType::Transaction)
643            .unwrap();
644        let transaction = Annotated::<Event>::from_json_bytes(&item.payload()).unwrap();
645        let context = transaction
646            .value()
647            .unwrap()
648            .context::<ProfileContext>()
649            .unwrap();
650
651        assert_debug_snapshot!(context, @r###"
652        ProfileContext {
653            profile_id: ~,
654            profiler_id: EventId(
655                4c79f60c-1121-4eb3-8604-f4ae0781bfb2,
656            ),
657        }
658        "###);
659    }
660
661    #[cfg(feature = "processing")]
662    #[test]
663    fn test_scrub_profiler_id_should_be_stripped() {
664        let mut contexts = Contexts::new();
665        contexts.add(ProfileContext {
666            profiler_id: Annotated::new(EventId(
667                Uuid::parse_str("52df9022835246eeb317dbd739ccd059").unwrap(),
668            )),
669            ..Default::default()
670        });
671        let mut event: Annotated<Event> = Annotated::new(Event {
672            ty: Annotated::new(EventType::Transaction),
673            start_timestamp: Annotated::new(
674                Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap().into(),
675            ),
676            timestamp: Annotated::new(
677                Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0)
678                    .unwrap()
679                    .checked_add_signed(Duration::milliseconds(15))
680                    .unwrap()
681                    .into(),
682            ),
683            contexts: Annotated::new(contexts),
684            ..Default::default()
685        });
686
687        scrub_profiler_id(&mut event);
688
689        let profile_context = get_value!(event.contexts)
690            .unwrap()
691            .get::<ProfileContext>()
692            .unwrap();
693
694        assert!(
695            profile_context
696                .profiler_id
697                .meta()
698                .iter_remarks()
699                .any(|remark| remark.rule_id == *"transaction_duration"
700                    && remark.ty == RemarkType::Removed)
701        )
702    }
703
704    #[cfg(feature = "processing")]
705    #[test]
706    fn test_scrub_profiler_id_should_not_be_stripped() {
707        let mut contexts = Contexts::new();
708        contexts.add(ProfileContext {
709            profiler_id: Annotated::new(EventId(
710                Uuid::parse_str("52df9022835246eeb317dbd739ccd059").unwrap(),
711            )),
712            ..Default::default()
713        });
714        let mut event: Annotated<Event> = Annotated::new(Event {
715            ty: Annotated::new(EventType::Transaction),
716            start_timestamp: Annotated::new(
717                Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap().into(),
718            ),
719            timestamp: Annotated::new(
720                Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0)
721                    .unwrap()
722                    .checked_add_signed(Duration::milliseconds(20))
723                    .unwrap()
724                    .into(),
725            ),
726            contexts: Annotated::new(contexts),
727            ..Default::default()
728        });
729
730        scrub_profiler_id(&mut event);
731
732        let profile_context = get_value!(event.contexts)
733            .unwrap()
734            .get::<ProfileContext>()
735            .unwrap();
736
737        assert!(
738            !profile_context
739                .profiler_id
740                .meta()
741                .iter_remarks()
742                .any(|remark| remark.rule_id == *"transaction_duration"
743                    && remark.ty == RemarkType::Removed)
744        )
745    }
746}