relay_server/services/processor/
profile_chunk.rs

1//! Profile chunks processor code.
2
3use relay_dynamic_config::Feature;
4
5use crate::envelope::ItemType;
6use crate::managed::{ItemAction, TypedEnvelope};
7
8use crate::services::projects::project::ProjectInfo;
9#[cfg(feature = "processing")]
10use {
11    crate::envelope::ContentType,
12    crate::services::outcome::{DiscardReason, Outcome},
13    crate::services::processor::ProfileChunkGroup,
14    relay_config::Config,
15    relay_dynamic_config::GlobalConfig,
16    relay_profiling::ProfileError,
17};
18
19/// Removes profile chunks from the envelope if the feature is not enabled.
20pub fn filter<Group>(managed_envelope: &mut TypedEnvelope<Group>, project_info: &ProjectInfo) {
21    let continuous_profiling_enabled =
22        if project_info.has_feature(Feature::ContinuousProfilingBetaIngest) {
23            project_info.has_feature(Feature::ContinuousProfilingBeta)
24        } else {
25            project_info.has_feature(Feature::ContinuousProfiling)
26        };
27    managed_envelope.retain_items(|item| match item.ty() {
28        ItemType::ProfileChunk if !continuous_profiling_enabled => ItemAction::DropSilently,
29        _ => ItemAction::Keep,
30    });
31}
32
33/// Processes profile chunks.
34#[cfg(feature = "processing")]
35pub fn process(
36    managed_envelope: &mut TypedEnvelope<ProfileChunkGroup>,
37    project_info: &ProjectInfo,
38    global_config: &GlobalConfig,
39    config: &Config,
40) {
41    let client_ip = managed_envelope.envelope().meta().client_addr();
42    let filter_settings = &project_info.config.filter_settings;
43
44    let continuous_profiling_enabled =
45        if project_info.has_feature(Feature::ContinuousProfilingBetaIngest) {
46            project_info.has_feature(Feature::ContinuousProfilingBeta)
47        } else {
48            project_info.has_feature(Feature::ContinuousProfiling)
49        };
50
51    managed_envelope.retain_items(|item| match item.ty() {
52        ItemType::ProfileChunk => {
53            if !continuous_profiling_enabled {
54                return ItemAction::DropSilently;
55            }
56
57            let chunk = match relay_profiling::ProfileChunk::new(item.payload()) {
58                Ok(chunk) => chunk,
59                Err(err) => return error_to_action(err),
60            };
61
62            // Validate the item inferred profile type with the one from the payload,
63            // or if missing set it.
64            //
65            // This is currently necessary to ensure profile chunks are emitted in the correct
66            // data category, as well as rate limited with the correct data category.
67            //
68            // In the future we plan to make the profile type on the item header a necessity.
69            // For more context see also: <https://github.com/getsentry/relay/pull/4595>.
70            match item.profile_type() {
71                Some(profile_type) => {
72                    // Validate the profile type inferred from the item header (either set before
73                    // or from the platform) against the profile type from the parsed chunk itself.
74                    if profile_type != chunk.profile_type() {
75                        return error_to_action(relay_profiling::ProfileError::InvalidProfileType);
76                    }
77                }
78                None => {
79                    // Important: set the profile type to get outcomes in the correct category,
80                    // if there isn't already one on the profile.
81                    item.set_profile_type(chunk.profile_type());
82                }
83            }
84
85            if let Err(err) = chunk.filter(client_ip, filter_settings, global_config) {
86                return error_to_action(err);
87            }
88
89            let payload = match chunk.expand() {
90                Ok(expanded) => expanded,
91                Err(err) => return error_to_action(err),
92            };
93
94            if payload.len() > config.max_profile_size() {
95                return error_to_action(relay_profiling::ProfileError::ExceedSizeLimit);
96            }
97
98            item.set_payload(ContentType::Json, payload);
99            ItemAction::Keep
100        }
101        _ => ItemAction::Keep,
102    });
103}
104
105#[cfg(feature = "processing")]
106fn error_to_action(err: ProfileError) -> ItemAction {
107    match err {
108        ProfileError::Filtered(filter_stat_key) => {
109            ItemAction::Drop(Outcome::Filtered(filter_stat_key))
110        }
111        err => ItemAction::Drop(Outcome::Invalid(DiscardReason::Profiling(
112            relay_profiling::discard_reason(err),
113        ))),
114    }
115}