use relay_dynamic_config::{Feature, GlobalConfig};
use std::net::IpAddr;
use std::sync::Arc;
use relay_base_schema::events::EventType;
use relay_base_schema::project::ProjectId;
use relay_config::Config;
use relay_event_schema::protocol::{Contexts, Event, ProfileContext};
use relay_filter::ProjectFiltersConfig;
use relay_profiling::{ProfileError, ProfileId};
use relay_protocol::Annotated;
use crate::envelope::{ContentType, Item, ItemType};
use crate::services::outcome::{DiscardReason, Outcome};
use crate::services::processor::{event_type, should_filter, TransactionGroup};
use crate::services::projects::project::ProjectInfo;
use crate::utils::{ItemAction, TypedEnvelope};
pub fn filter<Group>(
managed_envelope: &mut TypedEnvelope<Group>,
event: &Annotated<Event>,
config: Arc<Config>,
project_id: ProjectId,
project_info: Arc<ProjectInfo>,
) -> Option<ProfileId> {
let profiling_disabled = should_filter(&config, &project_info, Feature::Profiling);
let has_transaction = event_type(event) == Some(EventType::Transaction);
let keep_unsampled_profiles = true;
let mut profile_id = None;
managed_envelope.retain_items(|item| match item.ty() {
ItemType::Profile if profile_id.is_none() => {
if profiling_disabled {
return ItemAction::DropSilently;
}
let profile_allowed = has_transaction || (keep_unsampled_profiles && !item.sampled());
if !profile_allowed {
return ItemAction::DropSilently;
}
match relay_profiling::parse_metadata(&item.payload(), project_id) {
Ok(id) => {
profile_id = Some(id);
ItemAction::Keep
}
Err(err) => ItemAction::Drop(Outcome::Invalid(DiscardReason::Profiling(
relay_profiling::discard_reason(err),
))),
}
}
ItemType::Profile => ItemAction::Drop(Outcome::Invalid(DiscardReason::Profiling(
relay_profiling::discard_reason(ProfileError::TooManyProfiles),
))),
_ => ItemAction::Keep,
});
profile_id
}
pub fn transfer_id(event: &mut Annotated<Event>, profile_id: Option<ProfileId>) {
let Some(event) = event.value_mut() else {
return;
};
match profile_id {
Some(profile_id) => {
let contexts = event.contexts.get_or_insert_with(Contexts::new);
contexts.add(ProfileContext {
profile_id: Annotated::new(profile_id),
..ProfileContext::default()
});
}
None => {
if let Some(contexts) = event.contexts.value_mut() {
if let Some(profile_context) = contexts.get_mut::<ProfileContext>() {
profile_context.profile_id = Annotated::empty();
}
}
}
}
}
pub fn process(
managed_envelope: &mut TypedEnvelope<TransactionGroup>,
event: &mut Annotated<Event>,
global_config: &GlobalConfig,
config: Arc<Config>,
project_info: Arc<ProjectInfo>,
) -> Option<ProfileId> {
let client_ip = managed_envelope.envelope().meta().client_addr();
let filter_settings = &project_info.config.filter_settings;
let profiling_enabled = project_info.has_feature(Feature::Profiling);
let mut profile_id = None;
managed_envelope.retain_items(|item| match item.ty() {
ItemType::Profile => {
if !profiling_enabled {
return ItemAction::DropSilently;
}
let Some(event) = event.value() else {
return ItemAction::DropSilently;
};
match expand_profile(
item,
event,
&config,
client_ip,
filter_settings,
global_config,
) {
Ok(id) => {
profile_id = Some(id);
ItemAction::Keep
}
Err(outcome) => ItemAction::Drop(outcome),
}
}
_ => ItemAction::Keep,
});
profile_id
}
fn expand_profile(
item: &mut Item,
event: &Event,
config: &Config,
client_ip: Option<IpAddr>,
filter_settings: &ProjectFiltersConfig,
global_config: &GlobalConfig,
) -> Result<ProfileId, Outcome> {
match relay_profiling::expand_profile(
&item.payload(),
event,
client_ip,
filter_settings,
global_config,
) {
Ok((id, payload)) => {
if payload.len() <= config.max_profile_size() {
item.set_payload(ContentType::Json, payload);
Ok(id)
} else {
Err(Outcome::Invalid(DiscardReason::Profiling(
relay_profiling::discard_reason(relay_profiling::ProfileError::ExceedSizeLimit),
)))
}
}
Err(relay_profiling::ProfileError::Filtered(filter_stat_key)) => {
Err(Outcome::Filtered(filter_stat_key))
}
Err(err) => Err(Outcome::Invalid(DiscardReason::Profiling(
relay_profiling::discard_reason(err),
))),
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
#[cfg(feature = "processing")]
use insta::assert_debug_snapshot;
#[cfg(not(feature = "processing"))]
use relay_dynamic_config::Feature;
use relay_event_schema::protocol::EventId;
use relay_sampling::evaluation::ReservoirCounters;
use relay_system::Addr;
use crate::envelope::Envelope;
use crate::extractors::RequestMeta;
use crate::services::processor::{ProcessEnvelope, ProcessingGroup};
use crate::services::projects::project::ProjectInfo;
use crate::testutils::create_test_processor;
use crate::utils::ManagedEnvelope;
use super::*;
#[cfg(feature = "processing")]
#[tokio::test]
async fn test_profile_id_transfered() {
let config = Config::from_json_value(serde_json::json!({
"processing": {
"enabled": true,
"kafka_config": []
}
}))
.unwrap();
let processor = create_test_processor(config).await;
let event_id = EventId::new();
let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
.parse()
.unwrap();
let request_meta = RequestMeta::new(dsn);
let mut envelope = Envelope::from_request(Some(event_id), request_meta);
envelope.add_item({
let mut item = Item::new(ItemType::Transaction);
item.set_payload(
ContentType::Json,
r#"{
"event_id": "9b73438f70e044ecbd006b7fd15b7373",
"type": "transaction",
"transaction": "/foo/",
"timestamp": 946684810.0,
"start_timestamp": 946684800.0,
"contexts": {
"trace": {
"trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
"span_id": "fa90fdead5f74053",
"op": "http.server",
"type": "trace"
}
},
"transaction_info": {
"source": "url"
}
}"#,
);
item
});
envelope.add_item({
let mut item = Item::new(ItemType::Profile);
item.set_payload(
ContentType::Json,
r#"{
"profile_id": "012d836b15bb49d7bbf99e64295d995b",
"version": "1",
"platform": "android",
"os": {"name": "foo", "version": "bar"},
"device": {"architecture": "zap"},
"timestamp": "2023-10-10 00:00:00Z",
"profile": {
"samples":[
{
"stack_id":0,
"elapsed_since_start_ns":1,
"thread_id":1
},
{
"stack_id":0,
"elapsed_since_start_ns":2,
"thread_id":1
}
],
"stacks":[[0]],
"frames":[{
"function":"main"
}]
},
"transactions": [
{
"id": "9b73438f70e044ecbd006b7fd15b7373",
"name": "/foo/",
"trace_id": "4c79f60c11214eb38604f4ae0781bfb2"
}
]
}"#,
);
item
});
let mut project_state = ProjectInfo::default();
project_state.config.features.0.insert(Feature::Profiling);
let mut envelopes = ProcessingGroup::split_envelope(*envelope);
assert_eq!(envelopes.len(), 1);
let (group, envelope) = envelopes.pop().unwrap();
let envelope = ManagedEnvelope::new(envelope, Addr::dummy(), Addr::dummy(), group);
let message = ProcessEnvelope {
envelope,
project_info: Arc::new(project_state),
rate_limits: Default::default(),
sampling_project_info: None,
reservoir_counters: ReservoirCounters::default(),
};
let envelope_response = processor.process(message).unwrap();
let ctx = envelope_response.envelope.unwrap();
let new_envelope = ctx.envelope();
let item = new_envelope
.get_item_by(|item| item.ty() == &ItemType::Transaction)
.unwrap();
let transaction = Annotated::<Event>::from_json_bytes(&item.payload()).unwrap();
let context = transaction
.value()
.unwrap()
.context::<ProfileContext>()
.unwrap();
assert_debug_snapshot!(context, @r###"
ProfileContext {
profile_id: EventId(
012d836b-15bb-49d7-bbf9-9e64295d995b,
),
profiler_id: ~,
}
"###);
}
#[cfg(feature = "processing")]
#[tokio::test]
async fn test_invalid_profile_id_not_transfered() {
let config = Config::from_json_value(serde_json::json!({
"processing": {
"enabled": true,
"kafka_config": []
}
}))
.unwrap();
let processor = create_test_processor(config).await;
let event_id = EventId::new();
let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
.parse()
.unwrap();
let request_meta = RequestMeta::new(dsn);
let mut envelope = Envelope::from_request(Some(event_id), request_meta);
envelope.add_item({
let mut item = Item::new(ItemType::Transaction);
item.set_payload(
ContentType::Json,
r#"{
"event_id": "9b73438f70e044ecbd006b7fd15b7373",
"type": "transaction",
"transaction": "/foo/",
"timestamp": 946684810.0,
"start_timestamp": 946684800.0,
"contexts": {
"trace": {
"trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
"span_id": "fa90fdead5f74053",
"op": "http.server",
"type": "trace"
}
},
"transaction_info": {
"source": "url"
}
}"#,
);
item
});
envelope.add_item({
let mut item = Item::new(ItemType::Profile);
item.set_payload(
ContentType::Json,
r#"{
"profile_id": "012d836b15bb49d7bbf99e64295d995b",
"version": "1",
"platform": "android",
"os": {"name": "foo", "version": "bar"},
"device": {"architecture": "zap"},
"timestamp": "2023-10-10 00:00:00Z",
"profile": {
"samples":[
{
"stack_id":0,
"elapsed_since_start_ns":1,
"thread_id":1
},
{
"stack_id":1,
"elapsed_since_start_ns":2,
"thread_id":1
}
],
"stacks":[[0],[]],
"frames":[{
"function":"main"
}]
},
"transactions": [
{
"id": "9b73438f70e044ecbd006b7fd15b7373",
"name": "/foo/",
"trace_id": "4c79f60c11214eb38604f4ae0781bfb2"
}
]
}"#,
);
item
});
let mut project_info = ProjectInfo::default();
project_info.config.features.0.insert(Feature::Profiling);
let mut envelopes = ProcessingGroup::split_envelope(*envelope);
assert_eq!(envelopes.len(), 1);
let (group, envelope) = envelopes.pop().unwrap();
let envelope = ManagedEnvelope::new(envelope, Addr::dummy(), Addr::dummy(), group);
let message = ProcessEnvelope {
envelope,
project_info: Arc::new(project_info),
rate_limits: Default::default(),
sampling_project_info: None,
reservoir_counters: ReservoirCounters::default(),
};
let envelope_response = processor.process(message).unwrap();
let ctx = envelope_response.envelope.unwrap();
let new_envelope = ctx.envelope();
let item = new_envelope
.get_item_by(|item| item.ty() == &ItemType::Transaction)
.unwrap();
let transaction = Annotated::<Event>::from_json_bytes(&item.payload()).unwrap();
let context = transaction
.value()
.unwrap()
.context::<ProfileContext>()
.unwrap();
assert_debug_snapshot!(context, @r###"
ProfileContext {
profile_id: ~,
profiler_id: ~,
}
"###);
}
#[tokio::test]
async fn filter_standalone_profile() {
relay_log::init_test!();
let processor = create_test_processor(Default::default()).await;
let event_id = EventId::new();
let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
.parse()
.unwrap();
let request_meta = RequestMeta::new(dsn);
let mut envelope = Envelope::from_request(Some(event_id), request_meta);
envelope.add_item({
let mut item = Item::new(ItemType::Profile);
item.set_payload(
ContentType::Json,
r#"{
"profile_id": "012d836b15bb49d7bbf99e64295d995b",
"version": "1",
"platform": "android",
"os": {"name": "foo", "version": "bar"},
"device": {"architecture": "zap"},
"timestamp": "2023-10-10 00:00:00Z"
}"#,
);
item
});
let mut project_state = ProjectInfo::default();
project_state.config.features.0.insert(Feature::Profiling);
let mut envelopes = ProcessingGroup::split_envelope(*envelope);
assert_eq!(envelopes.len(), 1);
let (group, envelope) = envelopes.pop().unwrap();
let envelope = ManagedEnvelope::new(envelope.clone(), Addr::dummy(), Addr::dummy(), group);
let message = ProcessEnvelope {
envelope,
project_info: Arc::new(project_state),
rate_limits: Default::default(),
sampling_project_info: None,
reservoir_counters: ReservoirCounters::default(),
};
let envelope_response = processor.process(message).unwrap();
assert!(envelope_response.envelope.is_none());
}
#[cfg(feature = "processing")]
#[tokio::test]
async fn test_profile_id_removed_profiler_id_kept() {
let config = Config::from_json_value(serde_json::json!({
"processing": {
"enabled": true,
"kafka_config": []
}
}))
.unwrap();
let processor = create_test_processor(config).await;
let event_id = EventId::new();
let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
.parse()
.unwrap();
let request_meta = RequestMeta::new(dsn);
let mut envelope = Envelope::from_request(Some(event_id), request_meta);
envelope.add_item({
let mut item = Item::new(ItemType::Transaction);
item.set_payload(
ContentType::Json,
r#"{
"type": "transaction",
"transaction": "/foo/",
"timestamp": 946684810.0,
"start_timestamp": 946684800.0,
"contexts": {
"trace": {
"trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
"span_id": "fa90fdead5f74053",
"op": "http.server",
"type": "trace"
},
"profile": {
"profile_id": "4c79f60c11214eb38604f4ae0781bfb2",
"profiler_id": "4c79f60c11214eb38604f4ae0781bfb2",
"type": "profile"
}
},
"transaction_info": {
"source": "url"
}
}"#,
);
item
});
let mut project_state = ProjectInfo::default();
project_state.config.features.0.insert(Feature::Profiling);
let mut envelopes = ProcessingGroup::split_envelope(*envelope);
assert_eq!(envelopes.len(), 1);
let (group, envelope) = envelopes.pop().unwrap();
let envelope = ManagedEnvelope::new(envelope, Addr::dummy(), Addr::dummy(), group);
let message = ProcessEnvelope {
envelope,
project_info: Arc::new(project_state),
rate_limits: Default::default(),
sampling_project_info: None,
reservoir_counters: ReservoirCounters::default(),
};
let envelope_response = processor.process(message).unwrap();
let ctx = envelope_response.envelope.unwrap();
let new_envelope = ctx.envelope();
let item = new_envelope
.get_item_by(|item| item.ty() == &ItemType::Transaction)
.unwrap();
let transaction = Annotated::<Event>::from_json_bytes(&item.payload()).unwrap();
let context = transaction
.value()
.unwrap()
.context::<ProfileContext>()
.unwrap();
assert_debug_snapshot!(context, @r###"
ProfileContext {
profile_id: ~,
profiler_id: EventId(
4c79f60c-1121-4eb3-8604-f4ae0781bfb2,
),
}
"###);
}
}