1use std::borrow::Cow;
15use std::cell::RefCell;
16use std::fmt;
17use std::io::Read;
18use std::rc::Rc;
19
20use flate2::Compression;
21use flate2::bufread::ZlibDecoder;
22use flate2::write::ZlibEncoder;
23use once_cell::sync::Lazy;
24use relay_event_schema::processor::{FieldAttrs, Pii, ProcessingState, Processor, ValueType};
25use relay_pii::{PiiConfig, PiiProcessor};
26use relay_protocol::Meta;
27use serde::{Deserializer, de, ser};
28use serde_json::value::RawValue;
29
30use relay_pii::transform::Transform;
31
32static PII_FIELDS: Lazy<[Vec<&str>; 2]> = Lazy::new(|| {
36 [
37 vec!["data", "payload", "description"],
38 vec!["data", "payload", "data"],
39 ]
40});
41
42fn scrub_at_path(path: &Vec<String>) -> bool {
44 PII_FIELDS.iter().any(|pii_path| {
45 path.len() >= pii_path.len() && pii_path.iter().zip(path).all(|(k1, k2)| k1 == k2)
46 })
47}
48
49const FIELD_ATTRS_PII_TRUE: FieldAttrs = FieldAttrs::new().pii(Pii::True);
51
52const FIELD_ATTRS_PII_FALSE: FieldAttrs = FieldAttrs::new().pii(Pii::False);
54
55#[derive(Debug)]
57pub enum ParseRecordingError {
58 Parse(serde_json::Error),
60 Compression(std::io::Error),
62 Message(&'static str),
66}
67
68impl fmt::Display for ParseRecordingError {
69 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
70 match self {
71 ParseRecordingError::Parse(serde_error) => write!(f, "{serde_error}"),
72 ParseRecordingError::Compression(error) => write!(f, "{error}"),
73 ParseRecordingError::Message(message) => write!(f, "{message}"),
74 }
75 }
76}
77
78impl std::error::Error for ParseRecordingError {
79 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
80 match self {
81 ParseRecordingError::Parse(e) => Some(e),
82 ParseRecordingError::Compression(e) => Some(e),
83 ParseRecordingError::Message(_) => None,
84 }
85 }
86}
87
88impl From<serde_json::Error> for ParseRecordingError {
89 fn from(err: serde_json::Error) -> Self {
90 ParseRecordingError::Parse(err)
91 }
92}
93
94struct ScrubberTransform<'a> {
98 processor1: Option<PiiProcessor<'a>>,
100 processor2: Option<PiiProcessor<'a>>,
101 state: ProcessingState<'a>,
103 path: Vec<String>,
106}
107
108impl ScrubberTransform<'_> {
109 fn ensure_empty(&mut self) {
110 if !self.path.is_empty() || self.state.depth() > 0 {
111 debug_assert!(false, "ScrubberTransform not empty");
112 relay_log::error!("ScrubberTransform not empty");
113 }
114 self.state = ProcessingState::new_root(None, None);
115 self.path.clear();
116 }
117}
118
119impl<'de> Transform<'de> for &'_ mut ScrubberTransform<'_> {
120 fn push_path(&mut self, key: &'de str) {
121 self.path.push(key.to_owned());
122 let field_attrs = if scrub_at_path(&self.path) {
123 &FIELD_ATTRS_PII_TRUE
124 } else {
125 &FIELD_ATTRS_PII_FALSE
126 };
127
128 self.state = std::mem::take(&mut self.state).enter_owned(
129 key.to_owned(),
130 Some(Cow::Borrowed(field_attrs)),
131 Some(ValueType::String), )
133 }
134
135 fn pop_path(&mut self) {
136 if let Ok(Some(parent)) = std::mem::take(&mut self.state).try_into_parent() {
137 self.state = parent;
138 }
139 let popped = self.path.pop();
140 debug_assert!(popped.is_some()); }
142
143 fn transform_str<'a>(&mut self, v: &'a str) -> Cow<'a, str> {
144 self.transform_string(v.to_owned())
145 }
146
147 fn transform_string(&mut self, mut value: String) -> Cow<'static, str> {
148 if let Some(ref mut processor) = self.processor1 {
149 if processor
150 .process_string(&mut value, &mut Meta::default(), &self.state)
151 .is_err()
152 {
153 return Cow::Borrowed("");
154 }
155 }
156
157 if let Some(ref mut processor) = self.processor2 {
158 if processor
159 .process_string(&mut value, &mut Meta::default(), &self.state)
160 .is_err()
161 {
162 return Cow::Borrowed("");
163 }
164 }
165
166 Cow::Owned(value)
167 }
168}
169
170struct ScrubbedValue<'a, 'b>(&'a RawValue, Rc<RefCell<ScrubberTransform<'b>>>);
175
176impl serde::Serialize for ScrubbedValue<'_, '_> {
177 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
178 where
179 S: serde::Serializer,
180 {
181 let mut transform = self.1.borrow_mut();
182 let mut deserializer = serde_json::Deserializer::from_str(self.0.get());
183 let scrubber = relay_pii::transform::Deserializer::new(&mut deserializer, &mut *transform);
184 serde_transcode::transcode(scrubber, serializer)
185 }
186}
187
188struct EventStreamVisitor<'a, S> {
190 serializer: S,
191 scrubber: Rc<RefCell<ScrubberTransform<'a>>>,
192}
193
194impl<'a, S> EventStreamVisitor<'a, S> {
195 const SENTRY_EVENT_TYPE: u8 = 5;
199
200 fn new(serializer: S, scrubber: Rc<RefCell<ScrubberTransform<'a>>>) -> Self {
202 Self {
203 serializer,
204 scrubber,
205 }
206 }
207}
208
209impl<'de, S> de::Visitor<'de> for EventStreamVisitor<'_, S>
210where
211 S: ser::Serializer,
212{
213 type Value = S::Ok;
214
215 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
216 write!(formatter, "a replay recording event stream")
217 }
218
219 fn visit_seq<A>(self, mut v: A) -> Result<Self::Value, A::Error>
220 where
221 A: de::SeqAccess<'de>,
222 {
223 #[derive(Clone, Copy, serde::Deserialize)]
225 struct TypeHelper {
226 #[serde(rename = "type")]
227 ty: u8,
228 }
229
230 use serde::ser::SerializeSeq;
231 let mut seq = self.serializer.serialize_seq(v.size_hint()).map_err(s2d)?;
232
233 while let Some(raw) = v.next_element::<&'de RawValue>()? {
248 let helper = serde_json::from_str::<TypeHelper>(raw.get()).map_err(s2d)?;
249 if helper.ty == Self::SENTRY_EVENT_TYPE {
251 seq.serialize_element(&ScrubbedValue(raw, self.scrubber.clone()))
252 .map_err(s2d)?;
253 self.scrubber.borrow_mut().ensure_empty();
256 } else {
257 seq.serialize_element(raw).map_err(s2d)?;
258 }
259 }
260
261 seq.end().map_err(s2d)
262 }
263}
264
265fn s2d<S, D>(s: S) -> D
267where
268 S: ser::Error,
269 D: de::Error,
270{
271 D::custom(s.to_string())
272}
273
274pub struct RecordingScrubber<'a> {
290 limit: usize,
291 transform: Rc<RefCell<ScrubberTransform<'a>>>,
292}
293
294impl<'a> RecordingScrubber<'a> {
295 pub fn new(
307 limit: usize,
308 config1: Option<&'a PiiConfig>,
309 config2: Option<&'a PiiConfig>,
310 ) -> Self {
311 Self {
312 limit,
313 transform: Rc::new(RefCell::new(ScrubberTransform {
314 processor1: config1.map(|c| PiiProcessor::new(c.compiled())),
315 processor2: config2.map(|c| PiiProcessor::new(c.compiled())),
316 state: ProcessingState::new_root(None, None),
317 path: vec![],
318 })),
319 }
320 }
321
322 pub fn is_empty(&self) -> bool {
324 let tmp = self.transform.borrow();
325 tmp.processor1.is_none() && tmp.processor2.is_none()
326 }
327
328 fn scrub_replay<W>(&mut self, json: &[u8], write: W) -> Result<(), ParseRecordingError>
329 where
330 W: std::io::Write,
331 {
332 let mut deserializer = serde_json::Deserializer::from_slice(json);
333 let mut serializer = serde_json::Serializer::new(write);
334
335 deserializer.deserialize_seq(EventStreamVisitor::new(
336 &mut serializer,
337 self.transform.clone(),
338 ))?;
339
340 Ok(())
341 }
342
343 #[doc(hidden)] pub fn transcode_replay(
345 &mut self,
346 body: &[u8],
347 output: &mut Vec<u8>,
348 ) -> Result<(), ParseRecordingError> {
349 let encoder = ZlibEncoder::new(output, Compression::default());
350
351 if body.first() == Some(&b'[') {
352 self.scrub_replay(body, encoder)
353 } else {
354 let mut decompressed = Vec::with_capacity(8 * 1024);
355 let mut decoder = ZlibDecoder::new(body).take(self.limit as u64);
356 decoder
357 .read_to_end(&mut decompressed)
358 .map_err(ParseRecordingError::Compression)?;
359
360 self.scrub_replay(&decompressed, encoder)
361 }
362 }
363
364 pub fn process_recording(&mut self, bytes: &[u8]) -> Result<Vec<u8>, ParseRecordingError> {
383 if bytes.is_empty() {
385 return Err(ParseRecordingError::Message("no data found"));
386 }
387
388 let mut split = bytes.splitn(2, |b| b == &b'\n');
389 let header = split
390 .next()
391 .ok_or(ParseRecordingError::Message("no headers found"))?;
392
393 let body = match split.next() {
394 Some(b"") | None => return Err(ParseRecordingError::Message("no body found")),
395 Some(body) => body,
396 };
397
398 let mut output = header.to_owned();
399 output.push(b'\n');
400 output.reserve(body.len());
404 self.transcode_replay(body, &mut output)?;
405
406 Ok(output)
407 }
408}
409
410#[cfg(test)]
411mod tests {
412 use relay_pii::{DataScrubbingConfig, PiiConfig};
415
416 use crate::recording::scrub_at_path;
417
418 use super::RecordingScrubber;
419
420 fn default_pii_config() -> PiiConfig {
421 let mut scrubbing_config = DataScrubbingConfig::default();
422 scrubbing_config.scrub_data = true;
423 scrubbing_config.scrub_defaults = true;
424 scrubbing_config.scrub_ip_addresses = true;
425 scrubbing_config.pii_config_uncached().unwrap().unwrap()
426 }
427
428 fn scrubber(config: &PiiConfig) -> RecordingScrubber {
429 RecordingScrubber::new(usize::MAX, Some(config), None)
430 }
431
432 #[test]
433 fn test_process_recording_end_to_end() {
434 let payload: &[u8] = &[
437 123, 34, 115, 101, 103, 109, 101, 110, 116, 95, 105, 100, 34, 58, 51, 125, 10, 120,
438 156, 149, 144, 91, 106, 196, 32, 20, 64, 247, 114, 191, 237, 160, 241, 145, 234, 38,
439 102, 1, 195, 124, 152, 104, 6, 33, 169, 193, 40, 52, 4, 247, 94, 91, 103, 40, 20, 108,
440 59, 191, 247, 30, 207, 225, 122, 57, 32, 238, 171, 5, 69, 17, 24, 29, 53, 168, 3, 54,
441 159, 194, 88, 70, 4, 193, 234, 55, 23, 157, 127, 219, 64, 93, 14, 120, 7, 37, 100, 1,
442 119, 80, 29, 102, 8, 156, 1, 213, 11, 4, 209, 45, 246, 60, 77, 155, 141, 160, 94, 232,
443 43, 206, 232, 206, 118, 127, 176, 132, 177, 7, 203, 42, 75, 36, 175, 44, 231, 63, 88,
444 217, 229, 107, 174, 179, 45, 234, 101, 45, 172, 232, 49, 163, 84, 22, 191, 232, 63, 61,
445 207, 93, 130, 229, 189, 216, 53, 138, 84, 182, 139, 178, 199, 191, 22, 139, 179, 238,
446 196, 227, 244, 134, 137, 240, 158, 60, 101, 34, 255, 18, 241, 6, 116, 42, 212, 119, 35,
447 234, 27, 40, 24, 130, 213, 102, 12, 105, 25, 160, 252, 147, 222, 103, 175, 205, 215,
448 182, 45, 168, 17, 48, 118, 210, 105, 142, 229, 217, 168, 163, 189, 249, 80, 254, 19,
449 146, 59, 13, 115, 10, 144, 115, 190, 126, 0, 2, 68, 180, 16,
450 ];
451
452 let config = default_pii_config();
453 let result = scrubber(&config).process_recording(payload);
454 assert!(!result.unwrap().is_empty());
455 }
456
457 #[test]
458 fn test_process_recording_no_body_data() {
459 let payload: &[u8] = &[
461 123, 34, 115, 101, 103, 109, 101, 110, 116, 95, 105, 100, 34, 58, 51, 125, 10,
462 ];
463
464 let config = default_pii_config();
465 let result = scrubber(&config).process_recording(payload);
466 assert!(matches!(
467 result.unwrap_err(),
468 super::ParseRecordingError::Message("no body found"),
469 ));
470 }
471
472 #[test]
473 fn test_process_recording_bad_body_data() {
474 let payload: &[u8] = &[
476 123, 34, 115, 101, 103, 109, 101, 110, 116, 95, 105, 100, 34, 58, 51, 125, 10, 22,
477 ];
478
479 let config = default_pii_config();
480 let result = scrubber(&config).process_recording(payload);
481 assert!(matches!(
482 result.unwrap_err(),
483 super::ParseRecordingError::Compression(_),
484 ));
485 }
486
487 #[test]
488 fn test_process_recording_no_headers() {
489 let payload: &[u8] = &[
491 123, 34, 115, 101, 103, 109, 101, 110, 116, 95, 105, 100, 34, 58, 51, 125,
492 ];
493
494 let config = default_pii_config();
495 let result = scrubber(&config).process_recording(payload);
496 assert!(matches!(
497 result.unwrap_err(),
498 super::ParseRecordingError::Message("no body found"),
499 ));
500 }
501
502 #[test]
503 fn test_process_recording_no_contents() {
504 let payload: &[u8] = &[];
506
507 let config = default_pii_config();
508 let result = scrubber(&config).process_recording(payload);
509 assert!(matches!(
510 result.unwrap_err(),
511 super::ParseRecordingError::Message("no data found"),
512 ));
513 }
514
515 #[ignore = "type 3 nodes are not supported"]
518 #[test]
519 fn test_pii_credit_card_removal() {
520 let payload = include_bytes!("../tests/fixtures/rrweb-pii.json");
521
522 let mut transcoded = Vec::new();
523 let config = default_pii_config();
524 scrubber(&config)
525 .scrub_replay(payload.as_slice(), &mut transcoded)
526 .unwrap();
527
528 let parsed = std::str::from_utf8(&transcoded).unwrap();
529 assert!(parsed.contains(r#"{"type":3,"textContent":"[Filtered]","id":284}"#));
530 }
531
532 #[test]
533 fn test_scrub_pii_navigation() {
534 let payload = include_bytes!("../tests/fixtures/rrweb-performance-navigation.json");
535
536 let mut transcoded = Vec::new();
537 let config = default_pii_config();
538 scrubber(&config)
539 .scrub_replay(payload.as_slice(), &mut transcoded)
540 .unwrap();
541
542 let parsed = std::str::from_utf8(&transcoded).unwrap();
543 assert!(parsed.contains("https://sentry.io?credit-card=[Filtered]"));
544 }
545
546 #[test]
547 fn test_scrub_pii_resource() {
548 let payload = include_bytes!("../tests/fixtures/rrweb-performance-resource.json");
549
550 let mut transcoded = Vec::new();
551 let config = default_pii_config();
552 scrubber(&config)
553 .scrub_replay(payload.as_slice(), &mut transcoded)
554 .unwrap();
555
556 let parsed = std::str::from_utf8(&transcoded).unwrap();
557 assert!(parsed.contains("https://sentry.io?credit-card=[Filtered]"));
558 }
559
560 #[ignore = "type 3 nodes are not supported"]
561 #[test]
562 fn test_pii_ip_address_removal() {
563 let payload = include_bytes!("../tests/fixtures/rrweb-pii-ip-address.json");
564
565 let mut transcoded = Vec::new();
566 let config = default_pii_config();
567 scrubber(&config)
568 .scrub_replay(payload.as_slice(), &mut transcoded)
569 .unwrap();
570
571 let parsed = std::str::from_utf8(&transcoded).unwrap();
572 assert!(parsed.contains("\"value\":\"[ip]\"")); assert!(parsed.contains("\"textContent\":\"[ip]\"")) }
575
576 #[ignore = "type 2 nodes are not supported"]
579 #[test]
580 fn test_scrub_pii_full_snapshot_event() {
581 let payload = include_bytes!("../tests/fixtures/rrweb-event-2.json");
582
583 let mut transcoded = Vec::new();
584 let config = default_pii_config();
585 scrubber(&config)
586 .scrub_replay(payload.as_slice(), &mut transcoded)
587 .unwrap();
588
589 let scrubbed_result = std::str::from_utf8(&transcoded).unwrap();
590 assert!(scrubbed_result.contains("\"textContent\":\"my ssn is [Filtered]\""));
593 }
594
595 #[ignore = "type 3 nodes are not supported"]
596 #[test]
597 fn test_scrub_pii_incremental_snapshot_event() {
598 let payload = include_bytes!("../tests/fixtures/rrweb-event-3.json");
599
600 let mut transcoded = Vec::new();
601 let config = default_pii_config();
602 scrubber(&config)
603 .scrub_replay(payload.as_slice(), &mut transcoded)
604 .unwrap();
605
606 let scrubbed_result = std::str::from_utf8(&transcoded).unwrap();
607 assert!(scrubbed_result.contains("\"textContent\":\"[Filtered]\""));
608 assert!(scrubbed_result.contains("\"value\":\"[Filtered]\""));
609 }
610
611 #[test]
612 fn test_scrub_pii_custom_event() {
613 let payload = include_bytes!("../tests/fixtures/rrweb-event-5.json");
614
615 let mut transcoded = Vec::new();
616 let config = default_pii_config();
617 scrubber(&config)
618 .scrub_replay(payload.as_slice(), &mut transcoded)
619 .unwrap();
620
621 let scrubbed_result = std::str::from_utf8(&transcoded).unwrap();
622 assert!(scrubbed_result.contains("\"description\":\"[Filtered]\""));
623 assert!(scrubbed_result.contains("\"description\":\"https://sentry.io?ip-address=[ip]\""));
624 }
627
628 #[test]
629 fn test_scrub_pii_key_based() {
630 let payload = include_bytes!("../tests/fixtures/rrweb-request.json");
631
632 let mut transcoded = Vec::new();
633 let config = default_pii_config();
634
635 scrubber(&config)
636 .scrub_replay(payload.as_slice(), &mut transcoded)
637 .unwrap();
638
639 let scrubbed_result = std::str::from_utf8(&transcoded).unwrap();
640 let scrubbed: serde_json::Value = serde_json::from_str(scrubbed_result).unwrap();
641
642 assert_eq!(scrubbed[0]["data"]["payload"]["data"]["method"], "POST");
644
645 assert_eq!(
646 scrubbed[0]["data"]["payload"]["data"]["request"]["body"]["api_key"],
647 "[Filtered]"
648 );
649 }
650
651 #[test]
652 fn test_scrub_pii_key_based_edge_cases() {
653 let payload = include_bytes!("../tests/fixtures/rrweb-request-edge-cases.json");
654
655 let mut transcoded = Vec::new();
656 let config = default_pii_config();
657
658 scrubber(&config)
659 .scrub_replay(payload.as_slice(), &mut transcoded)
660 .unwrap();
661
662 let scrubbed_result = std::str::from_utf8(&transcoded).unwrap();
663 let scrubbed: serde_json::Value = serde_json::from_str(scrubbed_result).unwrap();
664
665 insta::assert_ron_snapshot!(scrubbed);
666 }
667
668 #[test]
669 fn test_scrub_at_path() {
670 for (should_scrub, path) in [
671 (false, vec![]),
672 (false, vec!["data"]),
673 (false, vec!["data", "payload"]),
674 (false, vec!["data", "payload", "foo"]),
675 (false, vec!["foo", "payload", "data"]),
676 (true, vec!["data", "payload", "data"]),
677 (true, vec!["data", "payload", "data", "request"]),
678 (true, vec!["data", "payload", "data", "request", "body"]),
679 ] {
680 let path = path.into_iter().map(|p| p.to_owned()).collect::<Vec<_>>();
681 assert_eq!(should_scrub, scrub_at_path(&path));
682 }
683 }
684}