relay_server/services/processor/
replay.rs1use std::error::Error;
3use std::net::IpAddr;
4
5use crate::envelope::{ContentType, ItemType};
6use crate::managed::TypedEnvelope;
7use crate::services::outcome::DiscardReason;
8use crate::services::processor::{ProcessingError, ReplayGroup, should_filter};
9use crate::services::projects::project::ProjectInfo;
10use crate::statsd::{RelayCounters, RelayTimers};
11use crate::utils::sample;
12
13use bytes::Bytes;
14use relay_base_schema::organization::OrganizationId;
15use relay_base_schema::project::ProjectId;
16use relay_config::Config;
17use relay_dynamic_config::{Feature, GlobalConfig, ProjectConfig};
18use relay_event_normalization::replay::{self, ReplayError};
19use relay_event_normalization::{GeoIpLookup, RawUserAgentInfo};
20use relay_event_schema::processor::{self, ProcessingState};
21use relay_event_schema::protocol::{EventId, Replay};
22use relay_pii::PiiProcessor;
23use relay_protocol::Annotated;
24use relay_replays::recording::RecordingScrubber;
25use relay_statsd::metric;
26use serde::{Deserialize, Serialize};
27
28pub fn process(
30 managed_envelope: &mut TypedEnvelope<ReplayGroup>,
31 global_config: &GlobalConfig,
32 config: &Config,
33 project_info: &ProjectInfo,
34 geoip_lookup: &GeoIpLookup,
35) -> Result<(), ProcessingError> {
36 if should_filter(config, project_info, Feature::SessionReplay) {
38 managed_envelope.drop_items_silently();
39 return Ok(());
40 }
41
42 if project_info.has_feature(Feature::SessionReplayVideoDisabled)
45 && count_replay_video_events(managed_envelope) > 0
46 {
47 managed_envelope.drop_items_silently();
48 return Ok(());
49 }
50
51 let meta = managed_envelope.envelope().meta();
52 let user_agent = RawUserAgentInfo {
53 user_agent: meta.user_agent().map(String::from),
54 client_hints: meta.client_hints().to_owned(),
55 };
56
57 let rpc = ReplayProcessingConfig {
58 config: &project_info.config,
59 global_config,
60 geoip_lookup,
61 event_id: managed_envelope.envelope().event_id(),
62 project_id: project_info.project_id,
63 organization_id: project_info.organization_id,
64 client_addr: meta.client_addr(),
65 user_agent: user_agent.as_deref(),
66 };
67
68 let mut scrubber = if project_info.has_feature(Feature::SessionReplayRecordingScrubbing) {
69 let datascrubbing_config = rpc
70 .config
71 .datascrubbing_settings
72 .pii_config()
73 .map_err(|e| ProcessingError::PiiConfigError(e.clone()))?
74 .as_ref();
75
76 Some(RecordingScrubber::new(
77 config.max_replay_uncompressed_size(),
78 rpc.config.pii_config.as_ref(),
79 datascrubbing_config,
80 ))
81 } else {
82 None
83 };
84
85 for item in managed_envelope.envelope_mut().items_mut() {
86 match item.ty() {
87 ItemType::ReplayEvent => {
88 let replay_event = handle_replay_event_item(item.payload(), &rpc)?;
89 item.set_payload(ContentType::Json, replay_event);
90 }
91 ItemType::ReplayRecording => {
92 let replay_recording =
93 handle_replay_recording_item(item.payload(), scrubber.as_mut(), &rpc)?;
94 item.set_payload(ContentType::OctetStream, replay_recording);
95 }
96 ItemType::ReplayVideo => {
97 let replay_video =
98 handle_replay_video_item(item.payload(), scrubber.as_mut(), &rpc)?;
99 item.set_payload(ContentType::OctetStream, replay_video);
100 }
101 _ => {}
102 }
103 }
104
105 Ok(())
106}
107
108#[derive(Debug)]
109struct ReplayProcessingConfig<'a> {
110 pub config: &'a ProjectConfig,
111 pub global_config: &'a GlobalConfig,
112 pub geoip_lookup: &'a GeoIpLookup,
113 pub event_id: Option<EventId>,
114 pub project_id: Option<ProjectId>,
115 pub organization_id: Option<OrganizationId>,
116 pub client_addr: Option<IpAddr>,
117 pub user_agent: RawUserAgentInfo<&'a str>,
118}
119
120fn handle_replay_event_item(
123 payload: Bytes,
124 config: &ReplayProcessingConfig<'_>,
125) -> Result<Bytes, ProcessingError> {
126 match process_replay_event(&payload, config) {
127 Ok(replay) => {
128 if let Some(replay_type) = replay.value() {
129 relay_filter::should_filter(
130 replay_type,
131 config.client_addr,
132 &config.config.filter_settings,
133 config.global_config.filters(),
134 )
135 .map_err(ProcessingError::ReplayFiltered)?;
136
137 if let Some(segment_id) = replay_type.segment_id.value()
140 && *segment_id > 720
141 {
142 metric!(counter(RelayCounters::ReplayExceededSegmentLimit) += 1);
143
144 relay_log::debug!(
145 event_id = ?config.event_id,
146 project_id = config.project_id.map(|v| v.value()),
147 organization_id = config.organization_id.map(|o| o.value()),
148 segment_id = segment_id,
149 "replay segment-exceeded-limit"
150 );
151 }
152 }
153
154 match replay.to_json() {
155 Ok(json) => Ok(json.into_bytes().into()),
156 Err(error) => {
157 relay_log::error!(
158 error = &error as &dyn Error,
159 event_id = ?config.event_id,
160 "failed to serialize replay"
161 );
162 Ok(payload)
163 }
164 }
165 }
166 Err(error) => {
167 relay_log::debug!(
168 error = &error as &dyn Error,
169 event_id = ?config.event_id,
170 project_id = config.project_id.map(|v| v.value()),
171 organization_id = config.organization_id.map(|o| o.value()),
172 "invalid replay event"
173 );
174 Err(match error {
175 ReplayError::NoContent => {
176 ProcessingError::InvalidReplay(DiscardReason::InvalidReplayEventNoPayload)
177 }
178 ReplayError::CouldNotScrub(_) => {
179 ProcessingError::InvalidReplay(DiscardReason::InvalidReplayEventPii)
180 }
181 ReplayError::CouldNotParse(_) => {
182 ProcessingError::InvalidReplay(DiscardReason::InvalidReplayEvent)
183 }
184 ReplayError::InvalidPayload(_) => {
185 ProcessingError::InvalidReplay(DiscardReason::InvalidReplayEvent)
186 }
187 })
188 }
189 }
190}
191
192fn process_replay_event(
194 payload: &[u8],
195 config: &ReplayProcessingConfig<'_>,
196) -> Result<Annotated<Replay>, ReplayError> {
197 let mut replay =
198 Annotated::<Replay>::from_json_bytes(payload).map_err(ReplayError::CouldNotParse)?;
199
200 let Some(replay_value) = replay.value_mut() else {
201 return Err(ReplayError::NoContent);
202 };
203
204 replay::validate(replay_value)?;
205 replay::normalize(
206 &mut replay,
207 config.client_addr,
208 &config.user_agent,
209 config.geoip_lookup,
210 );
211
212 if let Some(ref config) = config.config.pii_config {
213 let mut processor = PiiProcessor::new(config.compiled());
214 processor::process_value(&mut replay, &mut processor, ProcessingState::root())
215 .map_err(|e| ReplayError::CouldNotScrub(e.to_string()))?;
216 }
217
218 let pii_config = config
219 .config
220 .datascrubbing_settings
221 .pii_config()
222 .map_err(|e| ReplayError::CouldNotScrub(e.to_string()))?;
223
224 if let Some(config) = pii_config {
225 let mut processor = PiiProcessor::new(config.compiled());
226 processor::process_value(&mut replay, &mut processor, ProcessingState::root())
227 .map_err(|e| ReplayError::CouldNotScrub(e.to_string()))?;
228 }
229
230 Ok(replay)
231}
232
233fn handle_replay_recording_item(
236 payload: Bytes,
237 scrubber: Option<&mut RecordingScrubber>,
238 config: &ReplayProcessingConfig<'_>,
239) -> Result<Bytes, ProcessingError> {
240 let Some(scrubber) = scrubber else {
243 return Ok(payload);
244 };
245 if scrubber.is_empty() {
246 return Ok(payload);
247 }
248
249 metric!(timer(RelayTimers::ReplayRecordingProcessing), {
254 scrubber.process_recording(&payload)
255 })
256 .map(Into::into)
257 .map_err(|error| {
258 match &error {
259 relay_replays::recording::ParseRecordingError::Compression(e) => {
260 if sample(0.001).is_keep() {
262 relay_log::with_scope(
263 move |scope| {
264 scope.add_attachment(relay_log::protocol::Attachment {
265 buffer: payload.into(),
266 filename: "payload".to_owned(),
267 content_type: Some("application/octet-stream".to_owned()),
268 ty: None,
269 });
270 },
271 || {
272 relay_log::error!(
273 error = e as &dyn Error,
274 event_id = ?config.event_id,
275 project_id = config.project_id.map(|v| v.value()),
276 organization_id = config.organization_id.map(|o| o.value()),
277 "ParseRecordingError::Compression"
278 )
279 },
280 );
281 }
282 }
283 relay_replays::recording::ParseRecordingError::Message(e) => {
284 relay_log::with_scope(
286 move |scope| {
287 scope.add_attachment(relay_log::protocol::Attachment {
288 buffer: payload.into(),
289 filename: "payload".to_owned(),
290 content_type: Some("application/octet-stream".to_owned()),
291 ty: None,
292 });
293 },
294 || {
295 relay_log::error!(
296 error = e,
297 event_id = ?config.event_id,
298 project_id = config.project_id.map(|v| v.value()),
299 organization_id = config.organization_id.map(|o| o.value()),
300 "ParseRecordingError::Message"
301 )
302 },
303 );
304 }
305 _ => (),
306 };
307 ProcessingError::InvalidReplay(DiscardReason::InvalidReplayRecordingEvent)
308 })
309}
310
311#[derive(Debug, Deserialize, Serialize)]
314struct ReplayVideoEvent {
315 replay_event: Bytes,
316 replay_recording: Bytes,
317 replay_video: Bytes,
318}
319
320fn handle_replay_video_item(
321 payload: Bytes,
322 scrubber: Option<&mut RecordingScrubber>,
323 config: &ReplayProcessingConfig<'_>,
324) -> Result<Bytes, ProcessingError> {
325 let ReplayVideoEvent {
326 replay_event,
327 replay_recording,
328 replay_video,
329 } = rmp_serde::from_slice(&payload)
330 .map_err(|_| ProcessingError::InvalidReplay(DiscardReason::InvalidReplayVideoEvent))?;
331
332 let replay_event = handle_replay_event_item(replay_event, config)?;
334
335 let replay_recording = handle_replay_recording_item(replay_recording, scrubber, config)?;
337
338 if replay_video.is_empty() {
340 return Err(ProcessingError::InvalidReplay(
341 DiscardReason::InvalidReplayVideoEvent,
342 ));
343 }
344
345 match rmp_serde::to_vec_named(&ReplayVideoEvent {
346 replay_event,
347 replay_recording,
348 replay_video,
349 }) {
350 Ok(payload) => Ok(payload.into()),
351 Err(_) => Err(ProcessingError::InvalidReplay(
352 DiscardReason::InvalidReplayVideoEvent,
353 )),
354 }
355}
356
357fn count_replay_video_events(managed_envelope: &mut TypedEnvelope<ReplayGroup>) -> usize {
360 managed_envelope
361 .envelope()
362 .items()
363 .filter(|item| item.ty() == &ItemType::ReplayVideo)
364 .count()
365}