relay_server/services/processor/
replay.rs1use std::error::Error;
3use std::net::IpAddr;
4
5use crate::envelope::{ContentType, ItemType};
6use crate::managed::TypedEnvelope;
7use crate::services::outcome::DiscardReason;
8use crate::services::processor::{ProcessingError, ReplayGroup, should_filter};
9use crate::services::projects::project::ProjectInfo;
10use crate::statsd::{RelayCounters, RelayTimers};
11use crate::utils::sample;
12
13use bytes::Bytes;
14use relay_base_schema::organization::OrganizationId;
15use relay_base_schema::project::ProjectId;
16use relay_config::Config;
17use relay_dynamic_config::{Feature, GlobalConfig, ProjectConfig};
18use relay_event_normalization::replay::{self, ReplayError};
19use relay_event_normalization::{GeoIpLookup, RawUserAgentInfo};
20use relay_event_schema::processor::{self, ProcessingState};
21use relay_event_schema::protocol::{EventId, Replay};
22use relay_pii::PiiProcessor;
23use relay_protocol::Annotated;
24use relay_replays::recording::RecordingScrubber;
25use relay_statsd::metric;
26use serde::{Deserialize, Serialize};
27
28pub fn process(
30 managed_envelope: &mut TypedEnvelope<ReplayGroup>,
31 global_config: &GlobalConfig,
32 config: &Config,
33 project_info: &ProjectInfo,
34 geoip_lookup: Option<&GeoIpLookup>,
35) -> Result<(), ProcessingError> {
36 if should_filter(config, project_info, Feature::SessionReplay) {
38 managed_envelope.drop_items_silently();
39 return Ok(());
40 }
41
42 if project_info.has_feature(Feature::SessionReplayVideoDisabled)
45 && count_replay_video_events(managed_envelope) > 0
46 {
47 managed_envelope.drop_items_silently();
48 return Ok(());
49 }
50
51 let rpc = {
52 let meta = managed_envelope.envelope().meta();
53
54 ReplayProcessingConfig {
55 config: &project_info.config,
56 global_config,
57 geoip_lookup,
58 event_id: managed_envelope.envelope().event_id(),
59 project_id: project_info.project_id,
60 organization_id: project_info.organization_id,
61 client_addr: meta.client_addr(),
62 user_agent: RawUserAgentInfo {
63 user_agent: meta.user_agent().map(|s| s.to_owned()),
64 client_hints: meta.client_hints().clone(),
65 },
66 }
67 };
68
69 let mut scrubber = if project_info.has_feature(Feature::SessionReplayRecordingScrubbing) {
70 let datascrubbing_config = rpc
71 .config
72 .datascrubbing_settings
73 .pii_config()
74 .map_err(|e| ProcessingError::PiiConfigError(e.clone()))?
75 .as_ref();
76
77 Some(RecordingScrubber::new(
78 config.max_replay_uncompressed_size(),
79 rpc.config.pii_config.as_ref(),
80 datascrubbing_config,
81 ))
82 } else {
83 None
84 };
85
86 for item in managed_envelope.envelope_mut().items_mut() {
87 match item.ty() {
88 ItemType::ReplayEvent => {
89 let replay_event = handle_replay_event_item(item.payload(), &rpc)?;
90 item.set_payload(ContentType::Json, replay_event);
91 }
92 ItemType::ReplayRecording => {
93 let replay_recording =
94 handle_replay_recording_item(item.payload(), scrubber.as_mut(), &rpc)?;
95 item.set_payload(ContentType::OctetStream, replay_recording);
96 }
97 ItemType::ReplayVideo => {
98 let replay_video =
99 handle_replay_video_item(item.payload(), scrubber.as_mut(), &rpc)?;
100 item.set_payload(ContentType::OctetStream, replay_video);
101 }
102 _ => {}
103 }
104 }
105
106 Ok(())
107}
108
109#[derive(Debug)]
110struct ReplayProcessingConfig<'a> {
111 pub config: &'a ProjectConfig,
112 pub global_config: &'a GlobalConfig,
113 pub geoip_lookup: Option<&'a GeoIpLookup>,
114 pub event_id: Option<EventId>,
115 pub project_id: Option<ProjectId>,
116 pub organization_id: Option<OrganizationId>,
117 pub client_addr: Option<IpAddr>,
118 pub user_agent: RawUserAgentInfo<String>,
119}
120
121fn handle_replay_event_item(
124 payload: Bytes,
125 config: &ReplayProcessingConfig<'_>,
126) -> Result<Bytes, ProcessingError> {
127 match process_replay_event(&payload, config) {
128 Ok(replay) => {
129 if let Some(replay_type) = replay.value() {
130 relay_filter::should_filter(
131 replay_type,
132 config.client_addr,
133 &config.config.filter_settings,
134 config.global_config.filters(),
135 )
136 .map_err(ProcessingError::ReplayFiltered)?;
137
138 if let Some(segment_id) = replay_type.segment_id.value() {
141 if *segment_id > 720 {
142 metric!(counter(RelayCounters::ReplayExceededSegmentLimit) += 1);
143
144 relay_log::debug!(
145 event_id = ?config.event_id,
146 project_id = config.project_id.map(|v| v.value()),
147 organization_id = config.organization_id.map(|o| o.value()),
148 segment_id = segment_id,
149 "replay segment-exceeded-limit"
150 );
151 }
152 }
153 }
154
155 match replay.to_json() {
156 Ok(json) => Ok(json.into_bytes().into()),
157 Err(error) => {
158 relay_log::error!(
159 error = &error as &dyn Error,
160 event_id = ?config.event_id,
161 "failed to serialize replay"
162 );
163 Ok(payload)
164 }
165 }
166 }
167 Err(error) => {
168 relay_log::debug!(
169 error = &error as &dyn Error,
170 event_id = ?config.event_id,
171 project_id = config.project_id.map(|v| v.value()),
172 organization_id = config.organization_id.map(|o| o.value()),
173 "invalid replay event"
174 );
175 Err(match error {
176 ReplayError::NoContent => {
177 ProcessingError::InvalidReplay(DiscardReason::InvalidReplayEventNoPayload)
178 }
179 ReplayError::CouldNotScrub(_) => {
180 ProcessingError::InvalidReplay(DiscardReason::InvalidReplayEventPii)
181 }
182 ReplayError::CouldNotParse(_) => {
183 ProcessingError::InvalidReplay(DiscardReason::InvalidReplayEvent)
184 }
185 ReplayError::InvalidPayload(_) => {
186 ProcessingError::InvalidReplay(DiscardReason::InvalidReplayEvent)
187 }
188 })
189 }
190 }
191}
192
193fn process_replay_event(
195 payload: &[u8],
196 config: &ReplayProcessingConfig<'_>,
197) -> Result<Annotated<Replay>, ReplayError> {
198 let mut replay =
199 Annotated::<Replay>::from_json_bytes(payload).map_err(ReplayError::CouldNotParse)?;
200
201 let Some(replay_value) = replay.value_mut() else {
202 return Err(ReplayError::NoContent);
203 };
204
205 replay::validate(replay_value)?;
206 replay::normalize(
207 &mut replay,
208 config.client_addr,
209 config.user_agent.as_deref(),
210 config.geoip_lookup,
211 );
212
213 if let Some(ref config) = config.config.pii_config {
214 let mut processor = PiiProcessor::new(config.compiled());
215 processor::process_value(&mut replay, &mut processor, ProcessingState::root())
216 .map_err(|e| ReplayError::CouldNotScrub(e.to_string()))?;
217 }
218
219 let pii_config = config
220 .config
221 .datascrubbing_settings
222 .pii_config()
223 .map_err(|e| ReplayError::CouldNotScrub(e.to_string()))?;
224
225 if let Some(config) = pii_config {
226 let mut processor = PiiProcessor::new(config.compiled());
227 processor::process_value(&mut replay, &mut processor, ProcessingState::root())
228 .map_err(|e| ReplayError::CouldNotScrub(e.to_string()))?;
229 }
230
231 Ok(replay)
232}
233
234fn handle_replay_recording_item(
237 payload: Bytes,
238 scrubber: Option<&mut RecordingScrubber>,
239 config: &ReplayProcessingConfig<'_>,
240) -> Result<Bytes, ProcessingError> {
241 let Some(scrubber) = scrubber else {
244 return Ok(payload);
245 };
246 if scrubber.is_empty() {
247 return Ok(payload);
248 }
249
250 metric!(timer(RelayTimers::ReplayRecordingProcessing), {
255 scrubber.process_recording(&payload)
256 })
257 .map(Into::into)
258 .map_err(|error| {
259 match &error {
260 relay_replays::recording::ParseRecordingError::Compression(e) => {
261 if sample(0.001).is_keep() {
263 relay_log::with_scope(
264 move |scope| {
265 scope.add_attachment(relay_log::protocol::Attachment {
266 buffer: payload.into(),
267 filename: "payload".to_owned(),
268 content_type: Some("application/octet-stream".to_owned()),
269 ty: None,
270 });
271 },
272 || {
273 relay_log::error!(
274 error = e as &dyn Error,
275 event_id = ?config.event_id,
276 project_id = config.project_id.map(|v| v.value()),
277 organization_id = config.organization_id.map(|o| o.value()),
278 "ParseRecordingError::Compression"
279 )
280 },
281 );
282 }
283 }
284 relay_replays::recording::ParseRecordingError::Message(e) => {
285 relay_log::with_scope(
287 move |scope| {
288 scope.add_attachment(relay_log::protocol::Attachment {
289 buffer: payload.into(),
290 filename: "payload".to_owned(),
291 content_type: Some("application/octet-stream".to_owned()),
292 ty: None,
293 });
294 },
295 || {
296 relay_log::error!(
297 error = e,
298 event_id = ?config.event_id,
299 project_id = config.project_id.map(|v| v.value()),
300 organization_id = config.organization_id.map(|o| o.value()),
301 "ParseRecordingError::Message"
302 )
303 },
304 );
305 }
306 _ => (),
307 };
308 ProcessingError::InvalidReplay(DiscardReason::InvalidReplayRecordingEvent)
309 })
310}
311
312#[derive(Debug, Deserialize, Serialize)]
315struct ReplayVideoEvent {
316 replay_event: Bytes,
317 replay_recording: Bytes,
318 replay_video: Bytes,
319}
320
321fn handle_replay_video_item(
322 payload: Bytes,
323 scrubber: Option<&mut RecordingScrubber>,
324 config: &ReplayProcessingConfig<'_>,
325) -> Result<Bytes, ProcessingError> {
326 let ReplayVideoEvent {
327 replay_event,
328 replay_recording,
329 replay_video,
330 } = rmp_serde::from_slice(&payload)
331 .map_err(|_| ProcessingError::InvalidReplay(DiscardReason::InvalidReplayVideoEvent))?;
332
333 let replay_event = handle_replay_event_item(replay_event, config)?;
335
336 let replay_recording = handle_replay_recording_item(replay_recording, scrubber, config)?;
338
339 if replay_video.is_empty() {
341 return Err(ProcessingError::InvalidReplay(
342 DiscardReason::InvalidReplayVideoEvent,
343 ));
344 }
345
346 match rmp_serde::to_vec_named(&ReplayVideoEvent {
347 replay_event,
348 replay_recording,
349 replay_video,
350 }) {
351 Ok(payload) => Ok(payload.into()),
352 Err(_) => Err(ProcessingError::InvalidReplay(
353 DiscardReason::InvalidReplayVideoEvent,
354 )),
355 }
356}
357
358fn count_replay_video_events(managed_envelope: &mut TypedEnvelope<ReplayGroup>) -> usize {
361 managed_envelope
362 .envelope()
363 .items()
364 .filter(|item| item.ty() == &ItemType::ReplayVideo)
365 .count()
366}