relay_server/services/processor/
profile.rs

1//! Profiles related processor code.
2use relay_dynamic_config::{Feature, GlobalConfig};
3use std::net::IpAddr;
4use std::sync::Arc;
5
6use relay_base_schema::events::EventType;
7use relay_base_schema::project::ProjectId;
8use relay_config::Config;
9use relay_event_schema::protocol::{Contexts, Event, ProfileContext};
10use relay_filter::ProjectFiltersConfig;
11use relay_profiling::{ProfileError, ProfileId};
12use relay_protocol::Annotated;
13#[cfg(feature = "processing")]
14use relay_protocol::{Getter, Remark, RemarkType};
15
16use crate::envelope::{ContentType, Item, ItemType};
17use crate::managed::{ItemAction, TypedEnvelope};
18use crate::services::outcome::{DiscardReason, Outcome};
19use crate::services::processor::{TransactionGroup, event_type, should_filter};
20use crate::services::projects::project::ProjectInfo;
21
22/// Filters out invalid and duplicate profiles.
23///
24/// Returns the profile id of the single remaining profile, if there is one.
25pub fn filter<Group>(
26    managed_envelope: &mut TypedEnvelope<Group>,
27    event: &Annotated<Event>,
28    config: Arc<Config>,
29    project_id: ProjectId,
30    project_info: &ProjectInfo,
31) -> Option<ProfileId> {
32    let profiling_disabled = should_filter(&config, project_info, Feature::Profiling);
33    let has_transaction = event_type(event) == Some(EventType::Transaction);
34    let keep_unsampled_profiles = true;
35
36    let mut profile_id = None;
37    managed_envelope.retain_items(|item| match item.ty() {
38        // First profile found in the envelope, we'll keep it if metadata are valid.
39        ItemType::Profile if profile_id.is_none() => {
40            if profiling_disabled {
41                return ItemAction::DropSilently;
42            }
43
44            // Drop profile without a transaction in the same envelope,
45            // except if unsampled profiles are allowed for this project.
46            let profile_allowed = has_transaction || (keep_unsampled_profiles && !item.sampled());
47            if !profile_allowed {
48                return ItemAction::DropSilently;
49            }
50
51            match relay_profiling::parse_metadata(&item.payload(), project_id) {
52                Ok(id) => {
53                    profile_id = Some(id);
54                    ItemAction::Keep
55                }
56                Err(err) => ItemAction::Drop(Outcome::Invalid(DiscardReason::Profiling(
57                    relay_profiling::discard_reason(err),
58                ))),
59            }
60        }
61        // We found another profile, we'll drop it.
62        ItemType::Profile => ItemAction::Drop(Outcome::Invalid(DiscardReason::Profiling(
63            relay_profiling::discard_reason(ProfileError::TooManyProfiles),
64        ))),
65        _ => ItemAction::Keep,
66    });
67
68    profile_id
69}
70
71/// Transfers the profile ID from the profile item to the transaction item.
72///
73/// The profile id may be `None` when the envelope does not contain a profile,
74/// in that case the profile context is removed.
75/// Some SDKs send transactions with profile ids but omit the profile in the envelope.
76pub fn transfer_id(event: &mut Annotated<Event>, profile_id: Option<ProfileId>) {
77    let Some(event) = event.value_mut() else {
78        return;
79    };
80
81    match profile_id {
82        Some(profile_id) => {
83            let contexts = event.contexts.get_or_insert_with(Contexts::new);
84            contexts.add(ProfileContext {
85                profile_id: Annotated::new(profile_id),
86                ..ProfileContext::default()
87            });
88        }
89        None => {
90            if let Some(contexts) = event.contexts.value_mut() {
91                if let Some(profile_context) = contexts.get_mut::<ProfileContext>() {
92                    profile_context.profile_id = Annotated::empty();
93                }
94            }
95        }
96    }
97}
98
99/// Strip out the profiler_id from the transaction's profile context if the transaction lasts less than 20ms.
100///
101/// This is necessary because if the transaction lasts less than 19.8ms, we know that the respective
102/// profile data won't have enough samples to be of any use, hence we "unlink" the profile from the transaction.
103#[cfg(feature = "processing")]
104pub fn scrub_profiler_id(event: &mut Annotated<Event>) {
105    let Some(event) = event.value_mut() else {
106        return;
107    };
108    let transaction_duration = event
109        .get_value("event.duration")
110        .and_then(|duration| duration.as_f64());
111
112    if !transaction_duration.is_some_and(|duration| duration < 19.8) {
113        return;
114    }
115    if let Some(contexts) = event.contexts.value_mut().as_mut() {
116        if let Some(profiler_id) = contexts
117            .get_mut::<ProfileContext>()
118            .map(|ctx| &mut ctx.profiler_id)
119        {
120            let id = std::mem::take(profiler_id.value_mut());
121            let remark = Remark::new(RemarkType::Removed, "transaction_duration");
122            profiler_id.meta_mut().add_remark(remark);
123            profiler_id.meta_mut().set_original_value(id);
124        }
125    }
126}
127
128/// Processes profiles and set the profile ID in the profile context on the transaction if successful.
129pub fn process(
130    managed_envelope: &mut TypedEnvelope<TransactionGroup>,
131    event: &mut Annotated<Event>,
132    global_config: &GlobalConfig,
133    config: Arc<Config>,
134    project_info: Arc<ProjectInfo>,
135) -> Option<ProfileId> {
136    let client_ip = managed_envelope.envelope().meta().client_addr();
137    let filter_settings = &project_info.config.filter_settings;
138
139    let profiling_enabled = project_info.has_feature(Feature::Profiling);
140    let mut profile_id = None;
141
142    managed_envelope.retain_items(|item| match item.ty() {
143        ItemType::Profile => {
144            if !profiling_enabled {
145                return ItemAction::DropSilently;
146            }
147
148            // There should always be an event/transaction available at this stage.
149            // It is required to expand the profile. If it's missing, drop the item.
150            let Some(event) = event.value() else {
151                return ItemAction::DropSilently;
152            };
153
154            match expand_profile(
155                item,
156                event,
157                &config,
158                client_ip,
159                filter_settings,
160                global_config,
161            ) {
162                Ok(id) => {
163                    profile_id = Some(id);
164                    ItemAction::Keep
165                }
166                Err(outcome) => ItemAction::Drop(outcome),
167            }
168        }
169        _ => ItemAction::Keep,
170    });
171
172    profile_id
173}
174
175/// Transfers transaction metadata to profile and check its size.
176fn expand_profile(
177    item: &mut Item,
178    event: &Event,
179    config: &Config,
180    client_ip: Option<IpAddr>,
181    filter_settings: &ProjectFiltersConfig,
182    global_config: &GlobalConfig,
183) -> Result<ProfileId, Outcome> {
184    match relay_profiling::expand_profile(
185        &item.payload(),
186        event,
187        client_ip,
188        filter_settings,
189        global_config,
190    ) {
191        Ok((id, payload)) => {
192            if payload.len() <= config.max_profile_size() {
193                item.set_payload(ContentType::Json, payload);
194                Ok(id)
195            } else {
196                Err(Outcome::Invalid(DiscardReason::Profiling(
197                    relay_profiling::discard_reason(relay_profiling::ProfileError::ExceedSizeLimit),
198                )))
199            }
200        }
201        Err(relay_profiling::ProfileError::Filtered(filter_stat_key)) => {
202            Err(Outcome::Filtered(filter_stat_key))
203        }
204        Err(err) => Err(Outcome::Invalid(DiscardReason::Profiling(
205            relay_profiling::discard_reason(err),
206        ))),
207    }
208}
209
210#[cfg(test)]
211mod tests {
212    #[cfg(feature = "processing")]
213    use chrono::{Duration, TimeZone, Utc};
214    use std::sync::Arc;
215    #[cfg(feature = "processing")]
216    use uuid::Uuid;
217
218    #[cfg(feature = "processing")]
219    use insta::assert_debug_snapshot;
220    use relay_cogs::Token;
221    #[cfg(not(feature = "processing"))]
222    use relay_dynamic_config::Feature;
223    use relay_event_schema::protocol::EventId;
224    #[cfg(feature = "processing")]
225    use relay_protocol::get_value;
226    use relay_sampling::evaluation::ReservoirCounters;
227    use relay_system::Addr;
228
229    use crate::envelope::Envelope;
230    use crate::extractors::RequestMeta;
231    use crate::managed::ManagedEnvelope;
232    #[cfg(feature = "processing")]
233    use crate::services::processor::Submit;
234    use crate::services::processor::{ProcessEnvelopeGrouped, ProcessingGroup};
235    use crate::services::projects::project::ProjectInfo;
236    use crate::testutils::create_test_processor;
237
238    use super::*;
239
240    #[cfg(feature = "processing")]
241    #[tokio::test]
242    async fn test_profile_id_transfered() {
243        // Setup
244
245        let config = Config::from_json_value(serde_json::json!({
246            "processing": {
247                "enabled": true,
248                "kafka_config": []
249            }
250        }))
251        .unwrap();
252        let processor = create_test_processor(config).await;
253        let event_id = EventId::new();
254        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
255            .parse()
256            .unwrap();
257        let request_meta = RequestMeta::new(dsn);
258        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
259
260        // Add a valid transaction item.
261        envelope.add_item({
262            let mut item = Item::new(ItemType::Transaction);
263
264            item.set_payload(
265                ContentType::Json,
266                r#"{
267                    "event_id": "9b73438f70e044ecbd006b7fd15b7373",
268                    "type": "transaction",
269                    "transaction": "/foo/",
270                    "timestamp": 946684810.0,
271                    "start_timestamp": 946684800.0,
272                    "contexts": {
273                        "trace": {
274                        "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
275                        "span_id": "fa90fdead5f74053",
276                        "op": "http.server",
277                        "type": "trace"
278                        }
279                    },
280                    "transaction_info": {
281                        "source": "url"
282                    }
283                }"#,
284            );
285            item
286        });
287
288        // Add a profile to the same envelope.
289        envelope.add_item({
290            let mut item = Item::new(ItemType::Profile);
291            item.set_payload(
292                ContentType::Json,
293                r#"{
294                    "profile_id": "012d836b15bb49d7bbf99e64295d995b",
295                    "version": "1",
296                    "platform": "android",
297                    "os": {"name": "foo", "version": "bar"},
298                    "device": {"architecture": "zap"},
299                    "timestamp": "2023-10-10 00:00:00Z",
300                    "profile": {
301                        "samples":[
302                            {
303                                "stack_id":0,
304                                "elapsed_since_start_ns":1,
305                                "thread_id":1
306                            },
307                            {
308                                "stack_id":0,
309                                "elapsed_since_start_ns":2,
310                                "thread_id":1
311                            }
312                        ],
313                        "stacks":[[0]],
314                        "frames":[{
315                            "function":"main"
316                        }]
317                    },
318                    "transactions": [
319                        {
320                            "id": "9b73438f70e044ecbd006b7fd15b7373",
321                            "name": "/foo/",
322                            "trace_id": "4c79f60c11214eb38604f4ae0781bfb2"
323                        }
324                    ]
325                }"#,
326            );
327            item
328        });
329
330        let mut project_state = ProjectInfo::default();
331        project_state.config.features.0.insert(Feature::Profiling);
332
333        let mut envelopes = ProcessingGroup::split_envelope(*envelope);
334        assert_eq!(envelopes.len(), 1);
335
336        let (group, envelope) = envelopes.pop().unwrap();
337        let envelope = ManagedEnvelope::new(envelope, Addr::dummy(), Addr::dummy());
338
339        let message = ProcessEnvelopeGrouped {
340            group,
341            envelope,
342            project_info: Arc::new(project_state),
343            rate_limits: Default::default(),
344            sampling_project_info: None,
345            reservoir_counters: ReservoirCounters::default(),
346        };
347
348        let Ok(Some(Submit::Envelope(new_envelope))) =
349            processor.process(&mut Token::noop(), message).await
350        else {
351            panic!();
352        };
353        let new_envelope = new_envelope.envelope();
354
355        // Get the re-serialized context.
356        let item = new_envelope
357            .get_item_by(|item| item.ty() == &ItemType::Transaction)
358            .unwrap();
359        let transaction = Annotated::<Event>::from_json_bytes(&item.payload()).unwrap();
360        let context = transaction
361            .value()
362            .unwrap()
363            .context::<ProfileContext>()
364            .unwrap();
365
366        assert_debug_snapshot!(context, @r###"
367        ProfileContext {
368            profile_id: EventId(
369                012d836b-15bb-49d7-bbf9-9e64295d995b,
370            ),
371            profiler_id: ~,
372        }
373        "###);
374    }
375
376    #[cfg(feature = "processing")]
377    #[tokio::test]
378    async fn test_invalid_profile_id_not_transfered() {
379        // Setup
380        let config = Config::from_json_value(serde_json::json!({
381            "processing": {
382                "enabled": true,
383                "kafka_config": []
384            }
385        }))
386        .unwrap();
387        let processor = create_test_processor(config).await;
388        let event_id = EventId::new();
389        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
390            .parse()
391            .unwrap();
392        let request_meta = RequestMeta::new(dsn);
393        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
394
395        // Add a valid transaction item.
396        envelope.add_item({
397            let mut item = Item::new(ItemType::Transaction);
398
399            item.set_payload(
400                ContentType::Json,
401                r#"{
402                    "event_id": "9b73438f70e044ecbd006b7fd15b7373",
403                    "type": "transaction",
404                    "transaction": "/foo/",
405                    "timestamp": 946684810.0,
406                    "start_timestamp": 946684800.0,
407                    "contexts": {
408                        "trace": {
409                        "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
410                        "span_id": "fa90fdead5f74053",
411                        "op": "http.server",
412                        "type": "trace"
413                        }
414                    },
415                    "transaction_info": {
416                        "source": "url"
417                    }
418                }"#,
419            );
420            item
421        });
422
423        // Add a profile to the same envelope.
424        envelope.add_item({
425            let mut item = Item::new(ItemType::Profile);
426            item.set_payload(
427                ContentType::Json,
428                r#"{
429                    "profile_id": "012d836b15bb49d7bbf99e64295d995b",
430                    "version": "1",
431                    "platform": "android",
432                    "os": {"name": "foo", "version": "bar"},
433                    "device": {"architecture": "zap"},
434                    "timestamp": "2023-10-10 00:00:00Z",
435                    "profile": {
436                        "samples":[
437                            {
438                                "stack_id":0,
439                                "elapsed_since_start_ns":1,
440                                "thread_id":1
441                            },
442                            {
443                                "stack_id":1,
444                                "elapsed_since_start_ns":2,
445                                "thread_id":1
446                            }
447                        ],
448                        "stacks":[[0],[]],
449                        "frames":[{
450                            "function":"main"
451                        }]
452                    },
453                    "transactions": [
454                        {
455                            "id": "9b73438f70e044ecbd006b7fd15b7373",
456                            "name": "/foo/",
457                            "trace_id": "4c79f60c11214eb38604f4ae0781bfb2"
458                        }
459                    ]
460                }"#,
461            );
462            item
463        });
464
465        let mut project_info = ProjectInfo::default();
466        project_info.config.features.0.insert(Feature::Profiling);
467
468        let mut envelopes = ProcessingGroup::split_envelope(*envelope);
469        assert_eq!(envelopes.len(), 1);
470
471        let (group, envelope) = envelopes.pop().unwrap();
472        let envelope = ManagedEnvelope::new(envelope, Addr::dummy(), Addr::dummy());
473
474        let message = ProcessEnvelopeGrouped {
475            group,
476            envelope,
477            project_info: Arc::new(project_info),
478            rate_limits: Default::default(),
479            sampling_project_info: None,
480            reservoir_counters: ReservoirCounters::default(),
481        };
482
483        let Ok(Some(Submit::Envelope(new_envelope))) =
484            processor.process(&mut Token::noop(), message).await
485        else {
486            panic!();
487        };
488        let new_envelope = new_envelope.envelope();
489
490        // Get the re-serialized context.
491        let item = new_envelope
492            .get_item_by(|item| item.ty() == &ItemType::Transaction)
493            .unwrap();
494        let transaction = Annotated::<Event>::from_json_bytes(&item.payload()).unwrap();
495        let context = transaction
496            .value()
497            .unwrap()
498            .context::<ProfileContext>()
499            .unwrap();
500
501        assert_debug_snapshot!(context, @r###"
502        ProfileContext {
503            profile_id: ~,
504            profiler_id: ~,
505        }
506        "###);
507    }
508
509    #[tokio::test]
510    async fn filter_standalone_profile() {
511        relay_log::init_test!();
512
513        // Setup
514        let processor = create_test_processor(Default::default()).await;
515        let event_id = EventId::new();
516        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
517            .parse()
518            .unwrap();
519        let request_meta = RequestMeta::new(dsn);
520        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
521
522        // Add a profile to the same envelope.
523        envelope.add_item({
524            let mut item = Item::new(ItemType::Profile);
525            item.set_payload(
526                ContentType::Json,
527                r#"{
528                    "profile_id": "012d836b15bb49d7bbf99e64295d995b",
529                    "version": "1",
530                    "platform": "android",
531                    "os": {"name": "foo", "version": "bar"},
532                    "device": {"architecture": "zap"},
533                    "timestamp": "2023-10-10 00:00:00Z"
534                }"#,
535            );
536            item
537        });
538
539        let mut project_state = ProjectInfo::default();
540        project_state.config.features.0.insert(Feature::Profiling);
541
542        let mut envelopes = ProcessingGroup::split_envelope(*envelope);
543        assert_eq!(envelopes.len(), 1);
544
545        let (group, envelope) = envelopes.pop().unwrap();
546        let envelope = ManagedEnvelope::new(envelope.clone(), Addr::dummy(), Addr::dummy());
547
548        let message = ProcessEnvelopeGrouped {
549            group,
550            envelope,
551            project_info: Arc::new(project_state),
552            rate_limits: Default::default(),
553            sampling_project_info: None,
554            reservoir_counters: ReservoirCounters::default(),
555        };
556
557        let envelope = processor
558            .process(&mut Token::noop(), message)
559            .await
560            .unwrap();
561        assert!(envelope.is_none());
562    }
563
564    #[cfg(feature = "processing")]
565    #[tokio::test]
566    async fn test_profile_id_removed_profiler_id_kept() {
567        // Setup
568        let config = Config::from_json_value(serde_json::json!({
569            "processing": {
570                "enabled": true,
571                "kafka_config": []
572            }
573        }))
574        .unwrap();
575        let processor = create_test_processor(config).await;
576        let event_id = EventId::new();
577        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
578            .parse()
579            .unwrap();
580        let request_meta = RequestMeta::new(dsn);
581        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
582
583        // Add a valid transaction item.
584        envelope.add_item({
585            let mut item = Item::new(ItemType::Transaction);
586
587            item.set_payload(
588                ContentType::Json,
589                r#"{
590                "type": "transaction",
591                "transaction": "/foo/",
592                "timestamp": 946684810.0,
593                "start_timestamp": 946684800.0,
594                "contexts": {
595                    "trace": {
596                        "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
597                        "span_id": "fa90fdead5f74053",
598                        "op": "http.server",
599                        "type": "trace"
600                    },
601                    "profile": {
602                        "profile_id": "4c79f60c11214eb38604f4ae0781bfb2",
603                        "profiler_id": "4c79f60c11214eb38604f4ae0781bfb2",
604                        "type": "profile"
605                    }
606                },
607                "transaction_info": {
608                    "source": "url"
609                }
610            }"#,
611            );
612            item
613        });
614
615        let mut project_state = ProjectInfo::default();
616        project_state.config.features.0.insert(Feature::Profiling);
617
618        let mut envelopes = ProcessingGroup::split_envelope(*envelope);
619        assert_eq!(envelopes.len(), 1);
620
621        let (group, envelope) = envelopes.pop().unwrap();
622        let envelope = ManagedEnvelope::new(envelope, Addr::dummy(), Addr::dummy());
623
624        let message = ProcessEnvelopeGrouped {
625            group,
626            envelope,
627            project_info: Arc::new(project_state),
628            rate_limits: Default::default(),
629            sampling_project_info: None,
630            reservoir_counters: ReservoirCounters::default(),
631        };
632
633        let Ok(Some(Submit::Envelope(new_envelope))) =
634            processor.process(&mut Token::noop(), message).await
635        else {
636            panic!();
637        };
638        let new_envelope = new_envelope.envelope();
639
640        // Get the re-serialized context.
641        let item = new_envelope
642            .get_item_by(|item| item.ty() == &ItemType::Transaction)
643            .unwrap();
644        let transaction = Annotated::<Event>::from_json_bytes(&item.payload()).unwrap();
645        let context = transaction
646            .value()
647            .unwrap()
648            .context::<ProfileContext>()
649            .unwrap();
650
651        assert_debug_snapshot!(context, @r###"
652        ProfileContext {
653            profile_id: ~,
654            profiler_id: EventId(
655                4c79f60c-1121-4eb3-8604-f4ae0781bfb2,
656            ),
657        }
658        "###);
659    }
660
661    #[cfg(feature = "processing")]
662    #[test]
663    fn test_scrub_profiler_id_should_be_stripped() {
664        let mut contexts = Contexts::new();
665        contexts.add(ProfileContext {
666            profiler_id: Annotated::new(EventId(
667                Uuid::parse_str("52df9022835246eeb317dbd739ccd059").unwrap(),
668            )),
669            ..Default::default()
670        });
671        let mut event: Annotated<Event> = Annotated::new(Event {
672            ty: Annotated::new(EventType::Transaction),
673            start_timestamp: Annotated::new(
674                Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap().into(),
675            ),
676            timestamp: Annotated::new(
677                Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0)
678                    .unwrap()
679                    .checked_add_signed(Duration::milliseconds(15))
680                    .unwrap()
681                    .into(),
682            ),
683            contexts: Annotated::new(contexts),
684            ..Default::default()
685        });
686
687        scrub_profiler_id(&mut event);
688
689        let profile_context = get_value!(event.contexts)
690            .unwrap()
691            .get::<ProfileContext>()
692            .unwrap();
693
694        assert!(
695            profile_context
696                .profiler_id
697                .meta()
698                .iter_remarks()
699                .any(|remark| remark.rule_id == *"transaction_duration"
700                    && remark.ty == RemarkType::Removed)
701        )
702    }
703
704    #[cfg(feature = "processing")]
705    #[test]
706    fn test_scrub_profiler_id_should_not_be_stripped() {
707        let mut contexts = Contexts::new();
708        contexts.add(ProfileContext {
709            profiler_id: Annotated::new(EventId(
710                Uuid::parse_str("52df9022835246eeb317dbd739ccd059").unwrap(),
711            )),
712            ..Default::default()
713        });
714        let mut event: Annotated<Event> = Annotated::new(Event {
715            ty: Annotated::new(EventType::Transaction),
716            start_timestamp: Annotated::new(
717                Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap().into(),
718            ),
719            timestamp: Annotated::new(
720                Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0)
721                    .unwrap()
722                    .checked_add_signed(Duration::milliseconds(20))
723                    .unwrap()
724                    .into(),
725            ),
726            contexts: Annotated::new(contexts),
727            ..Default::default()
728        });
729
730        scrub_profiler_id(&mut event);
731
732        let profile_context = get_value!(event.contexts)
733            .unwrap()
734            .get::<ProfileContext>()
735            .unwrap();
736
737        assert!(
738            !profile_context
739                .profiler_id
740                .meta()
741                .iter_remarks()
742                .any(|remark| remark.rule_id == *"transaction_duration"
743                    && remark.ty == RemarkType::Removed)
744        )
745    }
746}