1use std::net::IpAddr as StdIpAddr;
4
5use relay_event_schema::processor::{self, ProcessingState, Processor};
6use relay_event_schema::protocol::{Contexts, IpAddr, Replay};
7use relay_protocol::Annotated;
8
9use crate::event::normalize_user_geoinfo;
10use crate::normalize::user_agent;
11use crate::user_agent::RawUserAgentInfo;
12use crate::{GeoIpLookup, trimming};
13
14#[derive(Debug, thiserror::Error)]
18pub enum ReplayError {
19 #[error("missing replay_id")]
21 MissingReplayId,
22 #[error("missing segment_id")]
24 MissingSegmentId,
25 #[error("segment_id too large")]
27 SegmentIdTooLarge,
28 #[error("invalid error_id specified")]
30 InvalidErrorId,
31 #[error("invalid trace_id specified")]
33 InvalidTraceId,
34
35 #[error("invalid json")]
37 CouldNotParse(#[from] serde_json::Error),
38
39 #[error("no data found")]
41 NoContent,
42
43 #[error("invalid payload {0}")]
47 InvalidPayload(String),
48
49 #[error("failed to scrub PII: {0}")]
53 CouldNotScrub(String),
54}
55
56pub fn validate(replay: &Replay) -> Result<(), ReplayError> {
61 replay
62 .replay_id
63 .value()
64 .ok_or(ReplayError::MissingReplayId)?;
65
66 let segment_id = *replay
67 .segment_id
68 .value()
69 .ok_or(ReplayError::MissingSegmentId)?;
70
71 if segment_id > u16::MAX as u64 {
72 return Err(ReplayError::SegmentIdTooLarge);
73 }
74
75 if replay
76 .error_ids
77 .value()
78 .into_iter()
79 .flat_map(|v| v.iter())
80 .any(|v| v.meta().has_errors())
81 {
82 return Err(ReplayError::InvalidErrorId);
83 }
84
85 if replay
86 .trace_ids
87 .value()
88 .into_iter()
89 .flat_map(|v| v.iter())
90 .any(|v| v.meta().has_errors())
91 {
92 return Err(ReplayError::InvalidTraceId);
93 }
94
95 Ok(())
96}
97
98pub fn normalize(
100 replay: &mut Annotated<Replay>,
101 client_ip: Option<StdIpAddr>,
102 user_agent: &RawUserAgentInfo<&str>,
103 geoip_lookup: &GeoIpLookup,
104) {
105 let _ = processor::apply(replay, |replay_value, meta| {
106 normalize_platform(replay_value);
107 normalize_ip_address(replay_value, client_ip);
108 normalize_user_geoinfo(
109 geoip_lookup,
110 &mut replay_value.user,
111 client_ip.map(|ip| IpAddr(ip.to_string())).as_ref(),
112 );
113 normalize_user_agent(replay_value, user_agent);
114 normalize_type(replay_value);
115 normalize_array_fields(replay_value);
116 let _ = trimming::TrimmingProcessor::new().process_replay(
117 replay_value,
118 meta,
119 ProcessingState::root(),
120 );
121 Ok(())
122 });
123}
124
125fn normalize_array_fields(replay: &mut Replay) {
126 if let Some(items) = replay.error_ids.value_mut() {
129 items.truncate(100);
130 }
131
132 if let Some(items) = replay.trace_ids.value_mut() {
133 items.truncate(100);
134 }
135
136 if let Some(items) = replay.urls.value_mut() {
137 items.truncate(100);
138 }
139}
140
141fn normalize_ip_address(replay: &mut Replay, ip_address: Option<StdIpAddr>) {
142 crate::event::normalize_ip_addresses(
143 &mut replay.request,
144 &mut replay.user,
145 replay.platform.as_str(),
146 ip_address.map(|ip| IpAddr(ip.to_string())).as_ref(),
147 replay.sdk.value(),
148 );
149}
150
151fn normalize_user_agent(replay: &mut Replay, default_user_agent: &RawUserAgentInfo<&str>) {
152 let headers = match replay
153 .request
154 .value()
155 .and_then(|request| request.headers.value())
156 {
157 Some(headers) => headers,
158 None => return,
159 };
160
161 let user_agent_info = RawUserAgentInfo::from_headers(headers);
162 let user_agent_info = if user_agent_info.is_empty() {
163 default_user_agent
164 } else {
165 &user_agent_info
166 };
167
168 let contexts = replay.contexts.get_or_insert_with(Contexts::new);
169 user_agent::normalize_user_agent_info_generic(contexts, &replay.platform, user_agent_info);
170}
171
172fn normalize_platform(replay: &mut Replay) {
173 let platform = replay.platform.get_or_insert_with(|| "other".to_owned());
175
176 if !crate::is_valid_platform(platform) {
178 replay.platform = Annotated::from("other".to_owned());
179 }
180}
181
182fn normalize_type(replay: &mut Replay) {
183 replay.ty = Annotated::from("replay_event".to_owned());
184}
185
186#[cfg(test)]
187mod tests {
188 use std::net::{IpAddr, Ipv4Addr};
189
190 use chrono::{TimeZone, Utc};
191 use insta::assert_json_snapshot;
192 use relay_protocol::{SerializableAnnotated, assert_annotated_snapshot, get_value};
193 use uuid::Uuid;
194
195 use relay_event_schema::protocol::{
196 BrowserContext, Context, DeviceContext, EventId, OsContext, TagEntry, Tags,
197 };
198
199 use super::*;
200
201 #[test]
202 fn test_event_roundtrip() {
203 let json = r#"{
205 "event_id": "52df9022835246eeb317dbd739ccd059",
206 "replay_id": "52df9022835246eeb317dbd739ccd059",
207 "segment_id": 0,
208 "replay_type": "session",
209 "error_sample_rate": 0.5,
210 "session_sample_rate": 0.5,
211 "timestamp": 946684800.0,
212 "replay_start_timestamp": 946684800.0,
213 "urls": ["localhost:9000"],
214 "error_ids": ["52df9022835246eeb317dbd739ccd059"],
215 "trace_ids": ["52df9022835246eeb317dbd739ccd059"],
216 "platform": "myplatform",
217 "release": "myrelease",
218 "dist": "mydist",
219 "environment": "myenv",
220 "tags": [
221 [
222 "tag",
223 "value"
224 ]
225 ]
226}"#;
227
228 let replay = Annotated::new(Replay {
229 event_id: Annotated::new(EventId("52df9022835246eeb317dbd739ccd059".parse().unwrap())),
230 replay_id: Annotated::new(EventId("52df9022835246eeb317dbd739ccd059".parse().unwrap())),
231 replay_type: Annotated::new("session".to_owned()),
232 segment_id: Annotated::new(0),
233 timestamp: Annotated::new(Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap().into()),
234 replay_start_timestamp: Annotated::new(
235 Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap().into(),
236 ),
237 urls: Annotated::new(vec![Annotated::new("localhost:9000".to_owned())]),
238 error_ids: Annotated::new(vec![Annotated::new(
239 Uuid::parse_str("52df9022835246eeb317dbd739ccd059").unwrap(),
240 )]),
241 trace_ids: Annotated::new(vec![Annotated::new(
242 Uuid::parse_str("52df9022835246eeb317dbd739ccd059").unwrap(),
243 )]),
244 platform: Annotated::new("myplatform".to_owned()),
245 release: Annotated::new("myrelease".to_owned().into()),
246 dist: Annotated::new("mydist".to_owned()),
247 environment: Annotated::new("myenv".to_owned()),
248 tags: {
249 let items = vec![Annotated::new(TagEntry(
250 Annotated::new("tag".to_owned()),
251 Annotated::new("value".to_owned()),
252 ))];
253 Annotated::new(Tags(items.into()))
254 },
255 ..Default::default()
256 });
257
258 assert_eq!(replay, Annotated::from_json(json).unwrap());
259 }
260
261 #[test]
262 fn test_lenient_release() {
263 let input = r#"{"release":42}"#;
264 let output = r#"{"release":"42"}"#;
265 let event = Annotated::new(Replay {
266 release: Annotated::new("42".to_owned().into()),
267 ..Default::default()
268 });
269
270 assert_eq!(event, Annotated::from_json(input).unwrap());
271 assert_eq!(output, event.to_json().unwrap());
272 }
273
274 #[test]
275 fn test_set_user_agent_meta() {
276 let payload = include_str!("../../tests/fixtures/replay.json");
278
279 let mut replay: Annotated<Replay> = Annotated::from_json(payload).unwrap();
280 normalize(
281 &mut replay,
282 None,
283 &RawUserAgentInfo::default(),
284 &GeoIpLookup::empty(),
285 );
286
287 let contexts = get_value!(replay.contexts!);
288 assert_eq!(
289 contexts.get::<BrowserContext>(),
290 Some(&BrowserContext {
291 name: Annotated::new("Safari".to_owned()),
292 version: Annotated::new("15.5".to_owned()),
293 ..Default::default()
294 })
295 );
296 assert_eq!(
297 contexts.get_key("client_os"),
298 Some(&Context::Os(Box::new(OsContext {
299 name: Annotated::new("Mac OS X".to_owned()),
300 version: Annotated::new(">=10.15.7".to_owned()),
301 ..Default::default()
302 })))
303 );
304 assert_eq!(
305 contexts.get::<DeviceContext>(),
306 Some(&DeviceContext {
307 family: Annotated::new("Mac".to_owned()),
308 brand: Annotated::new("Apple".to_owned()),
309 model: Annotated::new("Mac".to_owned()),
310 ..Default::default()
311 })
312 );
313 }
314
315 #[test]
316 fn test_missing_user() {
317 let payload = include_str!("../../tests/fixtures/replay_missing_user.json");
318
319 let mut replay: Annotated<Replay> = Annotated::from_json(payload).unwrap();
320
321 normalize(
323 &mut replay,
324 None,
325 &RawUserAgentInfo::default(),
326 &GeoIpLookup::empty(),
327 );
328 assert_eq!(get_value!(replay.user.geo), None);
329
330 let ip_address = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
332 normalize(
333 &mut replay,
334 Some(ip_address),
335 &RawUserAgentInfo::default(),
336 &GeoIpLookup::empty(),
337 );
338
339 let ipaddr = get_value!(replay.user!).ip_address.as_str();
340 assert_eq!(Some("127.0.0.1"), ipaddr);
341 }
342
343 #[test]
344 fn test_set_ip_address_missing_user_ip_address_and_geo() {
345 let lookup = GeoIpLookup::open("tests/fixtures/GeoIP2-Enterprise-Test.mmdb").unwrap();
346 let ip_address = IpAddr::V4(Ipv4Addr::new(2, 125, 160, 216));
347
348 let payload = include_str!("../../tests/fixtures/replay_missing_user_ip_address.json");
350
351 let mut replay: Annotated<Replay> = Annotated::from_json(payload).unwrap();
352 normalize(
353 &mut replay,
354 Some(ip_address),
355 &RawUserAgentInfo::default(),
356 &lookup,
357 );
358
359 let user = &replay.value().unwrap().user;
360 assert_json_snapshot!(SerializableAnnotated(user), @r###"
361 {
362 "id": "123",
363 "email": "user@site.com",
364 "ip_address": "2.125.160.216",
365 "username": "user",
366 "geo": {
367 "country_code": "GB",
368 "city": "Boxford",
369 "subdivision": "England",
370 "region": "United Kingdom"
371 }
372 }
373 "###);
374 }
375
376 #[test]
377 fn test_loose_type_requirements() {
378 let payload = include_str!("../../tests/fixtures/replay_failure_22_08_31.json");
379
380 let mut replay: Annotated<Replay> = Annotated::from_json(payload).unwrap();
381 normalize(
382 &mut replay,
383 None,
384 &RawUserAgentInfo::default(),
385 &GeoIpLookup::empty(),
386 );
387
388 let user = get_value!(replay.user!);
389 assert_eq!(user.ip_address.as_str(), Some("127.1.1.1"));
390 assert_eq!(user.username.value(), None);
391 assert_eq!(user.email.as_str(), Some("email@sentry.io"));
392 assert_eq!(user.id.as_str(), Some("1"));
393 }
394
395 #[test]
396 fn test_capped_values() {
397 let urls: Vec<Annotated<String>> = (0..101)
398 .map(|_| Annotated::new("localhost:9000".to_owned()))
399 .collect();
400
401 let error_ids: Vec<Annotated<Uuid>> = (0..101)
402 .map(|_| Annotated::new(Uuid::parse_str("52df9022835246eeb317dbd739ccd059").unwrap()))
403 .collect();
404
405 let trace_ids: Vec<Annotated<Uuid>> = (0..101)
406 .map(|_| Annotated::new(Uuid::parse_str("52df9022835246eeb317dbd739ccd059").unwrap()))
407 .collect();
408
409 let mut replay = Annotated::new(Replay {
410 urls: Annotated::new(urls),
411 error_ids: Annotated::new(error_ids),
412 trace_ids: Annotated::new(trace_ids),
413 ..Default::default()
414 });
415
416 let replay_value = replay.value_mut().as_mut().unwrap();
417 normalize_array_fields(replay_value);
418
419 assert!(replay_value.error_ids.value().unwrap().len() == 100);
420 assert!(replay_value.trace_ids.value().unwrap().len() == 100);
421 assert!(replay_value.urls.value().unwrap().len() == 100);
422 }
423
424 #[test]
425 fn test_truncated_list_less_than_limit() {
426 let mut replay = Annotated::new(Replay {
427 urls: Annotated::new(Vec::new()),
428 error_ids: Annotated::new(Vec::new()),
429 trace_ids: Annotated::new(Vec::new()),
430 ..Default::default()
431 });
432
433 let replay_value = replay.value_mut().as_mut().unwrap();
434 normalize_array_fields(replay_value);
435
436 assert!(replay_value.error_ids.value().unwrap().is_empty());
437 assert!(replay_value.trace_ids.value().unwrap().is_empty());
438 assert!(replay_value.urls.value().unwrap().is_empty());
439 }
440
441 #[test]
442 fn test_error_id_validation() {
443 let json = r#"{
445 "event_id": "52df9022835246eeb317dbd739ccd059",
446 "replay_id": "52df9022835246eeb317dbd739ccd059",
447 "segment_id": 0,
448 "replay_type": "session",
449 "error_sample_rate": 0.5,
450 "session_sample_rate": 0.5,
451 "timestamp": 946684800.0,
452 "replay_start_timestamp": 946684800.0,
453 "urls": ["localhost:9000"],
454 "error_ids": ["test"],
455 "trace_ids": [],
456 "platform": "myplatform",
457 "release": "myrelease",
458 "dist": "mydist",
459 "environment": "myenv",
460 "tags": [
461 [
462 "tag",
463 "value"
464 ]
465 ]
466}"#;
467
468 let mut replay = Annotated::<Replay>::from_json(json).unwrap();
469 let validation_result = validate(replay.value_mut().as_mut().unwrap());
470 assert!(validation_result.is_err());
471 }
472
473 #[test]
474 fn test_trace_id_validation() {
475 let json = r#"{
477 "event_id": "52df9022835246eeb317dbd739ccd059",
478 "replay_id": "52df9022835246eeb317dbd739ccd059",
479 "segment_id": 0,
480 "replay_type": "session",
481 "error_sample_rate": 0.5,
482 "session_sample_rate": 0.5,
483 "timestamp": 946684800.0,
484 "replay_start_timestamp": 946684800.0,
485 "urls": ["localhost:9000"],
486 "error_ids": [],
487 "trace_ids": ["123"],
488 "platform": "myplatform",
489 "release": "myrelease",
490 "dist": "mydist",
491 "environment": "myenv",
492 "tags": [
493 [
494 "tag",
495 "value"
496 ]
497 ]
498}"#;
499
500 let mut replay = Annotated::<Replay>::from_json(json).unwrap();
501 let validation_result = validate(replay.value_mut().as_mut().unwrap());
502 assert!(validation_result.is_err());
503 }
504
505 #[test]
506 fn test_maxchars_trimming() {
507 let json = format!(r#"{{"dist": "{}"}}"#, "0".repeat(100));
508 let mut replay = Annotated::<Replay>::from_json(json.as_str()).unwrap();
509
510 normalize(
511 &mut replay,
512 None,
513 &RawUserAgentInfo::default(),
514 &GeoIpLookup::empty(),
515 );
516 assert_annotated_snapshot!(replay, @r###"
517 {
518 "platform": "other",
519 "dist": "0000000000000000000000000000000000000000000000000000000000000...",
520 "type": "replay_event",
521 "_meta": {
522 "dist": {
523 "": {
524 "rem": [
525 [
526 "!limit",
527 "s",
528 61,
529 64
530 ]
531 ],
532 "len": 100
533 }
534 }
535 }
536 }
537 "###);
538 }
539
540 #[test]
541 fn test_validate_u16_segment_id() {
542 let replay_id =
544 Annotated::new(EventId("52df9022835246eeb317dbd739ccd059".parse().unwrap()));
545 let segment_id: Annotated<u64> = Annotated::new(u16::MAX as u64 + 1);
546 let mut replay = Annotated::new(Replay {
547 replay_id,
548 segment_id,
549 ..Default::default()
550 });
551 assert!(validate(replay.value_mut().as_mut().unwrap()).is_err());
552
553 let replay_id =
555 Annotated::new(EventId("52df9022835246eeb317dbd739ccd059".parse().unwrap()));
556 let segment_id: Annotated<u64> = Annotated::new(u16::MAX as u64);
557 let mut replay = Annotated::new(Replay {
558 replay_id,
559 segment_id,
560 ..Default::default()
561 });
562 assert!(validate(replay.value_mut().as_mut().unwrap()).is_ok());
563 }
564}