1use std::net::IpAddr as StdIpAddr;
4
5use relay_event_schema::processor::{self, ProcessingState, Processor};
6use relay_event_schema::protocol::{Contexts, IpAddr, Replay};
7use relay_protocol::Annotated;
8
9use crate::event::normalize_user_geoinfo;
10use crate::normalize::user_agent;
11use crate::user_agent::RawUserAgentInfo;
12use crate::{GeoIpLookup, trimming};
13
14#[derive(Debug, thiserror::Error)]
18pub enum ReplayError {
19 #[error("invalid json")]
21 CouldNotParse(#[from] serde_json::Error),
22
23 #[error("no data found")]
25 NoContent,
26
27 #[error("invalid payload {0}")]
31 InvalidPayload(String),
32
33 #[error("failed to scrub PII: {0}")]
37 CouldNotScrub(String),
38}
39
40pub fn validate(replay: &Replay) -> Result<(), ReplayError> {
45 replay
46 .replay_id
47 .value()
48 .ok_or_else(|| ReplayError::InvalidPayload("missing replay_id".to_string()))?;
49
50 let segment_id = *replay
51 .segment_id
52 .value()
53 .ok_or_else(|| ReplayError::InvalidPayload("missing segment_id".to_string()))?;
54
55 if segment_id > u16::MAX as u64 {
56 return Err(ReplayError::InvalidPayload(
57 "segment_id exceeded u16 limit".to_string(),
58 ));
59 }
60
61 if replay
62 .error_ids
63 .value()
64 .into_iter()
65 .flat_map(|v| v.iter())
66 .any(|v| v.meta().has_errors())
67 {
68 return Err(ReplayError::InvalidPayload(
69 "Invalid error-id specified.".to_string(),
70 ));
71 }
72
73 if replay
74 .trace_ids
75 .value()
76 .into_iter()
77 .flat_map(|v| v.iter())
78 .any(|v| v.meta().has_errors())
79 {
80 return Err(ReplayError::InvalidPayload(
81 "Invalid trace-id specified.".to_string(),
82 ));
83 }
84
85 Ok(())
86}
87
88pub fn normalize(
90 replay: &mut Annotated<Replay>,
91 client_ip: Option<StdIpAddr>,
92 user_agent: RawUserAgentInfo<&str>,
93 geoip_lookup: Option<&GeoIpLookup>,
94) {
95 let _ = processor::apply(replay, |replay_value, meta| {
96 normalize_platform(replay_value);
97 normalize_ip_address(replay_value, client_ip);
98 if let Some(geoip_lookup) = geoip_lookup {
99 normalize_user_geoinfo(
100 geoip_lookup,
101 &mut replay_value.user,
102 client_ip.map(|ip| IpAddr(ip.to_string())).as_ref(),
103 );
104 }
105 normalize_user_agent(replay_value, user_agent);
106 normalize_type(replay_value);
107 normalize_array_fields(replay_value);
108 let _ = trimming::TrimmingProcessor::new().process_replay(
109 replay_value,
110 meta,
111 ProcessingState::root(),
112 );
113 Ok(())
114 });
115}
116
117fn normalize_array_fields(replay: &mut Replay) {
118 if let Some(items) = replay.error_ids.value_mut() {
121 items.truncate(100);
122 }
123
124 if let Some(items) = replay.trace_ids.value_mut() {
125 items.truncate(100);
126 }
127
128 if let Some(items) = replay.urls.value_mut() {
129 items.truncate(100);
130 }
131}
132
133fn normalize_ip_address(replay: &mut Replay, ip_address: Option<StdIpAddr>) {
134 crate::event::normalize_ip_addresses(
135 &mut replay.request,
136 &mut replay.user,
137 replay.platform.as_str(),
138 ip_address.map(|ip| IpAddr(ip.to_string())).as_ref(),
139 replay.sdk.value(),
140 );
141}
142
143fn normalize_user_agent(replay: &mut Replay, default_user_agent: RawUserAgentInfo<&str>) {
144 let headers = match replay
145 .request
146 .value()
147 .and_then(|request| request.headers.value())
148 {
149 Some(headers) => headers,
150 None => return,
151 };
152
153 let user_agent_info = RawUserAgentInfo::from_headers(headers);
154
155 let user_agent_info = if user_agent_info.is_empty() {
156 default_user_agent
157 } else {
158 user_agent_info
159 };
160
161 let contexts = replay.contexts.get_or_insert_with(Contexts::new);
162 user_agent::normalize_user_agent_info_generic(contexts, &replay.platform, &user_agent_info);
163}
164
165fn normalize_platform(replay: &mut Replay) {
166 let platform = replay.platform.get_or_insert_with(|| "other".to_string());
168
169 if !crate::is_valid_platform(platform) {
171 replay.platform = Annotated::from("other".to_string());
172 }
173}
174
175fn normalize_type(replay: &mut Replay) {
176 replay.ty = Annotated::from("replay_event".to_string());
177}
178
179#[cfg(test)]
180mod tests {
181 use std::net::{IpAddr, Ipv4Addr};
182
183 use chrono::{TimeZone, Utc};
184 use insta::assert_json_snapshot;
185 use relay_protocol::{SerializableAnnotated, assert_annotated_snapshot, get_value};
186 use uuid::Uuid;
187
188 use relay_event_schema::protocol::{
189 BrowserContext, Context, DeviceContext, EventId, OsContext, TagEntry, Tags,
190 };
191
192 use super::*;
193
194 #[test]
195 fn test_event_roundtrip() {
196 let json = r#"{
198 "event_id": "52df9022835246eeb317dbd739ccd059",
199 "replay_id": "52df9022835246eeb317dbd739ccd059",
200 "segment_id": 0,
201 "replay_type": "session",
202 "error_sample_rate": 0.5,
203 "session_sample_rate": 0.5,
204 "timestamp": 946684800.0,
205 "replay_start_timestamp": 946684800.0,
206 "urls": ["localhost:9000"],
207 "error_ids": ["52df9022835246eeb317dbd739ccd059"],
208 "trace_ids": ["52df9022835246eeb317dbd739ccd059"],
209 "platform": "myplatform",
210 "release": "myrelease",
211 "dist": "mydist",
212 "environment": "myenv",
213 "tags": [
214 [
215 "tag",
216 "value"
217 ]
218 ]
219}"#;
220
221 let replay = Annotated::new(Replay {
222 event_id: Annotated::new(EventId("52df9022835246eeb317dbd739ccd059".parse().unwrap())),
223 replay_id: Annotated::new(EventId("52df9022835246eeb317dbd739ccd059".parse().unwrap())),
224 replay_type: Annotated::new("session".to_string()),
225 segment_id: Annotated::new(0),
226 timestamp: Annotated::new(Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap().into()),
227 replay_start_timestamp: Annotated::new(
228 Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap().into(),
229 ),
230 urls: Annotated::new(vec![Annotated::new("localhost:9000".to_string())]),
231 error_ids: Annotated::new(vec![Annotated::new(
232 Uuid::parse_str("52df9022835246eeb317dbd739ccd059").unwrap(),
233 )]),
234 trace_ids: Annotated::new(vec![Annotated::new(
235 Uuid::parse_str("52df9022835246eeb317dbd739ccd059").unwrap(),
236 )]),
237 platform: Annotated::new("myplatform".to_string()),
238 release: Annotated::new("myrelease".to_string().into()),
239 dist: Annotated::new("mydist".to_string()),
240 environment: Annotated::new("myenv".to_string()),
241 tags: {
242 let items = vec![Annotated::new(TagEntry(
243 Annotated::new("tag".to_string()),
244 Annotated::new("value".to_string()),
245 ))];
246 Annotated::new(Tags(items.into()))
247 },
248 ..Default::default()
249 });
250
251 assert_eq!(replay, Annotated::from_json(json).unwrap());
252 }
253
254 #[test]
255 fn test_lenient_release() {
256 let input = r#"{"release":42}"#;
257 let output = r#"{"release":"42"}"#;
258 let event = Annotated::new(Replay {
259 release: Annotated::new("42".to_string().into()),
260 ..Default::default()
261 });
262
263 assert_eq!(event, Annotated::from_json(input).unwrap());
264 assert_eq!(output, event.to_json().unwrap());
265 }
266
267 #[test]
268 fn test_set_user_agent_meta() {
269 let payload = include_str!("../../tests/fixtures/replay.json");
271
272 let mut replay: Annotated<Replay> = Annotated::from_json(payload).unwrap();
273 normalize(&mut replay, None, RawUserAgentInfo::default(), None);
274
275 let contexts = get_value!(replay.contexts!);
276 assert_eq!(
277 contexts.get::<BrowserContext>(),
278 Some(&BrowserContext {
279 name: Annotated::new("Safari".to_string()),
280 version: Annotated::new("15.5".to_string()),
281 ..Default::default()
282 })
283 );
284 assert_eq!(
285 contexts.get_key("client_os"),
286 Some(&Context::Os(Box::new(OsContext {
287 name: Annotated::new("Mac OS X".to_string()),
288 version: Annotated::new(">=10.15.7".to_string()),
289 ..Default::default()
290 })))
291 );
292 assert_eq!(
293 contexts.get::<DeviceContext>(),
294 Some(&DeviceContext {
295 family: Annotated::new("Mac".to_string()),
296 brand: Annotated::new("Apple".to_string()),
297 model: Annotated::new("Mac".to_string()),
298 ..Default::default()
299 })
300 );
301 }
302
303 #[test]
304 fn test_missing_user() {
305 let payload = include_str!("../../tests/fixtures/replay_missing_user.json");
306
307 let mut replay: Annotated<Replay> = Annotated::from_json(payload).unwrap();
308
309 normalize(&mut replay, None, RawUserAgentInfo::default(), None);
311 assert_eq!(get_value!(replay.user), None);
312
313 let ip_address = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
315 normalize(
316 &mut replay,
317 Some(ip_address),
318 RawUserAgentInfo::default(),
319 None,
320 );
321
322 let ipaddr = get_value!(replay.user!).ip_address.as_str();
323 assert_eq!(Some("127.0.0.1"), ipaddr);
324 }
325
326 #[test]
327 fn test_set_ip_address_missing_user_ip_address_and_geo() {
328 let lookup = GeoIpLookup::open("tests/fixtures/GeoIP2-Enterprise-Test.mmdb").unwrap();
329 let ip_address = IpAddr::V4(Ipv4Addr::new(2, 125, 160, 216));
330
331 let payload = include_str!("../../tests/fixtures/replay_missing_user_ip_address.json");
333
334 let mut replay: Annotated<Replay> = Annotated::from_json(payload).unwrap();
335 normalize(
336 &mut replay,
337 Some(ip_address),
338 RawUserAgentInfo::default(),
339 Some(&lookup),
340 );
341
342 let user = &replay.value().unwrap().user;
343 assert_json_snapshot!(SerializableAnnotated(user), @r###"
344 {
345 "id": "123",
346 "email": "user@site.com",
347 "ip_address": "2.125.160.216",
348 "username": "user",
349 "geo": {
350 "country_code": "GB",
351 "city": "Boxford",
352 "subdivision": "England",
353 "region": "United Kingdom"
354 }
355 }
356 "###);
357 }
358
359 #[test]
360 fn test_loose_type_requirements() {
361 let payload = include_str!("../../tests/fixtures/replay_failure_22_08_31.json");
362
363 let mut replay: Annotated<Replay> = Annotated::from_json(payload).unwrap();
364 normalize(&mut replay, None, RawUserAgentInfo::default(), None);
365
366 let user = get_value!(replay.user!);
367 assert_eq!(user.ip_address.as_str(), Some("127.1.1.1"));
368 assert_eq!(user.username.value(), None);
369 assert_eq!(user.email.as_str(), Some("email@sentry.io"));
370 assert_eq!(user.id.as_str(), Some("1"));
371 }
372
373 #[test]
374 fn test_capped_values() {
375 let urls: Vec<Annotated<String>> = (0..101)
376 .map(|_| Annotated::new("localhost:9000".to_string()))
377 .collect();
378
379 let error_ids: Vec<Annotated<Uuid>> = (0..101)
380 .map(|_| Annotated::new(Uuid::parse_str("52df9022835246eeb317dbd739ccd059").unwrap()))
381 .collect();
382
383 let trace_ids: Vec<Annotated<Uuid>> = (0..101)
384 .map(|_| Annotated::new(Uuid::parse_str("52df9022835246eeb317dbd739ccd059").unwrap()))
385 .collect();
386
387 let mut replay = Annotated::new(Replay {
388 urls: Annotated::new(urls),
389 error_ids: Annotated::new(error_ids),
390 trace_ids: Annotated::new(trace_ids),
391 ..Default::default()
392 });
393
394 let replay_value = replay.value_mut().as_mut().unwrap();
395 normalize_array_fields(replay_value);
396
397 assert!(replay_value.error_ids.value().unwrap().len() == 100);
398 assert!(replay_value.trace_ids.value().unwrap().len() == 100);
399 assert!(replay_value.urls.value().unwrap().len() == 100);
400 }
401
402 #[test]
403 fn test_truncated_list_less_than_limit() {
404 let mut replay = Annotated::new(Replay {
405 urls: Annotated::new(Vec::new()),
406 error_ids: Annotated::new(Vec::new()),
407 trace_ids: Annotated::new(Vec::new()),
408 ..Default::default()
409 });
410
411 let replay_value = replay.value_mut().as_mut().unwrap();
412 normalize_array_fields(replay_value);
413
414 assert!(replay_value.error_ids.value().unwrap().is_empty());
415 assert!(replay_value.trace_ids.value().unwrap().is_empty());
416 assert!(replay_value.urls.value().unwrap().is_empty());
417 }
418
419 #[test]
420 fn test_error_id_validation() {
421 let json = r#"{
423 "event_id": "52df9022835246eeb317dbd739ccd059",
424 "replay_id": "52df9022835246eeb317dbd739ccd059",
425 "segment_id": 0,
426 "replay_type": "session",
427 "error_sample_rate": 0.5,
428 "session_sample_rate": 0.5,
429 "timestamp": 946684800.0,
430 "replay_start_timestamp": 946684800.0,
431 "urls": ["localhost:9000"],
432 "error_ids": ["test"],
433 "trace_ids": [],
434 "platform": "myplatform",
435 "release": "myrelease",
436 "dist": "mydist",
437 "environment": "myenv",
438 "tags": [
439 [
440 "tag",
441 "value"
442 ]
443 ]
444}"#;
445
446 let mut replay = Annotated::<Replay>::from_json(json).unwrap();
447 let validation_result = validate(replay.value_mut().as_mut().unwrap());
448 assert!(validation_result.is_err());
449 }
450
451 #[test]
452 fn test_trace_id_validation() {
453 let json = r#"{
455 "event_id": "52df9022835246eeb317dbd739ccd059",
456 "replay_id": "52df9022835246eeb317dbd739ccd059",
457 "segment_id": 0,
458 "replay_type": "session",
459 "error_sample_rate": 0.5,
460 "session_sample_rate": 0.5,
461 "timestamp": 946684800.0,
462 "replay_start_timestamp": 946684800.0,
463 "urls": ["localhost:9000"],
464 "error_ids": [],
465 "trace_ids": ["123"],
466 "platform": "myplatform",
467 "release": "myrelease",
468 "dist": "mydist",
469 "environment": "myenv",
470 "tags": [
471 [
472 "tag",
473 "value"
474 ]
475 ]
476}"#;
477
478 let mut replay = Annotated::<Replay>::from_json(json).unwrap();
479 let validation_result = validate(replay.value_mut().as_mut().unwrap());
480 assert!(validation_result.is_err());
481 }
482
483 #[test]
484 fn test_maxchars_trimming() {
485 let json = format!(r#"{{"dist": "{}"}}"#, "0".repeat(100));
486 let mut replay = Annotated::<Replay>::from_json(json.as_str()).unwrap();
487
488 normalize(&mut replay, None, RawUserAgentInfo::default(), None);
489 assert_annotated_snapshot!(replay, @r#"{
490 "platform": "other",
491 "dist": "0000000000000000000000000000000000000000000000000000000000000...",
492 "type": "replay_event",
493 "_meta": {
494 "dist": {
495 "": {
496 "rem": [
497 [
498 "!limit",
499 "s",
500 61,
501 64
502 ]
503 ],
504 "len": 100
505 }
506 }
507 }
508}"#);
509 }
510
511 #[test]
512 fn test_validate_u16_segment_id() {
513 let replay_id =
515 Annotated::new(EventId("52df9022835246eeb317dbd739ccd059".parse().unwrap()));
516 let segment_id: Annotated<u64> = Annotated::new(u16::MAX as u64 + 1);
517 let mut replay = Annotated::new(Replay {
518 replay_id,
519 segment_id,
520 ..Default::default()
521 });
522 assert!(validate(replay.value_mut().as_mut().unwrap()).is_err());
523
524 let replay_id =
526 Annotated::new(EventId("52df9022835246eeb317dbd739ccd059".parse().unwrap()));
527 let segment_id: Annotated<u64> = Annotated::new(u16::MAX as u64);
528 let mut replay = Annotated::new(Replay {
529 replay_id,
530 segment_id,
531 ..Default::default()
532 });
533 assert!(validate(replay.value_mut().as_mut().unwrap()).is_ok());
534 }
535}