1use std::cell::RefCell;
15use std::fmt;
16use std::io::Read;
17use std::rc::Rc;
18use std::{borrow::Cow, sync::LazyLock};
19
20use flate2::Compression;
21use flate2::bufread::ZlibDecoder;
22use flate2::write::ZlibEncoder;
23use relay_event_schema::processor::{FieldAttrs, Pii, ProcessingState, Processor, ValueType};
24use relay_pii::{PiiConfig, PiiProcessor};
25use relay_protocol::Meta;
26use serde::{Deserializer, de, ser};
27use serde_json::value::RawValue;
28
29use relay_pii::transform::Transform;
30
31static PII_FIELDS: LazyLock<[Vec<&str>; 2]> = LazyLock::new(|| {
35 [
36 vec!["data", "payload", "description"],
37 vec!["data", "payload", "data"],
38 ]
39});
40
41fn scrub_at_path(path: &Vec<String>) -> bool {
43 PII_FIELDS.iter().any(|pii_path| {
44 path.len() >= pii_path.len() && pii_path.iter().zip(path).all(|(k1, k2)| k1 == k2)
45 })
46}
47
48const FIELD_ATTRS_PII_TRUE: FieldAttrs = FieldAttrs::new().pii(Pii::True);
50
51const FIELD_ATTRS_PII_FALSE: FieldAttrs = FieldAttrs::new().pii(Pii::False);
53
54#[derive(Debug)]
56pub enum ParseRecordingError {
57 Parse(serde_json::Error),
59 Compression(std::io::Error),
61 Message(&'static str),
65}
66
67impl fmt::Display for ParseRecordingError {
68 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
69 match self {
70 ParseRecordingError::Parse(serde_error) => write!(f, "{serde_error}"),
71 ParseRecordingError::Compression(error) => write!(f, "{error}"),
72 ParseRecordingError::Message(message) => write!(f, "{message}"),
73 }
74 }
75}
76
77impl std::error::Error for ParseRecordingError {
78 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
79 match self {
80 ParseRecordingError::Parse(e) => Some(e),
81 ParseRecordingError::Compression(e) => Some(e),
82 ParseRecordingError::Message(_) => None,
83 }
84 }
85}
86
87impl From<serde_json::Error> for ParseRecordingError {
88 fn from(err: serde_json::Error) -> Self {
89 ParseRecordingError::Parse(err)
90 }
91}
92
93struct ScrubberTransform<'a> {
97 processor1: Option<PiiProcessor<'a>>,
99 processor2: Option<PiiProcessor<'a>>,
100 state: ProcessingState<'a>,
102 path: Vec<String>,
105}
106
107impl ScrubberTransform<'_> {
108 fn ensure_empty(&mut self) {
109 if !self.path.is_empty() || self.state.depth() > 0 {
110 debug_assert!(false, "ScrubberTransform not empty");
111 relay_log::error!("ScrubberTransform not empty");
112 }
113 self.state = ProcessingState::new_root(None, None);
114 self.path.clear();
115 }
116}
117
118impl<'de> Transform<'de> for &'_ mut ScrubberTransform<'_> {
119 fn push_path(&mut self, key: &'de str) {
120 self.path.push(key.to_owned());
121 let field_attrs = if scrub_at_path(&self.path) {
122 &FIELD_ATTRS_PII_TRUE
123 } else {
124 &FIELD_ATTRS_PII_FALSE
125 };
126
127 self.state = std::mem::take(&mut self.state).enter_owned(
128 key.to_owned(),
129 Some(Cow::Borrowed(field_attrs)),
130 Some(ValueType::String), )
132 }
133
134 fn pop_path(&mut self) {
135 if let Ok(Some(parent)) = std::mem::take(&mut self.state).try_into_parent() {
136 self.state = parent;
137 }
138 let popped = self.path.pop();
139 debug_assert!(popped.is_some()); }
141
142 fn transform_str<'a>(&mut self, v: &'a str) -> Cow<'a, str> {
143 self.transform_string(v.to_owned())
144 }
145
146 fn transform_string(&mut self, mut value: String) -> Cow<'static, str> {
147 if let Some(ref mut processor) = self.processor1
148 && processor
149 .process_string(&mut value, &mut Meta::default(), &self.state)
150 .is_err()
151 {
152 return Cow::Borrowed("");
153 }
154
155 if let Some(ref mut processor) = self.processor2
156 && processor
157 .process_string(&mut value, &mut Meta::default(), &self.state)
158 .is_err()
159 {
160 return Cow::Borrowed("");
161 }
162
163 Cow::Owned(value)
164 }
165}
166
167struct ScrubbedValue<'a, 'b>(&'a RawValue, Rc<RefCell<ScrubberTransform<'b>>>);
172
173impl serde::Serialize for ScrubbedValue<'_, '_> {
174 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
175 where
176 S: serde::Serializer,
177 {
178 let mut transform = self.1.borrow_mut();
179 let mut deserializer = serde_json::Deserializer::from_str(self.0.get());
180 let scrubber = relay_pii::transform::Deserializer::new(&mut deserializer, &mut *transform);
181 serde_transcode::transcode(scrubber, serializer)
182 }
183}
184
185struct EventStreamVisitor<'a, S> {
187 serializer: S,
188 scrubber: Rc<RefCell<ScrubberTransform<'a>>>,
189}
190
191impl<'a, S> EventStreamVisitor<'a, S> {
192 const SENTRY_EVENT_TYPE: u8 = 5;
196
197 fn new(serializer: S, scrubber: Rc<RefCell<ScrubberTransform<'a>>>) -> Self {
199 Self {
200 serializer,
201 scrubber,
202 }
203 }
204}
205
206impl<'de, S> de::Visitor<'de> for EventStreamVisitor<'_, S>
207where
208 S: ser::Serializer,
209{
210 type Value = S::Ok;
211
212 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
213 write!(formatter, "a replay recording event stream")
214 }
215
216 fn visit_seq<A>(self, mut v: A) -> Result<Self::Value, A::Error>
217 where
218 A: de::SeqAccess<'de>,
219 {
220 #[derive(Clone, Copy, serde::Deserialize)]
222 struct TypeHelper {
223 #[serde(rename = "type")]
224 ty: u8,
225 }
226
227 use serde::ser::SerializeSeq;
228 let mut seq = self.serializer.serialize_seq(v.size_hint()).map_err(s2d)?;
229
230 while let Some(raw) = v.next_element::<&'de RawValue>()? {
245 let helper = serde_json::from_str::<TypeHelper>(raw.get()).map_err(s2d)?;
246 if helper.ty == Self::SENTRY_EVENT_TYPE {
248 seq.serialize_element(&ScrubbedValue(raw, self.scrubber.clone()))
249 .map_err(s2d)?;
250 self.scrubber.borrow_mut().ensure_empty();
253 } else {
254 seq.serialize_element(raw).map_err(s2d)?;
255 }
256 }
257
258 seq.end().map_err(s2d)
259 }
260}
261
262fn s2d<S, D>(s: S) -> D
264where
265 S: ser::Error,
266 D: de::Error,
267{
268 D::custom(s.to_string())
269}
270
271pub struct RecordingScrubber<'a> {
287 limit: usize,
288 transform: Rc<RefCell<ScrubberTransform<'a>>>,
289}
290
291impl<'a> RecordingScrubber<'a> {
292 pub fn new(
304 limit: usize,
305 config1: Option<&'a PiiConfig>,
306 config2: Option<&'a PiiConfig>,
307 ) -> Self {
308 Self {
309 limit,
310 transform: Rc::new(RefCell::new(ScrubberTransform {
311 processor1: config1.map(|c| PiiProcessor::new(c.compiled())),
312 processor2: config2.map(|c| PiiProcessor::new(c.compiled())),
313 state: ProcessingState::new_root(None, None),
314 path: vec![],
315 })),
316 }
317 }
318
319 pub fn is_empty(&self) -> bool {
321 let tmp = self.transform.borrow();
322 tmp.processor1.is_none() && tmp.processor2.is_none()
323 }
324
325 fn scrub_replay<W>(&mut self, json: &[u8], write: W) -> Result<(), ParseRecordingError>
326 where
327 W: std::io::Write,
328 {
329 let mut deserializer = serde_json::Deserializer::from_slice(json);
330 let mut serializer = serde_json::Serializer::new(write);
331
332 deserializer.deserialize_seq(EventStreamVisitor::new(
333 &mut serializer,
334 self.transform.clone(),
335 ))?;
336
337 Ok(())
338 }
339
340 #[doc(hidden)] pub fn transcode_replay(
342 &mut self,
343 body: &[u8],
344 output: &mut Vec<u8>,
345 ) -> Result<(), ParseRecordingError> {
346 let encoder = ZlibEncoder::new(output, Compression::default());
347
348 if body.first() == Some(&b'[') {
349 self.scrub_replay(body, encoder)
350 } else {
351 let mut decompressed = Vec::with_capacity(8 * 1024);
352 let mut decoder = ZlibDecoder::new(body).take(self.limit as u64);
353 decoder
354 .read_to_end(&mut decompressed)
355 .map_err(ParseRecordingError::Compression)?;
356
357 self.scrub_replay(&decompressed, encoder)
358 }
359 }
360
361 pub fn process_recording(&mut self, bytes: &[u8]) -> Result<Vec<u8>, ParseRecordingError> {
380 if bytes.is_empty() {
382 return Err(ParseRecordingError::Message("no data found"));
383 }
384
385 let mut split = bytes.splitn(2, |b| b == &b'\n');
386 let header = split
387 .next()
388 .ok_or(ParseRecordingError::Message("no headers found"))?;
389
390 let body = match split.next() {
391 Some(b"") | None => return Err(ParseRecordingError::Message("no body found")),
392 Some(body) => body,
393 };
394
395 let mut output = header.to_owned();
396 output.push(b'\n');
397 output.reserve(body.len());
401 self.transcode_replay(body, &mut output)?;
402
403 Ok(output)
404 }
405}
406
407#[cfg(test)]
408mod tests {
409 use relay_pii::{DataScrubbingConfig, PiiConfig};
412
413 use crate::recording::scrub_at_path;
414
415 use super::RecordingScrubber;
416
417 fn default_pii_config() -> PiiConfig {
418 let mut scrubbing_config = DataScrubbingConfig::default();
419 scrubbing_config.scrub_data = true;
420 scrubbing_config.scrub_defaults = true;
421 scrubbing_config.scrub_ip_addresses = true;
422 scrubbing_config.pii_config_uncached().unwrap().unwrap()
423 }
424
425 fn scrubber(config: &PiiConfig) -> RecordingScrubber<'_> {
426 RecordingScrubber::new(usize::MAX, Some(config), None)
427 }
428
429 #[test]
430 fn test_process_recording_end_to_end() {
431 let payload: &[u8] = &[
434 123, 34, 115, 101, 103, 109, 101, 110, 116, 95, 105, 100, 34, 58, 51, 125, 10, 120,
435 156, 149, 144, 91, 106, 196, 32, 20, 64, 247, 114, 191, 237, 160, 241, 145, 234, 38,
436 102, 1, 195, 124, 152, 104, 6, 33, 169, 193, 40, 52, 4, 247, 94, 91, 103, 40, 20, 108,
437 59, 191, 247, 30, 207, 225, 122, 57, 32, 238, 171, 5, 69, 17, 24, 29, 53, 168, 3, 54,
438 159, 194, 88, 70, 4, 193, 234, 55, 23, 157, 127, 219, 64, 93, 14, 120, 7, 37, 100, 1,
439 119, 80, 29, 102, 8, 156, 1, 213, 11, 4, 209, 45, 246, 60, 77, 155, 141, 160, 94, 232,
440 43, 206, 232, 206, 118, 127, 176, 132, 177, 7, 203, 42, 75, 36, 175, 44, 231, 63, 88,
441 217, 229, 107, 174, 179, 45, 234, 101, 45, 172, 232, 49, 163, 84, 22, 191, 232, 63, 61,
442 207, 93, 130, 229, 189, 216, 53, 138, 84, 182, 139, 178, 199, 191, 22, 139, 179, 238,
443 196, 227, 244, 134, 137, 240, 158, 60, 101, 34, 255, 18, 241, 6, 116, 42, 212, 119, 35,
444 234, 27, 40, 24, 130, 213, 102, 12, 105, 25, 160, 252, 147, 222, 103, 175, 205, 215,
445 182, 45, 168, 17, 48, 118, 210, 105, 142, 229, 217, 168, 163, 189, 249, 80, 254, 19,
446 146, 59, 13, 115, 10, 144, 115, 190, 126, 0, 2, 68, 180, 16,
447 ];
448
449 let config = default_pii_config();
450 let result = scrubber(&config).process_recording(payload);
451 assert!(!result.unwrap().is_empty());
452 }
453
454 #[test]
455 fn test_process_recording_no_body_data() {
456 let payload: &[u8] = &[
458 123, 34, 115, 101, 103, 109, 101, 110, 116, 95, 105, 100, 34, 58, 51, 125, 10,
459 ];
460
461 let config = default_pii_config();
462 let result = scrubber(&config).process_recording(payload);
463 assert!(matches!(
464 result.unwrap_err(),
465 super::ParseRecordingError::Message("no body found"),
466 ));
467 }
468
469 #[test]
470 fn test_process_recording_bad_body_data() {
471 let payload: &[u8] = &[
473 123, 34, 115, 101, 103, 109, 101, 110, 116, 95, 105, 100, 34, 58, 51, 125, 10, 22, 123,
474 ];
475
476 let config = default_pii_config();
477 let result = scrubber(&config).process_recording(payload);
478 assert!(matches!(
479 result.unwrap_err(),
480 super::ParseRecordingError::Compression(_),
481 ));
482 }
483
484 #[test]
485 fn test_process_recording_no_headers() {
486 let payload: &[u8] = &[
488 123, 34, 115, 101, 103, 109, 101, 110, 116, 95, 105, 100, 34, 58, 51, 125,
489 ];
490
491 let config = default_pii_config();
492 let result = scrubber(&config).process_recording(payload);
493 assert!(matches!(
494 result.unwrap_err(),
495 super::ParseRecordingError::Message("no body found"),
496 ));
497 }
498
499 #[test]
500 fn test_process_recording_no_contents() {
501 let payload: &[u8] = &[];
503
504 let config = default_pii_config();
505 let result = scrubber(&config).process_recording(payload);
506 assert!(matches!(
507 result.unwrap_err(),
508 super::ParseRecordingError::Message("no data found"),
509 ));
510 }
511
512 #[ignore = "type 3 nodes are not supported"]
515 #[test]
516 fn test_pii_credit_card_removal() {
517 let payload = include_bytes!("../tests/fixtures/rrweb-pii.json");
518
519 let mut transcoded = Vec::new();
520 let config = default_pii_config();
521 scrubber(&config)
522 .scrub_replay(payload.as_slice(), &mut transcoded)
523 .unwrap();
524
525 let parsed = std::str::from_utf8(&transcoded).unwrap();
526 assert!(parsed.contains(r#"{"type":3,"textContent":"[Filtered]","id":284}"#));
527 }
528
529 #[test]
530 fn test_scrub_pii_navigation() {
531 let payload = include_bytes!("../tests/fixtures/rrweb-performance-navigation.json");
532
533 let mut transcoded = Vec::new();
534 let config = default_pii_config();
535 scrubber(&config)
536 .scrub_replay(payload.as_slice(), &mut transcoded)
537 .unwrap();
538
539 let parsed = std::str::from_utf8(&transcoded).unwrap();
540 assert!(parsed.contains("https://sentry.io?credit-card=[Filtered]"));
541 }
542
543 #[test]
544 fn test_scrub_pii_resource() {
545 let payload = include_bytes!("../tests/fixtures/rrweb-performance-resource.json");
546
547 let mut transcoded = Vec::new();
548 let config = default_pii_config();
549 scrubber(&config)
550 .scrub_replay(payload.as_slice(), &mut transcoded)
551 .unwrap();
552
553 let parsed = std::str::from_utf8(&transcoded).unwrap();
554 assert!(parsed.contains("https://sentry.io?credit-card=[Filtered]"));
555 }
556
557 #[ignore = "type 3 nodes are not supported"]
558 #[test]
559 fn test_pii_ip_address_removal() {
560 let payload = include_bytes!("../tests/fixtures/rrweb-pii-ip-address.json");
561
562 let mut transcoded = Vec::new();
563 let config = default_pii_config();
564 scrubber(&config)
565 .scrub_replay(payload.as_slice(), &mut transcoded)
566 .unwrap();
567
568 let parsed = std::str::from_utf8(&transcoded).unwrap();
569 assert!(parsed.contains("\"value\":\"[ip]\"")); assert!(parsed.contains("\"textContent\":\"[ip]\"")) }
572
573 #[ignore = "type 2 nodes are not supported"]
576 #[test]
577 fn test_scrub_pii_full_snapshot_event() {
578 let payload = include_bytes!("../tests/fixtures/rrweb-event-2.json");
579
580 let mut transcoded = Vec::new();
581 let config = default_pii_config();
582 scrubber(&config)
583 .scrub_replay(payload.as_slice(), &mut transcoded)
584 .unwrap();
585
586 let scrubbed_result = std::str::from_utf8(&transcoded).unwrap();
587 assert!(scrubbed_result.contains("\"textContent\":\"my ssn is [Filtered]\""));
590 }
591
592 #[ignore = "type 3 nodes are not supported"]
593 #[test]
594 fn test_scrub_pii_incremental_snapshot_event() {
595 let payload = include_bytes!("../tests/fixtures/rrweb-event-3.json");
596
597 let mut transcoded = Vec::new();
598 let config = default_pii_config();
599 scrubber(&config)
600 .scrub_replay(payload.as_slice(), &mut transcoded)
601 .unwrap();
602
603 let scrubbed_result = std::str::from_utf8(&transcoded).unwrap();
604 assert!(scrubbed_result.contains("\"textContent\":\"[Filtered]\""));
605 assert!(scrubbed_result.contains("\"value\":\"[Filtered]\""));
606 }
607
608 #[test]
609 fn test_scrub_pii_custom_event() {
610 let payload = include_bytes!("../tests/fixtures/rrweb-event-5.json");
611
612 let mut transcoded = Vec::new();
613 let config = default_pii_config();
614 scrubber(&config)
615 .scrub_replay(payload.as_slice(), &mut transcoded)
616 .unwrap();
617
618 let scrubbed_result = std::str::from_utf8(&transcoded).unwrap();
619 assert!(scrubbed_result.contains("\"description\":\"[Filtered]\""));
620 assert!(scrubbed_result.contains("\"description\":\"https://sentry.io?ip-address=[ip]\""));
621 }
624
625 #[test]
626 fn test_scrub_pii_key_based() {
627 let payload = include_bytes!("../tests/fixtures/rrweb-request.json");
628
629 let mut transcoded = Vec::new();
630 let config = default_pii_config();
631
632 scrubber(&config)
633 .scrub_replay(payload.as_slice(), &mut transcoded)
634 .unwrap();
635
636 let scrubbed_result = std::str::from_utf8(&transcoded).unwrap();
637 let scrubbed: serde_json::Value = serde_json::from_str(scrubbed_result).unwrap();
638
639 assert_eq!(scrubbed[0]["data"]["payload"]["data"]["method"], "POST");
641
642 assert_eq!(
643 scrubbed[0]["data"]["payload"]["data"]["request"]["body"]["api_key"],
644 "[Filtered]"
645 );
646 }
647
648 #[test]
649 fn test_scrub_pii_key_based_edge_cases() {
650 let payload = include_bytes!("../tests/fixtures/rrweb-request-edge-cases.json");
651
652 let mut transcoded = Vec::new();
653 let config = default_pii_config();
654
655 scrubber(&config)
656 .scrub_replay(payload.as_slice(), &mut transcoded)
657 .unwrap();
658
659 let scrubbed_result = std::str::from_utf8(&transcoded).unwrap();
660 let scrubbed: serde_json::Value = serde_json::from_str(scrubbed_result).unwrap();
661
662 insta::assert_ron_snapshot!(scrubbed);
663 }
664
665 #[test]
666 fn test_scrub_at_path() {
667 for (should_scrub, path) in [
668 (false, vec![]),
669 (false, vec!["data"]),
670 (false, vec!["data", "payload"]),
671 (false, vec!["data", "payload", "foo"]),
672 (false, vec!["foo", "payload", "data"]),
673 (true, vec!["data", "payload", "data"]),
674 (true, vec!["data", "payload", "data", "request"]),
675 (true, vec!["data", "payload", "data", "request", "body"]),
676 ] {
677 let path = path.into_iter().map(|p| p.to_owned()).collect::<Vec<_>>();
678 assert_eq!(should_scrub, scrub_at_path(&path));
679 }
680 }
681}