relay_server/services/processor/
replay.rs

1//! Replay related processor code.
2use std::error::Error;
3use std::net::IpAddr;
4
5use crate::envelope::{ContentType, ItemType};
6use crate::managed::TypedEnvelope;
7use crate::services::outcome::DiscardReason;
8use crate::services::processor::{ProcessingError, ReplayGroup, should_filter};
9use crate::services::projects::project::ProjectInfo;
10use crate::statsd::{RelayCounters, RelayTimers};
11use crate::utils::sample;
12
13use bytes::Bytes;
14use relay_base_schema::organization::OrganizationId;
15use relay_base_schema::project::ProjectId;
16use relay_config::Config;
17use relay_dynamic_config::{Feature, GlobalConfig, ProjectConfig};
18use relay_event_normalization::replay::{self, ReplayError};
19use relay_event_normalization::{GeoIpLookup, RawUserAgentInfo};
20use relay_event_schema::processor::{self, ProcessingState};
21use relay_event_schema::protocol::{EventId, Replay};
22use relay_pii::PiiProcessor;
23use relay_protocol::Annotated;
24use relay_replays::recording::RecordingScrubber;
25use relay_statsd::metric;
26use serde::{Deserialize, Serialize};
27
28/// Removes replays if the feature flag is not enabled.
29pub fn process(
30    managed_envelope: &mut TypedEnvelope<ReplayGroup>,
31    global_config: &GlobalConfig,
32    config: &Config,
33    project_info: &ProjectInfo,
34    geoip_lookup: Option<&GeoIpLookup>,
35) -> Result<(), ProcessingError> {
36    // If the replay feature is not enabled drop the items silently.
37    if should_filter(config, project_info, Feature::SessionReplay) {
38        managed_envelope.drop_items_silently();
39        return Ok(());
40    }
41
42    // If the replay video feature is not enabled check the envelope items for a
43    // replay video event.
44    if project_info.has_feature(Feature::SessionReplayVideoDisabled)
45        && count_replay_video_events(managed_envelope) > 0
46    {
47        managed_envelope.drop_items_silently();
48        return Ok(());
49    }
50
51    let rpc = {
52        let meta = managed_envelope.envelope().meta();
53
54        ReplayProcessingConfig {
55            config: &project_info.config,
56            global_config,
57            geoip_lookup,
58            event_id: managed_envelope.envelope().event_id(),
59            project_id: project_info.project_id,
60            organization_id: project_info.organization_id,
61            client_addr: meta.client_addr(),
62            user_agent: RawUserAgentInfo {
63                user_agent: meta.user_agent().map(|s| s.to_owned()),
64                client_hints: meta.client_hints().clone(),
65            },
66        }
67    };
68
69    let mut scrubber = if project_info.has_feature(Feature::SessionReplayRecordingScrubbing) {
70        let datascrubbing_config = rpc
71            .config
72            .datascrubbing_settings
73            .pii_config()
74            .map_err(|e| ProcessingError::PiiConfigError(e.clone()))?
75            .as_ref();
76
77        Some(RecordingScrubber::new(
78            config.max_replay_uncompressed_size(),
79            rpc.config.pii_config.as_ref(),
80            datascrubbing_config,
81        ))
82    } else {
83        None
84    };
85
86    for item in managed_envelope.envelope_mut().items_mut() {
87        match item.ty() {
88            ItemType::ReplayEvent => {
89                let replay_event = handle_replay_event_item(item.payload(), &rpc)?;
90                item.set_payload(ContentType::Json, replay_event);
91            }
92            ItemType::ReplayRecording => {
93                let replay_recording =
94                    handle_replay_recording_item(item.payload(), scrubber.as_mut(), &rpc)?;
95                item.set_payload(ContentType::OctetStream, replay_recording);
96            }
97            ItemType::ReplayVideo => {
98                let replay_video =
99                    handle_replay_video_item(item.payload(), scrubber.as_mut(), &rpc)?;
100                item.set_payload(ContentType::OctetStream, replay_video);
101            }
102            _ => {}
103        }
104    }
105
106    Ok(())
107}
108
109#[derive(Debug)]
110struct ReplayProcessingConfig<'a> {
111    pub config: &'a ProjectConfig,
112    pub global_config: &'a GlobalConfig,
113    pub geoip_lookup: Option<&'a GeoIpLookup>,
114    pub event_id: Option<EventId>,
115    pub project_id: Option<ProjectId>,
116    pub organization_id: Option<OrganizationId>,
117    pub client_addr: Option<IpAddr>,
118    pub user_agent: RawUserAgentInfo<String>,
119}
120
121// Replay Event Processing.
122
123fn handle_replay_event_item(
124    payload: Bytes,
125    config: &ReplayProcessingConfig<'_>,
126) -> Result<Bytes, ProcessingError> {
127    match process_replay_event(&payload, config) {
128        Ok(replay) => {
129            if let Some(replay_type) = replay.value() {
130                relay_filter::should_filter(
131                    replay_type,
132                    config.client_addr,
133                    &config.config.filter_settings,
134                    config.global_config.filters(),
135                )
136                .map_err(ProcessingError::ReplayFiltered)?;
137
138                // Log segments that exceed the hour limit so we can diagnose errant SDKs
139                // or exotic customer implementations.
140                if let Some(segment_id) = replay_type.segment_id.value() {
141                    if *segment_id > 720 {
142                        metric!(counter(RelayCounters::ReplayExceededSegmentLimit) += 1);
143
144                        relay_log::debug!(
145                            event_id = ?config.event_id,
146                            project_id = config.project_id.map(|v| v.value()),
147                            organization_id = config.organization_id.map(|o| o.value()),
148                            segment_id = segment_id,
149                            "replay segment-exceeded-limit"
150                        );
151                    }
152                }
153            }
154
155            match replay.to_json() {
156                Ok(json) => Ok(json.into_bytes().into()),
157                Err(error) => {
158                    relay_log::error!(
159                        error = &error as &dyn Error,
160                        event_id = ?config.event_id,
161                        "failed to serialize replay"
162                    );
163                    Ok(payload)
164                }
165            }
166        }
167        Err(error) => {
168            relay_log::debug!(
169                error = &error as &dyn Error,
170                event_id = ?config.event_id,
171                project_id = config.project_id.map(|v| v.value()),
172                organization_id = config.organization_id.map(|o| o.value()),
173                "invalid replay event"
174            );
175            Err(match error {
176                ReplayError::NoContent => {
177                    ProcessingError::InvalidReplay(DiscardReason::InvalidReplayEventNoPayload)
178                }
179                ReplayError::CouldNotScrub(_) => {
180                    ProcessingError::InvalidReplay(DiscardReason::InvalidReplayEventPii)
181                }
182                ReplayError::CouldNotParse(_) => {
183                    ProcessingError::InvalidReplay(DiscardReason::InvalidReplayEvent)
184                }
185                ReplayError::InvalidPayload(_) => {
186                    ProcessingError::InvalidReplay(DiscardReason::InvalidReplayEvent)
187                }
188            })
189        }
190    }
191}
192
193/// Validates, normalizes, and scrubs PII from a replay event.
194fn process_replay_event(
195    payload: &[u8],
196    config: &ReplayProcessingConfig<'_>,
197) -> Result<Annotated<Replay>, ReplayError> {
198    let mut replay =
199        Annotated::<Replay>::from_json_bytes(payload).map_err(ReplayError::CouldNotParse)?;
200
201    let Some(replay_value) = replay.value_mut() else {
202        return Err(ReplayError::NoContent);
203    };
204
205    replay::validate(replay_value)?;
206    replay::normalize(
207        &mut replay,
208        config.client_addr,
209        config.user_agent.as_deref(),
210        config.geoip_lookup,
211    );
212
213    if let Some(ref config) = config.config.pii_config {
214        let mut processor = PiiProcessor::new(config.compiled());
215        processor::process_value(&mut replay, &mut processor, ProcessingState::root())
216            .map_err(|e| ReplayError::CouldNotScrub(e.to_string()))?;
217    }
218
219    let pii_config = config
220        .config
221        .datascrubbing_settings
222        .pii_config()
223        .map_err(|e| ReplayError::CouldNotScrub(e.to_string()))?;
224
225    if let Some(config) = pii_config {
226        let mut processor = PiiProcessor::new(config.compiled());
227        processor::process_value(&mut replay, &mut processor, ProcessingState::root())
228            .map_err(|e| ReplayError::CouldNotScrub(e.to_string()))?;
229    }
230
231    Ok(replay)
232}
233
234// Replay Recording Processing
235
236fn handle_replay_recording_item(
237    payload: Bytes,
238    scrubber: Option<&mut RecordingScrubber>,
239    config: &ReplayProcessingConfig<'_>,
240) -> Result<Bytes, ProcessingError> {
241    // XXX: Processing is there just for data scrubbing. Skip the entire expensive
242    // processing step if we do not need to scrub.
243    let Some(scrubber) = scrubber else {
244        return Ok(payload);
245    };
246    if scrubber.is_empty() {
247        return Ok(payload);
248    }
249
250    // Limit expansion of recordings to the max replay size. The payload is
251    // decompressed temporarily and then immediately re-compressed. However, to
252    // limit memory pressure, we use the replay limit as a good overall limit for
253    // allocations.
254    metric!(timer(RelayTimers::ReplayRecordingProcessing), {
255        scrubber.process_recording(&payload)
256    })
257    .map(Into::into)
258    .map_err(|error| {
259        match &error {
260            relay_replays::recording::ParseRecordingError::Compression(e) => {
261                // 20k errors per day at 0.1% sample rate == 20 logs per day
262                if sample(0.001).is_keep() {
263                    relay_log::with_scope(
264                        move |scope| {
265                            scope.add_attachment(relay_log::protocol::Attachment {
266                                buffer: payload.into(),
267                                filename: "payload".to_owned(),
268                                content_type: Some("application/octet-stream".to_owned()),
269                                ty: None,
270                            });
271                        },
272                        || {
273                            relay_log::error!(
274                                error = e as &dyn Error,
275                                event_id = ?config.event_id,
276                                project_id = config.project_id.map(|v| v.value()),
277                                organization_id = config.organization_id.map(|o| o.value()),
278                                "ParseRecordingError::Compression"
279                            )
280                        },
281                    );
282                }
283            }
284            relay_replays::recording::ParseRecordingError::Message(e) => {
285                // Only 118 errors in the past 30 days. We log everything.
286                relay_log::with_scope(
287                    move |scope| {
288                        scope.add_attachment(relay_log::protocol::Attachment {
289                            buffer: payload.into(),
290                            filename: "payload".to_owned(),
291                            content_type: Some("application/octet-stream".to_owned()),
292                            ty: None,
293                        });
294                    },
295                    || {
296                        relay_log::error!(
297                            error = e,
298                            event_id = ?config.event_id,
299                            project_id = config.project_id.map(|v| v.value()),
300                            organization_id = config.organization_id.map(|o| o.value()),
301                            "ParseRecordingError::Message"
302                        )
303                    },
304                );
305            }
306            _ => (),
307        };
308        ProcessingError::InvalidReplay(DiscardReason::InvalidReplayRecordingEvent)
309    })
310}
311
312// Replay Video Processing
313
314#[derive(Debug, Deserialize, Serialize)]
315struct ReplayVideoEvent {
316    replay_event: Bytes,
317    replay_recording: Bytes,
318    replay_video: Bytes,
319}
320
321fn handle_replay_video_item(
322    payload: Bytes,
323    scrubber: Option<&mut RecordingScrubber>,
324    config: &ReplayProcessingConfig<'_>,
325) -> Result<Bytes, ProcessingError> {
326    let ReplayVideoEvent {
327        replay_event,
328        replay_recording,
329        replay_video,
330    } = rmp_serde::from_slice(&payload)
331        .map_err(|_| ProcessingError::InvalidReplay(DiscardReason::InvalidReplayVideoEvent))?;
332
333    // Process as a replay-event envelope item.
334    let replay_event = handle_replay_event_item(replay_event, config)?;
335
336    // Process as a replay-recording envelope item.
337    let replay_recording = handle_replay_recording_item(replay_recording, scrubber, config)?;
338
339    // Verify the replay-video payload is not empty.
340    if replay_video.is_empty() {
341        return Err(ProcessingError::InvalidReplay(
342            DiscardReason::InvalidReplayVideoEvent,
343        ));
344    }
345
346    match rmp_serde::to_vec_named(&ReplayVideoEvent {
347        replay_event,
348        replay_recording,
349        replay_video,
350    }) {
351        Ok(payload) => Ok(payload.into()),
352        Err(_) => Err(ProcessingError::InvalidReplay(
353            DiscardReason::InvalidReplayVideoEvent,
354        )),
355    }
356}
357
358// Pre-processors
359
360fn count_replay_video_events(managed_envelope: &mut TypedEnvelope<ReplayGroup>) -> usize {
361    managed_envelope
362        .envelope()
363        .items()
364        .filter(|item| item.ty() == &ItemType::ReplayVideo)
365        .count()
366}