relay_event_normalization/
replay.rs

1//! Validation and normalization of [`Replay`] events.
2
3use std::net::IpAddr as StdIpAddr;
4
5use relay_event_schema::processor::{self, ProcessingState, Processor};
6use relay_event_schema::protocol::{Contexts, IpAddr, Replay};
7use relay_protocol::Annotated;
8
9use crate::event::normalize_user_geoinfo;
10use crate::normalize::user_agent;
11use crate::user_agent::RawUserAgentInfo;
12use crate::{GeoIpLookup, trimming};
13
14/// Replay validation or normalization error.
15///
16/// This error is returned from [`validate`] and [`normalize`].
17#[derive(Debug, thiserror::Error)]
18pub enum ReplayError {
19    /// The Replay event could not be parsed from JSON.
20    #[error("invalid json")]
21    CouldNotParse(#[from] serde_json::Error),
22
23    /// The Replay event was parsed but did not match the schema.
24    #[error("no data found")]
25    NoContent,
26
27    /// The Replay contains invalid data or is missing a required field.
28    ///
29    /// This is returned from [`validate`].
30    #[error("invalid payload {0}")]
31    InvalidPayload(String),
32
33    /// An error occurred during PII scrubbing of the Replay.
34    ///
35    /// This erorr is usually returned when the PII configuration fails to parse.
36    #[error("failed to scrub PII: {0}")]
37    CouldNotScrub(String),
38}
39
40/// Checks if the Replay event is structurally valid.
41///
42/// Returns `Ok(())`, if the Replay is valid and can be normalized. Otherwise, returns
43/// `Err(ReplayError::InvalidPayload)` describing the missing or invalid data.
44pub fn validate(replay: &Replay) -> Result<(), ReplayError> {
45    replay
46        .replay_id
47        .value()
48        .ok_or_else(|| ReplayError::InvalidPayload("missing replay_id".to_owned()))?;
49
50    let segment_id = *replay
51        .segment_id
52        .value()
53        .ok_or_else(|| ReplayError::InvalidPayload("missing segment_id".to_owned()))?;
54
55    if segment_id > u16::MAX as u64 {
56        return Err(ReplayError::InvalidPayload(
57            "segment_id exceeded u16 limit".to_owned(),
58        ));
59    }
60
61    if replay
62        .error_ids
63        .value()
64        .into_iter()
65        .flat_map(|v| v.iter())
66        .any(|v| v.meta().has_errors())
67    {
68        return Err(ReplayError::InvalidPayload(
69            "Invalid error-id specified.".to_owned(),
70        ));
71    }
72
73    if replay
74        .trace_ids
75        .value()
76        .into_iter()
77        .flat_map(|v| v.iter())
78        .any(|v| v.meta().has_errors())
79    {
80        return Err(ReplayError::InvalidPayload(
81            "Invalid trace-id specified.".to_owned(),
82        ));
83    }
84
85    Ok(())
86}
87
88/// Adds default fields and normalizes all values in to their standard representation.
89pub fn normalize(
90    replay: &mut Annotated<Replay>,
91    client_ip: Option<StdIpAddr>,
92    user_agent: &RawUserAgentInfo<&str>,
93    geoip_lookup: &GeoIpLookup,
94) {
95    let _ = processor::apply(replay, |replay_value, meta| {
96        normalize_platform(replay_value);
97        normalize_ip_address(replay_value, client_ip);
98        normalize_user_geoinfo(
99            geoip_lookup,
100            &mut replay_value.user,
101            client_ip.map(|ip| IpAddr(ip.to_string())).as_ref(),
102        );
103        normalize_user_agent(replay_value, user_agent);
104        normalize_type(replay_value);
105        normalize_array_fields(replay_value);
106        let _ = trimming::TrimmingProcessor::new().process_replay(
107            replay_value,
108            meta,
109            ProcessingState::root(),
110        );
111        Ok(())
112    });
113}
114
115fn normalize_array_fields(replay: &mut Replay) {
116    // TODO: This should be replaced by the TrimmingProcessor.
117    // https://github.com/getsentry/relay/pull/1910#pullrequestreview-1337188206
118    if let Some(items) = replay.error_ids.value_mut() {
119        items.truncate(100);
120    }
121
122    if let Some(items) = replay.trace_ids.value_mut() {
123        items.truncate(100);
124    }
125
126    if let Some(items) = replay.urls.value_mut() {
127        items.truncate(100);
128    }
129}
130
131fn normalize_ip_address(replay: &mut Replay, ip_address: Option<StdIpAddr>) {
132    crate::event::normalize_ip_addresses(
133        &mut replay.request,
134        &mut replay.user,
135        replay.platform.as_str(),
136        ip_address.map(|ip| IpAddr(ip.to_string())).as_ref(),
137        replay.sdk.value(),
138    );
139}
140
141fn normalize_user_agent(replay: &mut Replay, default_user_agent: &RawUserAgentInfo<&str>) {
142    let headers = match replay
143        .request
144        .value()
145        .and_then(|request| request.headers.value())
146    {
147        Some(headers) => headers,
148        None => return,
149    };
150
151    let user_agent_info = RawUserAgentInfo::from_headers(headers);
152    let user_agent_info = if user_agent_info.is_empty() {
153        default_user_agent
154    } else {
155        &user_agent_info
156    };
157
158    let contexts = replay.contexts.get_or_insert_with(Contexts::new);
159    user_agent::normalize_user_agent_info_generic(contexts, &replay.platform, user_agent_info);
160}
161
162fn normalize_platform(replay: &mut Replay) {
163    // Null platforms are permitted but must be defaulted before continuing.
164    let platform = replay.platform.get_or_insert_with(|| "other".to_owned());
165
166    // Normalize bad platforms to "other" type.
167    if !crate::is_valid_platform(platform) {
168        replay.platform = Annotated::from("other".to_owned());
169    }
170}
171
172fn normalize_type(replay: &mut Replay) {
173    replay.ty = Annotated::from("replay_event".to_owned());
174}
175
176#[cfg(test)]
177mod tests {
178    use std::net::{IpAddr, Ipv4Addr};
179
180    use chrono::{TimeZone, Utc};
181    use insta::assert_json_snapshot;
182    use relay_protocol::{SerializableAnnotated, assert_annotated_snapshot, get_value};
183    use uuid::Uuid;
184
185    use relay_event_schema::protocol::{
186        BrowserContext, Context, DeviceContext, EventId, OsContext, TagEntry, Tags,
187    };
188
189    use super::*;
190
191    #[test]
192    fn test_event_roundtrip() {
193        // NOTE: Interfaces will be tested separately.
194        let json = r#"{
195  "event_id": "52df9022835246eeb317dbd739ccd059",
196  "replay_id": "52df9022835246eeb317dbd739ccd059",
197  "segment_id": 0,
198  "replay_type": "session",
199  "error_sample_rate": 0.5,
200  "session_sample_rate": 0.5,
201  "timestamp": 946684800.0,
202  "replay_start_timestamp": 946684800.0,
203  "urls": ["localhost:9000"],
204  "error_ids": ["52df9022835246eeb317dbd739ccd059"],
205  "trace_ids": ["52df9022835246eeb317dbd739ccd059"],
206  "platform": "myplatform",
207  "release": "myrelease",
208  "dist": "mydist",
209  "environment": "myenv",
210  "tags": [
211    [
212      "tag",
213      "value"
214    ]
215  ]
216}"#;
217
218        let replay = Annotated::new(Replay {
219            event_id: Annotated::new(EventId("52df9022835246eeb317dbd739ccd059".parse().unwrap())),
220            replay_id: Annotated::new(EventId("52df9022835246eeb317dbd739ccd059".parse().unwrap())),
221            replay_type: Annotated::new("session".to_owned()),
222            segment_id: Annotated::new(0),
223            timestamp: Annotated::new(Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap().into()),
224            replay_start_timestamp: Annotated::new(
225                Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap().into(),
226            ),
227            urls: Annotated::new(vec![Annotated::new("localhost:9000".to_owned())]),
228            error_ids: Annotated::new(vec![Annotated::new(
229                Uuid::parse_str("52df9022835246eeb317dbd739ccd059").unwrap(),
230            )]),
231            trace_ids: Annotated::new(vec![Annotated::new(
232                Uuid::parse_str("52df9022835246eeb317dbd739ccd059").unwrap(),
233            )]),
234            platform: Annotated::new("myplatform".to_owned()),
235            release: Annotated::new("myrelease".to_owned().into()),
236            dist: Annotated::new("mydist".to_owned()),
237            environment: Annotated::new("myenv".to_owned()),
238            tags: {
239                let items = vec![Annotated::new(TagEntry(
240                    Annotated::new("tag".to_owned()),
241                    Annotated::new("value".to_owned()),
242                ))];
243                Annotated::new(Tags(items.into()))
244            },
245            ..Default::default()
246        });
247
248        assert_eq!(replay, Annotated::from_json(json).unwrap());
249    }
250
251    #[test]
252    fn test_lenient_release() {
253        let input = r#"{"release":42}"#;
254        let output = r#"{"release":"42"}"#;
255        let event = Annotated::new(Replay {
256            release: Annotated::new("42".to_owned().into()),
257            ..Default::default()
258        });
259
260        assert_eq!(event, Annotated::from_json(input).unwrap());
261        assert_eq!(output, event.to_json().unwrap());
262    }
263
264    #[test]
265    fn test_set_user_agent_meta() {
266        // Parse user input.
267        let payload = include_str!("../../tests/fixtures/replay.json");
268
269        let mut replay: Annotated<Replay> = Annotated::from_json(payload).unwrap();
270        normalize(
271            &mut replay,
272            None,
273            &RawUserAgentInfo::default(),
274            &GeoIpLookup::empty(),
275        );
276
277        let contexts = get_value!(replay.contexts!);
278        assert_eq!(
279            contexts.get::<BrowserContext>(),
280            Some(&BrowserContext {
281                name: Annotated::new("Safari".to_owned()),
282                version: Annotated::new("15.5".to_owned()),
283                ..Default::default()
284            })
285        );
286        assert_eq!(
287            contexts.get_key("client_os"),
288            Some(&Context::Os(Box::new(OsContext {
289                name: Annotated::new("Mac OS X".to_owned()),
290                version: Annotated::new(">=10.15.7".to_owned()),
291                ..Default::default()
292            })))
293        );
294        assert_eq!(
295            contexts.get::<DeviceContext>(),
296            Some(&DeviceContext {
297                family: Annotated::new("Mac".to_owned()),
298                brand: Annotated::new("Apple".to_owned()),
299                model: Annotated::new("Mac".to_owned()),
300                ..Default::default()
301            })
302        );
303    }
304
305    #[test]
306    fn test_missing_user() {
307        let payload = include_str!("../../tests/fixtures/replay_missing_user.json");
308
309        let mut replay: Annotated<Replay> = Annotated::from_json(payload).unwrap();
310
311        // No user object and no ip-address was provided.
312        normalize(
313            &mut replay,
314            None,
315            &RawUserAgentInfo::default(),
316            &GeoIpLookup::empty(),
317        );
318        assert_eq!(get_value!(replay.user.geo), None);
319
320        // No user object but an ip-address was provided.
321        let ip_address = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
322        normalize(
323            &mut replay,
324            Some(ip_address),
325            &RawUserAgentInfo::default(),
326            &GeoIpLookup::empty(),
327        );
328
329        let ipaddr = get_value!(replay.user!).ip_address.as_str();
330        assert_eq!(Some("127.0.0.1"), ipaddr);
331    }
332
333    #[test]
334    fn test_set_ip_address_missing_user_ip_address_and_geo() {
335        let lookup = GeoIpLookup::open("tests/fixtures/GeoIP2-Enterprise-Test.mmdb").unwrap();
336        let ip_address = IpAddr::V4(Ipv4Addr::new(2, 125, 160, 216));
337
338        // IP-Address set.
339        let payload = include_str!("../../tests/fixtures/replay_missing_user_ip_address.json");
340
341        let mut replay: Annotated<Replay> = Annotated::from_json(payload).unwrap();
342        normalize(
343            &mut replay,
344            Some(ip_address),
345            &RawUserAgentInfo::default(),
346            &lookup,
347        );
348
349        let user = &replay.value().unwrap().user;
350        assert_json_snapshot!(SerializableAnnotated(user), @r###"
351        {
352          "id": "123",
353          "email": "user@site.com",
354          "ip_address": "2.125.160.216",
355          "username": "user",
356          "geo": {
357            "country_code": "GB",
358            "city": "Boxford",
359            "subdivision": "England",
360            "region": "United Kingdom"
361          }
362        }
363        "###);
364    }
365
366    #[test]
367    fn test_loose_type_requirements() {
368        let payload = include_str!("../../tests/fixtures/replay_failure_22_08_31.json");
369
370        let mut replay: Annotated<Replay> = Annotated::from_json(payload).unwrap();
371        normalize(
372            &mut replay,
373            None,
374            &RawUserAgentInfo::default(),
375            &GeoIpLookup::empty(),
376        );
377
378        let user = get_value!(replay.user!);
379        assert_eq!(user.ip_address.as_str(), Some("127.1.1.1"));
380        assert_eq!(user.username.value(), None);
381        assert_eq!(user.email.as_str(), Some("email@sentry.io"));
382        assert_eq!(user.id.as_str(), Some("1"));
383    }
384
385    #[test]
386    fn test_capped_values() {
387        let urls: Vec<Annotated<String>> = (0..101)
388            .map(|_| Annotated::new("localhost:9000".to_owned()))
389            .collect();
390
391        let error_ids: Vec<Annotated<Uuid>> = (0..101)
392            .map(|_| Annotated::new(Uuid::parse_str("52df9022835246eeb317dbd739ccd059").unwrap()))
393            .collect();
394
395        let trace_ids: Vec<Annotated<Uuid>> = (0..101)
396            .map(|_| Annotated::new(Uuid::parse_str("52df9022835246eeb317dbd739ccd059").unwrap()))
397            .collect();
398
399        let mut replay = Annotated::new(Replay {
400            urls: Annotated::new(urls),
401            error_ids: Annotated::new(error_ids),
402            trace_ids: Annotated::new(trace_ids),
403            ..Default::default()
404        });
405
406        let replay_value = replay.value_mut().as_mut().unwrap();
407        normalize_array_fields(replay_value);
408
409        assert!(replay_value.error_ids.value().unwrap().len() == 100);
410        assert!(replay_value.trace_ids.value().unwrap().len() == 100);
411        assert!(replay_value.urls.value().unwrap().len() == 100);
412    }
413
414    #[test]
415    fn test_truncated_list_less_than_limit() {
416        let mut replay = Annotated::new(Replay {
417            urls: Annotated::new(Vec::new()),
418            error_ids: Annotated::new(Vec::new()),
419            trace_ids: Annotated::new(Vec::new()),
420            ..Default::default()
421        });
422
423        let replay_value = replay.value_mut().as_mut().unwrap();
424        normalize_array_fields(replay_value);
425
426        assert!(replay_value.error_ids.value().unwrap().is_empty());
427        assert!(replay_value.trace_ids.value().unwrap().is_empty());
428        assert!(replay_value.urls.value().unwrap().is_empty());
429    }
430
431    #[test]
432    fn test_error_id_validation() {
433        // NOTE: Interfaces will be tested separately.
434        let json = r#"{
435  "event_id": "52df9022835246eeb317dbd739ccd059",
436  "replay_id": "52df9022835246eeb317dbd739ccd059",
437  "segment_id": 0,
438  "replay_type": "session",
439  "error_sample_rate": 0.5,
440  "session_sample_rate": 0.5,
441  "timestamp": 946684800.0,
442  "replay_start_timestamp": 946684800.0,
443  "urls": ["localhost:9000"],
444  "error_ids": ["test"],
445  "trace_ids": [],
446  "platform": "myplatform",
447  "release": "myrelease",
448  "dist": "mydist",
449  "environment": "myenv",
450  "tags": [
451    [
452      "tag",
453      "value"
454    ]
455  ]
456}"#;
457
458        let mut replay = Annotated::<Replay>::from_json(json).unwrap();
459        let validation_result = validate(replay.value_mut().as_mut().unwrap());
460        assert!(validation_result.is_err());
461    }
462
463    #[test]
464    fn test_trace_id_validation() {
465        // NOTE: Interfaces will be tested separately.
466        let json = r#"{
467  "event_id": "52df9022835246eeb317dbd739ccd059",
468  "replay_id": "52df9022835246eeb317dbd739ccd059",
469  "segment_id": 0,
470  "replay_type": "session",
471  "error_sample_rate": 0.5,
472  "session_sample_rate": 0.5,
473  "timestamp": 946684800.0,
474  "replay_start_timestamp": 946684800.0,
475  "urls": ["localhost:9000"],
476  "error_ids": [],
477  "trace_ids": ["123"],
478  "platform": "myplatform",
479  "release": "myrelease",
480  "dist": "mydist",
481  "environment": "myenv",
482  "tags": [
483    [
484      "tag",
485      "value"
486    ]
487  ]
488}"#;
489
490        let mut replay = Annotated::<Replay>::from_json(json).unwrap();
491        let validation_result = validate(replay.value_mut().as_mut().unwrap());
492        assert!(validation_result.is_err());
493    }
494
495    #[test]
496    fn test_maxchars_trimming() {
497        let json = format!(r#"{{"dist": "{}"}}"#, "0".repeat(100));
498        let mut replay = Annotated::<Replay>::from_json(json.as_str()).unwrap();
499
500        normalize(
501            &mut replay,
502            None,
503            &RawUserAgentInfo::default(),
504            &GeoIpLookup::empty(),
505        );
506        assert_annotated_snapshot!(replay, @r###"
507        {
508          "platform": "other",
509          "dist": "0000000000000000000000000000000000000000000000000000000000000...",
510          "type": "replay_event",
511          "_meta": {
512            "dist": {
513              "": {
514                "rem": [
515                  [
516                    "!limit",
517                    "s",
518                    61,
519                    64
520                  ]
521                ],
522                "len": 100
523              }
524            }
525          }
526        }
527        "###);
528    }
529
530    #[test]
531    fn test_validate_u16_segment_id() {
532        // Does not fit within a u16.
533        let replay_id =
534            Annotated::new(EventId("52df9022835246eeb317dbd739ccd059".parse().unwrap()));
535        let segment_id: Annotated<u64> = Annotated::new(u16::MAX as u64 + 1);
536        let mut replay = Annotated::new(Replay {
537            replay_id,
538            segment_id,
539            ..Default::default()
540        });
541        assert!(validate(replay.value_mut().as_mut().unwrap()).is_err());
542
543        // Fits within a u16.
544        let replay_id =
545            Annotated::new(EventId("52df9022835246eeb317dbd739ccd059".parse().unwrap()));
546        let segment_id: Annotated<u64> = Annotated::new(u16::MAX as u64);
547        let mut replay = Annotated::new(Replay {
548            replay_id,
549            segment_id,
550            ..Default::default()
551        });
552        assert!(validate(replay.value_mut().as_mut().unwrap()).is_ok());
553    }
554}