relay_server/services/processor/
profile_chunk.rs1use relay_dynamic_config::Feature;
4use std::sync::Arc;
5
6use crate::envelope::ItemType;
7use crate::managed::{ItemAction, TypedEnvelope};
8
9use crate::services::projects::project::ProjectInfo;
10#[cfg(feature = "processing")]
11use {
12 crate::envelope::ContentType,
13 crate::services::outcome::{DiscardReason, Outcome},
14 crate::services::processor::ProfileChunkGroup,
15 relay_config::Config,
16 relay_dynamic_config::GlobalConfig,
17 relay_profiling::ProfileError,
18};
19
20pub fn filter<Group>(managed_envelope: &mut TypedEnvelope<Group>, project_info: Arc<ProjectInfo>) {
22 let continuous_profiling_enabled =
23 if project_info.has_feature(Feature::ContinuousProfilingBetaIngest) {
24 project_info.has_feature(Feature::ContinuousProfilingBeta)
25 } else {
26 project_info.has_feature(Feature::ContinuousProfiling)
27 };
28 managed_envelope.retain_items(|item| match item.ty() {
29 ItemType::ProfileChunk if !continuous_profiling_enabled => ItemAction::DropSilently,
30 _ => ItemAction::Keep,
31 });
32}
33
34#[cfg(feature = "processing")]
36pub fn process(
37 managed_envelope: &mut TypedEnvelope<ProfileChunkGroup>,
38 project_info: &ProjectInfo,
39 global_config: &GlobalConfig,
40 config: &Config,
41) {
42 let client_ip = managed_envelope.envelope().meta().client_addr();
43 let filter_settings = &project_info.config.filter_settings;
44
45 let continuous_profiling_enabled =
46 if project_info.has_feature(Feature::ContinuousProfilingBetaIngest) {
47 project_info.has_feature(Feature::ContinuousProfilingBeta)
48 } else {
49 project_info.has_feature(Feature::ContinuousProfiling)
50 };
51
52 managed_envelope.retain_items(|item| match item.ty() {
53 ItemType::ProfileChunk => {
54 if !continuous_profiling_enabled {
55 return ItemAction::DropSilently;
56 }
57
58 let chunk = match relay_profiling::ProfileChunk::new(item.payload()) {
59 Ok(chunk) => chunk,
60 Err(err) => return error_to_action(err),
61 };
62
63 match item.profile_type() {
72 Some(profile_type) => {
73 if profile_type != chunk.profile_type() {
76 return error_to_action(relay_profiling::ProfileError::InvalidProfileType);
77 }
78 }
79 None => {
80 item.set_profile_type(chunk.profile_type());
83 }
84 }
85
86 if let Err(err) = chunk.filter(client_ip, filter_settings, global_config) {
87 return error_to_action(err);
88 }
89
90 let payload = match chunk.expand() {
91 Ok(expanded) => expanded,
92 Err(err) => return error_to_action(err),
93 };
94
95 if payload.len() > config.max_profile_size() {
96 return error_to_action(relay_profiling::ProfileError::ExceedSizeLimit);
97 }
98
99 item.set_payload(ContentType::Json, payload);
100 ItemAction::Keep
101 }
102 _ => ItemAction::Keep,
103 });
104}
105
106#[cfg(feature = "processing")]
107fn error_to_action(err: ProfileError) -> ItemAction {
108 match err {
109 ProfileError::Filtered(filter_stat_key) => {
110 ItemAction::Drop(Outcome::Filtered(filter_stat_key))
111 }
112 err => ItemAction::Drop(Outcome::Invalid(DiscardReason::Profiling(
113 relay_profiling::discard_reason(err),
114 ))),
115 }
116}