relay_server/services/processor/
replay.rsuse std::error::Error;
use std::net::IpAddr;
use std::sync::Arc;
use crate::envelope::{ContentType, ItemType};
use crate::services::outcome::DiscardReason;
use crate::services::processor::{should_filter, ProcessingError, ReplayGroup};
use crate::services::projects::project::ProjectInfo;
use crate::statsd::{RelayCounters, RelayTimers};
use crate::utils::{sample, TypedEnvelope};
use bytes::Bytes;
use relay_base_schema::organization::OrganizationId;
use relay_base_schema::project::ProjectId;
use relay_config::Config;
use relay_dynamic_config::{Feature, GlobalConfig, ProjectConfig};
use relay_event_normalization::replay::{self, ReplayError};
use relay_event_normalization::{GeoIpLookup, RawUserAgentInfo};
use relay_event_schema::processor::{self, ProcessingState};
use relay_event_schema::protocol::{EventId, Replay};
use relay_pii::PiiProcessor;
use relay_protocol::Annotated;
use relay_replays::recording::RecordingScrubber;
use relay_statsd::metric;
use serde::{Deserialize, Serialize};
pub fn process(
managed_envelope: &mut TypedEnvelope<ReplayGroup>,
global_config: &GlobalConfig,
config: Arc<Config>,
project_info: Arc<ProjectInfo>,
geoip_lookup: Option<&GeoIpLookup>,
) -> Result<(), ProcessingError> {
if should_filter(&config, &project_info, Feature::SessionReplay) {
managed_envelope.drop_items_silently();
return Ok(());
}
if project_info.has_feature(Feature::SessionReplayVideoDisabled)
&& count_replay_video_events(managed_envelope) > 0
{
managed_envelope.drop_items_silently();
return Ok(());
}
let rpc = {
let meta = managed_envelope.envelope().meta();
ReplayProcessingConfig {
config: &project_info.config,
global_config,
geoip_lookup,
event_id: managed_envelope.envelope().event_id(),
project_id: project_info.project_id,
organization_id: project_info.organization_id,
client_addr: meta.client_addr(),
user_agent: RawUserAgentInfo {
user_agent: meta.user_agent().map(|s| s.to_owned()),
client_hints: meta.client_hints().clone(),
},
}
};
let mut scrubber = if project_info.has_feature(Feature::SessionReplayRecordingScrubbing) {
let datascrubbing_config = rpc
.config
.datascrubbing_settings
.pii_config()
.map_err(|e| ProcessingError::PiiConfigError(e.clone()))?
.as_ref();
Some(RecordingScrubber::new(
config.max_replay_uncompressed_size(),
rpc.config.pii_config.as_ref(),
datascrubbing_config,
))
} else {
None
};
for item in managed_envelope.envelope_mut().items_mut() {
if project_info.has_feature(Feature::SessionReplayCombinedEnvelopeItems) {
item.set_replay_combined_payload(true);
}
match item.ty() {
ItemType::ReplayEvent => {
let replay_event = handle_replay_event_item(item.payload(), &rpc)?;
item.set_payload(ContentType::Json, replay_event);
}
ItemType::ReplayRecording => {
let replay_recording =
handle_replay_recording_item(item.payload(), scrubber.as_mut(), &rpc)?;
item.set_payload(ContentType::OctetStream, replay_recording);
}
ItemType::ReplayVideo => {
let replay_video =
handle_replay_video_item(item.payload(), scrubber.as_mut(), &rpc)?;
item.set_payload(ContentType::OctetStream, replay_video);
}
_ => {}
}
}
Ok(())
}
#[derive(Debug)]
struct ReplayProcessingConfig<'a> {
pub config: &'a ProjectConfig,
pub global_config: &'a GlobalConfig,
pub geoip_lookup: Option<&'a GeoIpLookup>,
pub event_id: Option<EventId>,
pub project_id: Option<ProjectId>,
pub organization_id: Option<OrganizationId>,
pub client_addr: Option<IpAddr>,
pub user_agent: RawUserAgentInfo<String>,
}
fn handle_replay_event_item(
payload: Bytes,
config: &ReplayProcessingConfig<'_>,
) -> Result<Bytes, ProcessingError> {
match process_replay_event(&payload, config) {
Ok(replay) => {
if let Some(replay_type) = replay.value() {
relay_filter::should_filter(
replay_type,
config.client_addr,
&config.config.filter_settings,
config.global_config.filters(),
)
.map_err(ProcessingError::ReplayFiltered)?;
if let Some(segment_id) = replay_type.segment_id.value() {
if *segment_id > 720 {
metric!(counter(RelayCounters::ReplayExceededSegmentLimit) += 1);
relay_log::debug!(
event_id = ?config.event_id,
project_id = config.project_id.map(|v| v.value()),
organization_id = config.organization_id.map(|o| o.value()),
segment_id = segment_id,
"replay segment-exceeded-limit"
);
}
}
}
match replay.to_json() {
Ok(json) => Ok(json.into_bytes().into()),
Err(error) => {
relay_log::error!(
error = &error as &dyn Error,
event_id = ?config.event_id,
"failed to serialize replay"
);
Ok(payload)
}
}
}
Err(error) => {
relay_log::warn!(
error = &error as &dyn Error,
event_id = ?config.event_id,
project_id = config.project_id.map(|v| v.value()),
organization_id = config.organization_id.map(|o| o.value()),
"invalid replay event"
);
Err(match error {
ReplayError::NoContent => {
ProcessingError::InvalidReplay(DiscardReason::InvalidReplayEventNoPayload)
}
ReplayError::CouldNotScrub(_) => {
ProcessingError::InvalidReplay(DiscardReason::InvalidReplayEventPii)
}
ReplayError::CouldNotParse(_) => {
ProcessingError::InvalidReplay(DiscardReason::InvalidReplayEvent)
}
ReplayError::InvalidPayload(_) => {
ProcessingError::InvalidReplay(DiscardReason::InvalidReplayEvent)
}
})
}
}
}
fn process_replay_event(
payload: &[u8],
config: &ReplayProcessingConfig<'_>,
) -> Result<Annotated<Replay>, ReplayError> {
let mut replay =
Annotated::<Replay>::from_json_bytes(payload).map_err(ReplayError::CouldNotParse)?;
let Some(replay_value) = replay.value_mut() else {
return Err(ReplayError::NoContent);
};
replay::validate(replay_value)?;
replay::normalize(
&mut replay,
config.client_addr,
config.user_agent.as_deref(),
config.geoip_lookup,
);
if let Some(ref config) = config.config.pii_config {
let mut processor = PiiProcessor::new(config.compiled());
processor::process_value(&mut replay, &mut processor, ProcessingState::root())
.map_err(|e| ReplayError::CouldNotScrub(e.to_string()))?;
}
let pii_config = config
.config
.datascrubbing_settings
.pii_config()
.map_err(|e| ReplayError::CouldNotScrub(e.to_string()))?;
if let Some(config) = pii_config {
let mut processor = PiiProcessor::new(config.compiled());
processor::process_value(&mut replay, &mut processor, ProcessingState::root())
.map_err(|e| ReplayError::CouldNotScrub(e.to_string()))?;
}
Ok(replay)
}
fn handle_replay_recording_item(
payload: Bytes,
scrubber: Option<&mut RecordingScrubber>,
config: &ReplayProcessingConfig<'_>,
) -> Result<Bytes, ProcessingError> {
let Some(scrubber) = scrubber else {
return Ok(payload);
};
if scrubber.is_empty() {
return Ok(payload);
}
metric!(timer(RelayTimers::ReplayRecordingProcessing), {
scrubber.process_recording(&payload)
})
.map(Into::into)
.map_err(|error| {
match &error {
relay_replays::recording::ParseRecordingError::Compression(e) => {
if sample(0.001) {
relay_log::with_scope(
move |scope| {
scope.add_attachment(relay_log::protocol::Attachment {
buffer: payload.into(),
filename: "payload".to_owned(),
content_type: Some("application/octet-stream".to_owned()),
ty: None,
});
},
|| {
relay_log::error!(
error = e as &dyn Error,
event_id = ?config.event_id,
project_id = config.project_id.map(|v| v.value()),
organization_id = config.organization_id.map(|o| o.value()),
"ParseRecordingError::Compression"
)
},
);
}
}
relay_replays::recording::ParseRecordingError::Message(e) => {
relay_log::with_scope(
move |scope| {
scope.add_attachment(relay_log::protocol::Attachment {
buffer: payload.into(),
filename: "payload".to_owned(),
content_type: Some("application/octet-stream".to_owned()),
ty: None,
});
},
|| {
relay_log::error!(
error = e,
event_id = ?config.event_id,
project_id = config.project_id.map(|v| v.value()),
organization_id = config.organization_id.map(|o| o.value()),
"ParseRecordingError::Message"
)
},
);
}
_ => (),
};
ProcessingError::InvalidReplay(DiscardReason::InvalidReplayRecordingEvent)
})
}
#[derive(Debug, Deserialize, Serialize)]
struct ReplayVideoEvent {
replay_event: Bytes,
replay_recording: Bytes,
replay_video: Bytes,
}
fn handle_replay_video_item(
payload: Bytes,
scrubber: Option<&mut RecordingScrubber>,
config: &ReplayProcessingConfig<'_>,
) -> Result<Bytes, ProcessingError> {
let ReplayVideoEvent {
replay_event,
replay_recording,
replay_video,
} = rmp_serde::from_slice(&payload)
.map_err(|_| ProcessingError::InvalidReplay(DiscardReason::InvalidReplayVideoEvent))?;
let replay_event = handle_replay_event_item(replay_event, config)?;
let replay_recording = handle_replay_recording_item(replay_recording, scrubber, config)?;
if replay_video.is_empty() {
return Err(ProcessingError::InvalidReplay(
DiscardReason::InvalidReplayVideoEvent,
));
}
match rmp_serde::to_vec_named(&ReplayVideoEvent {
replay_event,
replay_recording,
replay_video,
}) {
Ok(payload) => Ok(payload.into()),
Err(_) => Err(ProcessingError::InvalidReplay(
DiscardReason::InvalidReplayVideoEvent,
)),
}
}
fn count_replay_video_events(managed_envelope: &mut TypedEnvelope<ReplayGroup>) -> usize {
managed_envelope
.envelope()
.items()
.filter(|item| item.ty() == &ItemType::ReplayVideo)
.count()
}