1use std::borrow::Cow;
15use std::cell::RefCell;
16use std::fmt;
17use std::io::Read;
18use std::rc::Rc;
19
20use flate2::Compression;
21use flate2::bufread::ZlibDecoder;
22use flate2::write::ZlibEncoder;
23use once_cell::sync::Lazy;
24use relay_event_schema::processor::{FieldAttrs, Pii, ProcessingState, Processor, ValueType};
25use relay_pii::{PiiConfig, PiiProcessor};
26use relay_protocol::Meta;
27use serde::{Deserializer, de, ser};
28use serde_json::value::RawValue;
29
30use relay_pii::transform::Transform;
31
32static PII_FIELDS: Lazy<[Vec<&str>; 2]> = Lazy::new(|| {
36 [
37 vec!["data", "payload", "description"],
38 vec!["data", "payload", "data"],
39 ]
40});
41
42fn scrub_at_path(path: &Vec<String>) -> bool {
44 PII_FIELDS.iter().any(|pii_path| {
45 path.len() >= pii_path.len() && pii_path.iter().zip(path).all(|(k1, k2)| k1 == k2)
46 })
47}
48
49const FIELD_ATTRS_PII_TRUE: FieldAttrs = FieldAttrs::new().pii(Pii::True);
51
52const FIELD_ATTRS_PII_FALSE: FieldAttrs = FieldAttrs::new().pii(Pii::False);
54
55#[derive(Debug)]
57pub enum ParseRecordingError {
58 Parse(serde_json::Error),
60 Compression(std::io::Error),
62 Message(&'static str),
66}
67
68impl fmt::Display for ParseRecordingError {
69 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
70 match self {
71 ParseRecordingError::Parse(serde_error) => write!(f, "{serde_error}"),
72 ParseRecordingError::Compression(error) => write!(f, "{error}"),
73 ParseRecordingError::Message(message) => write!(f, "{message}"),
74 }
75 }
76}
77
78impl std::error::Error for ParseRecordingError {
79 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
80 match self {
81 ParseRecordingError::Parse(e) => Some(e),
82 ParseRecordingError::Compression(e) => Some(e),
83 ParseRecordingError::Message(_) => None,
84 }
85 }
86}
87
88impl From<serde_json::Error> for ParseRecordingError {
89 fn from(err: serde_json::Error) -> Self {
90 ParseRecordingError::Parse(err)
91 }
92}
93
94struct ScrubberTransform<'a> {
98 processor1: Option<PiiProcessor<'a>>,
100 processor2: Option<PiiProcessor<'a>>,
101 state: ProcessingState<'a>,
103 path: Vec<String>,
106}
107
108impl ScrubberTransform<'_> {
109 fn ensure_empty(&mut self) {
110 if !self.path.is_empty() || self.state.depth() > 0 {
111 debug_assert!(false, "ScrubberTransform not empty");
112 relay_log::error!("ScrubberTransform not empty");
113 }
114 self.state = ProcessingState::new_root(None, None);
115 self.path.clear();
116 }
117}
118
119impl<'de> Transform<'de> for &'_ mut ScrubberTransform<'_> {
120 fn push_path(&mut self, key: &'de str) {
121 self.path.push(key.to_owned());
122 let field_attrs = if scrub_at_path(&self.path) {
123 &FIELD_ATTRS_PII_TRUE
124 } else {
125 &FIELD_ATTRS_PII_FALSE
126 };
127
128 self.state = std::mem::take(&mut self.state).enter_owned(
129 key.to_owned(),
130 Some(Cow::Borrowed(field_attrs)),
131 Some(ValueType::String), )
133 }
134
135 fn pop_path(&mut self) {
136 if let Ok(Some(parent)) = std::mem::take(&mut self.state).try_into_parent() {
137 self.state = parent;
138 }
139 let popped = self.path.pop();
140 debug_assert!(popped.is_some()); }
142
143 fn transform_str<'a>(&mut self, v: &'a str) -> Cow<'a, str> {
144 self.transform_string(v.to_owned())
145 }
146
147 fn transform_string(&mut self, mut value: String) -> Cow<'static, str> {
148 if let Some(ref mut processor) = self.processor1
149 && processor
150 .process_string(&mut value, &mut Meta::default(), &self.state)
151 .is_err()
152 {
153 return Cow::Borrowed("");
154 }
155
156 if let Some(ref mut processor) = self.processor2
157 && processor
158 .process_string(&mut value, &mut Meta::default(), &self.state)
159 .is_err()
160 {
161 return Cow::Borrowed("");
162 }
163
164 Cow::Owned(value)
165 }
166}
167
168struct ScrubbedValue<'a, 'b>(&'a RawValue, Rc<RefCell<ScrubberTransform<'b>>>);
173
174impl serde::Serialize for ScrubbedValue<'_, '_> {
175 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
176 where
177 S: serde::Serializer,
178 {
179 let mut transform = self.1.borrow_mut();
180 let mut deserializer = serde_json::Deserializer::from_str(self.0.get());
181 let scrubber = relay_pii::transform::Deserializer::new(&mut deserializer, &mut *transform);
182 serde_transcode::transcode(scrubber, serializer)
183 }
184}
185
186struct EventStreamVisitor<'a, S> {
188 serializer: S,
189 scrubber: Rc<RefCell<ScrubberTransform<'a>>>,
190}
191
192impl<'a, S> EventStreamVisitor<'a, S> {
193 const SENTRY_EVENT_TYPE: u8 = 5;
197
198 fn new(serializer: S, scrubber: Rc<RefCell<ScrubberTransform<'a>>>) -> Self {
200 Self {
201 serializer,
202 scrubber,
203 }
204 }
205}
206
207impl<'de, S> de::Visitor<'de> for EventStreamVisitor<'_, S>
208where
209 S: ser::Serializer,
210{
211 type Value = S::Ok;
212
213 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
214 write!(formatter, "a replay recording event stream")
215 }
216
217 fn visit_seq<A>(self, mut v: A) -> Result<Self::Value, A::Error>
218 where
219 A: de::SeqAccess<'de>,
220 {
221 #[derive(Clone, Copy, serde::Deserialize)]
223 struct TypeHelper {
224 #[serde(rename = "type")]
225 ty: u8,
226 }
227
228 use serde::ser::SerializeSeq;
229 let mut seq = self.serializer.serialize_seq(v.size_hint()).map_err(s2d)?;
230
231 while let Some(raw) = v.next_element::<&'de RawValue>()? {
246 let helper = serde_json::from_str::<TypeHelper>(raw.get()).map_err(s2d)?;
247 if helper.ty == Self::SENTRY_EVENT_TYPE {
249 seq.serialize_element(&ScrubbedValue(raw, self.scrubber.clone()))
250 .map_err(s2d)?;
251 self.scrubber.borrow_mut().ensure_empty();
254 } else {
255 seq.serialize_element(raw).map_err(s2d)?;
256 }
257 }
258
259 seq.end().map_err(s2d)
260 }
261}
262
263fn s2d<S, D>(s: S) -> D
265where
266 S: ser::Error,
267 D: de::Error,
268{
269 D::custom(s.to_string())
270}
271
272pub struct RecordingScrubber<'a> {
288 limit: usize,
289 transform: Rc<RefCell<ScrubberTransform<'a>>>,
290}
291
292impl<'a> RecordingScrubber<'a> {
293 pub fn new(
305 limit: usize,
306 config1: Option<&'a PiiConfig>,
307 config2: Option<&'a PiiConfig>,
308 ) -> Self {
309 Self {
310 limit,
311 transform: Rc::new(RefCell::new(ScrubberTransform {
312 processor1: config1.map(|c| PiiProcessor::new(c.compiled())),
313 processor2: config2.map(|c| PiiProcessor::new(c.compiled())),
314 state: ProcessingState::new_root(None, None),
315 path: vec![],
316 })),
317 }
318 }
319
320 pub fn is_empty(&self) -> bool {
322 let tmp = self.transform.borrow();
323 tmp.processor1.is_none() && tmp.processor2.is_none()
324 }
325
326 fn scrub_replay<W>(&mut self, json: &[u8], write: W) -> Result<(), ParseRecordingError>
327 where
328 W: std::io::Write,
329 {
330 let mut deserializer = serde_json::Deserializer::from_slice(json);
331 let mut serializer = serde_json::Serializer::new(write);
332
333 deserializer.deserialize_seq(EventStreamVisitor::new(
334 &mut serializer,
335 self.transform.clone(),
336 ))?;
337
338 Ok(())
339 }
340
341 #[doc(hidden)] pub fn transcode_replay(
343 &mut self,
344 body: &[u8],
345 output: &mut Vec<u8>,
346 ) -> Result<(), ParseRecordingError> {
347 let encoder = ZlibEncoder::new(output, Compression::default());
348
349 if body.first() == Some(&b'[') {
350 self.scrub_replay(body, encoder)
351 } else {
352 let mut decompressed = Vec::with_capacity(8 * 1024);
353 let mut decoder = ZlibDecoder::new(body).take(self.limit as u64);
354 decoder
355 .read_to_end(&mut decompressed)
356 .map_err(ParseRecordingError::Compression)?;
357
358 self.scrub_replay(&decompressed, encoder)
359 }
360 }
361
362 pub fn process_recording(&mut self, bytes: &[u8]) -> Result<Vec<u8>, ParseRecordingError> {
381 if bytes.is_empty() {
383 return Err(ParseRecordingError::Message("no data found"));
384 }
385
386 let mut split = bytes.splitn(2, |b| b == &b'\n');
387 let header = split
388 .next()
389 .ok_or(ParseRecordingError::Message("no headers found"))?;
390
391 let body = match split.next() {
392 Some(b"") | None => return Err(ParseRecordingError::Message("no body found")),
393 Some(body) => body,
394 };
395
396 let mut output = header.to_owned();
397 output.push(b'\n');
398 output.reserve(body.len());
402 self.transcode_replay(body, &mut output)?;
403
404 Ok(output)
405 }
406}
407
408#[cfg(test)]
409mod tests {
410 use relay_pii::{DataScrubbingConfig, PiiConfig};
413
414 use crate::recording::scrub_at_path;
415
416 use super::RecordingScrubber;
417
418 fn default_pii_config() -> PiiConfig {
419 let mut scrubbing_config = DataScrubbingConfig::default();
420 scrubbing_config.scrub_data = true;
421 scrubbing_config.scrub_defaults = true;
422 scrubbing_config.scrub_ip_addresses = true;
423 scrubbing_config.pii_config_uncached().unwrap().unwrap()
424 }
425
426 fn scrubber(config: &PiiConfig) -> RecordingScrubber<'_> {
427 RecordingScrubber::new(usize::MAX, Some(config), None)
428 }
429
430 #[test]
431 fn test_process_recording_end_to_end() {
432 let payload: &[u8] = &[
435 123, 34, 115, 101, 103, 109, 101, 110, 116, 95, 105, 100, 34, 58, 51, 125, 10, 120,
436 156, 149, 144, 91, 106, 196, 32, 20, 64, 247, 114, 191, 237, 160, 241, 145, 234, 38,
437 102, 1, 195, 124, 152, 104, 6, 33, 169, 193, 40, 52, 4, 247, 94, 91, 103, 40, 20, 108,
438 59, 191, 247, 30, 207, 225, 122, 57, 32, 238, 171, 5, 69, 17, 24, 29, 53, 168, 3, 54,
439 159, 194, 88, 70, 4, 193, 234, 55, 23, 157, 127, 219, 64, 93, 14, 120, 7, 37, 100, 1,
440 119, 80, 29, 102, 8, 156, 1, 213, 11, 4, 209, 45, 246, 60, 77, 155, 141, 160, 94, 232,
441 43, 206, 232, 206, 118, 127, 176, 132, 177, 7, 203, 42, 75, 36, 175, 44, 231, 63, 88,
442 217, 229, 107, 174, 179, 45, 234, 101, 45, 172, 232, 49, 163, 84, 22, 191, 232, 63, 61,
443 207, 93, 130, 229, 189, 216, 53, 138, 84, 182, 139, 178, 199, 191, 22, 139, 179, 238,
444 196, 227, 244, 134, 137, 240, 158, 60, 101, 34, 255, 18, 241, 6, 116, 42, 212, 119, 35,
445 234, 27, 40, 24, 130, 213, 102, 12, 105, 25, 160, 252, 147, 222, 103, 175, 205, 215,
446 182, 45, 168, 17, 48, 118, 210, 105, 142, 229, 217, 168, 163, 189, 249, 80, 254, 19,
447 146, 59, 13, 115, 10, 144, 115, 190, 126, 0, 2, 68, 180, 16,
448 ];
449
450 let config = default_pii_config();
451 let result = scrubber(&config).process_recording(payload);
452 assert!(!result.unwrap().is_empty());
453 }
454
455 #[test]
456 fn test_process_recording_no_body_data() {
457 let payload: &[u8] = &[
459 123, 34, 115, 101, 103, 109, 101, 110, 116, 95, 105, 100, 34, 58, 51, 125, 10,
460 ];
461
462 let config = default_pii_config();
463 let result = scrubber(&config).process_recording(payload);
464 assert!(matches!(
465 result.unwrap_err(),
466 super::ParseRecordingError::Message("no body found"),
467 ));
468 }
469
470 #[test]
471 fn test_process_recording_bad_body_data() {
472 let payload: &[u8] = &[
474 123, 34, 115, 101, 103, 109, 101, 110, 116, 95, 105, 100, 34, 58, 51, 125, 10, 22,
475 ];
476
477 let config = default_pii_config();
478 let result = scrubber(&config).process_recording(payload);
479 assert!(matches!(
480 result.unwrap_err(),
481 super::ParseRecordingError::Compression(_),
482 ));
483 }
484
485 #[test]
486 fn test_process_recording_no_headers() {
487 let payload: &[u8] = &[
489 123, 34, 115, 101, 103, 109, 101, 110, 116, 95, 105, 100, 34, 58, 51, 125,
490 ];
491
492 let config = default_pii_config();
493 let result = scrubber(&config).process_recording(payload);
494 assert!(matches!(
495 result.unwrap_err(),
496 super::ParseRecordingError::Message("no body found"),
497 ));
498 }
499
500 #[test]
501 fn test_process_recording_no_contents() {
502 let payload: &[u8] = &[];
504
505 let config = default_pii_config();
506 let result = scrubber(&config).process_recording(payload);
507 assert!(matches!(
508 result.unwrap_err(),
509 super::ParseRecordingError::Message("no data found"),
510 ));
511 }
512
513 #[ignore = "type 3 nodes are not supported"]
516 #[test]
517 fn test_pii_credit_card_removal() {
518 let payload = include_bytes!("../tests/fixtures/rrweb-pii.json");
519
520 let mut transcoded = Vec::new();
521 let config = default_pii_config();
522 scrubber(&config)
523 .scrub_replay(payload.as_slice(), &mut transcoded)
524 .unwrap();
525
526 let parsed = std::str::from_utf8(&transcoded).unwrap();
527 assert!(parsed.contains(r#"{"type":3,"textContent":"[Filtered]","id":284}"#));
528 }
529
530 #[test]
531 fn test_scrub_pii_navigation() {
532 let payload = include_bytes!("../tests/fixtures/rrweb-performance-navigation.json");
533
534 let mut transcoded = Vec::new();
535 let config = default_pii_config();
536 scrubber(&config)
537 .scrub_replay(payload.as_slice(), &mut transcoded)
538 .unwrap();
539
540 let parsed = std::str::from_utf8(&transcoded).unwrap();
541 assert!(parsed.contains("https://sentry.io?credit-card=[Filtered]"));
542 }
543
544 #[test]
545 fn test_scrub_pii_resource() {
546 let payload = include_bytes!("../tests/fixtures/rrweb-performance-resource.json");
547
548 let mut transcoded = Vec::new();
549 let config = default_pii_config();
550 scrubber(&config)
551 .scrub_replay(payload.as_slice(), &mut transcoded)
552 .unwrap();
553
554 let parsed = std::str::from_utf8(&transcoded).unwrap();
555 assert!(parsed.contains("https://sentry.io?credit-card=[Filtered]"));
556 }
557
558 #[ignore = "type 3 nodes are not supported"]
559 #[test]
560 fn test_pii_ip_address_removal() {
561 let payload = include_bytes!("../tests/fixtures/rrweb-pii-ip-address.json");
562
563 let mut transcoded = Vec::new();
564 let config = default_pii_config();
565 scrubber(&config)
566 .scrub_replay(payload.as_slice(), &mut transcoded)
567 .unwrap();
568
569 let parsed = std::str::from_utf8(&transcoded).unwrap();
570 assert!(parsed.contains("\"value\":\"[ip]\"")); assert!(parsed.contains("\"textContent\":\"[ip]\"")) }
573
574 #[ignore = "type 2 nodes are not supported"]
577 #[test]
578 fn test_scrub_pii_full_snapshot_event() {
579 let payload = include_bytes!("../tests/fixtures/rrweb-event-2.json");
580
581 let mut transcoded = Vec::new();
582 let config = default_pii_config();
583 scrubber(&config)
584 .scrub_replay(payload.as_slice(), &mut transcoded)
585 .unwrap();
586
587 let scrubbed_result = std::str::from_utf8(&transcoded).unwrap();
588 assert!(scrubbed_result.contains("\"textContent\":\"my ssn is [Filtered]\""));
591 }
592
593 #[ignore = "type 3 nodes are not supported"]
594 #[test]
595 fn test_scrub_pii_incremental_snapshot_event() {
596 let payload = include_bytes!("../tests/fixtures/rrweb-event-3.json");
597
598 let mut transcoded = Vec::new();
599 let config = default_pii_config();
600 scrubber(&config)
601 .scrub_replay(payload.as_slice(), &mut transcoded)
602 .unwrap();
603
604 let scrubbed_result = std::str::from_utf8(&transcoded).unwrap();
605 assert!(scrubbed_result.contains("\"textContent\":\"[Filtered]\""));
606 assert!(scrubbed_result.contains("\"value\":\"[Filtered]\""));
607 }
608
609 #[test]
610 fn test_scrub_pii_custom_event() {
611 let payload = include_bytes!("../tests/fixtures/rrweb-event-5.json");
612
613 let mut transcoded = Vec::new();
614 let config = default_pii_config();
615 scrubber(&config)
616 .scrub_replay(payload.as_slice(), &mut transcoded)
617 .unwrap();
618
619 let scrubbed_result = std::str::from_utf8(&transcoded).unwrap();
620 assert!(scrubbed_result.contains("\"description\":\"[Filtered]\""));
621 assert!(scrubbed_result.contains("\"description\":\"https://sentry.io?ip-address=[ip]\""));
622 }
625
626 #[test]
627 fn test_scrub_pii_key_based() {
628 let payload = include_bytes!("../tests/fixtures/rrweb-request.json");
629
630 let mut transcoded = Vec::new();
631 let config = default_pii_config();
632
633 scrubber(&config)
634 .scrub_replay(payload.as_slice(), &mut transcoded)
635 .unwrap();
636
637 let scrubbed_result = std::str::from_utf8(&transcoded).unwrap();
638 let scrubbed: serde_json::Value = serde_json::from_str(scrubbed_result).unwrap();
639
640 assert_eq!(scrubbed[0]["data"]["payload"]["data"]["method"], "POST");
642
643 assert_eq!(
644 scrubbed[0]["data"]["payload"]["data"]["request"]["body"]["api_key"],
645 "[Filtered]"
646 );
647 }
648
649 #[test]
650 fn test_scrub_pii_key_based_edge_cases() {
651 let payload = include_bytes!("../tests/fixtures/rrweb-request-edge-cases.json");
652
653 let mut transcoded = Vec::new();
654 let config = default_pii_config();
655
656 scrubber(&config)
657 .scrub_replay(payload.as_slice(), &mut transcoded)
658 .unwrap();
659
660 let scrubbed_result = std::str::from_utf8(&transcoded).unwrap();
661 let scrubbed: serde_json::Value = serde_json::from_str(scrubbed_result).unwrap();
662
663 insta::assert_ron_snapshot!(scrubbed);
664 }
665
666 #[test]
667 fn test_scrub_at_path() {
668 for (should_scrub, path) in [
669 (false, vec![]),
670 (false, vec!["data"]),
671 (false, vec!["data", "payload"]),
672 (false, vec!["data", "payload", "foo"]),
673 (false, vec!["foo", "payload", "data"]),
674 (true, vec!["data", "payload", "data"]),
675 (true, vec!["data", "payload", "data", "request"]),
676 (true, vec!["data", "payload", "data", "request", "body"]),
677 ] {
678 let path = path.into_iter().map(|p| p.to_owned()).collect::<Vec<_>>();
679 assert_eq!(should_scrub, scrub_at_path(&path));
680 }
681 }
682}