1use std::net::IpAddr as StdIpAddr;
4
5use relay_event_schema::processor::{self, ProcessingState, Processor};
6use relay_event_schema::protocol::{Contexts, IpAddr, Replay};
7use relay_protocol::Annotated;
8
9use crate::event::normalize_user_geoinfo;
10use crate::normalize::user_agent;
11use crate::user_agent::RawUserAgentInfo;
12use crate::{GeoIpLookup, trimming};
13
14#[derive(Debug, thiserror::Error)]
18pub enum ReplayError {
19 #[error("invalid json")]
21 CouldNotParse(#[from] serde_json::Error),
22
23 #[error("no data found")]
25 NoContent,
26
27 #[error("invalid payload {0}")]
31 InvalidPayload(String),
32
33 #[error("failed to scrub PII: {0}")]
37 CouldNotScrub(String),
38}
39
40pub fn validate(replay: &Replay) -> Result<(), ReplayError> {
45 replay
46 .replay_id
47 .value()
48 .ok_or_else(|| ReplayError::InvalidPayload("missing replay_id".to_owned()))?;
49
50 let segment_id = *replay
51 .segment_id
52 .value()
53 .ok_or_else(|| ReplayError::InvalidPayload("missing segment_id".to_owned()))?;
54
55 if segment_id > u16::MAX as u64 {
56 return Err(ReplayError::InvalidPayload(
57 "segment_id exceeded u16 limit".to_owned(),
58 ));
59 }
60
61 if replay
62 .error_ids
63 .value()
64 .into_iter()
65 .flat_map(|v| v.iter())
66 .any(|v| v.meta().has_errors())
67 {
68 return Err(ReplayError::InvalidPayload(
69 "Invalid error-id specified.".to_owned(),
70 ));
71 }
72
73 if replay
74 .trace_ids
75 .value()
76 .into_iter()
77 .flat_map(|v| v.iter())
78 .any(|v| v.meta().has_errors())
79 {
80 return Err(ReplayError::InvalidPayload(
81 "Invalid trace-id specified.".to_owned(),
82 ));
83 }
84
85 Ok(())
86}
87
88pub fn normalize(
90 replay: &mut Annotated<Replay>,
91 client_ip: Option<StdIpAddr>,
92 user_agent: &RawUserAgentInfo<&str>,
93 geoip_lookup: &GeoIpLookup,
94) {
95 let _ = processor::apply(replay, |replay_value, meta| {
96 normalize_platform(replay_value);
97 normalize_ip_address(replay_value, client_ip);
98 normalize_user_geoinfo(
99 geoip_lookup,
100 &mut replay_value.user,
101 client_ip.map(|ip| IpAddr(ip.to_string())).as_ref(),
102 );
103 normalize_user_agent(replay_value, user_agent);
104 normalize_type(replay_value);
105 normalize_array_fields(replay_value);
106 let _ = trimming::TrimmingProcessor::new().process_replay(
107 replay_value,
108 meta,
109 ProcessingState::root(),
110 );
111 Ok(())
112 });
113}
114
115fn normalize_array_fields(replay: &mut Replay) {
116 if let Some(items) = replay.error_ids.value_mut() {
119 items.truncate(100);
120 }
121
122 if let Some(items) = replay.trace_ids.value_mut() {
123 items.truncate(100);
124 }
125
126 if let Some(items) = replay.urls.value_mut() {
127 items.truncate(100);
128 }
129}
130
131fn normalize_ip_address(replay: &mut Replay, ip_address: Option<StdIpAddr>) {
132 crate::event::normalize_ip_addresses(
133 &mut replay.request,
134 &mut replay.user,
135 replay.platform.as_str(),
136 ip_address.map(|ip| IpAddr(ip.to_string())).as_ref(),
137 replay.sdk.value(),
138 );
139}
140
141fn normalize_user_agent(replay: &mut Replay, default_user_agent: &RawUserAgentInfo<&str>) {
142 let headers = match replay
143 .request
144 .value()
145 .and_then(|request| request.headers.value())
146 {
147 Some(headers) => headers,
148 None => return,
149 };
150
151 let user_agent_info = RawUserAgentInfo::from_headers(headers);
152 let user_agent_info = if user_agent_info.is_empty() {
153 default_user_agent
154 } else {
155 &user_agent_info
156 };
157
158 let contexts = replay.contexts.get_or_insert_with(Contexts::new);
159 user_agent::normalize_user_agent_info_generic(contexts, &replay.platform, user_agent_info);
160}
161
162fn normalize_platform(replay: &mut Replay) {
163 let platform = replay.platform.get_or_insert_with(|| "other".to_owned());
165
166 if !crate::is_valid_platform(platform) {
168 replay.platform = Annotated::from("other".to_owned());
169 }
170}
171
172fn normalize_type(replay: &mut Replay) {
173 replay.ty = Annotated::from("replay_event".to_owned());
174}
175
176#[cfg(test)]
177mod tests {
178 use std::net::{IpAddr, Ipv4Addr};
179
180 use chrono::{TimeZone, Utc};
181 use insta::assert_json_snapshot;
182 use relay_protocol::{SerializableAnnotated, assert_annotated_snapshot, get_value};
183 use uuid::Uuid;
184
185 use relay_event_schema::protocol::{
186 BrowserContext, Context, DeviceContext, EventId, OsContext, TagEntry, Tags,
187 };
188
189 use super::*;
190
191 #[test]
192 fn test_event_roundtrip() {
193 let json = r#"{
195 "event_id": "52df9022835246eeb317dbd739ccd059",
196 "replay_id": "52df9022835246eeb317dbd739ccd059",
197 "segment_id": 0,
198 "replay_type": "session",
199 "error_sample_rate": 0.5,
200 "session_sample_rate": 0.5,
201 "timestamp": 946684800.0,
202 "replay_start_timestamp": 946684800.0,
203 "urls": ["localhost:9000"],
204 "error_ids": ["52df9022835246eeb317dbd739ccd059"],
205 "trace_ids": ["52df9022835246eeb317dbd739ccd059"],
206 "platform": "myplatform",
207 "release": "myrelease",
208 "dist": "mydist",
209 "environment": "myenv",
210 "tags": [
211 [
212 "tag",
213 "value"
214 ]
215 ]
216}"#;
217
218 let replay = Annotated::new(Replay {
219 event_id: Annotated::new(EventId("52df9022835246eeb317dbd739ccd059".parse().unwrap())),
220 replay_id: Annotated::new(EventId("52df9022835246eeb317dbd739ccd059".parse().unwrap())),
221 replay_type: Annotated::new("session".to_owned()),
222 segment_id: Annotated::new(0),
223 timestamp: Annotated::new(Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap().into()),
224 replay_start_timestamp: Annotated::new(
225 Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap().into(),
226 ),
227 urls: Annotated::new(vec![Annotated::new("localhost:9000".to_owned())]),
228 error_ids: Annotated::new(vec![Annotated::new(
229 Uuid::parse_str("52df9022835246eeb317dbd739ccd059").unwrap(),
230 )]),
231 trace_ids: Annotated::new(vec![Annotated::new(
232 Uuid::parse_str("52df9022835246eeb317dbd739ccd059").unwrap(),
233 )]),
234 platform: Annotated::new("myplatform".to_owned()),
235 release: Annotated::new("myrelease".to_owned().into()),
236 dist: Annotated::new("mydist".to_owned()),
237 environment: Annotated::new("myenv".to_owned()),
238 tags: {
239 let items = vec![Annotated::new(TagEntry(
240 Annotated::new("tag".to_owned()),
241 Annotated::new("value".to_owned()),
242 ))];
243 Annotated::new(Tags(items.into()))
244 },
245 ..Default::default()
246 });
247
248 assert_eq!(replay, Annotated::from_json(json).unwrap());
249 }
250
251 #[test]
252 fn test_lenient_release() {
253 let input = r#"{"release":42}"#;
254 let output = r#"{"release":"42"}"#;
255 let event = Annotated::new(Replay {
256 release: Annotated::new("42".to_owned().into()),
257 ..Default::default()
258 });
259
260 assert_eq!(event, Annotated::from_json(input).unwrap());
261 assert_eq!(output, event.to_json().unwrap());
262 }
263
264 #[test]
265 fn test_set_user_agent_meta() {
266 let payload = include_str!("../../tests/fixtures/replay.json");
268
269 let mut replay: Annotated<Replay> = Annotated::from_json(payload).unwrap();
270 normalize(
271 &mut replay,
272 None,
273 &RawUserAgentInfo::default(),
274 &GeoIpLookup::empty(),
275 );
276
277 let contexts = get_value!(replay.contexts!);
278 assert_eq!(
279 contexts.get::<BrowserContext>(),
280 Some(&BrowserContext {
281 name: Annotated::new("Safari".to_owned()),
282 version: Annotated::new("15.5".to_owned()),
283 ..Default::default()
284 })
285 );
286 assert_eq!(
287 contexts.get_key("client_os"),
288 Some(&Context::Os(Box::new(OsContext {
289 name: Annotated::new("Mac OS X".to_owned()),
290 version: Annotated::new(">=10.15.7".to_owned()),
291 ..Default::default()
292 })))
293 );
294 assert_eq!(
295 contexts.get::<DeviceContext>(),
296 Some(&DeviceContext {
297 family: Annotated::new("Mac".to_owned()),
298 brand: Annotated::new("Apple".to_owned()),
299 model: Annotated::new("Mac".to_owned()),
300 ..Default::default()
301 })
302 );
303 }
304
305 #[test]
306 fn test_missing_user() {
307 let payload = include_str!("../../tests/fixtures/replay_missing_user.json");
308
309 let mut replay: Annotated<Replay> = Annotated::from_json(payload).unwrap();
310
311 normalize(
313 &mut replay,
314 None,
315 &RawUserAgentInfo::default(),
316 &GeoIpLookup::empty(),
317 );
318 assert_eq!(get_value!(replay.user.geo), None);
319
320 let ip_address = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
322 normalize(
323 &mut replay,
324 Some(ip_address),
325 &RawUserAgentInfo::default(),
326 &GeoIpLookup::empty(),
327 );
328
329 let ipaddr = get_value!(replay.user!).ip_address.as_str();
330 assert_eq!(Some("127.0.0.1"), ipaddr);
331 }
332
333 #[test]
334 fn test_set_ip_address_missing_user_ip_address_and_geo() {
335 let lookup = GeoIpLookup::open("tests/fixtures/GeoIP2-Enterprise-Test.mmdb").unwrap();
336 let ip_address = IpAddr::V4(Ipv4Addr::new(2, 125, 160, 216));
337
338 let payload = include_str!("../../tests/fixtures/replay_missing_user_ip_address.json");
340
341 let mut replay: Annotated<Replay> = Annotated::from_json(payload).unwrap();
342 normalize(
343 &mut replay,
344 Some(ip_address),
345 &RawUserAgentInfo::default(),
346 &lookup,
347 );
348
349 let user = &replay.value().unwrap().user;
350 assert_json_snapshot!(SerializableAnnotated(user), @r###"
351 {
352 "id": "123",
353 "email": "user@site.com",
354 "ip_address": "2.125.160.216",
355 "username": "user",
356 "geo": {
357 "country_code": "GB",
358 "city": "Boxford",
359 "subdivision": "England",
360 "region": "United Kingdom"
361 }
362 }
363 "###);
364 }
365
366 #[test]
367 fn test_loose_type_requirements() {
368 let payload = include_str!("../../tests/fixtures/replay_failure_22_08_31.json");
369
370 let mut replay: Annotated<Replay> = Annotated::from_json(payload).unwrap();
371 normalize(
372 &mut replay,
373 None,
374 &RawUserAgentInfo::default(),
375 &GeoIpLookup::empty(),
376 );
377
378 let user = get_value!(replay.user!);
379 assert_eq!(user.ip_address.as_str(), Some("127.1.1.1"));
380 assert_eq!(user.username.value(), None);
381 assert_eq!(user.email.as_str(), Some("email@sentry.io"));
382 assert_eq!(user.id.as_str(), Some("1"));
383 }
384
385 #[test]
386 fn test_capped_values() {
387 let urls: Vec<Annotated<String>> = (0..101)
388 .map(|_| Annotated::new("localhost:9000".to_owned()))
389 .collect();
390
391 let error_ids: Vec<Annotated<Uuid>> = (0..101)
392 .map(|_| Annotated::new(Uuid::parse_str("52df9022835246eeb317dbd739ccd059").unwrap()))
393 .collect();
394
395 let trace_ids: Vec<Annotated<Uuid>> = (0..101)
396 .map(|_| Annotated::new(Uuid::parse_str("52df9022835246eeb317dbd739ccd059").unwrap()))
397 .collect();
398
399 let mut replay = Annotated::new(Replay {
400 urls: Annotated::new(urls),
401 error_ids: Annotated::new(error_ids),
402 trace_ids: Annotated::new(trace_ids),
403 ..Default::default()
404 });
405
406 let replay_value = replay.value_mut().as_mut().unwrap();
407 normalize_array_fields(replay_value);
408
409 assert!(replay_value.error_ids.value().unwrap().len() == 100);
410 assert!(replay_value.trace_ids.value().unwrap().len() == 100);
411 assert!(replay_value.urls.value().unwrap().len() == 100);
412 }
413
414 #[test]
415 fn test_truncated_list_less_than_limit() {
416 let mut replay = Annotated::new(Replay {
417 urls: Annotated::new(Vec::new()),
418 error_ids: Annotated::new(Vec::new()),
419 trace_ids: Annotated::new(Vec::new()),
420 ..Default::default()
421 });
422
423 let replay_value = replay.value_mut().as_mut().unwrap();
424 normalize_array_fields(replay_value);
425
426 assert!(replay_value.error_ids.value().unwrap().is_empty());
427 assert!(replay_value.trace_ids.value().unwrap().is_empty());
428 assert!(replay_value.urls.value().unwrap().is_empty());
429 }
430
431 #[test]
432 fn test_error_id_validation() {
433 let json = r#"{
435 "event_id": "52df9022835246eeb317dbd739ccd059",
436 "replay_id": "52df9022835246eeb317dbd739ccd059",
437 "segment_id": 0,
438 "replay_type": "session",
439 "error_sample_rate": 0.5,
440 "session_sample_rate": 0.5,
441 "timestamp": 946684800.0,
442 "replay_start_timestamp": 946684800.0,
443 "urls": ["localhost:9000"],
444 "error_ids": ["test"],
445 "trace_ids": [],
446 "platform": "myplatform",
447 "release": "myrelease",
448 "dist": "mydist",
449 "environment": "myenv",
450 "tags": [
451 [
452 "tag",
453 "value"
454 ]
455 ]
456}"#;
457
458 let mut replay = Annotated::<Replay>::from_json(json).unwrap();
459 let validation_result = validate(replay.value_mut().as_mut().unwrap());
460 assert!(validation_result.is_err());
461 }
462
463 #[test]
464 fn test_trace_id_validation() {
465 let json = r#"{
467 "event_id": "52df9022835246eeb317dbd739ccd059",
468 "replay_id": "52df9022835246eeb317dbd739ccd059",
469 "segment_id": 0,
470 "replay_type": "session",
471 "error_sample_rate": 0.5,
472 "session_sample_rate": 0.5,
473 "timestamp": 946684800.0,
474 "replay_start_timestamp": 946684800.0,
475 "urls": ["localhost:9000"],
476 "error_ids": [],
477 "trace_ids": ["123"],
478 "platform": "myplatform",
479 "release": "myrelease",
480 "dist": "mydist",
481 "environment": "myenv",
482 "tags": [
483 [
484 "tag",
485 "value"
486 ]
487 ]
488}"#;
489
490 let mut replay = Annotated::<Replay>::from_json(json).unwrap();
491 let validation_result = validate(replay.value_mut().as_mut().unwrap());
492 assert!(validation_result.is_err());
493 }
494
495 #[test]
496 fn test_maxchars_trimming() {
497 let json = format!(r#"{{"dist": "{}"}}"#, "0".repeat(100));
498 let mut replay = Annotated::<Replay>::from_json(json.as_str()).unwrap();
499
500 normalize(
501 &mut replay,
502 None,
503 &RawUserAgentInfo::default(),
504 &GeoIpLookup::empty(),
505 );
506 assert_annotated_snapshot!(replay, @r###"
507 {
508 "platform": "other",
509 "dist": "0000000000000000000000000000000000000000000000000000000000000...",
510 "type": "replay_event",
511 "_meta": {
512 "dist": {
513 "": {
514 "rem": [
515 [
516 "!limit",
517 "s",
518 61,
519 64
520 ]
521 ],
522 "len": 100
523 }
524 }
525 }
526 }
527 "###);
528 }
529
530 #[test]
531 fn test_validate_u16_segment_id() {
532 let replay_id =
534 Annotated::new(EventId("52df9022835246eeb317dbd739ccd059".parse().unwrap()));
535 let segment_id: Annotated<u64> = Annotated::new(u16::MAX as u64 + 1);
536 let mut replay = Annotated::new(Replay {
537 replay_id,
538 segment_id,
539 ..Default::default()
540 });
541 assert!(validate(replay.value_mut().as_mut().unwrap()).is_err());
542
543 let replay_id =
545 Annotated::new(EventId("52df9022835246eeb317dbd739ccd059".parse().unwrap()));
546 let segment_id: Annotated<u64> = Annotated::new(u16::MAX as u64);
547 let mut replay = Annotated::new(Replay {
548 replay_id,
549 segment_id,
550 ..Default::default()
551 });
552 assert!(validate(replay.value_mut().as_mut().unwrap()).is_ok());
553 }
554}