use std::borrow::Cow;
use std::cell::RefCell;
use std::fmt;
use std::io::Read;
use std::rc::Rc;
use flate2::bufread::ZlibDecoder;
use flate2::write::ZlibEncoder;
use flate2::Compression;
use once_cell::sync::Lazy;
use relay_event_schema::processor::{FieldAttrs, Pii, ProcessingState, Processor, ValueType};
use relay_pii::{PiiConfig, PiiProcessor};
use relay_protocol::Meta;
use serde::{de, ser, Deserializer};
use serde_json::value::RawValue;
use crate::transform::Transform;
static PII_FIELDS: Lazy<[Vec<&str>; 2]> = Lazy::new(|| {
[
vec!["data", "payload", "description"],
vec!["data", "payload", "data"],
]
});
fn scrub_at_path(path: &Vec<String>) -> bool {
PII_FIELDS.iter().any(|pii_path| {
path.len() >= pii_path.len() && pii_path.iter().zip(path).all(|(k1, k2)| k1 == k2)
})
}
const FIELD_ATTRS_PII_TRUE: FieldAttrs = FieldAttrs::new().pii(Pii::True);
const FIELD_ATTRS_PII_FALSE: FieldAttrs = FieldAttrs::new().pii(Pii::False);
#[derive(Debug)]
pub enum ParseRecordingError {
Parse(serde_json::Error),
Compression(std::io::Error),
Message(&'static str),
}
impl fmt::Display for ParseRecordingError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ParseRecordingError::Parse(serde_error) => write!(f, "{serde_error}"),
ParseRecordingError::Compression(error) => write!(f, "{error}"),
ParseRecordingError::Message(message) => write!(f, "{message}"),
}
}
}
impl std::error::Error for ParseRecordingError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
ParseRecordingError::Parse(e) => Some(e),
ParseRecordingError::Compression(e) => Some(e),
ParseRecordingError::Message(_) => None,
}
}
}
impl From<serde_json::Error> for ParseRecordingError {
fn from(err: serde_json::Error) -> Self {
ParseRecordingError::Parse(err)
}
}
struct ScrubberTransform<'a> {
processor1: Option<PiiProcessor<'a>>,
processor2: Option<PiiProcessor<'a>>,
state: ProcessingState<'a>,
path: Vec<String>,
}
impl ScrubberTransform<'_> {
fn ensure_empty(&mut self) {
if !self.path.is_empty() || self.state.depth() > 0 {
debug_assert!(false, "ScrubberTransform not empty");
relay_log::error!("ScrubberTransform not empty");
}
self.state = ProcessingState::new_root(None, None);
self.path.clear();
}
}
impl<'de> Transform<'de> for &'_ mut ScrubberTransform<'_> {
fn push_path(&mut self, key: &'de str) {
self.path.push(key.to_owned());
let field_attrs = if scrub_at_path(&self.path) {
&FIELD_ATTRS_PII_TRUE
} else {
&FIELD_ATTRS_PII_FALSE
};
self.state = std::mem::take(&mut self.state).enter_owned(
key.to_owned(),
Some(Cow::Borrowed(field_attrs)),
Some(ValueType::String), )
}
fn pop_path(&mut self) {
if let Ok(Some(parent)) = std::mem::take(&mut self.state).try_into_parent() {
self.state = parent;
}
let popped = self.path.pop();
debug_assert!(popped.is_some()); }
fn transform_str<'a>(&mut self, v: &'a str) -> Cow<'a, str> {
self.transform_string(v.to_owned())
}
fn transform_string(&mut self, mut value: String) -> Cow<'static, str> {
if let Some(ref mut processor) = self.processor1 {
if processor
.process_string(&mut value, &mut Meta::default(), &self.state)
.is_err()
{
return Cow::Borrowed("");
}
}
if let Some(ref mut processor) = self.processor2 {
if processor
.process_string(&mut value, &mut Meta::default(), &self.state)
.is_err()
{
return Cow::Borrowed("");
}
}
Cow::Owned(value)
}
}
struct ScrubbedValue<'a, 'b>(&'a RawValue, Rc<RefCell<ScrubberTransform<'b>>>);
impl serde::Serialize for ScrubbedValue<'_, '_> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut transform = self.1.borrow_mut();
let mut deserializer = serde_json::Deserializer::from_str(self.0.get());
let scrubber = crate::transform::Deserializer::new(&mut deserializer, &mut *transform);
serde_transcode::transcode(scrubber, serializer)
}
}
struct EventStreamVisitor<'a, S> {
serializer: S,
scrubber: Rc<RefCell<ScrubberTransform<'a>>>,
}
impl<'a, S> EventStreamVisitor<'a, S> {
const SENTRY_EVENT_TYPE: u8 = 5;
fn new(serializer: S, scrubber: Rc<RefCell<ScrubberTransform<'a>>>) -> Self {
Self {
serializer,
scrubber,
}
}
}
impl<'de, S> de::Visitor<'de> for EventStreamVisitor<'_, S>
where
S: ser::Serializer,
{
type Value = S::Ok;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(formatter, "a replay recording event stream")
}
fn visit_seq<A>(self, mut v: A) -> Result<Self::Value, A::Error>
where
A: de::SeqAccess<'de>,
{
#[derive(Clone, Copy, serde::Deserialize)]
struct TypeHelper {
#[serde(rename = "type")]
ty: u8,
}
use serde::ser::SerializeSeq;
let mut seq = self.serializer.serialize_seq(v.size_hint()).map_err(s2d)?;
while let Some(raw) = v.next_element::<&'de RawValue>()? {
let helper = serde_json::from_str::<TypeHelper>(raw.get()).map_err(s2d)?;
if helper.ty == Self::SENTRY_EVENT_TYPE {
seq.serialize_element(&ScrubbedValue(raw, self.scrubber.clone()))
.map_err(s2d)?;
self.scrubber.borrow_mut().ensure_empty();
} else {
seq.serialize_element(raw).map_err(s2d)?;
}
}
seq.end().map_err(s2d)
}
}
fn s2d<S, D>(s: S) -> D
where
S: ser::Error,
D: de::Error,
{
D::custom(s.to_string())
}
pub struct RecordingScrubber<'a> {
limit: usize,
transform: Rc<RefCell<ScrubberTransform<'a>>>,
}
impl<'a> RecordingScrubber<'a> {
pub fn new(
limit: usize,
config1: Option<&'a PiiConfig>,
config2: Option<&'a PiiConfig>,
) -> Self {
Self {
limit,
transform: Rc::new(RefCell::new(ScrubberTransform {
processor1: config1.map(|c| PiiProcessor::new(c.compiled())),
processor2: config2.map(|c| PiiProcessor::new(c.compiled())),
state: ProcessingState::new_root(None, None),
path: vec![],
})),
}
}
pub fn is_empty(&self) -> bool {
let tmp = self.transform.borrow();
tmp.processor1.is_none() && tmp.processor2.is_none()
}
fn scrub_replay<W>(&mut self, json: &[u8], write: W) -> Result<(), ParseRecordingError>
where
W: std::io::Write,
{
let mut deserializer = serde_json::Deserializer::from_slice(json);
let mut serializer = serde_json::Serializer::new(write);
deserializer.deserialize_seq(EventStreamVisitor::new(
&mut serializer,
self.transform.clone(),
))?;
Ok(())
}
#[doc(hidden)] pub fn transcode_replay(
&mut self,
body: &[u8],
output: &mut Vec<u8>,
) -> Result<(), ParseRecordingError> {
let encoder = ZlibEncoder::new(output, Compression::default());
if body.first() == Some(&b'[') {
self.scrub_replay(body, encoder)
} else {
let mut decompressed = Vec::with_capacity(8 * 1024);
let mut decoder = ZlibDecoder::new(body).take(self.limit as u64);
decoder
.read_to_end(&mut decompressed)
.map_err(ParseRecordingError::Compression)?;
self.scrub_replay(&decompressed, encoder)
}
}
pub fn process_recording(&mut self, bytes: &[u8]) -> Result<Vec<u8>, ParseRecordingError> {
if bytes.is_empty() {
return Err(ParseRecordingError::Message("no data found"));
}
let mut split = bytes.splitn(2, |b| b == &b'\n');
let header = split
.next()
.ok_or(ParseRecordingError::Message("no headers found"))?;
let body = match split.next() {
Some(b"") | None => return Err(ParseRecordingError::Message("no body found")),
Some(body) => body,
};
let mut output = header.to_owned();
output.push(b'\n');
output.reserve(body.len());
self.transcode_replay(body, &mut output)?;
Ok(output)
}
}
#[cfg(test)]
mod tests {
use relay_pii::{DataScrubbingConfig, PiiConfig};
use crate::recording::scrub_at_path;
use super::RecordingScrubber;
fn default_pii_config() -> PiiConfig {
let mut scrubbing_config = DataScrubbingConfig::default();
scrubbing_config.scrub_data = true;
scrubbing_config.scrub_defaults = true;
scrubbing_config.scrub_ip_addresses = true;
scrubbing_config.pii_config_uncached().unwrap().unwrap()
}
fn scrubber(config: &PiiConfig) -> RecordingScrubber {
RecordingScrubber::new(usize::MAX, Some(config), None)
}
#[test]
fn test_process_recording_end_to_end() {
let payload: &[u8] = &[
123, 34, 115, 101, 103, 109, 101, 110, 116, 95, 105, 100, 34, 58, 51, 125, 10, 120,
156, 149, 144, 91, 106, 196, 32, 20, 64, 247, 114, 191, 237, 160, 241, 145, 234, 38,
102, 1, 195, 124, 152, 104, 6, 33, 169, 193, 40, 52, 4, 247, 94, 91, 103, 40, 20, 108,
59, 191, 247, 30, 207, 225, 122, 57, 32, 238, 171, 5, 69, 17, 24, 29, 53, 168, 3, 54,
159, 194, 88, 70, 4, 193, 234, 55, 23, 157, 127, 219, 64, 93, 14, 120, 7, 37, 100, 1,
119, 80, 29, 102, 8, 156, 1, 213, 11, 4, 209, 45, 246, 60, 77, 155, 141, 160, 94, 232,
43, 206, 232, 206, 118, 127, 176, 132, 177, 7, 203, 42, 75, 36, 175, 44, 231, 63, 88,
217, 229, 107, 174, 179, 45, 234, 101, 45, 172, 232, 49, 163, 84, 22, 191, 232, 63, 61,
207, 93, 130, 229, 189, 216, 53, 138, 84, 182, 139, 178, 199, 191, 22, 139, 179, 238,
196, 227, 244, 134, 137, 240, 158, 60, 101, 34, 255, 18, 241, 6, 116, 42, 212, 119, 35,
234, 27, 40, 24, 130, 213, 102, 12, 105, 25, 160, 252, 147, 222, 103, 175, 205, 215,
182, 45, 168, 17, 48, 118, 210, 105, 142, 229, 217, 168, 163, 189, 249, 80, 254, 19,
146, 59, 13, 115, 10, 144, 115, 190, 126, 0, 2, 68, 180, 16,
];
let config = default_pii_config();
let result = scrubber(&config).process_recording(payload);
assert!(!result.unwrap().is_empty());
}
#[test]
fn test_process_recording_no_body_data() {
let payload: &[u8] = &[
123, 34, 115, 101, 103, 109, 101, 110, 116, 95, 105, 100, 34, 58, 51, 125, 10,
];
let config = default_pii_config();
let result = scrubber(&config).process_recording(payload);
assert!(matches!(
result.unwrap_err(),
super::ParseRecordingError::Message("no body found"),
));
}
#[test]
fn test_process_recording_bad_body_data() {
let payload: &[u8] = &[
123, 34, 115, 101, 103, 109, 101, 110, 116, 95, 105, 100, 34, 58, 51, 125, 10, 22,
];
let config = default_pii_config();
let result = scrubber(&config).process_recording(payload);
assert!(matches!(
result.unwrap_err(),
super::ParseRecordingError::Compression(_),
));
}
#[test]
fn test_process_recording_no_headers() {
let payload: &[u8] = &[
123, 34, 115, 101, 103, 109, 101, 110, 116, 95, 105, 100, 34, 58, 51, 125,
];
let config = default_pii_config();
let result = scrubber(&config).process_recording(payload);
assert!(matches!(
result.unwrap_err(),
super::ParseRecordingError::Message("no body found"),
));
}
#[test]
fn test_process_recording_no_contents() {
let payload: &[u8] = &[];
let config = default_pii_config();
let result = scrubber(&config).process_recording(payload);
assert!(matches!(
result.unwrap_err(),
super::ParseRecordingError::Message("no data found"),
));
}
#[ignore = "type 3 nodes are not supported"]
#[test]
fn test_pii_credit_card_removal() {
let payload = include_bytes!("../tests/fixtures/rrweb-pii.json");
let mut transcoded = Vec::new();
let config = default_pii_config();
scrubber(&config)
.scrub_replay(payload.as_slice(), &mut transcoded)
.unwrap();
let parsed = std::str::from_utf8(&transcoded).unwrap();
assert!(parsed.contains(r#"{"type":3,"textContent":"[Filtered]","id":284}"#));
}
#[test]
fn test_scrub_pii_navigation() {
let payload = include_bytes!("../tests/fixtures/rrweb-performance-navigation.json");
let mut transcoded = Vec::new();
let config = default_pii_config();
scrubber(&config)
.scrub_replay(payload.as_slice(), &mut transcoded)
.unwrap();
let parsed = std::str::from_utf8(&transcoded).unwrap();
assert!(parsed.contains("https://sentry.io?credit-card=[Filtered]"));
}
#[test]
fn test_scrub_pii_resource() {
let payload = include_bytes!("../tests/fixtures/rrweb-performance-resource.json");
let mut transcoded = Vec::new();
let config = default_pii_config();
scrubber(&config)
.scrub_replay(payload.as_slice(), &mut transcoded)
.unwrap();
let parsed = std::str::from_utf8(&transcoded).unwrap();
assert!(parsed.contains("https://sentry.io?credit-card=[Filtered]"));
}
#[ignore = "type 3 nodes are not supported"]
#[test]
fn test_pii_ip_address_removal() {
let payload = include_bytes!("../tests/fixtures/rrweb-pii-ip-address.json");
let mut transcoded = Vec::new();
let config = default_pii_config();
scrubber(&config)
.scrub_replay(payload.as_slice(), &mut transcoded)
.unwrap();
let parsed = std::str::from_utf8(&transcoded).unwrap();
assert!(parsed.contains("\"value\":\"[ip]\"")); assert!(parsed.contains("\"textContent\":\"[ip]\"")) }
#[ignore = "type 2 nodes are not supported"]
#[test]
fn test_scrub_pii_full_snapshot_event() {
let payload = include_bytes!("../tests/fixtures/rrweb-event-2.json");
let mut transcoded = Vec::new();
let config = default_pii_config();
scrubber(&config)
.scrub_replay(payload.as_slice(), &mut transcoded)
.unwrap();
let scrubbed_result = std::str::from_utf8(&transcoded).unwrap();
assert!(scrubbed_result.contains("\"textContent\":\"my ssn is [Filtered]\""));
}
#[ignore = "type 3 nodes are not supported"]
#[test]
fn test_scrub_pii_incremental_snapshot_event() {
let payload = include_bytes!("../tests/fixtures/rrweb-event-3.json");
let mut transcoded = Vec::new();
let config = default_pii_config();
scrubber(&config)
.scrub_replay(payload.as_slice(), &mut transcoded)
.unwrap();
let scrubbed_result = std::str::from_utf8(&transcoded).unwrap();
assert!(scrubbed_result.contains("\"textContent\":\"[Filtered]\""));
assert!(scrubbed_result.contains("\"value\":\"[Filtered]\""));
}
#[test]
fn test_scrub_pii_custom_event() {
let payload = include_bytes!("../tests/fixtures/rrweb-event-5.json");
let mut transcoded = Vec::new();
let config = default_pii_config();
scrubber(&config)
.scrub_replay(payload.as_slice(), &mut transcoded)
.unwrap();
let scrubbed_result = std::str::from_utf8(&transcoded).unwrap();
assert!(scrubbed_result.contains("\"description\":\"[Filtered]\""));
assert!(scrubbed_result.contains("\"description\":\"https://sentry.io?ip-address=[ip]\""));
}
#[test]
fn test_scrub_pii_key_based() {
let payload = include_bytes!("../tests/fixtures/rrweb-request.json");
let mut transcoded = Vec::new();
let config = default_pii_config();
scrubber(&config)
.scrub_replay(payload.as_slice(), &mut transcoded)
.unwrap();
let scrubbed_result = std::str::from_utf8(&transcoded).unwrap();
let scrubbed: serde_json::Value = serde_json::from_str(scrubbed_result).unwrap();
assert_eq!(scrubbed[0]["data"]["payload"]["data"]["method"], "POST");
assert_eq!(
scrubbed[0]["data"]["payload"]["data"]["request"]["body"]["api_key"],
"[Filtered]"
);
}
#[test]
fn test_scrub_pii_key_based_edge_cases() {
let payload = include_bytes!("../tests/fixtures/rrweb-request-edge-cases.json");
let mut transcoded = Vec::new();
let config = default_pii_config();
scrubber(&config)
.scrub_replay(payload.as_slice(), &mut transcoded)
.unwrap();
let scrubbed_result = std::str::from_utf8(&transcoded).unwrap();
let scrubbed: serde_json::Value = serde_json::from_str(scrubbed_result).unwrap();
insta::assert_ron_snapshot!(scrubbed);
}
#[test]
fn test_scrub_at_path() {
for (should_scrub, path) in [
(false, vec![]),
(false, vec!["data"]),
(false, vec!["data", "payload"]),
(false, vec!["data", "payload", "foo"]),
(false, vec!["foo", "payload", "data"]),
(true, vec!["data", "payload", "data"]),
(true, vec!["data", "payload", "data", "request"]),
(true, vec!["data", "payload", "data", "request", "body"]),
] {
let path = path.into_iter().map(|p| p.to_owned()).collect::<Vec<_>>();
assert_eq!(should_scrub, scrub_at_path(&path));
}
}
}