relay_event_normalization/
replay.rs

1//! Validation and normalization of [`Replay`] events.
2
3use std::net::IpAddr as StdIpAddr;
4
5use relay_event_schema::processor::{self, ProcessingState, Processor};
6use relay_event_schema::protocol::{Contexts, IpAddr, Replay};
7use relay_protocol::Annotated;
8
9use crate::event::normalize_user_geoinfo;
10use crate::normalize::user_agent;
11use crate::user_agent::RawUserAgentInfo;
12use crate::{GeoIpLookup, trimming};
13
14/// Replay validation error.
15///
16/// This error is returned from [`validate`].
17#[derive(Debug, thiserror::Error)]
18pub enum ReplayError {
19    /// The replay event is missing a `replay_id`.
20    #[error("missing replay_id")]
21    MissingReplayId,
22    /// The replay event is missing a `segment_id`.
23    #[error("missing segment_id")]
24    MissingSegmentId,
25    /// The `segment_id` is to large to fit in a a u16.
26    #[error("segment_id too large")]
27    SegmentIdTooLarge,
28    /// One or more of the `error_ids` have an error.
29    #[error("invalid error_id specified")]
30    InvalidErrorId,
31    /// One or more of the `trace_ids` have an error.
32    #[error("invalid trace_id specified")]
33    InvalidTraceId,
34
35    /// The Replay event could not be parsed from JSON.
36    #[error("invalid json")]
37    CouldNotParse(#[from] serde_json::Error),
38
39    /// The Replay event was parsed but did not match the schema.
40    #[error("no data found")]
41    NoContent,
42
43    /// The Replay contains invalid data or is missing a required field.
44    ///
45    /// This is returned from [`validate`].
46    #[error("invalid payload {0}")]
47    InvalidPayload(String),
48
49    /// An error occurred during PII scrubbing of the Replay.
50    ///
51    /// This erorr is usually returned when the PII configuration fails to parse.
52    #[error("failed to scrub PII: {0}")]
53    CouldNotScrub(String),
54}
55
56/// Checks if the Replay event is structurally valid.
57///
58/// Returns `Ok(())`, if the Replay is valid and can be normalized. Otherwise, returns
59/// `Err(ReplayError::InvalidPayload)` describing the missing or invalid data.
60pub fn validate(replay: &Replay) -> Result<(), ReplayError> {
61    replay
62        .replay_id
63        .value()
64        .ok_or(ReplayError::MissingReplayId)?;
65
66    let segment_id = *replay
67        .segment_id
68        .value()
69        .ok_or(ReplayError::MissingSegmentId)?;
70
71    if segment_id > u16::MAX as u64 {
72        return Err(ReplayError::SegmentIdTooLarge);
73    }
74
75    if replay
76        .error_ids
77        .value()
78        .into_iter()
79        .flat_map(|v| v.iter())
80        .any(|v| v.meta().has_errors())
81    {
82        return Err(ReplayError::InvalidErrorId);
83    }
84
85    if replay
86        .trace_ids
87        .value()
88        .into_iter()
89        .flat_map(|v| v.iter())
90        .any(|v| v.meta().has_errors())
91    {
92        return Err(ReplayError::InvalidTraceId);
93    }
94
95    Ok(())
96}
97
98/// Adds default fields and normalizes all values in to their standard representation.
99pub fn normalize(
100    replay: &mut Annotated<Replay>,
101    client_ip: Option<StdIpAddr>,
102    user_agent: &RawUserAgentInfo<&str>,
103    geoip_lookup: &GeoIpLookup,
104) {
105    let _ = processor::apply(replay, |replay_value, meta| {
106        normalize_platform(replay_value);
107        normalize_ip_address(replay_value, client_ip);
108        normalize_user_geoinfo(
109            geoip_lookup,
110            &mut replay_value.user,
111            client_ip.map(|ip| IpAddr(ip.to_string())).as_ref(),
112        );
113        normalize_user_agent(replay_value, user_agent);
114        normalize_type(replay_value);
115        normalize_array_fields(replay_value);
116        let _ = trimming::TrimmingProcessor::new().process_replay(
117            replay_value,
118            meta,
119            ProcessingState::root(),
120        );
121        Ok(())
122    });
123}
124
125fn normalize_array_fields(replay: &mut Replay) {
126    // TODO: This should be replaced by the TrimmingProcessor.
127    // https://github.com/getsentry/relay/pull/1910#pullrequestreview-1337188206
128    if let Some(items) = replay.error_ids.value_mut() {
129        items.truncate(100);
130    }
131
132    if let Some(items) = replay.trace_ids.value_mut() {
133        items.truncate(100);
134    }
135
136    if let Some(items) = replay.urls.value_mut() {
137        items.truncate(100);
138    }
139}
140
141fn normalize_ip_address(replay: &mut Replay, ip_address: Option<StdIpAddr>) {
142    crate::event::normalize_ip_addresses(
143        &mut replay.request,
144        &mut replay.user,
145        replay.platform.as_str(),
146        ip_address.map(|ip| IpAddr(ip.to_string())).as_ref(),
147        replay.sdk.value(),
148    );
149}
150
151fn normalize_user_agent(replay: &mut Replay, default_user_agent: &RawUserAgentInfo<&str>) {
152    let headers = match replay
153        .request
154        .value()
155        .and_then(|request| request.headers.value())
156    {
157        Some(headers) => headers,
158        None => return,
159    };
160
161    let user_agent_info = RawUserAgentInfo::from_headers(headers);
162    let user_agent_info = if user_agent_info.is_empty() {
163        default_user_agent
164    } else {
165        &user_agent_info
166    };
167
168    let contexts = replay.contexts.get_or_insert_with(Contexts::new);
169    user_agent::normalize_user_agent_info_generic(contexts, &replay.platform, user_agent_info);
170}
171
172fn normalize_platform(replay: &mut Replay) {
173    // Null platforms are permitted but must be defaulted before continuing.
174    let platform = replay.platform.get_or_insert_with(|| "other".to_owned());
175
176    // Normalize bad platforms to "other" type.
177    if !crate::is_valid_platform(platform) {
178        replay.platform = Annotated::from("other".to_owned());
179    }
180}
181
182fn normalize_type(replay: &mut Replay) {
183    replay.ty = Annotated::from("replay_event".to_owned());
184}
185
186#[cfg(test)]
187mod tests {
188    use std::net::{IpAddr, Ipv4Addr};
189
190    use chrono::{TimeZone, Utc};
191    use insta::assert_json_snapshot;
192    use relay_protocol::{SerializableAnnotated, assert_annotated_snapshot, get_value};
193    use uuid::Uuid;
194
195    use relay_event_schema::protocol::{
196        BrowserContext, Context, DeviceContext, EventId, OsContext, TagEntry, Tags,
197    };
198
199    use super::*;
200
201    #[test]
202    fn test_event_roundtrip() {
203        // NOTE: Interfaces will be tested separately.
204        let json = r#"{
205  "event_id": "52df9022835246eeb317dbd739ccd059",
206  "replay_id": "52df9022835246eeb317dbd739ccd059",
207  "segment_id": 0,
208  "replay_type": "session",
209  "error_sample_rate": 0.5,
210  "session_sample_rate": 0.5,
211  "timestamp": 946684800.0,
212  "replay_start_timestamp": 946684800.0,
213  "urls": ["localhost:9000"],
214  "error_ids": ["52df9022835246eeb317dbd739ccd059"],
215  "trace_ids": ["52df9022835246eeb317dbd739ccd059"],
216  "platform": "myplatform",
217  "release": "myrelease",
218  "dist": "mydist",
219  "environment": "myenv",
220  "tags": [
221    [
222      "tag",
223      "value"
224    ]
225  ]
226}"#;
227
228        let replay = Annotated::new(Replay {
229            event_id: Annotated::new(EventId("52df9022835246eeb317dbd739ccd059".parse().unwrap())),
230            replay_id: Annotated::new(EventId("52df9022835246eeb317dbd739ccd059".parse().unwrap())),
231            replay_type: Annotated::new("session".to_owned()),
232            segment_id: Annotated::new(0),
233            timestamp: Annotated::new(Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap().into()),
234            replay_start_timestamp: Annotated::new(
235                Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap().into(),
236            ),
237            urls: Annotated::new(vec![Annotated::new("localhost:9000".to_owned())]),
238            error_ids: Annotated::new(vec![Annotated::new(
239                Uuid::parse_str("52df9022835246eeb317dbd739ccd059").unwrap(),
240            )]),
241            trace_ids: Annotated::new(vec![Annotated::new(
242                Uuid::parse_str("52df9022835246eeb317dbd739ccd059").unwrap(),
243            )]),
244            platform: Annotated::new("myplatform".to_owned()),
245            release: Annotated::new("myrelease".to_owned().into()),
246            dist: Annotated::new("mydist".to_owned()),
247            environment: Annotated::new("myenv".to_owned()),
248            tags: {
249                let items = vec![Annotated::new(TagEntry(
250                    Annotated::new("tag".to_owned()),
251                    Annotated::new("value".to_owned()),
252                ))];
253                Annotated::new(Tags(items.into()))
254            },
255            ..Default::default()
256        });
257
258        assert_eq!(replay, Annotated::from_json(json).unwrap());
259    }
260
261    #[test]
262    fn test_lenient_release() {
263        let input = r#"{"release":42}"#;
264        let output = r#"{"release":"42"}"#;
265        let event = Annotated::new(Replay {
266            release: Annotated::new("42".to_owned().into()),
267            ..Default::default()
268        });
269
270        assert_eq!(event, Annotated::from_json(input).unwrap());
271        assert_eq!(output, event.to_json().unwrap());
272    }
273
274    #[test]
275    fn test_set_user_agent_meta() {
276        // Parse user input.
277        let payload = include_str!("../../tests/fixtures/replay.json");
278
279        let mut replay: Annotated<Replay> = Annotated::from_json(payload).unwrap();
280        normalize(
281            &mut replay,
282            None,
283            &RawUserAgentInfo::default(),
284            &GeoIpLookup::empty(),
285        );
286
287        let contexts = get_value!(replay.contexts!);
288        assert_eq!(
289            contexts.get::<BrowserContext>(),
290            Some(&BrowserContext {
291                name: Annotated::new("Safari".to_owned()),
292                version: Annotated::new("15.5".to_owned()),
293                ..Default::default()
294            })
295        );
296        assert_eq!(
297            contexts.get_key("client_os"),
298            Some(&Context::Os(Box::new(OsContext {
299                name: Annotated::new("Mac OS X".to_owned()),
300                version: Annotated::new(">=10.15.7".to_owned()),
301                ..Default::default()
302            })))
303        );
304        assert_eq!(
305            contexts.get::<DeviceContext>(),
306            Some(&DeviceContext {
307                family: Annotated::new("Mac".to_owned()),
308                brand: Annotated::new("Apple".to_owned()),
309                model: Annotated::new("Mac".to_owned()),
310                ..Default::default()
311            })
312        );
313    }
314
315    #[test]
316    fn test_missing_user() {
317        let payload = include_str!("../../tests/fixtures/replay_missing_user.json");
318
319        let mut replay: Annotated<Replay> = Annotated::from_json(payload).unwrap();
320
321        // No user object and no ip-address was provided.
322        normalize(
323            &mut replay,
324            None,
325            &RawUserAgentInfo::default(),
326            &GeoIpLookup::empty(),
327        );
328        assert_eq!(get_value!(replay.user.geo), None);
329
330        // No user object but an ip-address was provided.
331        let ip_address = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
332        normalize(
333            &mut replay,
334            Some(ip_address),
335            &RawUserAgentInfo::default(),
336            &GeoIpLookup::empty(),
337        );
338
339        let ipaddr = get_value!(replay.user!).ip_address.as_str();
340        assert_eq!(Some("127.0.0.1"), ipaddr);
341    }
342
343    #[test]
344    fn test_set_ip_address_missing_user_ip_address_and_geo() {
345        let lookup = GeoIpLookup::open("tests/fixtures/GeoIP2-Enterprise-Test.mmdb").unwrap();
346        let ip_address = IpAddr::V4(Ipv4Addr::new(2, 125, 160, 216));
347
348        // IP-Address set.
349        let payload = include_str!("../../tests/fixtures/replay_missing_user_ip_address.json");
350
351        let mut replay: Annotated<Replay> = Annotated::from_json(payload).unwrap();
352        normalize(
353            &mut replay,
354            Some(ip_address),
355            &RawUserAgentInfo::default(),
356            &lookup,
357        );
358
359        let user = &replay.value().unwrap().user;
360        assert_json_snapshot!(SerializableAnnotated(user), @r###"
361        {
362          "id": "123",
363          "email": "user@site.com",
364          "ip_address": "2.125.160.216",
365          "username": "user",
366          "geo": {
367            "country_code": "GB",
368            "city": "Boxford",
369            "subdivision": "England",
370            "region": "United Kingdom"
371          }
372        }
373        "###);
374    }
375
376    #[test]
377    fn test_loose_type_requirements() {
378        let payload = include_str!("../../tests/fixtures/replay_failure_22_08_31.json");
379
380        let mut replay: Annotated<Replay> = Annotated::from_json(payload).unwrap();
381        normalize(
382            &mut replay,
383            None,
384            &RawUserAgentInfo::default(),
385            &GeoIpLookup::empty(),
386        );
387
388        let user = get_value!(replay.user!);
389        assert_eq!(user.ip_address.as_str(), Some("127.1.1.1"));
390        assert_eq!(user.username.value(), None);
391        assert_eq!(user.email.as_str(), Some("email@sentry.io"));
392        assert_eq!(user.id.as_str(), Some("1"));
393    }
394
395    #[test]
396    fn test_capped_values() {
397        let urls: Vec<Annotated<String>> = (0..101)
398            .map(|_| Annotated::new("localhost:9000".to_owned()))
399            .collect();
400
401        let error_ids: Vec<Annotated<Uuid>> = (0..101)
402            .map(|_| Annotated::new(Uuid::parse_str("52df9022835246eeb317dbd739ccd059").unwrap()))
403            .collect();
404
405        let trace_ids: Vec<Annotated<Uuid>> = (0..101)
406            .map(|_| Annotated::new(Uuid::parse_str("52df9022835246eeb317dbd739ccd059").unwrap()))
407            .collect();
408
409        let mut replay = Annotated::new(Replay {
410            urls: Annotated::new(urls),
411            error_ids: Annotated::new(error_ids),
412            trace_ids: Annotated::new(trace_ids),
413            ..Default::default()
414        });
415
416        let replay_value = replay.value_mut().as_mut().unwrap();
417        normalize_array_fields(replay_value);
418
419        assert!(replay_value.error_ids.value().unwrap().len() == 100);
420        assert!(replay_value.trace_ids.value().unwrap().len() == 100);
421        assert!(replay_value.urls.value().unwrap().len() == 100);
422    }
423
424    #[test]
425    fn test_truncated_list_less_than_limit() {
426        let mut replay = Annotated::new(Replay {
427            urls: Annotated::new(Vec::new()),
428            error_ids: Annotated::new(Vec::new()),
429            trace_ids: Annotated::new(Vec::new()),
430            ..Default::default()
431        });
432
433        let replay_value = replay.value_mut().as_mut().unwrap();
434        normalize_array_fields(replay_value);
435
436        assert!(replay_value.error_ids.value().unwrap().is_empty());
437        assert!(replay_value.trace_ids.value().unwrap().is_empty());
438        assert!(replay_value.urls.value().unwrap().is_empty());
439    }
440
441    #[test]
442    fn test_error_id_validation() {
443        // NOTE: Interfaces will be tested separately.
444        let json = r#"{
445  "event_id": "52df9022835246eeb317dbd739ccd059",
446  "replay_id": "52df9022835246eeb317dbd739ccd059",
447  "segment_id": 0,
448  "replay_type": "session",
449  "error_sample_rate": 0.5,
450  "session_sample_rate": 0.5,
451  "timestamp": 946684800.0,
452  "replay_start_timestamp": 946684800.0,
453  "urls": ["localhost:9000"],
454  "error_ids": ["test"],
455  "trace_ids": [],
456  "platform": "myplatform",
457  "release": "myrelease",
458  "dist": "mydist",
459  "environment": "myenv",
460  "tags": [
461    [
462      "tag",
463      "value"
464    ]
465  ]
466}"#;
467
468        let mut replay = Annotated::<Replay>::from_json(json).unwrap();
469        let validation_result = validate(replay.value_mut().as_mut().unwrap());
470        assert!(validation_result.is_err());
471    }
472
473    #[test]
474    fn test_trace_id_validation() {
475        // NOTE: Interfaces will be tested separately.
476        let json = r#"{
477  "event_id": "52df9022835246eeb317dbd739ccd059",
478  "replay_id": "52df9022835246eeb317dbd739ccd059",
479  "segment_id": 0,
480  "replay_type": "session",
481  "error_sample_rate": 0.5,
482  "session_sample_rate": 0.5,
483  "timestamp": 946684800.0,
484  "replay_start_timestamp": 946684800.0,
485  "urls": ["localhost:9000"],
486  "error_ids": [],
487  "trace_ids": ["123"],
488  "platform": "myplatform",
489  "release": "myrelease",
490  "dist": "mydist",
491  "environment": "myenv",
492  "tags": [
493    [
494      "tag",
495      "value"
496    ]
497  ]
498}"#;
499
500        let mut replay = Annotated::<Replay>::from_json(json).unwrap();
501        let validation_result = validate(replay.value_mut().as_mut().unwrap());
502        assert!(validation_result.is_err());
503    }
504
505    #[test]
506    fn test_maxchars_trimming() {
507        let json = format!(r#"{{"dist": "{}"}}"#, "0".repeat(100));
508        let mut replay = Annotated::<Replay>::from_json(json.as_str()).unwrap();
509
510        normalize(
511            &mut replay,
512            None,
513            &RawUserAgentInfo::default(),
514            &GeoIpLookup::empty(),
515        );
516        assert_annotated_snapshot!(replay, @r###"
517        {
518          "platform": "other",
519          "dist": "0000000000000000000000000000000000000000000000000000000000000...",
520          "type": "replay_event",
521          "_meta": {
522            "dist": {
523              "": {
524                "rem": [
525                  [
526                    "!limit",
527                    "s",
528                    61,
529                    64
530                  ]
531                ],
532                "len": 100
533              }
534            }
535          }
536        }
537        "###);
538    }
539
540    #[test]
541    fn test_validate_u16_segment_id() {
542        // Does not fit within a u16.
543        let replay_id =
544            Annotated::new(EventId("52df9022835246eeb317dbd739ccd059".parse().unwrap()));
545        let segment_id: Annotated<u64> = Annotated::new(u16::MAX as u64 + 1);
546        let mut replay = Annotated::new(Replay {
547            replay_id,
548            segment_id,
549            ..Default::default()
550        });
551        assert!(validate(replay.value_mut().as_mut().unwrap()).is_err());
552
553        // Fits within a u16.
554        let replay_id =
555            Annotated::new(EventId("52df9022835246eeb317dbd739ccd059".parse().unwrap()));
556        let segment_id: Annotated<u64> = Annotated::new(u16::MAX as u64);
557        let mut replay = Annotated::new(Replay {
558            replay_id,
559            segment_id,
560            ..Default::default()
561        });
562        assert!(validate(replay.value_mut().as_mut().unwrap()).is_ok());
563    }
564}