relay_event_normalization/
replay.rs

1//! Validation and normalization of [`Replay`] events.
2
3use std::net::IpAddr as StdIpAddr;
4
5use relay_event_schema::processor::{self, ProcessingState, Processor};
6use relay_event_schema::protocol::{Contexts, IpAddr, Replay};
7use relay_protocol::Annotated;
8
9use crate::event::normalize_user_geoinfo;
10use crate::normalize::user_agent;
11use crate::user_agent::RawUserAgentInfo;
12use crate::{GeoIpLookup, trimming};
13
14/// Replay validation or normalization error.
15///
16/// This error is returned from [`validate`] and [`normalize`].
17#[derive(Debug, thiserror::Error)]
18pub enum ReplayError {
19    /// The Replay event could not be parsed from JSON.
20    #[error("invalid json")]
21    CouldNotParse(#[from] serde_json::Error),
22
23    /// The Replay event was parsed but did not match the schema.
24    #[error("no data found")]
25    NoContent,
26
27    /// The Replay contains invalid data or is missing a required field.
28    ///
29    /// This is returned from [`validate`].
30    #[error("invalid payload {0}")]
31    InvalidPayload(String),
32
33    /// An error occurred during PII scrubbing of the Replay.
34    ///
35    /// This erorr is usually returned when the PII configuration fails to parse.
36    #[error("failed to scrub PII: {0}")]
37    CouldNotScrub(String),
38}
39
40/// Checks if the Replay event is structurally valid.
41///
42/// Returns `Ok(())`, if the Replay is valid and can be normalized. Otherwise, returns
43/// `Err(ReplayError::InvalidPayload)` describing the missing or invalid data.
44pub fn validate(replay: &Replay) -> Result<(), ReplayError> {
45    replay
46        .replay_id
47        .value()
48        .ok_or_else(|| ReplayError::InvalidPayload("missing replay_id".to_string()))?;
49
50    let segment_id = *replay
51        .segment_id
52        .value()
53        .ok_or_else(|| ReplayError::InvalidPayload("missing segment_id".to_string()))?;
54
55    if segment_id > u16::MAX as u64 {
56        return Err(ReplayError::InvalidPayload(
57            "segment_id exceeded u16 limit".to_string(),
58        ));
59    }
60
61    if replay
62        .error_ids
63        .value()
64        .into_iter()
65        .flat_map(|v| v.iter())
66        .any(|v| v.meta().has_errors())
67    {
68        return Err(ReplayError::InvalidPayload(
69            "Invalid error-id specified.".to_string(),
70        ));
71    }
72
73    if replay
74        .trace_ids
75        .value()
76        .into_iter()
77        .flat_map(|v| v.iter())
78        .any(|v| v.meta().has_errors())
79    {
80        return Err(ReplayError::InvalidPayload(
81            "Invalid trace-id specified.".to_string(),
82        ));
83    }
84
85    Ok(())
86}
87
88/// Adds default fields and normalizes all values in to their standard representation.
89pub fn normalize(
90    replay: &mut Annotated<Replay>,
91    client_ip: Option<StdIpAddr>,
92    user_agent: RawUserAgentInfo<&str>,
93    geoip_lookup: Option<&GeoIpLookup>,
94) {
95    let _ = processor::apply(replay, |replay_value, meta| {
96        normalize_platform(replay_value);
97        normalize_ip_address(replay_value, client_ip);
98        if let Some(geoip_lookup) = geoip_lookup {
99            normalize_user_geoinfo(
100                geoip_lookup,
101                &mut replay_value.user,
102                client_ip.map(|ip| IpAddr(ip.to_string())).as_ref(),
103            );
104        }
105        normalize_user_agent(replay_value, user_agent);
106        normalize_type(replay_value);
107        normalize_array_fields(replay_value);
108        let _ = trimming::TrimmingProcessor::new().process_replay(
109            replay_value,
110            meta,
111            ProcessingState::root(),
112        );
113        Ok(())
114    });
115}
116
117fn normalize_array_fields(replay: &mut Replay) {
118    // TODO: This should be replaced by the TrimmingProcessor.
119    // https://github.com/getsentry/relay/pull/1910#pullrequestreview-1337188206
120    if let Some(items) = replay.error_ids.value_mut() {
121        items.truncate(100);
122    }
123
124    if let Some(items) = replay.trace_ids.value_mut() {
125        items.truncate(100);
126    }
127
128    if let Some(items) = replay.urls.value_mut() {
129        items.truncate(100);
130    }
131}
132
133fn normalize_ip_address(replay: &mut Replay, ip_address: Option<StdIpAddr>) {
134    crate::event::normalize_ip_addresses(
135        &mut replay.request,
136        &mut replay.user,
137        replay.platform.as_str(),
138        ip_address.map(|ip| IpAddr(ip.to_string())).as_ref(),
139        replay.sdk.value(),
140    );
141}
142
143fn normalize_user_agent(replay: &mut Replay, default_user_agent: RawUserAgentInfo<&str>) {
144    let headers = match replay
145        .request
146        .value()
147        .and_then(|request| request.headers.value())
148    {
149        Some(headers) => headers,
150        None => return,
151    };
152
153    let user_agent_info = RawUserAgentInfo::from_headers(headers);
154
155    let user_agent_info = if user_agent_info.is_empty() {
156        default_user_agent
157    } else {
158        user_agent_info
159    };
160
161    let contexts = replay.contexts.get_or_insert_with(Contexts::new);
162    user_agent::normalize_user_agent_info_generic(contexts, &replay.platform, &user_agent_info);
163}
164
165fn normalize_platform(replay: &mut Replay) {
166    // Null platforms are permitted but must be defaulted before continuing.
167    let platform = replay.platform.get_or_insert_with(|| "other".to_string());
168
169    // Normalize bad platforms to "other" type.
170    if !crate::is_valid_platform(platform) {
171        replay.platform = Annotated::from("other".to_string());
172    }
173}
174
175fn normalize_type(replay: &mut Replay) {
176    replay.ty = Annotated::from("replay_event".to_string());
177}
178
179#[cfg(test)]
180mod tests {
181    use std::net::{IpAddr, Ipv4Addr};
182
183    use chrono::{TimeZone, Utc};
184    use insta::assert_json_snapshot;
185    use relay_protocol::{SerializableAnnotated, assert_annotated_snapshot, get_value};
186    use uuid::Uuid;
187
188    use relay_event_schema::protocol::{
189        BrowserContext, Context, DeviceContext, EventId, OsContext, TagEntry, Tags,
190    };
191
192    use super::*;
193
194    #[test]
195    fn test_event_roundtrip() {
196        // NOTE: Interfaces will be tested separately.
197        let json = r#"{
198  "event_id": "52df9022835246eeb317dbd739ccd059",
199  "replay_id": "52df9022835246eeb317dbd739ccd059",
200  "segment_id": 0,
201  "replay_type": "session",
202  "error_sample_rate": 0.5,
203  "session_sample_rate": 0.5,
204  "timestamp": 946684800.0,
205  "replay_start_timestamp": 946684800.0,
206  "urls": ["localhost:9000"],
207  "error_ids": ["52df9022835246eeb317dbd739ccd059"],
208  "trace_ids": ["52df9022835246eeb317dbd739ccd059"],
209  "platform": "myplatform",
210  "release": "myrelease",
211  "dist": "mydist",
212  "environment": "myenv",
213  "tags": [
214    [
215      "tag",
216      "value"
217    ]
218  ]
219}"#;
220
221        let replay = Annotated::new(Replay {
222            event_id: Annotated::new(EventId("52df9022835246eeb317dbd739ccd059".parse().unwrap())),
223            replay_id: Annotated::new(EventId("52df9022835246eeb317dbd739ccd059".parse().unwrap())),
224            replay_type: Annotated::new("session".to_string()),
225            segment_id: Annotated::new(0),
226            timestamp: Annotated::new(Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap().into()),
227            replay_start_timestamp: Annotated::new(
228                Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap().into(),
229            ),
230            urls: Annotated::new(vec![Annotated::new("localhost:9000".to_string())]),
231            error_ids: Annotated::new(vec![Annotated::new(
232                Uuid::parse_str("52df9022835246eeb317dbd739ccd059").unwrap(),
233            )]),
234            trace_ids: Annotated::new(vec![Annotated::new(
235                Uuid::parse_str("52df9022835246eeb317dbd739ccd059").unwrap(),
236            )]),
237            platform: Annotated::new("myplatform".to_string()),
238            release: Annotated::new("myrelease".to_string().into()),
239            dist: Annotated::new("mydist".to_string()),
240            environment: Annotated::new("myenv".to_string()),
241            tags: {
242                let items = vec![Annotated::new(TagEntry(
243                    Annotated::new("tag".to_string()),
244                    Annotated::new("value".to_string()),
245                ))];
246                Annotated::new(Tags(items.into()))
247            },
248            ..Default::default()
249        });
250
251        assert_eq!(replay, Annotated::from_json(json).unwrap());
252    }
253
254    #[test]
255    fn test_lenient_release() {
256        let input = r#"{"release":42}"#;
257        let output = r#"{"release":"42"}"#;
258        let event = Annotated::new(Replay {
259            release: Annotated::new("42".to_string().into()),
260            ..Default::default()
261        });
262
263        assert_eq!(event, Annotated::from_json(input).unwrap());
264        assert_eq!(output, event.to_json().unwrap());
265    }
266
267    #[test]
268    fn test_set_user_agent_meta() {
269        // Parse user input.
270        let payload = include_str!("../../tests/fixtures/replay.json");
271
272        let mut replay: Annotated<Replay> = Annotated::from_json(payload).unwrap();
273        normalize(&mut replay, None, RawUserAgentInfo::default(), None);
274
275        let contexts = get_value!(replay.contexts!);
276        assert_eq!(
277            contexts.get::<BrowserContext>(),
278            Some(&BrowserContext {
279                name: Annotated::new("Safari".to_string()),
280                version: Annotated::new("15.5".to_string()),
281                ..Default::default()
282            })
283        );
284        assert_eq!(
285            contexts.get_key("client_os"),
286            Some(&Context::Os(Box::new(OsContext {
287                name: Annotated::new("Mac OS X".to_string()),
288                version: Annotated::new(">=10.15.7".to_string()),
289                ..Default::default()
290            })))
291        );
292        assert_eq!(
293            contexts.get::<DeviceContext>(),
294            Some(&DeviceContext {
295                family: Annotated::new("Mac".to_string()),
296                brand: Annotated::new("Apple".to_string()),
297                model: Annotated::new("Mac".to_string()),
298                ..Default::default()
299            })
300        );
301    }
302
303    #[test]
304    fn test_missing_user() {
305        let payload = include_str!("../../tests/fixtures/replay_missing_user.json");
306
307        let mut replay: Annotated<Replay> = Annotated::from_json(payload).unwrap();
308
309        // No user object and no ip-address was provided.
310        normalize(&mut replay, None, RawUserAgentInfo::default(), None);
311        assert_eq!(get_value!(replay.user), None);
312
313        // No user object but an ip-address was provided.
314        let ip_address = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
315        normalize(
316            &mut replay,
317            Some(ip_address),
318            RawUserAgentInfo::default(),
319            None,
320        );
321
322        let ipaddr = get_value!(replay.user!).ip_address.as_str();
323        assert_eq!(Some("127.0.0.1"), ipaddr);
324    }
325
326    #[test]
327    fn test_set_ip_address_missing_user_ip_address_and_geo() {
328        let lookup = GeoIpLookup::open("tests/fixtures/GeoIP2-Enterprise-Test.mmdb").unwrap();
329        let ip_address = IpAddr::V4(Ipv4Addr::new(2, 125, 160, 216));
330
331        // IP-Address set.
332        let payload = include_str!("../../tests/fixtures/replay_missing_user_ip_address.json");
333
334        let mut replay: Annotated<Replay> = Annotated::from_json(payload).unwrap();
335        normalize(
336            &mut replay,
337            Some(ip_address),
338            RawUserAgentInfo::default(),
339            Some(&lookup),
340        );
341
342        let user = &replay.value().unwrap().user;
343        assert_json_snapshot!(SerializableAnnotated(user), @r###"
344        {
345          "id": "123",
346          "email": "user@site.com",
347          "ip_address": "2.125.160.216",
348          "username": "user",
349          "geo": {
350            "country_code": "GB",
351            "city": "Boxford",
352            "subdivision": "England",
353            "region": "United Kingdom"
354          }
355        }
356        "###);
357    }
358
359    #[test]
360    fn test_loose_type_requirements() {
361        let payload = include_str!("../../tests/fixtures/replay_failure_22_08_31.json");
362
363        let mut replay: Annotated<Replay> = Annotated::from_json(payload).unwrap();
364        normalize(&mut replay, None, RawUserAgentInfo::default(), None);
365
366        let user = get_value!(replay.user!);
367        assert_eq!(user.ip_address.as_str(), Some("127.1.1.1"));
368        assert_eq!(user.username.value(), None);
369        assert_eq!(user.email.as_str(), Some("email@sentry.io"));
370        assert_eq!(user.id.as_str(), Some("1"));
371    }
372
373    #[test]
374    fn test_capped_values() {
375        let urls: Vec<Annotated<String>> = (0..101)
376            .map(|_| Annotated::new("localhost:9000".to_string()))
377            .collect();
378
379        let error_ids: Vec<Annotated<Uuid>> = (0..101)
380            .map(|_| Annotated::new(Uuid::parse_str("52df9022835246eeb317dbd739ccd059").unwrap()))
381            .collect();
382
383        let trace_ids: Vec<Annotated<Uuid>> = (0..101)
384            .map(|_| Annotated::new(Uuid::parse_str("52df9022835246eeb317dbd739ccd059").unwrap()))
385            .collect();
386
387        let mut replay = Annotated::new(Replay {
388            urls: Annotated::new(urls),
389            error_ids: Annotated::new(error_ids),
390            trace_ids: Annotated::new(trace_ids),
391            ..Default::default()
392        });
393
394        let replay_value = replay.value_mut().as_mut().unwrap();
395        normalize_array_fields(replay_value);
396
397        assert!(replay_value.error_ids.value().unwrap().len() == 100);
398        assert!(replay_value.trace_ids.value().unwrap().len() == 100);
399        assert!(replay_value.urls.value().unwrap().len() == 100);
400    }
401
402    #[test]
403    fn test_truncated_list_less_than_limit() {
404        let mut replay = Annotated::new(Replay {
405            urls: Annotated::new(Vec::new()),
406            error_ids: Annotated::new(Vec::new()),
407            trace_ids: Annotated::new(Vec::new()),
408            ..Default::default()
409        });
410
411        let replay_value = replay.value_mut().as_mut().unwrap();
412        normalize_array_fields(replay_value);
413
414        assert!(replay_value.error_ids.value().unwrap().is_empty());
415        assert!(replay_value.trace_ids.value().unwrap().is_empty());
416        assert!(replay_value.urls.value().unwrap().is_empty());
417    }
418
419    #[test]
420    fn test_error_id_validation() {
421        // NOTE: Interfaces will be tested separately.
422        let json = r#"{
423  "event_id": "52df9022835246eeb317dbd739ccd059",
424  "replay_id": "52df9022835246eeb317dbd739ccd059",
425  "segment_id": 0,
426  "replay_type": "session",
427  "error_sample_rate": 0.5,
428  "session_sample_rate": 0.5,
429  "timestamp": 946684800.0,
430  "replay_start_timestamp": 946684800.0,
431  "urls": ["localhost:9000"],
432  "error_ids": ["test"],
433  "trace_ids": [],
434  "platform": "myplatform",
435  "release": "myrelease",
436  "dist": "mydist",
437  "environment": "myenv",
438  "tags": [
439    [
440      "tag",
441      "value"
442    ]
443  ]
444}"#;
445
446        let mut replay = Annotated::<Replay>::from_json(json).unwrap();
447        let validation_result = validate(replay.value_mut().as_mut().unwrap());
448        assert!(validation_result.is_err());
449    }
450
451    #[test]
452    fn test_trace_id_validation() {
453        // NOTE: Interfaces will be tested separately.
454        let json = r#"{
455  "event_id": "52df9022835246eeb317dbd739ccd059",
456  "replay_id": "52df9022835246eeb317dbd739ccd059",
457  "segment_id": 0,
458  "replay_type": "session",
459  "error_sample_rate": 0.5,
460  "session_sample_rate": 0.5,
461  "timestamp": 946684800.0,
462  "replay_start_timestamp": 946684800.0,
463  "urls": ["localhost:9000"],
464  "error_ids": [],
465  "trace_ids": ["123"],
466  "platform": "myplatform",
467  "release": "myrelease",
468  "dist": "mydist",
469  "environment": "myenv",
470  "tags": [
471    [
472      "tag",
473      "value"
474    ]
475  ]
476}"#;
477
478        let mut replay = Annotated::<Replay>::from_json(json).unwrap();
479        let validation_result = validate(replay.value_mut().as_mut().unwrap());
480        assert!(validation_result.is_err());
481    }
482
483    #[test]
484    fn test_maxchars_trimming() {
485        let json = format!(r#"{{"dist": "{}"}}"#, "0".repeat(100));
486        let mut replay = Annotated::<Replay>::from_json(json.as_str()).unwrap();
487
488        normalize(&mut replay, None, RawUserAgentInfo::default(), None);
489        assert_annotated_snapshot!(replay, @r#"{
490  "platform": "other",
491  "dist": "0000000000000000000000000000000000000000000000000000000000000...",
492  "type": "replay_event",
493  "_meta": {
494    "dist": {
495      "": {
496        "rem": [
497          [
498            "!limit",
499            "s",
500            61,
501            64
502          ]
503        ],
504        "len": 100
505      }
506    }
507  }
508}"#);
509    }
510
511    #[test]
512    fn test_validate_u16_segment_id() {
513        // Does not fit within a u16.
514        let replay_id =
515            Annotated::new(EventId("52df9022835246eeb317dbd739ccd059".parse().unwrap()));
516        let segment_id: Annotated<u64> = Annotated::new(u16::MAX as u64 + 1);
517        let mut replay = Annotated::new(Replay {
518            replay_id,
519            segment_id,
520            ..Default::default()
521        });
522        assert!(validate(replay.value_mut().as_mut().unwrap()).is_err());
523
524        // Fits within a u16.
525        let replay_id =
526            Annotated::new(EventId("52df9022835246eeb317dbd739ccd059".parse().unwrap()));
527        let segment_id: Annotated<u64> = Annotated::new(u16::MAX as u64);
528        let mut replay = Annotated::new(Replay {
529            replay_id,
530            segment_id,
531            ..Default::default()
532        });
533        assert!(validate(replay.value_mut().as_mut().unwrap()).is_ok());
534    }
535}