relay_server/services/processor/
profile_chunk.rs

1//! Profile chunks processor code.
2
3use relay_dynamic_config::Feature;
4use std::sync::Arc;
5
6use crate::envelope::ItemType;
7use crate::managed::{ItemAction, TypedEnvelope};
8
9use crate::services::projects::project::ProjectInfo;
10#[cfg(feature = "processing")]
11use {
12    crate::envelope::ContentType,
13    crate::services::outcome::{DiscardReason, Outcome},
14    crate::services::processor::ProfileChunkGroup,
15    relay_config::Config,
16    relay_dynamic_config::GlobalConfig,
17    relay_profiling::ProfileError,
18};
19
20/// Removes profile chunks from the envelope if the feature is not enabled.
21pub fn filter<Group>(managed_envelope: &mut TypedEnvelope<Group>, project_info: Arc<ProjectInfo>) {
22    let continuous_profiling_enabled =
23        if project_info.has_feature(Feature::ContinuousProfilingBetaIngest) {
24            project_info.has_feature(Feature::ContinuousProfilingBeta)
25        } else {
26            project_info.has_feature(Feature::ContinuousProfiling)
27        };
28    managed_envelope.retain_items(|item| match item.ty() {
29        ItemType::ProfileChunk if !continuous_profiling_enabled => ItemAction::DropSilently,
30        _ => ItemAction::Keep,
31    });
32}
33
34/// Processes profile chunks.
35#[cfg(feature = "processing")]
36pub fn process(
37    managed_envelope: &mut TypedEnvelope<ProfileChunkGroup>,
38    project_info: &ProjectInfo,
39    global_config: &GlobalConfig,
40    config: &Config,
41) {
42    let client_ip = managed_envelope.envelope().meta().client_addr();
43    let filter_settings = &project_info.config.filter_settings;
44
45    let continuous_profiling_enabled =
46        if project_info.has_feature(Feature::ContinuousProfilingBetaIngest) {
47            project_info.has_feature(Feature::ContinuousProfilingBeta)
48        } else {
49            project_info.has_feature(Feature::ContinuousProfiling)
50        };
51
52    managed_envelope.retain_items(|item| match item.ty() {
53        ItemType::ProfileChunk => {
54            if !continuous_profiling_enabled {
55                return ItemAction::DropSilently;
56            }
57
58            let chunk = match relay_profiling::ProfileChunk::new(item.payload()) {
59                Ok(chunk) => chunk,
60                Err(err) => return error_to_action(err),
61            };
62
63            // Validate the item inferred profile type with the one from the payload,
64            // or if missing set it.
65            //
66            // This is currently necessary to ensure profile chunks are emitted in the correct
67            // data category, as well as rate limited with the correct data category.
68            //
69            // In the future we plan to make the profile type on the item header a necessity.
70            // For more context see also: <https://github.com/getsentry/relay/pull/4595>.
71            match item.profile_type() {
72                Some(profile_type) => {
73                    // Validate the profile type inferred from the item header (either set before
74                    // or from the platform) against the profile type from the parsed chunk itself.
75                    if profile_type != chunk.profile_type() {
76                        return error_to_action(relay_profiling::ProfileError::InvalidProfileType);
77                    }
78                }
79                None => {
80                    // Important: set the profile type to get outcomes in the correct category,
81                    // if there isn't already one on the profile.
82                    item.set_profile_type(chunk.profile_type());
83                }
84            }
85
86            if let Err(err) = chunk.filter(client_ip, filter_settings, global_config) {
87                return error_to_action(err);
88            }
89
90            let payload = match chunk.expand() {
91                Ok(expanded) => expanded,
92                Err(err) => return error_to_action(err),
93            };
94
95            if payload.len() > config.max_profile_size() {
96                return error_to_action(relay_profiling::ProfileError::ExceedSizeLimit);
97            }
98
99            item.set_payload(ContentType::Json, payload);
100            ItemAction::Keep
101        }
102        _ => ItemAction::Keep,
103    });
104}
105
106#[cfg(feature = "processing")]
107fn error_to_action(err: ProfileError) -> ItemAction {
108    match err {
109        ProfileError::Filtered(filter_stat_key) => {
110            ItemAction::Drop(Outcome::Filtered(filter_stat_key))
111        }
112        err => ItemAction::Drop(Outcome::Invalid(DiscardReason::Profiling(
113            relay_profiling::discard_reason(err),
114        ))),
115    }
116}