relay_server/services/processor/
profile_chunk.rs1use relay_dynamic_config::Feature;
4
5use crate::envelope::ItemType;
6use crate::managed::{ItemAction, TypedEnvelope};
7
8use crate::services::projects::project::ProjectInfo;
9#[cfg(feature = "processing")]
10use {
11 crate::envelope::ContentType,
12 crate::services::outcome::{DiscardReason, Outcome},
13 crate::services::processor::ProfileChunkGroup,
14 relay_config::Config,
15 relay_dynamic_config::GlobalConfig,
16 relay_profiling::ProfileError,
17};
18
19pub fn filter<Group>(managed_envelope: &mut TypedEnvelope<Group>, project_info: &ProjectInfo) {
21 let continuous_profiling_enabled =
22 if project_info.has_feature(Feature::ContinuousProfilingBetaIngest) {
23 project_info.has_feature(Feature::ContinuousProfilingBeta)
24 } else {
25 project_info.has_feature(Feature::ContinuousProfiling)
26 };
27 managed_envelope.retain_items(|item| match item.ty() {
28 ItemType::ProfileChunk if !continuous_profiling_enabled => ItemAction::DropSilently,
29 _ => ItemAction::Keep,
30 });
31}
32
33#[cfg(feature = "processing")]
35pub fn process(
36 managed_envelope: &mut TypedEnvelope<ProfileChunkGroup>,
37 project_info: &ProjectInfo,
38 global_config: &GlobalConfig,
39 config: &Config,
40) {
41 let client_ip = managed_envelope.envelope().meta().client_addr();
42 let filter_settings = &project_info.config.filter_settings;
43
44 let continuous_profiling_enabled =
45 if project_info.has_feature(Feature::ContinuousProfilingBetaIngest) {
46 project_info.has_feature(Feature::ContinuousProfilingBeta)
47 } else {
48 project_info.has_feature(Feature::ContinuousProfiling)
49 };
50
51 managed_envelope.retain_items(|item| match item.ty() {
52 ItemType::ProfileChunk => {
53 if !continuous_profiling_enabled {
54 return ItemAction::DropSilently;
55 }
56
57 let chunk = match relay_profiling::ProfileChunk::new(item.payload()) {
58 Ok(chunk) => chunk,
59 Err(err) => return error_to_action(err),
60 };
61
62 match item.profile_type() {
71 Some(profile_type) => {
72 if profile_type != chunk.profile_type() {
75 return error_to_action(relay_profiling::ProfileError::InvalidProfileType);
76 }
77 }
78 None => {
79 item.set_profile_type(chunk.profile_type());
82 }
83 }
84
85 if let Err(err) = chunk.filter(client_ip, filter_settings, global_config) {
86 return error_to_action(err);
87 }
88
89 let payload = match chunk.expand() {
90 Ok(expanded) => expanded,
91 Err(err) => return error_to_action(err),
92 };
93
94 if payload.len() > config.max_profile_size() {
95 return error_to_action(relay_profiling::ProfileError::ExceedSizeLimit);
96 }
97
98 item.set_payload(ContentType::Json, payload);
99 ItemAction::Keep
100 }
101 _ => ItemAction::Keep,
102 });
103}
104
105#[cfg(feature = "processing")]
106fn error_to_action(err: ProfileError) -> ItemAction {
107 match err {
108 ProfileError::Filtered(filter_stat_key) => {
109 ItemAction::Drop(Outcome::Filtered(filter_stat_key))
110 }
111 err => ItemAction::Drop(Outcome::Invalid(DiscardReason::Profiling(
112 relay_profiling::discard_reason(err),
113 ))),
114 }
115}