relay_server/services/processor/
replay.rs

1//! Replay related processor code.
2use std::error::Error;
3use std::net::IpAddr;
4
5use crate::envelope::{ContentType, ItemType};
6use crate::managed::TypedEnvelope;
7use crate::services::outcome::DiscardReason;
8use crate::services::processor::{ProcessingError, ReplayGroup, should_filter};
9use crate::services::projects::project::ProjectInfo;
10use crate::statsd::{RelayCounters, RelayTimers};
11use crate::utils::sample;
12
13use bytes::Bytes;
14use relay_base_schema::organization::OrganizationId;
15use relay_base_schema::project::ProjectId;
16use relay_config::Config;
17use relay_dynamic_config::{Feature, GlobalConfig, ProjectConfig};
18use relay_event_normalization::replay::{self, ReplayError};
19use relay_event_normalization::{GeoIpLookup, RawUserAgentInfo};
20use relay_event_schema::processor::{self, ProcessingState};
21use relay_event_schema::protocol::{EventId, Replay};
22use relay_pii::PiiProcessor;
23use relay_protocol::Annotated;
24use relay_replays::recording::RecordingScrubber;
25use relay_statsd::metric;
26use serde::{Deserialize, Serialize};
27
28/// Removes replays if the feature flag is not enabled.
29pub fn process(
30    managed_envelope: &mut TypedEnvelope<ReplayGroup>,
31    global_config: &GlobalConfig,
32    config: &Config,
33    project_info: &ProjectInfo,
34    geoip_lookup: &GeoIpLookup,
35) -> Result<(), ProcessingError> {
36    // If the replay feature is not enabled drop the items silently.
37    if should_filter(config, project_info, Feature::SessionReplay) {
38        managed_envelope.drop_items_silently();
39        return Ok(());
40    }
41
42    // If the replay video feature is not enabled check the envelope items for a
43    // replay video event.
44    if project_info.has_feature(Feature::SessionReplayVideoDisabled)
45        && count_replay_video_events(managed_envelope) > 0
46    {
47        managed_envelope.drop_items_silently();
48        return Ok(());
49    }
50
51    let meta = managed_envelope.envelope().meta();
52    let user_agent = RawUserAgentInfo {
53        user_agent: meta.user_agent().map(String::from),
54        client_hints: meta.client_hints().to_owned(),
55    };
56
57    let rpc = ReplayProcessingConfig {
58        config: &project_info.config,
59        global_config,
60        geoip_lookup,
61        event_id: managed_envelope.envelope().event_id(),
62        project_id: project_info.project_id,
63        organization_id: project_info.organization_id,
64        client_addr: meta.client_addr(),
65        user_agent: user_agent.as_deref(),
66    };
67
68    let mut scrubber = if project_info.has_feature(Feature::SessionReplayRecordingScrubbing) {
69        let datascrubbing_config = rpc
70            .config
71            .datascrubbing_settings
72            .pii_config()
73            .map_err(|e| ProcessingError::PiiConfigError(e.clone()))?
74            .as_ref();
75
76        Some(RecordingScrubber::new(
77            config.max_replay_uncompressed_size(),
78            rpc.config.pii_config.as_ref(),
79            datascrubbing_config,
80        ))
81    } else {
82        None
83    };
84
85    for item in managed_envelope.envelope_mut().items_mut() {
86        match item.ty() {
87            ItemType::ReplayEvent => {
88                let replay_event = handle_replay_event_item(item.payload(), &rpc)?;
89                item.set_payload(ContentType::Json, replay_event);
90            }
91            ItemType::ReplayRecording => {
92                let replay_recording =
93                    handle_replay_recording_item(item.payload(), scrubber.as_mut(), &rpc)?;
94                item.set_payload(ContentType::OctetStream, replay_recording);
95            }
96            ItemType::ReplayVideo => {
97                let replay_video =
98                    handle_replay_video_item(item.payload(), scrubber.as_mut(), &rpc)?;
99                item.set_payload(ContentType::OctetStream, replay_video);
100            }
101            _ => {}
102        }
103    }
104
105    Ok(())
106}
107
108#[derive(Debug)]
109struct ReplayProcessingConfig<'a> {
110    pub config: &'a ProjectConfig,
111    pub global_config: &'a GlobalConfig,
112    pub geoip_lookup: &'a GeoIpLookup,
113    pub event_id: Option<EventId>,
114    pub project_id: Option<ProjectId>,
115    pub organization_id: Option<OrganizationId>,
116    pub client_addr: Option<IpAddr>,
117    pub user_agent: RawUserAgentInfo<&'a str>,
118}
119
120// Replay Event Processing.
121
122fn handle_replay_event_item(
123    payload: Bytes,
124    config: &ReplayProcessingConfig<'_>,
125) -> Result<Bytes, ProcessingError> {
126    match process_replay_event(&payload, config) {
127        Ok(replay) => {
128            if let Some(replay_type) = replay.value() {
129                relay_filter::should_filter(
130                    replay_type,
131                    config.client_addr,
132                    &config.config.filter_settings,
133                    config.global_config.filters(),
134                )
135                .map_err(ProcessingError::ReplayFiltered)?;
136
137                // Log segments that exceed the hour limit so we can diagnose errant SDKs
138                // or exotic customer implementations.
139                if let Some(segment_id) = replay_type.segment_id.value()
140                    && *segment_id > 720
141                {
142                    metric!(counter(RelayCounters::ReplayExceededSegmentLimit) += 1);
143
144                    relay_log::debug!(
145                        event_id = ?config.event_id,
146                        project_id = config.project_id.map(|v| v.value()),
147                        organization_id = config.organization_id.map(|o| o.value()),
148                        segment_id = segment_id,
149                        "replay segment-exceeded-limit"
150                    );
151                }
152            }
153
154            match replay.to_json() {
155                Ok(json) => Ok(json.into_bytes().into()),
156                Err(error) => {
157                    relay_log::error!(
158                        error = &error as &dyn Error,
159                        event_id = ?config.event_id,
160                        "failed to serialize replay"
161                    );
162                    Ok(payload)
163                }
164            }
165        }
166        Err(error) => {
167            relay_log::debug!(
168                error = &error as &dyn Error,
169                event_id = ?config.event_id,
170                project_id = config.project_id.map(|v| v.value()),
171                organization_id = config.organization_id.map(|o| o.value()),
172                "invalid replay event"
173            );
174            Err(match error {
175                ReplayError::NoContent => {
176                    ProcessingError::InvalidReplay(DiscardReason::InvalidReplayEventNoPayload)
177                }
178                ReplayError::CouldNotScrub(_) => {
179                    ProcessingError::InvalidReplay(DiscardReason::InvalidReplayEventPii)
180                }
181                ReplayError::CouldNotParse(_) => {
182                    ProcessingError::InvalidReplay(DiscardReason::InvalidReplayEvent)
183                }
184                ReplayError::InvalidPayload(_) => {
185                    ProcessingError::InvalidReplay(DiscardReason::InvalidReplayEvent)
186                }
187            })
188        }
189    }
190}
191
192/// Validates, normalizes, and scrubs PII from a replay event.
193fn process_replay_event(
194    payload: &[u8],
195    config: &ReplayProcessingConfig<'_>,
196) -> Result<Annotated<Replay>, ReplayError> {
197    let mut replay =
198        Annotated::<Replay>::from_json_bytes(payload).map_err(ReplayError::CouldNotParse)?;
199
200    let Some(replay_value) = replay.value_mut() else {
201        return Err(ReplayError::NoContent);
202    };
203
204    replay::validate(replay_value)?;
205    replay::normalize(
206        &mut replay,
207        config.client_addr,
208        &config.user_agent,
209        config.geoip_lookup,
210    );
211
212    if let Some(ref config) = config.config.pii_config {
213        let mut processor = PiiProcessor::new(config.compiled());
214        processor::process_value(&mut replay, &mut processor, ProcessingState::root())
215            .map_err(|e| ReplayError::CouldNotScrub(e.to_string()))?;
216    }
217
218    let pii_config = config
219        .config
220        .datascrubbing_settings
221        .pii_config()
222        .map_err(|e| ReplayError::CouldNotScrub(e.to_string()))?;
223
224    if let Some(config) = pii_config {
225        let mut processor = PiiProcessor::new(config.compiled());
226        processor::process_value(&mut replay, &mut processor, ProcessingState::root())
227            .map_err(|e| ReplayError::CouldNotScrub(e.to_string()))?;
228    }
229
230    Ok(replay)
231}
232
233// Replay Recording Processing
234
235fn handle_replay_recording_item(
236    payload: Bytes,
237    scrubber: Option<&mut RecordingScrubber>,
238    config: &ReplayProcessingConfig<'_>,
239) -> Result<Bytes, ProcessingError> {
240    // XXX: Processing is there just for data scrubbing. Skip the entire expensive
241    // processing step if we do not need to scrub.
242    let Some(scrubber) = scrubber else {
243        return Ok(payload);
244    };
245    if scrubber.is_empty() {
246        return Ok(payload);
247    }
248
249    // Limit expansion of recordings to the max replay size. The payload is
250    // decompressed temporarily and then immediately re-compressed. However, to
251    // limit memory pressure, we use the replay limit as a good overall limit for
252    // allocations.
253    metric!(timer(RelayTimers::ReplayRecordingProcessing), {
254        scrubber.process_recording(&payload)
255    })
256    .map(Into::into)
257    .map_err(|error| {
258        match &error {
259            relay_replays::recording::ParseRecordingError::Compression(e) => {
260                // 20k errors per day at 0.1% sample rate == 20 logs per day
261                if sample(0.001).is_keep() {
262                    relay_log::with_scope(
263                        move |scope| {
264                            scope.add_attachment(relay_log::protocol::Attachment {
265                                buffer: payload.into(),
266                                filename: "payload".to_owned(),
267                                content_type: Some("application/octet-stream".to_owned()),
268                                ty: None,
269                            });
270                        },
271                        || {
272                            relay_log::error!(
273                                error = e as &dyn Error,
274                                event_id = ?config.event_id,
275                                project_id = config.project_id.map(|v| v.value()),
276                                organization_id = config.organization_id.map(|o| o.value()),
277                                "ParseRecordingError::Compression"
278                            )
279                        },
280                    );
281                }
282            }
283            relay_replays::recording::ParseRecordingError::Message(e) => {
284                // Only 118 errors in the past 30 days. We log everything.
285                relay_log::with_scope(
286                    move |scope| {
287                        scope.add_attachment(relay_log::protocol::Attachment {
288                            buffer: payload.into(),
289                            filename: "payload".to_owned(),
290                            content_type: Some("application/octet-stream".to_owned()),
291                            ty: None,
292                        });
293                    },
294                    || {
295                        relay_log::error!(
296                            error = e,
297                            event_id = ?config.event_id,
298                            project_id = config.project_id.map(|v| v.value()),
299                            organization_id = config.organization_id.map(|o| o.value()),
300                            "ParseRecordingError::Message"
301                        )
302                    },
303                );
304            }
305            _ => (),
306        };
307        ProcessingError::InvalidReplay(DiscardReason::InvalidReplayRecordingEvent)
308    })
309}
310
311// Replay Video Processing
312
313#[derive(Debug, Deserialize, Serialize)]
314struct ReplayVideoEvent {
315    replay_event: Bytes,
316    replay_recording: Bytes,
317    replay_video: Bytes,
318}
319
320fn handle_replay_video_item(
321    payload: Bytes,
322    scrubber: Option<&mut RecordingScrubber>,
323    config: &ReplayProcessingConfig<'_>,
324) -> Result<Bytes, ProcessingError> {
325    let ReplayVideoEvent {
326        replay_event,
327        replay_recording,
328        replay_video,
329    } = rmp_serde::from_slice(&payload)
330        .map_err(|_| ProcessingError::InvalidReplay(DiscardReason::InvalidReplayVideoEvent))?;
331
332    // Process as a replay-event envelope item.
333    let replay_event = handle_replay_event_item(replay_event, config)?;
334
335    // Process as a replay-recording envelope item.
336    let replay_recording = handle_replay_recording_item(replay_recording, scrubber, config)?;
337
338    // Verify the replay-video payload is not empty.
339    if replay_video.is_empty() {
340        return Err(ProcessingError::InvalidReplay(
341            DiscardReason::InvalidReplayVideoEvent,
342        ));
343    }
344
345    match rmp_serde::to_vec_named(&ReplayVideoEvent {
346        replay_event,
347        replay_recording,
348        replay_video,
349    }) {
350        Ok(payload) => Ok(payload.into()),
351        Err(_) => Err(ProcessingError::InvalidReplay(
352            DiscardReason::InvalidReplayVideoEvent,
353        )),
354    }
355}
356
357// Pre-processors
358
359fn count_replay_video_events(managed_envelope: &mut TypedEnvelope<ReplayGroup>) -> usize {
360    managed_envelope
361        .envelope()
362        .items()
363        .filter(|item| item.ty() == &ItemType::ReplayVideo)
364        .count()
365}