1use relay_dynamic_config::{Feature, GlobalConfig};
3use std::net::IpAddr;
4
5use relay_base_schema::events::EventType;
6use relay_base_schema::project::ProjectId;
7use relay_config::Config;
8use relay_event_schema::protocol::Event;
9use relay_filter::ProjectFiltersConfig;
10use relay_profiling::{ProfileError, ProfileId};
11use relay_protocol::Annotated;
12
13use crate::envelope::{ContentType, Item, ItemType};
14use crate::managed::{ItemAction, TypedEnvelope};
15use crate::services::outcome::{DiscardReason, Outcome};
16use crate::services::processor::{TransactionGroup, event_type, should_filter};
17use crate::services::projects::project::ProjectInfo;
18
19pub fn filter<Group>(
23 managed_envelope: &mut TypedEnvelope<Group>,
24 event: &Annotated<Event>,
25 config: &Config,
26 project_id: ProjectId,
27 project_info: &ProjectInfo,
28) -> Option<ProfileId> {
29 let profiling_disabled = should_filter(config, project_info, Feature::Profiling);
30 let has_transaction = event_type(event) == Some(EventType::Transaction);
31 let mut profile_id = None;
32 managed_envelope.retain_items(|item| match item.ty() {
33 ItemType::Profile if profile_id.is_none() => {
35 if profiling_disabled {
36 return ItemAction::DropSilently;
37 }
38
39 let profile_allowed = has_transaction || !item.sampled();
42 if !profile_allowed {
43 return ItemAction::DropSilently;
44 }
45
46 match relay_profiling::parse_metadata(&item.payload(), project_id) {
47 Ok(id) => {
48 profile_id = Some(id);
49 ItemAction::Keep
50 }
51 Err(err) => ItemAction::Drop(Outcome::Invalid(DiscardReason::Profiling(
52 relay_profiling::discard_reason(err),
53 ))),
54 }
55 }
56 ItemType::Profile => ItemAction::Drop(Outcome::Invalid(DiscardReason::Profiling(
58 relay_profiling::discard_reason(ProfileError::TooManyProfiles),
59 ))),
60 _ => ItemAction::Keep,
61 });
62
63 profile_id
64}
65
66pub fn process(
68 managed_envelope: &mut TypedEnvelope<TransactionGroup>,
69 event: &mut Annotated<Event>,
70 global_config: &GlobalConfig,
71 config: &Config,
72 project_info: &ProjectInfo,
73) -> Option<ProfileId> {
74 let client_ip = managed_envelope.envelope().meta().client_addr();
75 let filter_settings = &project_info.config.filter_settings;
76
77 let profiling_enabled = project_info.has_feature(Feature::Profiling);
78 let mut profile_id = None;
79
80 managed_envelope.retain_items(|item| match item.ty() {
81 ItemType::Profile => {
82 if !profiling_enabled {
83 return ItemAction::DropSilently;
84 }
85
86 let Some(event) = event.value() else {
89 return ItemAction::DropSilently;
90 };
91
92 match expand_profile(
93 item,
94 event,
95 config,
96 client_ip,
97 filter_settings,
98 global_config,
99 ) {
100 Ok(id) => {
101 profile_id = Some(id);
102 ItemAction::Keep
103 }
104 Err(outcome) => ItemAction::Drop(outcome),
105 }
106 }
107 _ => ItemAction::Keep,
108 });
109
110 profile_id
111}
112
113fn expand_profile(
115 item: &mut Item,
116 event: &Event,
117 config: &Config,
118 client_ip: Option<IpAddr>,
119 filter_settings: &ProjectFiltersConfig,
120 global_config: &GlobalConfig,
121) -> Result<ProfileId, Outcome> {
122 match relay_profiling::expand_profile(
123 &item.payload(),
124 event,
125 client_ip,
126 filter_settings,
127 global_config,
128 ) {
129 Ok((id, payload)) => {
130 if payload.len() <= config.max_profile_size() {
131 item.set_payload(ContentType::Json, payload);
132 Ok(id)
133 } else {
134 Err(Outcome::Invalid(DiscardReason::Profiling(
135 relay_profiling::discard_reason(relay_profiling::ProfileError::ExceedSizeLimit),
136 )))
137 }
138 }
139 Err(relay_profiling::ProfileError::Filtered(filter_stat_key)) => {
140 Err(Outcome::Filtered(filter_stat_key))
141 }
142 Err(err) => Err(Outcome::Invalid(DiscardReason::Profiling(
143 relay_profiling::discard_reason(err),
144 ))),
145 }
146}
147
148#[cfg(test)]
149#[cfg(feature = "processing")]
150mod tests {
151 use crate::envelope::Envelope;
152 use crate::extractors::RequestMeta;
153 use crate::managed::ManagedEnvelope;
154 use crate::processing;
155 use crate::services::processor::Submit;
156 use crate::services::processor::{ProcessEnvelopeGrouped, ProcessingGroup};
157 use crate::services::projects::project::ProjectInfo;
158 use crate::testutils::create_test_processor;
159 use insta::assert_debug_snapshot;
160 use relay_cogs::Token;
161 use relay_dynamic_config::Feature;
162 use relay_event_schema::protocol::{EventId, ProfileContext};
163 use relay_system::Addr;
164
165 use super::*;
166
167 #[tokio::test]
168 async fn test_profile_id_transfered() {
169 let config = Config::from_json_value(serde_json::json!({
170 "processing": {
171 "enabled": true,
172 "kafka_config": []
173 }
174 }))
175 .unwrap();
176 let processor = create_test_processor(config).await;
177 let event_id = EventId::new();
178 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
179 .parse()
180 .unwrap();
181 let request_meta = RequestMeta::new(dsn);
182 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
183
184 envelope.add_item({
186 let mut item = Item::new(ItemType::Transaction);
187
188 item.set_payload(
189 ContentType::Json,
190 r#"{
191 "event_id": "9b73438f70e044ecbd006b7fd15b7373",
192 "type": "transaction",
193 "transaction": "/foo/",
194 "timestamp": 946684810.0,
195 "start_timestamp": 946684800.0,
196 "contexts": {
197 "trace": {
198 "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
199 "span_id": "fa90fdead5f74053",
200 "op": "http.server",
201 "type": "trace"
202 }
203 },
204 "transaction_info": {
205 "source": "url"
206 }
207 }"#,
208 );
209 item
210 });
211
212 envelope.add_item({
214 let mut item = Item::new(ItemType::Profile);
215 item.set_payload(
216 ContentType::Json,
217 r#"{
218 "profile_id": "012d836b15bb49d7bbf99e64295d995b",
219 "version": "1",
220 "platform": "android",
221 "os": {"name": "foo", "version": "bar"},
222 "device": {"architecture": "zap"},
223 "timestamp": "2023-10-10 00:00:00Z",
224 "profile": {
225 "samples":[
226 {
227 "stack_id":0,
228 "elapsed_since_start_ns":1,
229 "thread_id":1
230 },
231 {
232 "stack_id":0,
233 "elapsed_since_start_ns":2,
234 "thread_id":1
235 }
236 ],
237 "stacks":[[0]],
238 "frames":[{
239 "function":"main"
240 }]
241 },
242 "transactions": [
243 {
244 "id": "9b73438f70e044ecbd006b7fd15b7373",
245 "name": "/foo/",
246 "trace_id": "4c79f60c11214eb38604f4ae0781bfb2"
247 }
248 ]
249 }"#,
250 );
251 item
252 });
253
254 let mut project_info = ProjectInfo::default();
255 project_info.config.features.0.insert(Feature::Profiling);
256
257 let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
258 assert_eq!(envelopes.len(), 1);
259
260 let (group, envelope) = envelopes.pop().unwrap();
261 let envelope = ManagedEnvelope::new(envelope, Addr::dummy());
262
263 let message = ProcessEnvelopeGrouped {
264 group,
265 envelope,
266 ctx: processing::Context {
267 project_info: &project_info,
268 ..processing::Context::for_test()
269 },
270 };
271
272 let Ok(Some(Submit::Envelope(new_envelope))) =
273 processor.process(&mut Token::noop(), message).await
274 else {
275 panic!();
276 };
277 let new_envelope = new_envelope.envelope();
278
279 let item = new_envelope
281 .get_item_by(|item| item.ty() == &ItemType::Transaction)
282 .unwrap();
283 let transaction = Annotated::<Event>::from_json_bytes(&item.payload()).unwrap();
284 let context = transaction
285 .value()
286 .unwrap()
287 .context::<ProfileContext>()
288 .unwrap();
289
290 assert_debug_snapshot!(context, @r###"
291 ProfileContext {
292 profile_id: EventId(
293 012d836b-15bb-49d7-bbf9-9e64295d995b,
294 ),
295 profiler_id: ~,
296 }
297 "###);
298 }
299
300 #[tokio::test]
301 async fn test_invalid_profile_id_not_transfered() {
302 let config = Config::from_json_value(serde_json::json!({
304 "processing": {
305 "enabled": true,
306 "kafka_config": []
307 }
308 }))
309 .unwrap();
310 let processor = create_test_processor(config).await;
311 let event_id = EventId::new();
312 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
313 .parse()
314 .unwrap();
315 let request_meta = RequestMeta::new(dsn);
316 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
317
318 envelope.add_item({
320 let mut item = Item::new(ItemType::Transaction);
321
322 item.set_payload(
323 ContentType::Json,
324 r#"{
325 "event_id": "9b73438f70e044ecbd006b7fd15b7373",
326 "type": "transaction",
327 "transaction": "/foo/",
328 "timestamp": 946684810.0,
329 "start_timestamp": 946684800.0,
330 "contexts": {
331 "trace": {
332 "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
333 "span_id": "fa90fdead5f74053",
334 "op": "http.server",
335 "type": "trace"
336 }
337 },
338 "transaction_info": {
339 "source": "url"
340 }
341 }"#,
342 );
343 item
344 });
345
346 envelope.add_item({
348 let mut item = Item::new(ItemType::Profile);
349 item.set_payload(
350 ContentType::Json,
351 r#"{
352 "profile_id": "012d836b15bb49d7bbf99e64295d995b",
353 "version": "1",
354 "platform": "android",
355 "os": {"name": "foo", "version": "bar"},
356 "device": {"architecture": "zap"},
357 "timestamp": "2023-10-10 00:00:00Z",
358 "profile": {
359 "samples":[
360 {
361 "stack_id":0,
362 "elapsed_since_start_ns":1,
363 "thread_id":1
364 },
365 {
366 "stack_id":1,
367 "elapsed_since_start_ns":2,
368 "thread_id":1
369 }
370 ],
371 "stacks":[[0],[]],
372 "frames":[{
373 "function":"main"
374 }]
375 },
376 "transactions": [
377 {
378 "id": "9b73438f70e044ecbd006b7fd15b7373",
379 "name": "/foo/",
380 "trace_id": "4c79f60c11214eb38604f4ae0781bfb2"
381 }
382 ]
383 }"#,
384 );
385 item
386 });
387
388 let mut project_info = ProjectInfo::default();
389 project_info.config.features.0.insert(Feature::Profiling);
390
391 let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
392 assert_eq!(envelopes.len(), 1);
393
394 let (group, envelope) = envelopes.pop().unwrap();
395 let envelope = ManagedEnvelope::new(envelope, Addr::dummy());
396
397 let message = ProcessEnvelopeGrouped {
398 group,
399 envelope,
400 ctx: processing::Context {
401 project_info: &project_info,
402 ..processing::Context::for_test()
403 },
404 };
405
406 let Ok(Some(Submit::Envelope(new_envelope))) =
407 processor.process(&mut Token::noop(), message).await
408 else {
409 panic!();
410 };
411 let new_envelope = new_envelope.envelope();
412
413 let item = new_envelope
415 .get_item_by(|item| item.ty() == &ItemType::Transaction)
416 .unwrap();
417 let transaction = Annotated::<Event>::from_json_bytes(&item.payload()).unwrap();
418 let context = transaction
419 .value()
420 .unwrap()
421 .context::<ProfileContext>()
422 .unwrap();
423
424 assert_debug_snapshot!(context, @r###"
425 ProfileContext {
426 profile_id: ~,
427 profiler_id: ~,
428 }
429 "###);
430 }
431
432 #[tokio::test]
433 async fn filter_standalone_profile() {
434 relay_log::init_test!();
435
436 let processor = create_test_processor(Default::default()).await;
438 let event_id = EventId::new();
439 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
440 .parse()
441 .unwrap();
442 let request_meta = RequestMeta::new(dsn);
443 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
444
445 envelope.add_item({
447 let mut item = Item::new(ItemType::Profile);
448 item.set_payload(
449 ContentType::Json,
450 r#"{
451 "profile_id": "012d836b15bb49d7bbf99e64295d995b",
452 "version": "1",
453 "platform": "android",
454 "os": {"name": "foo", "version": "bar"},
455 "device": {"architecture": "zap"},
456 "timestamp": "2023-10-10 00:00:00Z"
457 }"#,
458 );
459 item
460 });
461
462 let mut project_info = ProjectInfo::default();
463 project_info.config.features.0.insert(Feature::Profiling);
464
465 let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
466 assert_eq!(envelopes.len(), 1);
467
468 let (group, envelope) = envelopes.pop().unwrap();
469 let envelope = ManagedEnvelope::new(envelope.clone(), Addr::dummy());
470
471 let message = ProcessEnvelopeGrouped {
472 group,
473 envelope,
474 ctx: processing::Context {
475 project_info: &project_info,
476 ..processing::Context::for_test()
477 },
478 };
479
480 let envelope = processor
481 .process(&mut Token::noop(), message)
482 .await
483 .unwrap();
484 assert!(envelope.is_none());
485 }
486
487 #[tokio::test]
488 async fn test_profile_id_removed_profiler_id_kept() {
489 let config = Config::from_json_value(serde_json::json!({
491 "processing": {
492 "enabled": true,
493 "kafka_config": []
494 }
495 }))
496 .unwrap();
497 let processor = create_test_processor(config).await;
498 let event_id = EventId::new();
499 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
500 .parse()
501 .unwrap();
502 let request_meta = RequestMeta::new(dsn);
503 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
504
505 envelope.add_item({
507 let mut item = Item::new(ItemType::Transaction);
508
509 item.set_payload(
510 ContentType::Json,
511 r#"{
512 "type": "transaction",
513 "transaction": "/foo/",
514 "timestamp": 946684810.0,
515 "start_timestamp": 946684800.0,
516 "contexts": {
517 "trace": {
518 "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
519 "span_id": "fa90fdead5f74053",
520 "op": "http.server",
521 "type": "trace"
522 },
523 "profile": {
524 "profile_id": "4c79f60c11214eb38604f4ae0781bfb2",
525 "profiler_id": "4c79f60c11214eb38604f4ae0781bfb2",
526 "type": "profile"
527 }
528 },
529 "transaction_info": {
530 "source": "url"
531 }
532 }"#,
533 );
534 item
535 });
536
537 let mut project_info = ProjectInfo::default();
538 project_info.config.features.0.insert(Feature::Profiling);
539
540 let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
541 assert_eq!(envelopes.len(), 1);
542
543 let (group, envelope) = envelopes.pop().unwrap();
544 let envelope = ManagedEnvelope::new(envelope, Addr::dummy());
545
546 let message = ProcessEnvelopeGrouped {
547 group,
548 envelope,
549 ctx: processing::Context {
550 project_info: &project_info,
551 ..processing::Context::for_test()
552 },
553 };
554
555 let Ok(Some(Submit::Envelope(new_envelope))) =
556 processor.process(&mut Token::noop(), message).await
557 else {
558 panic!();
559 };
560 let new_envelope = new_envelope.envelope();
561
562 let item = new_envelope
564 .get_item_by(|item| item.ty() == &ItemType::Transaction)
565 .unwrap();
566 let transaction = Annotated::<Event>::from_json_bytes(&item.payload()).unwrap();
567 let context = transaction
568 .value()
569 .unwrap()
570 .context::<ProfileContext>()
571 .unwrap();
572
573 assert_debug_snapshot!(context, @r###"
574 ProfileContext {
575 profile_id: ~,
576 profiler_id: EventId(
577 4c79f60c-1121-4eb3-8604-f4ae0781bfb2,
578 ),
579 }
580 "###);
581 }
582}