relay_event_normalization/
replay.rs

1//! Validation and normalization of [`Replay`] events.
2
3use std::net::IpAddr as StdIpAddr;
4
5use relay_event_schema::processor::{self, ProcessingState, Processor};
6use relay_event_schema::protocol::{Contexts, IpAddr, Replay};
7use relay_protocol::Annotated;
8
9use crate::event::normalize_user_geoinfo;
10use crate::normalize::user_agent;
11use crate::user_agent::RawUserAgentInfo;
12use crate::{GeoIpLookup, trimming};
13
14/// Replay validation error.
15///
16/// This error is returned from [`validate`].
17#[derive(Debug, thiserror::Error)]
18pub enum ReplayError {
19    /// The replay event is missing a `replay_id`.
20    #[error("missing replay_id")]
21    MissingReplayId,
22    /// The replay event is missing a `segment_id`.
23    #[error("missing segment_id")]
24    MissingSegmentId,
25    /// The `segment_id` is to large to fit in a a u16.
26    #[error("segment_id too large")]
27    SegmentIdTooLarge,
28    /// One or more of the `error_ids` have an error.
29    #[error("invalid error_id specified")]
30    InvalidErrorId,
31    /// One or more of the `trace_ids` have an error.
32    #[error("invalid trace_id specified")]
33    InvalidTraceId,
34}
35
36/// Checks if the Replay event is structurally valid.
37///
38/// Returns `Ok(())`, if the Replay is valid and can be normalized. Otherwise, returns
39/// `Err(ReplayError::InvalidPayload)` describing the missing or invalid data.
40pub fn validate(replay: &Replay) -> Result<(), ReplayError> {
41    replay
42        .replay_id
43        .value()
44        .ok_or(ReplayError::MissingReplayId)?;
45
46    let segment_id = *replay
47        .segment_id
48        .value()
49        .ok_or(ReplayError::MissingSegmentId)?;
50
51    if segment_id > u16::MAX as u64 {
52        return Err(ReplayError::SegmentIdTooLarge);
53    }
54
55    if replay
56        .error_ids
57        .value()
58        .into_iter()
59        .flat_map(|v| v.iter())
60        .any(|v| v.meta().has_errors())
61    {
62        return Err(ReplayError::InvalidErrorId);
63    }
64
65    if replay
66        .trace_ids
67        .value()
68        .into_iter()
69        .flat_map(|v| v.iter())
70        .any(|v| v.meta().has_errors())
71    {
72        return Err(ReplayError::InvalidTraceId);
73    }
74
75    Ok(())
76}
77
78/// Adds default fields and normalizes all values in to their standard representation.
79pub fn normalize(
80    replay: &mut Annotated<Replay>,
81    client_ip: Option<StdIpAddr>,
82    user_agent: &RawUserAgentInfo<&str>,
83    geoip_lookup: &GeoIpLookup,
84) {
85    let _ = processor::apply(replay, |replay_value, meta| {
86        normalize_platform(replay_value);
87        normalize_ip_address(replay_value, client_ip);
88        normalize_user_geoinfo(
89            geoip_lookup,
90            &mut replay_value.user,
91            client_ip.map(|ip| IpAddr(ip.to_string())).as_ref(),
92        );
93        normalize_user_agent(replay_value, user_agent);
94        normalize_type(replay_value);
95        normalize_array_fields(replay_value);
96        let _ = trimming::TrimmingProcessor::new().process_replay(
97            replay_value,
98            meta,
99            ProcessingState::root(),
100        );
101        Ok(())
102    });
103}
104
105fn normalize_array_fields(replay: &mut Replay) {
106    // TODO: This should be replaced by the TrimmingProcessor.
107    // https://github.com/getsentry/relay/pull/1910#pullrequestreview-1337188206
108    if let Some(items) = replay.error_ids.value_mut() {
109        items.truncate(100);
110    }
111
112    if let Some(items) = replay.trace_ids.value_mut() {
113        items.truncate(100);
114    }
115
116    if let Some(items) = replay.urls.value_mut() {
117        items.truncate(100);
118    }
119}
120
121fn normalize_ip_address(replay: &mut Replay, ip_address: Option<StdIpAddr>) {
122    crate::event::normalize_ip_addresses(
123        &mut replay.request,
124        &mut replay.user,
125        replay.platform.as_str(),
126        ip_address.map(|ip| IpAddr(ip.to_string())).as_ref(),
127        replay.sdk.value(),
128    );
129}
130
131fn normalize_user_agent(replay: &mut Replay, default_user_agent: &RawUserAgentInfo<&str>) {
132    let headers = match replay
133        .request
134        .value()
135        .and_then(|request| request.headers.value())
136    {
137        Some(headers) => headers,
138        None => return,
139    };
140
141    let user_agent_info = RawUserAgentInfo::from_headers(headers);
142    let user_agent_info = if user_agent_info.is_empty() {
143        default_user_agent
144    } else {
145        &user_agent_info
146    };
147
148    let contexts = replay.contexts.get_or_insert_with(Contexts::new);
149    user_agent::normalize_user_agent_info_generic(contexts, &replay.platform, user_agent_info);
150}
151
152fn normalize_platform(replay: &mut Replay) {
153    // Null platforms are permitted but must be defaulted before continuing.
154    let platform = replay.platform.get_or_insert_with(|| "other".to_owned());
155
156    // Normalize bad platforms to "other" type.
157    if !crate::is_valid_platform(platform) {
158        replay.platform = Annotated::from("other".to_owned());
159    }
160}
161
162fn normalize_type(replay: &mut Replay) {
163    replay.ty = Annotated::from("replay_event".to_owned());
164}
165
166#[cfg(test)]
167mod tests {
168    use std::net::{IpAddr, Ipv4Addr};
169
170    use chrono::{TimeZone, Utc};
171    use insta::assert_json_snapshot;
172    use relay_protocol::{SerializableAnnotated, assert_annotated_snapshot, get_value};
173    use uuid::Uuid;
174
175    use relay_event_schema::protocol::{
176        BrowserContext, Context, DeviceContext, EventId, OsContext, TagEntry, Tags,
177    };
178
179    use super::*;
180
181    #[test]
182    fn test_event_roundtrip() {
183        // NOTE: Interfaces will be tested separately.
184        let json = r#"{
185  "event_id": "52df9022835246eeb317dbd739ccd059",
186  "replay_id": "52df9022835246eeb317dbd739ccd059",
187  "segment_id": 0,
188  "replay_type": "session",
189  "error_sample_rate": 0.5,
190  "session_sample_rate": 0.5,
191  "timestamp": 946684800.0,
192  "replay_start_timestamp": 946684800.0,
193  "urls": ["localhost:9000"],
194  "error_ids": ["52df9022835246eeb317dbd739ccd059"],
195  "trace_ids": ["52df9022835246eeb317dbd739ccd059"],
196  "platform": "myplatform",
197  "release": "myrelease",
198  "dist": "mydist",
199  "environment": "myenv",
200  "tags": [
201    [
202      "tag",
203      "value"
204    ]
205  ]
206}"#;
207
208        let replay = Annotated::new(Replay {
209            event_id: Annotated::new(EventId("52df9022835246eeb317dbd739ccd059".parse().unwrap())),
210            replay_id: Annotated::new(EventId("52df9022835246eeb317dbd739ccd059".parse().unwrap())),
211            replay_type: Annotated::new("session".to_owned()),
212            segment_id: Annotated::new(0),
213            timestamp: Annotated::new(Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap().into()),
214            replay_start_timestamp: Annotated::new(
215                Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap().into(),
216            ),
217            urls: Annotated::new(vec![Annotated::new("localhost:9000".to_owned())]),
218            error_ids: Annotated::new(vec![Annotated::new(
219                Uuid::parse_str("52df9022835246eeb317dbd739ccd059").unwrap(),
220            )]),
221            trace_ids: Annotated::new(vec![Annotated::new(
222                Uuid::parse_str("52df9022835246eeb317dbd739ccd059").unwrap(),
223            )]),
224            platform: Annotated::new("myplatform".to_owned()),
225            release: Annotated::new("myrelease".to_owned().into()),
226            dist: Annotated::new("mydist".to_owned()),
227            environment: Annotated::new("myenv".to_owned()),
228            tags: {
229                let items = vec![Annotated::new(TagEntry(
230                    Annotated::new("tag".to_owned()),
231                    Annotated::new("value".to_owned()),
232                ))];
233                Annotated::new(Tags(items.into()))
234            },
235            ..Default::default()
236        });
237
238        assert_eq!(replay, Annotated::from_json(json).unwrap());
239    }
240
241    #[test]
242    fn test_lenient_release() {
243        let input = r#"{"release":42}"#;
244        let output = r#"{"release":"42"}"#;
245        let event = Annotated::new(Replay {
246            release: Annotated::new("42".to_owned().into()),
247            ..Default::default()
248        });
249
250        assert_eq!(event, Annotated::from_json(input).unwrap());
251        assert_eq!(output, event.to_json().unwrap());
252    }
253
254    #[test]
255    fn test_set_user_agent_meta() {
256        // Parse user input.
257        let payload = include_str!("../../tests/fixtures/replay.json");
258
259        let mut replay: Annotated<Replay> = Annotated::from_json(payload).unwrap();
260        normalize(
261            &mut replay,
262            None,
263            &RawUserAgentInfo::default(),
264            &GeoIpLookup::empty(),
265        );
266
267        let contexts = get_value!(replay.contexts!);
268        assert_eq!(
269            contexts.get::<BrowserContext>(),
270            Some(&BrowserContext {
271                name: Annotated::new("Safari".to_owned()),
272                version: Annotated::new("15.5".to_owned()),
273                ..Default::default()
274            })
275        );
276        assert_eq!(
277            contexts.get_key("client_os"),
278            Some(&Context::Os(Box::new(OsContext {
279                name: Annotated::new("Mac OS X".to_owned()),
280                version: Annotated::new(">=10.15.7".to_owned()),
281                ..Default::default()
282            })))
283        );
284        assert_eq!(
285            contexts.get::<DeviceContext>(),
286            Some(&DeviceContext {
287                family: Annotated::new("Mac".to_owned()),
288                brand: Annotated::new("Apple".to_owned()),
289                model: Annotated::new("Mac".to_owned()),
290                ..Default::default()
291            })
292        );
293    }
294
295    #[test]
296    fn test_missing_user() {
297        let payload = include_str!("../../tests/fixtures/replay_missing_user.json");
298
299        let mut replay: Annotated<Replay> = Annotated::from_json(payload).unwrap();
300
301        // No user object and no ip-address was provided.
302        normalize(
303            &mut replay,
304            None,
305            &RawUserAgentInfo::default(),
306            &GeoIpLookup::empty(),
307        );
308        assert_eq!(get_value!(replay.user.geo), None);
309
310        // No user object but an ip-address was provided.
311        let ip_address = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
312        normalize(
313            &mut replay,
314            Some(ip_address),
315            &RawUserAgentInfo::default(),
316            &GeoIpLookup::empty(),
317        );
318
319        let ipaddr = get_value!(replay.user!).ip_address.as_str();
320        assert_eq!(Some("127.0.0.1"), ipaddr);
321    }
322
323    #[test]
324    fn test_set_ip_address_missing_user_ip_address_and_geo() {
325        let lookup = GeoIpLookup::open("tests/fixtures/GeoIP2-Enterprise-Test.mmdb").unwrap();
326        let ip_address = IpAddr::V4(Ipv4Addr::new(2, 125, 160, 216));
327
328        // IP-Address set.
329        let payload = include_str!("../../tests/fixtures/replay_missing_user_ip_address.json");
330
331        let mut replay: Annotated<Replay> = Annotated::from_json(payload).unwrap();
332        normalize(
333            &mut replay,
334            Some(ip_address),
335            &RawUserAgentInfo::default(),
336            &lookup,
337        );
338
339        let user = &replay.value().unwrap().user;
340        assert_json_snapshot!(SerializableAnnotated(user), @r###"
341        {
342          "id": "123",
343          "email": "user@site.com",
344          "ip_address": "2.125.160.216",
345          "username": "user",
346          "geo": {
347            "country_code": "GB",
348            "city": "Boxford",
349            "subdivision": "England",
350            "region": "United Kingdom"
351          }
352        }
353        "###);
354    }
355
356    #[test]
357    fn test_loose_type_requirements() {
358        let payload = include_str!("../../tests/fixtures/replay_failure_22_08_31.json");
359
360        let mut replay: Annotated<Replay> = Annotated::from_json(payload).unwrap();
361        normalize(
362            &mut replay,
363            None,
364            &RawUserAgentInfo::default(),
365            &GeoIpLookup::empty(),
366        );
367
368        let user = get_value!(replay.user!);
369        assert_eq!(user.ip_address.as_str(), Some("127.1.1.1"));
370        assert_eq!(user.username.value(), None);
371        assert_eq!(user.email.as_str(), Some("email@sentry.io"));
372        assert_eq!(user.id.as_str(), Some("1"));
373    }
374
375    #[test]
376    fn test_capped_values() {
377        let urls: Vec<Annotated<String>> = (0..101)
378            .map(|_| Annotated::new("localhost:9000".to_owned()))
379            .collect();
380
381        let error_ids: Vec<Annotated<Uuid>> = (0..101)
382            .map(|_| Annotated::new(Uuid::parse_str("52df9022835246eeb317dbd739ccd059").unwrap()))
383            .collect();
384
385        let trace_ids: Vec<Annotated<Uuid>> = (0..101)
386            .map(|_| Annotated::new(Uuid::parse_str("52df9022835246eeb317dbd739ccd059").unwrap()))
387            .collect();
388
389        let mut replay = Annotated::new(Replay {
390            urls: Annotated::new(urls),
391            error_ids: Annotated::new(error_ids),
392            trace_ids: Annotated::new(trace_ids),
393            ..Default::default()
394        });
395
396        let replay_value = replay.value_mut().as_mut().unwrap();
397        normalize_array_fields(replay_value);
398
399        assert!(replay_value.error_ids.value().unwrap().len() == 100);
400        assert!(replay_value.trace_ids.value().unwrap().len() == 100);
401        assert!(replay_value.urls.value().unwrap().len() == 100);
402    }
403
404    #[test]
405    fn test_truncated_list_less_than_limit() {
406        let mut replay = Annotated::new(Replay {
407            urls: Annotated::new(Vec::new()),
408            error_ids: Annotated::new(Vec::new()),
409            trace_ids: Annotated::new(Vec::new()),
410            ..Default::default()
411        });
412
413        let replay_value = replay.value_mut().as_mut().unwrap();
414        normalize_array_fields(replay_value);
415
416        assert!(replay_value.error_ids.value().unwrap().is_empty());
417        assert!(replay_value.trace_ids.value().unwrap().is_empty());
418        assert!(replay_value.urls.value().unwrap().is_empty());
419    }
420
421    #[test]
422    fn test_error_id_validation() {
423        // NOTE: Interfaces will be tested separately.
424        let json = r#"{
425  "event_id": "52df9022835246eeb317dbd739ccd059",
426  "replay_id": "52df9022835246eeb317dbd739ccd059",
427  "segment_id": 0,
428  "replay_type": "session",
429  "error_sample_rate": 0.5,
430  "session_sample_rate": 0.5,
431  "timestamp": 946684800.0,
432  "replay_start_timestamp": 946684800.0,
433  "urls": ["localhost:9000"],
434  "error_ids": ["test"],
435  "trace_ids": [],
436  "platform": "myplatform",
437  "release": "myrelease",
438  "dist": "mydist",
439  "environment": "myenv",
440  "tags": [
441    [
442      "tag",
443      "value"
444    ]
445  ]
446}"#;
447
448        let mut replay = Annotated::<Replay>::from_json(json).unwrap();
449        let validation_result = validate(replay.value_mut().as_mut().unwrap());
450        assert!(validation_result.is_err());
451    }
452
453    #[test]
454    fn test_trace_id_validation() {
455        // NOTE: Interfaces will be tested separately.
456        let json = r#"{
457  "event_id": "52df9022835246eeb317dbd739ccd059",
458  "replay_id": "52df9022835246eeb317dbd739ccd059",
459  "segment_id": 0,
460  "replay_type": "session",
461  "error_sample_rate": 0.5,
462  "session_sample_rate": 0.5,
463  "timestamp": 946684800.0,
464  "replay_start_timestamp": 946684800.0,
465  "urls": ["localhost:9000"],
466  "error_ids": [],
467  "trace_ids": ["123"],
468  "platform": "myplatform",
469  "release": "myrelease",
470  "dist": "mydist",
471  "environment": "myenv",
472  "tags": [
473    [
474      "tag",
475      "value"
476    ]
477  ]
478}"#;
479
480        let mut replay = Annotated::<Replay>::from_json(json).unwrap();
481        let validation_result = validate(replay.value_mut().as_mut().unwrap());
482        assert!(validation_result.is_err());
483    }
484
485    #[test]
486    fn test_maxchars_trimming() {
487        let json = format!(r#"{{"dist": "{}"}}"#, "0".repeat(100));
488        let mut replay = Annotated::<Replay>::from_json(json.as_str()).unwrap();
489
490        normalize(
491            &mut replay,
492            None,
493            &RawUserAgentInfo::default(),
494            &GeoIpLookup::empty(),
495        );
496        assert_annotated_snapshot!(replay, @r###"
497        {
498          "platform": "other",
499          "dist": "0000000000000000000000000000000000000000000000000000000000000...",
500          "type": "replay_event",
501          "_meta": {
502            "dist": {
503              "": {
504                "rem": [
505                  [
506                    "!limit",
507                    "s",
508                    61,
509                    64
510                  ]
511                ],
512                "len": 100
513              }
514            }
515          }
516        }
517        "###);
518    }
519
520    #[test]
521    fn test_validate_u16_segment_id() {
522        // Does not fit within a u16.
523        let replay_id =
524            Annotated::new(EventId("52df9022835246eeb317dbd739ccd059".parse().unwrap()));
525        let segment_id: Annotated<u64> = Annotated::new(u16::MAX as u64 + 1);
526        let mut replay = Annotated::new(Replay {
527            replay_id,
528            segment_id,
529            ..Default::default()
530        });
531        assert!(validate(replay.value_mut().as_mut().unwrap()).is_err());
532
533        // Fits within a u16.
534        let replay_id =
535            Annotated::new(EventId("52df9022835246eeb317dbd739ccd059".parse().unwrap()));
536        let segment_id: Annotated<u64> = Annotated::new(u16::MAX as u64);
537        let mut replay = Annotated::new(Replay {
538            replay_id,
539            segment_id,
540            ..Default::default()
541        });
542        assert!(validate(replay.value_mut().as_mut().unwrap()).is_ok());
543    }
544}