relay_server/services/processor/
profile.rs

1//! Profiles related processor code.
2use relay_dynamic_config::{Feature, GlobalConfig};
3use std::net::IpAddr;
4
5use relay_base_schema::events::EventType;
6use relay_base_schema::project::ProjectId;
7use relay_config::Config;
8use relay_event_schema::protocol::Event;
9use relay_filter::ProjectFiltersConfig;
10use relay_profiling::{ProfileError, ProfileId};
11use relay_protocol::Annotated;
12
13use crate::envelope::{ContentType, Item, ItemType};
14use crate::managed::{ItemAction, TypedEnvelope};
15use crate::services::outcome::{DiscardReason, Outcome};
16use crate::services::processor::{TransactionGroup, event_type, should_filter};
17use crate::services::projects::project::ProjectInfo;
18
19/// Filters out invalid and duplicate profiles.
20///
21/// Returns the profile id of the single remaining profile, if there is one.
22pub fn filter<Group>(
23    managed_envelope: &mut TypedEnvelope<Group>,
24    event: &Annotated<Event>,
25    config: &Config,
26    project_id: ProjectId,
27    project_info: &ProjectInfo,
28) -> Option<ProfileId> {
29    let profiling_disabled = should_filter(config, project_info, Feature::Profiling);
30    let has_transaction = event_type(event) == Some(EventType::Transaction);
31    let mut profile_id = None;
32    managed_envelope.retain_items(|item| match item.ty() {
33        // First profile found in the envelope, we'll keep it if metadata are valid.
34        ItemType::Profile if profile_id.is_none() => {
35            if profiling_disabled {
36                return ItemAction::DropSilently;
37            }
38
39            // Drop profile without a transaction in the same envelope,
40            // except if unsampled profiles are allowed for this project.
41            let profile_allowed = has_transaction || !item.sampled();
42            if !profile_allowed {
43                return ItemAction::DropSilently;
44            }
45
46            match relay_profiling::parse_metadata(&item.payload(), project_id) {
47                Ok(id) => {
48                    profile_id = Some(id);
49                    ItemAction::Keep
50                }
51                Err(err) => ItemAction::Drop(Outcome::Invalid(DiscardReason::Profiling(
52                    relay_profiling::discard_reason(err),
53                ))),
54            }
55        }
56        // We found another profile, we'll drop it.
57        ItemType::Profile => ItemAction::Drop(Outcome::Invalid(DiscardReason::Profiling(
58            relay_profiling::discard_reason(ProfileError::TooManyProfiles),
59        ))),
60        _ => ItemAction::Keep,
61    });
62
63    profile_id
64}
65
66/// Processes profiles and set the profile ID in the profile context on the transaction if successful.
67pub fn process(
68    managed_envelope: &mut TypedEnvelope<TransactionGroup>,
69    event: &mut Annotated<Event>,
70    global_config: &GlobalConfig,
71    config: &Config,
72    project_info: &ProjectInfo,
73) -> Option<ProfileId> {
74    let client_ip = managed_envelope.envelope().meta().client_addr();
75    let filter_settings = &project_info.config.filter_settings;
76
77    let profiling_enabled = project_info.has_feature(Feature::Profiling);
78    let mut profile_id = None;
79
80    managed_envelope.retain_items(|item| match item.ty() {
81        ItemType::Profile => {
82            if !profiling_enabled {
83                return ItemAction::DropSilently;
84            }
85
86            // There should always be an event/transaction available at this stage.
87            // It is required to expand the profile. If it's missing, drop the item.
88            let Some(event) = event.value() else {
89                return ItemAction::DropSilently;
90            };
91
92            match expand_profile(
93                item,
94                event,
95                config,
96                client_ip,
97                filter_settings,
98                global_config,
99            ) {
100                Ok(id) => {
101                    profile_id = Some(id);
102                    ItemAction::Keep
103                }
104                Err(outcome) => ItemAction::Drop(outcome),
105            }
106        }
107        _ => ItemAction::Keep,
108    });
109
110    profile_id
111}
112
113/// Transfers transaction metadata to profile and check its size.
114fn expand_profile(
115    item: &mut Item,
116    event: &Event,
117    config: &Config,
118    client_ip: Option<IpAddr>,
119    filter_settings: &ProjectFiltersConfig,
120    global_config: &GlobalConfig,
121) -> Result<ProfileId, Outcome> {
122    match relay_profiling::expand_profile(
123        &item.payload(),
124        event,
125        client_ip,
126        filter_settings,
127        global_config,
128    ) {
129        Ok((id, payload)) => {
130            if payload.len() <= config.max_profile_size() {
131                item.set_payload(ContentType::Json, payload);
132                Ok(id)
133            } else {
134                Err(Outcome::Invalid(DiscardReason::Profiling(
135                    relay_profiling::discard_reason(relay_profiling::ProfileError::ExceedSizeLimit),
136                )))
137            }
138        }
139        Err(relay_profiling::ProfileError::Filtered(filter_stat_key)) => {
140            Err(Outcome::Filtered(filter_stat_key))
141        }
142        Err(err) => Err(Outcome::Invalid(DiscardReason::Profiling(
143            relay_profiling::discard_reason(err),
144        ))),
145    }
146}
147
148#[cfg(test)]
149#[cfg(feature = "processing")]
150mod tests {
151    use crate::envelope::Envelope;
152    use crate::extractors::RequestMeta;
153    use crate::managed::ManagedEnvelope;
154    use crate::processing;
155    use crate::services::processor::Submit;
156    use crate::services::processor::{ProcessEnvelopeGrouped, ProcessingGroup};
157    use crate::services::projects::project::ProjectInfo;
158    use crate::testutils::create_test_processor;
159    use insta::assert_debug_snapshot;
160    use relay_cogs::Token;
161    use relay_dynamic_config::Feature;
162    use relay_event_schema::protocol::{EventId, ProfileContext};
163    use relay_system::Addr;
164
165    use super::*;
166
167    #[tokio::test]
168    async fn test_profile_id_transfered() {
169        let config = Config::from_json_value(serde_json::json!({
170            "processing": {
171                "enabled": true,
172                "kafka_config": []
173            }
174        }))
175        .unwrap();
176        let processor = create_test_processor(config).await;
177        let event_id = EventId::new();
178        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
179            .parse()
180            .unwrap();
181        let request_meta = RequestMeta::new(dsn);
182        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
183
184        // Add a valid transaction item.
185        envelope.add_item({
186            let mut item = Item::new(ItemType::Transaction);
187
188            item.set_payload(
189                ContentType::Json,
190                r#"{
191                    "event_id": "9b73438f70e044ecbd006b7fd15b7373",
192                    "type": "transaction",
193                    "transaction": "/foo/",
194                    "timestamp": 946684810.0,
195                    "start_timestamp": 946684800.0,
196                    "contexts": {
197                        "trace": {
198                        "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
199                        "span_id": "fa90fdead5f74053",
200                        "op": "http.server",
201                        "type": "trace"
202                        }
203                    },
204                    "transaction_info": {
205                        "source": "url"
206                    }
207                }"#,
208            );
209            item
210        });
211
212        // Add a profile to the same envelope.
213        envelope.add_item({
214            let mut item = Item::new(ItemType::Profile);
215            item.set_payload(
216                ContentType::Json,
217                r#"{
218                    "profile_id": "012d836b15bb49d7bbf99e64295d995b",
219                    "version": "1",
220                    "platform": "android",
221                    "os": {"name": "foo", "version": "bar"},
222                    "device": {"architecture": "zap"},
223                    "timestamp": "2023-10-10 00:00:00Z",
224                    "profile": {
225                        "samples":[
226                            {
227                                "stack_id":0,
228                                "elapsed_since_start_ns":1,
229                                "thread_id":1
230                            },
231                            {
232                                "stack_id":0,
233                                "elapsed_since_start_ns":2,
234                                "thread_id":1
235                            }
236                        ],
237                        "stacks":[[0]],
238                        "frames":[{
239                            "function":"main"
240                        }]
241                    },
242                    "transactions": [
243                        {
244                            "id": "9b73438f70e044ecbd006b7fd15b7373",
245                            "name": "/foo/",
246                            "trace_id": "4c79f60c11214eb38604f4ae0781bfb2"
247                        }
248                    ]
249                }"#,
250            );
251            item
252        });
253
254        let mut project_info = ProjectInfo::default();
255        project_info.config.features.0.insert(Feature::Profiling);
256
257        let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
258        assert_eq!(envelopes.len(), 1);
259
260        let (group, envelope) = envelopes.pop().unwrap();
261        let envelope = ManagedEnvelope::new(envelope, Addr::dummy());
262
263        let message = ProcessEnvelopeGrouped {
264            group,
265            envelope,
266            ctx: processing::Context {
267                project_info: &project_info,
268                ..processing::Context::for_test()
269            },
270        };
271
272        let Ok(Some(Submit::Envelope(new_envelope))) =
273            processor.process(&mut Token::noop(), message).await
274        else {
275            panic!();
276        };
277        let new_envelope = new_envelope.envelope();
278
279        // Get the re-serialized context.
280        let item = new_envelope
281            .get_item_by(|item| item.ty() == &ItemType::Transaction)
282            .unwrap();
283        let transaction = Annotated::<Event>::from_json_bytes(&item.payload()).unwrap();
284        let context = transaction
285            .value()
286            .unwrap()
287            .context::<ProfileContext>()
288            .unwrap();
289
290        assert_debug_snapshot!(context, @r###"
291        ProfileContext {
292            profile_id: EventId(
293                012d836b-15bb-49d7-bbf9-9e64295d995b,
294            ),
295            profiler_id: ~,
296        }
297        "###);
298    }
299
300    #[tokio::test]
301    async fn test_invalid_profile_id_not_transfered() {
302        // Setup
303        let config = Config::from_json_value(serde_json::json!({
304            "processing": {
305                "enabled": true,
306                "kafka_config": []
307            }
308        }))
309        .unwrap();
310        let processor = create_test_processor(config).await;
311        let event_id = EventId::new();
312        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
313            .parse()
314            .unwrap();
315        let request_meta = RequestMeta::new(dsn);
316        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
317
318        // Add a valid transaction item.
319        envelope.add_item({
320            let mut item = Item::new(ItemType::Transaction);
321
322            item.set_payload(
323                ContentType::Json,
324                r#"{
325                    "event_id": "9b73438f70e044ecbd006b7fd15b7373",
326                    "type": "transaction",
327                    "transaction": "/foo/",
328                    "timestamp": 946684810.0,
329                    "start_timestamp": 946684800.0,
330                    "contexts": {
331                        "trace": {
332                        "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
333                        "span_id": "fa90fdead5f74053",
334                        "op": "http.server",
335                        "type": "trace"
336                        }
337                    },
338                    "transaction_info": {
339                        "source": "url"
340                    }
341                }"#,
342            );
343            item
344        });
345
346        // Add a profile to the same envelope.
347        envelope.add_item({
348            let mut item = Item::new(ItemType::Profile);
349            item.set_payload(
350                ContentType::Json,
351                r#"{
352                    "profile_id": "012d836b15bb49d7bbf99e64295d995b",
353                    "version": "1",
354                    "platform": "android",
355                    "os": {"name": "foo", "version": "bar"},
356                    "device": {"architecture": "zap"},
357                    "timestamp": "2023-10-10 00:00:00Z",
358                    "profile": {
359                        "samples":[
360                            {
361                                "stack_id":0,
362                                "elapsed_since_start_ns":1,
363                                "thread_id":1
364                            },
365                            {
366                                "stack_id":1,
367                                "elapsed_since_start_ns":2,
368                                "thread_id":1
369                            }
370                        ],
371                        "stacks":[[0],[]],
372                        "frames":[{
373                            "function":"main"
374                        }]
375                    },
376                    "transactions": [
377                        {
378                            "id": "9b73438f70e044ecbd006b7fd15b7373",
379                            "name": "/foo/",
380                            "trace_id": "4c79f60c11214eb38604f4ae0781bfb2"
381                        }
382                    ]
383                }"#,
384            );
385            item
386        });
387
388        let mut project_info = ProjectInfo::default();
389        project_info.config.features.0.insert(Feature::Profiling);
390
391        let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
392        assert_eq!(envelopes.len(), 1);
393
394        let (group, envelope) = envelopes.pop().unwrap();
395        let envelope = ManagedEnvelope::new(envelope, Addr::dummy());
396
397        let message = ProcessEnvelopeGrouped {
398            group,
399            envelope,
400            ctx: processing::Context {
401                project_info: &project_info,
402                ..processing::Context::for_test()
403            },
404        };
405
406        let Ok(Some(Submit::Envelope(new_envelope))) =
407            processor.process(&mut Token::noop(), message).await
408        else {
409            panic!();
410        };
411        let new_envelope = new_envelope.envelope();
412
413        // Get the re-serialized context.
414        let item = new_envelope
415            .get_item_by(|item| item.ty() == &ItemType::Transaction)
416            .unwrap();
417        let transaction = Annotated::<Event>::from_json_bytes(&item.payload()).unwrap();
418        let context = transaction
419            .value()
420            .unwrap()
421            .context::<ProfileContext>()
422            .unwrap();
423
424        assert_debug_snapshot!(context, @r###"
425        ProfileContext {
426            profile_id: ~,
427            profiler_id: ~,
428        }
429        "###);
430    }
431
432    #[tokio::test]
433    async fn filter_standalone_profile() {
434        relay_log::init_test!();
435
436        // Setup
437        let processor = create_test_processor(Default::default()).await;
438        let event_id = EventId::new();
439        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
440            .parse()
441            .unwrap();
442        let request_meta = RequestMeta::new(dsn);
443        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
444
445        // Add a profile to the same envelope.
446        envelope.add_item({
447            let mut item = Item::new(ItemType::Profile);
448            item.set_payload(
449                ContentType::Json,
450                r#"{
451                    "profile_id": "012d836b15bb49d7bbf99e64295d995b",
452                    "version": "1",
453                    "platform": "android",
454                    "os": {"name": "foo", "version": "bar"},
455                    "device": {"architecture": "zap"},
456                    "timestamp": "2023-10-10 00:00:00Z"
457                }"#,
458            );
459            item
460        });
461
462        let mut project_info = ProjectInfo::default();
463        project_info.config.features.0.insert(Feature::Profiling);
464
465        let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
466        assert_eq!(envelopes.len(), 1);
467
468        let (group, envelope) = envelopes.pop().unwrap();
469        let envelope = ManagedEnvelope::new(envelope.clone(), Addr::dummy());
470
471        let message = ProcessEnvelopeGrouped {
472            group,
473            envelope,
474            ctx: processing::Context {
475                project_info: &project_info,
476                ..processing::Context::for_test()
477            },
478        };
479
480        let envelope = processor
481            .process(&mut Token::noop(), message)
482            .await
483            .unwrap();
484        assert!(envelope.is_none());
485    }
486
487    #[tokio::test]
488    async fn test_profile_id_removed_profiler_id_kept() {
489        // Setup
490        let config = Config::from_json_value(serde_json::json!({
491            "processing": {
492                "enabled": true,
493                "kafka_config": []
494            }
495        }))
496        .unwrap();
497        let processor = create_test_processor(config).await;
498        let event_id = EventId::new();
499        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
500            .parse()
501            .unwrap();
502        let request_meta = RequestMeta::new(dsn);
503        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
504
505        // Add a valid transaction item.
506        envelope.add_item({
507            let mut item = Item::new(ItemType::Transaction);
508
509            item.set_payload(
510                ContentType::Json,
511                r#"{
512                "type": "transaction",
513                "transaction": "/foo/",
514                "timestamp": 946684810.0,
515                "start_timestamp": 946684800.0,
516                "contexts": {
517                    "trace": {
518                        "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
519                        "span_id": "fa90fdead5f74053",
520                        "op": "http.server",
521                        "type": "trace"
522                    },
523                    "profile": {
524                        "profile_id": "4c79f60c11214eb38604f4ae0781bfb2",
525                        "profiler_id": "4c79f60c11214eb38604f4ae0781bfb2",
526                        "type": "profile"
527                    }
528                },
529                "transaction_info": {
530                    "source": "url"
531                }
532            }"#,
533            );
534            item
535        });
536
537        let mut project_info = ProjectInfo::default();
538        project_info.config.features.0.insert(Feature::Profiling);
539
540        let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
541        assert_eq!(envelopes.len(), 1);
542
543        let (group, envelope) = envelopes.pop().unwrap();
544        let envelope = ManagedEnvelope::new(envelope, Addr::dummy());
545
546        let message = ProcessEnvelopeGrouped {
547            group,
548            envelope,
549            ctx: processing::Context {
550                project_info: &project_info,
551                ..processing::Context::for_test()
552            },
553        };
554
555        let Ok(Some(Submit::Envelope(new_envelope))) =
556            processor.process(&mut Token::noop(), message).await
557        else {
558            panic!();
559        };
560        let new_envelope = new_envelope.envelope();
561
562        // Get the re-serialized context.
563        let item = new_envelope
564            .get_item_by(|item| item.ty() == &ItemType::Transaction)
565            .unwrap();
566        let transaction = Annotated::<Event>::from_json_bytes(&item.payload()).unwrap();
567        let context = transaction
568            .value()
569            .unwrap()
570            .context::<ProfileContext>()
571            .unwrap();
572
573        assert_debug_snapshot!(context, @r###"
574        ProfileContext {
575            profile_id: ~,
576            profiler_id: EventId(
577                4c79f60c-1121-4eb3-8604-f4ae0781bfb2,
578            ),
579        }
580        "###);
581    }
582}