1use std::cmp::Ordering;
2use std::ops::Bound;
3
4use relay_event_schema::processor::{
5 self, ProcessValue, ProcessingAction, ProcessingResult, ProcessingState, Processor, ValueType,
6};
7use relay_event_schema::protocol::Attributes;
8use relay_protocol::{Array, Empty, Meta, Object};
9
10use crate::eap::size;
11
12#[derive(Clone, Debug)]
13struct SizeState {
14 max_depth: Option<usize>,
15 encountered_at_depth: usize,
16 size_remaining: Option<usize>,
17}
18
19#[derive(Debug, Clone, Copy)]
21enum DeleteAction {
22 Hard,
24 WithRemark(&'static str),
26}
27
28impl From<DeleteAction> for ProcessingAction {
29 fn from(action: DeleteAction) -> Self {
30 match action {
31 DeleteAction::Hard => ProcessingAction::DeleteValueHard,
32 DeleteAction::WithRemark(rule_id) => ProcessingAction::DeleteValueWithRemark(rule_id),
33 }
34 }
35}
36
37#[derive(Default)]
49pub struct TrimmingProcessor {
50 size_state: Vec<SizeState>,
51 removed_key_byte_budget: usize,
52}
53
54impl TrimmingProcessor {
55 pub fn new(removed_key_byte_budget: usize) -> Self {
57 Self {
58 size_state: Default::default(),
59 removed_key_byte_budget,
60 }
61 }
62
63 fn should_remove_container<T: Empty>(&self, value: &T, state: &ProcessingState<'_>) -> bool {
64 self.remaining_depth(state) == Some(1) && !value.is_empty()
67 }
68
69 #[inline]
70 fn remaining_size(&self) -> Option<usize> {
71 self.size_state
72 .iter()
73 .filter_map(|x| x.size_remaining)
74 .min()
75 }
76
77 #[inline]
78 fn remaining_depth(&self, state: &ProcessingState<'_>) -> Option<usize> {
79 self.size_state
80 .iter()
81 .filter_map(|size_state| {
82 let current_depth = state.depth() - size_state.encountered_at_depth;
85 size_state
86 .max_depth
87 .map(|max_depth| max_depth.saturating_sub(current_depth))
88 })
89 .min()
90 }
91
92 fn consume_size(&mut self, state: Option<&ProcessingState>, default: usize) {
93 let size = state.and_then(|s| s.bytes_size()).unwrap_or(default);
94 for remaining in self
95 .size_state
96 .iter_mut()
97 .filter_map(|state| state.size_remaining.as_mut())
98 {
99 *remaining = remaining.saturating_sub(size);
100 }
101 }
102
103 fn delete_value(&mut self, key: Option<&str>) -> DeleteAction {
109 let len = key.map_or(0, |key| key.len());
110 if len <= self.removed_key_byte_budget {
111 self.removed_key_byte_budget -= len;
112 DeleteAction::WithRemark("trimmed")
113 } else {
114 DeleteAction::Hard
115 }
116 }
117}
118
119impl Processor for TrimmingProcessor {
120 fn before_process<T: ProcessValue>(
121 &mut self,
122 _: Option<&T>,
123 _: &mut Meta,
124 state: &ProcessingState<'_>,
125 ) -> ProcessingResult {
126 if state.max_bytes().is_some() || state.attrs().max_depth.is_some() {
129 self.size_state.push(SizeState {
130 size_remaining: state.max_bytes(),
131 encountered_at_depth: state.depth(),
132 max_depth: state.attrs().max_depth,
133 });
134 }
135
136 if state.attrs().trim {
137 let key = state.keys().next();
138 if self.remaining_size() == Some(0) {
139 return Err(self.delete_value(key).into());
140 }
141 if self.remaining_depth(state) == Some(0) {
142 return Err(self.delete_value(key).into());
143 }
144 }
145 Ok(())
146 }
147
148 fn after_process<T: ProcessValue>(
149 &mut self,
150 _value: Option<&T>,
151 _: &mut Meta,
152 state: &ProcessingState<'_>,
153 ) -> ProcessingResult {
154 self.size_state
157 .pop_if(|size_state| state.depth() == size_state.encountered_at_depth);
158
159 Ok(())
164 }
165 fn process_u64(
166 &mut self,
167 _value: &mut u64,
168 _meta: &mut Meta,
169 state: &ProcessingState<'_>,
170 ) -> ProcessingResult {
171 self.consume_size(Some(state), 8);
172 Ok(())
173 }
174
175 fn process_i64(
176 &mut self,
177 _value: &mut i64,
178 _meta: &mut Meta,
179 state: &ProcessingState<'_>,
180 ) -> ProcessingResult {
181 self.consume_size(Some(state), 8);
182 Ok(())
183 }
184
185 fn process_f64(
186 &mut self,
187 _value: &mut f64,
188 _meta: &mut Meta,
189 state: &ProcessingState<'_>,
190 ) -> ProcessingResult {
191 self.consume_size(Some(state), 8);
192 Ok(())
193 }
194
195 fn process_bool(
196 &mut self,
197 _value: &mut bool,
198 _meta: &mut Meta,
199 state: &ProcessingState<'_>,
200 ) -> ProcessingResult {
201 self.consume_size(Some(state), 1);
202 Ok(())
203 }
204
205 fn process_string(
206 &mut self,
207 value: &mut String,
208 meta: &mut Meta,
209 state: &ProcessingState<'_>,
210 ) -> ProcessingResult {
211 if let Some(max_chars) = state.max_chars() {
212 crate::trimming::trim_string(value, meta, max_chars, state.attrs().max_chars_allowance);
213 }
214
215 if !state.attrs().trim {
216 self.consume_size(Some(state), value.len());
217 return Ok(());
218 }
219
220 if let Some(size_remaining) = self.remaining_size() {
221 crate::trimming::trim_string(value, meta, size_remaining, 0);
222 }
223
224 self.consume_size(Some(state), value.len());
225
226 Ok(())
227 }
228
229 fn process_array<T>(
230 &mut self,
231 value: &mut Array<T>,
232 meta: &mut Meta,
233 state: &ProcessingState<'_>,
234 ) -> ProcessingResult
235 where
236 T: ProcessValue,
237 {
238 if !state.attrs().trim {
239 return Ok(());
240 }
241
242 if !self.size_state.is_empty() {
244 let original_length = value.len();
245
246 if self.should_remove_container(value, state) {
247 return Err(ProcessingAction::DeleteValueHard);
248 }
249
250 let mut split_index = None;
251 for (index, item) in value.iter_mut().enumerate() {
252 if self.remaining_size() == Some(0) {
253 split_index = Some(index);
254 break;
255 }
256
257 let item_state = state.enter_index(index, None, ValueType::for_field(item));
258 processor::process_value(item, self, &item_state)?;
259 }
260
261 if let Some(split_index) = split_index {
262 let mut i = split_index;
263
264 for item in &mut value[split_index..] {
265 match self.delete_value(None) {
266 DeleteAction::Hard => break,
267 DeleteAction::WithRemark(rule_id) => {
268 processor::delete_with_remark(item, rule_id)
269 }
270 }
271
272 i += 1;
273 }
274
275 let _ = value.split_off(i);
276 }
277
278 if value.len() != original_length {
279 meta.set_original_length(Some(original_length));
280 }
281 } else {
282 value.process_child_values(self, state)?;
283 }
284
285 Ok(())
286 }
287
288 fn process_object<T>(
289 &mut self,
290 value: &mut Object<T>,
291 meta: &mut Meta,
292 state: &ProcessingState<'_>,
293 ) -> ProcessingResult
294 where
295 T: ProcessValue,
296 {
297 if !state.attrs().trim {
298 return Ok(());
299 }
300
301 if !self.size_state.is_empty() {
303 let original_length = value.len();
304
305 if self.should_remove_container(value, state) {
306 return Err(ProcessingAction::DeleteValueHard);
307 }
308
309 let mut split_key = None;
310 for (key, item) in value.iter_mut() {
311 if self.remaining_size() == Some(0) {
312 split_key = Some(key.to_owned());
313 break;
314 }
315
316 let item_state = state.enter_borrowed(key, None, ValueType::for_field(item));
317 processor::process_value(item, self, &item_state)?;
318 }
319
320 if let Some(split_key) = split_key {
321 let mut i = split_key.as_str();
322
323 for (key, value) in value
326 .range_mut::<str, _>((Bound::Included(split_key.as_str()), Bound::Unbounded))
327 {
328 i = key.as_str();
329
330 match self.delete_value(Some(key.as_ref())) {
331 DeleteAction::Hard => break,
332 DeleteAction::WithRemark(rule_id) => {
333 processor::delete_with_remark(value, rule_id)
334 }
335 }
336 }
337
338 let split_key = i.to_owned();
339 let _ = value.split_off(&split_key);
340 }
341
342 if value.len() != original_length {
343 meta.set_original_length(Some(original_length));
344 }
345 } else {
346 value.process_child_values(self, state)?;
347 }
348
349 Ok(())
350 }
351
352 fn process_attributes(
353 &mut self,
354 attributes: &mut Attributes,
355 meta: &mut Meta,
356 state: &ProcessingState,
357 ) -> ProcessingResult {
358 if !state.attrs().trim {
359 return Ok(());
360 }
361
362 let original_length = size::attributes_size(attributes);
369
370 let inner = std::mem::take(&mut attributes.0);
373 let mut sorted: Vec<_> = inner.into_iter().collect();
374 sorted.sort_by(
375 |(k1, v1), (k2, v2)| match (v1.value().is_some(), v2.value().is_some()) {
376 (false, false) => k1.len().cmp(&k2.len()),
377 (false, true) => Ordering::Less,
378 (true, false) => Ordering::Greater,
379 (true, true) => (k1.len() + size::attribute_size(v1))
380 .cmp(&(k2.len() + size::attribute_size(v2))),
381 },
382 );
383
384 sorted.retain(|(k, v)| {
387 if v.value().is_some() {
388 return true;
389 }
390
391 match self.delete_value(Some(k)) {
392 DeleteAction::Hard => false,
393 DeleteAction::WithRemark(_) => true,
394 }
395 });
396
397 let mut split_idx = None;
398 for (idx, (key, value)) in sorted.iter_mut().enumerate() {
399 if value.value().is_none() {
400 continue;
404 }
405 if let Some(remaining) = self.remaining_size()
406 && remaining < key.len()
407 {
408 split_idx = Some(idx);
409 break;
410 }
411
412 self.consume_size(None, key.len());
413
414 let value_state = state.enter_borrowed(key, None, ValueType::for_field(value));
415 processor::process_value(value, self, &value_state)?;
416 }
417
418 if let Some(split_idx) = split_idx {
419 let mut i = split_idx;
420
421 for (key, value) in &mut sorted[split_idx..] {
422 match self.delete_value(Some(key.as_ref())) {
423 DeleteAction::Hard => break,
424 DeleteAction::WithRemark(rule_id) => {
425 processor::delete_with_remark(value, rule_id)
426 }
427 }
428
429 i += 1;
430 }
431
432 let _ = sorted.split_off(i);
433 }
434
435 attributes.0 = sorted.into_iter().collect();
436
437 let new_size = size::attributes_size(attributes);
438 if new_size != original_length {
439 meta.set_original_length(Some(original_length));
440 }
441
442 Ok(())
443 }
444}
445
446#[cfg(test)]
447mod tests {
448 use relay_event_schema::protocol::{AttributeType, AttributeValue};
449 use relay_protocol::{Annotated, FromValue, IntoValue, SerializableAnnotated, Value};
450
451 use super::*;
452
453 #[derive(Debug, Clone, Empty, IntoValue, FromValue, ProcessValue)]
454 struct TestObject {
455 #[metastructure(max_chars = 10, trim = true)]
456 body: Annotated<String>,
457 #[metastructure(trim = false, bytes_size = 0)]
459 number: Annotated<u64>,
460 #[metastructure(trim = false, bytes_size = 10)]
462 other_number: Annotated<u64>,
463 #[metastructure(max_bytes = 40, trim = true)]
464 attributes: Annotated<Attributes>,
465 #[metastructure(trim = true)]
466 footer: Annotated<String>,
467 }
468
469 #[test]
470 fn test_split_on_string() {
471 let mut attributes = Attributes::new();
472
473 attributes.insert("small", 17); attributes.insert("medium string", "This string should be trimmed"); attributes.insert("attribute is very large and should be removed", true); let mut value = Annotated::new(TestObject {
478 attributes: Annotated::new(attributes),
479 number: Annotated::empty(),
480 other_number: Annotated::empty(),
481 body: Annotated::new("This is longer than allowed".to_owned()),
482 footer: Annotated::empty(),
483 });
484
485 let mut processor = TrimmingProcessor::new(100);
486
487 let state = ProcessingState::new_root(Default::default(), []);
488 processor::process_value(&mut value, &mut processor, &state).unwrap();
489
490 insta::assert_json_snapshot!(SerializableAnnotated(&value), @r###"
491 {
492 "body": "This is...",
493 "attributes": {
494 "attribute is very large and should be removed": null,
495 "medium string": {
496 "type": "string",
497 "value": "This string..."
498 },
499 "small": {
500 "type": "integer",
501 "value": 17
502 }
503 },
504 "_meta": {
505 "attributes": {
506 "": {
507 "len": 101
508 },
509 "attribute is very large and should be removed": {
510 "": {
511 "rem": [
512 [
513 "trimmed",
514 "x"
515 ]
516 ]
517 }
518 },
519 "medium string": {
520 "value": {
521 "": {
522 "rem": [
523 [
524 "!limit",
525 "s",
526 11,
527 14
528 ]
529 ],
530 "len": 29
531 }
532 }
533 }
534 },
535 "body": {
536 "": {
537 "rem": [
538 [
539 "!limit",
540 "s",
541 7,
542 10
543 ]
544 ],
545 "len": 27
546 }
547 }
548 }
549 }
550 "###);
551 }
552
553 #[test]
554 fn test_one_byte_left() {
555 let mut attributes = Attributes::new();
556
557 attributes.insert("small attribute", 17); attributes.insert("medium attribute", "This string should be trimmed"); let mut value = Annotated::new(TestObject {
563 attributes: Annotated::new(attributes),
564 number: Annotated::empty(),
565 other_number: Annotated::empty(),
566 body: Annotated::new("This is longer than allowed".to_owned()),
567 footer: Annotated::empty(),
568 });
569
570 let mut processor = TrimmingProcessor::new(100);
571
572 let state = ProcessingState::new_root(Default::default(), []);
573 processor::process_value(&mut value, &mut processor, &state).unwrap();
574
575 insta::assert_json_snapshot!(SerializableAnnotated(&value), @r###"
576 {
577 "body": "This is...",
578 "attributes": {
579 "medium attribute": {
580 "type": "string",
581 "value": "..."
582 },
583 "small attribute": {
584 "type": "integer",
585 "value": 17
586 }
587 },
588 "_meta": {
589 "attributes": {
590 "": {
591 "len": 68
592 },
593 "medium attribute": {
594 "value": {
595 "": {
596 "rem": [
597 [
598 "!limit",
599 "s",
600 0,
601 3
602 ]
603 ],
604 "len": 29
605 }
606 }
607 }
608 },
609 "body": {
610 "": {
611 "rem": [
612 [
613 "!limit",
614 "s",
615 7,
616 10
617 ]
618 ],
619 "len": 27
620 }
621 }
622 }
623 }
624 "###);
625 }
626
627 #[test]
628 fn test_overaccept_number() {
629 let mut attributes = Attributes::new();
630
631 attributes.insert("small", "abcdefgh"); attributes.insert("attribute with long name", 71); attributes.insert("attribute is very large and should be removed", true); let mut value = Annotated::new(TestObject {
638 attributes: Annotated::new(attributes),
639 number: Annotated::empty(),
640 other_number: Annotated::empty(),
641 body: Annotated::new("This is longer than allowed".to_owned()),
642 footer: Annotated::empty(),
643 });
644
645 let mut processor = TrimmingProcessor::new(100);
646
647 let state = ProcessingState::new_root(Default::default(), []);
648 processor::process_value(&mut value, &mut processor, &state).unwrap();
649
650 insta::assert_json_snapshot!(SerializableAnnotated(&value), @r###"
651 {
652 "body": "This is...",
653 "attributes": {
654 "attribute is very large and should be removed": null,
655 "attribute with long name": {
656 "type": "integer",
657 "value": 71
658 },
659 "small": {
660 "type": "string",
661 "value": "abcdefgh"
662 }
663 },
664 "_meta": {
665 "attributes": {
666 "": {
667 "len": 91
668 },
669 "attribute is very large and should be removed": {
670 "": {
671 "rem": [
672 [
673 "trimmed",
674 "x"
675 ]
676 ]
677 }
678 }
679 },
680 "body": {
681 "": {
682 "rem": [
683 [
684 "!limit",
685 "s",
686 7,
687 10
688 ]
689 ],
690 "len": 27
691 }
692 }
693 }
694 }
695 "###);
696 }
697
698 #[test]
699 fn test_max_item_size() {
700 let mut attributes = Attributes::new();
701
702 attributes.insert("small", 17); attributes.insert("medium string", "This string should be trimmed"); attributes.insert("attribute is very large and should be removed", true); let mut value = Annotated::new(TestObject {
707 attributes: Annotated::new(attributes),
708 number: Annotated::new(0),
709 other_number: Annotated::new(0),
710 body: Annotated::new("Short".to_owned()),
711 footer: Annotated::new("Hello World".to_owned()),
712 });
713
714 let mut processor = TrimmingProcessor::new(100);
715
716 let state = ProcessingState::root_builder().max_bytes(50).build();
721 processor::process_value(&mut value, &mut processor, &state).unwrap();
722
723 insta::assert_json_snapshot!(SerializableAnnotated(&value), @r###"
724 {
725 "body": "Short",
726 "number": 0,
727 "other_number": 0,
728 "attributes": {
729 "attribute is very large and should be removed": null,
730 "medium string": {
731 "type": "string",
732 "value": "This s..."
733 },
734 "small": {
735 "type": "integer",
736 "value": 17
737 }
738 },
739 "footer": null,
740 "_meta": {
741 "attributes": {
742 "": {
743 "len": 101
744 },
745 "attribute is very large and should be removed": {
746 "": {
747 "rem": [
748 [
749 "trimmed",
750 "x"
751 ]
752 ]
753 }
754 },
755 "medium string": {
756 "value": {
757 "": {
758 "rem": [
759 [
760 "!limit",
761 "s",
762 6,
763 9
764 ]
765 ],
766 "len": 29
767 }
768 }
769 }
770 },
771 "footer": {
772 "": {
773 "rem": [
774 [
775 "trimmed",
776 "x"
777 ]
778 ]
779 }
780 }
781 }
782 }
783 "###);
784 }
785
786 #[test]
787 fn test_array_attribute() {
788 let mut attributes = Attributes::new();
789
790 let array = vec![
791 Annotated::new("first string".into()),
792 Annotated::new("second string".into()),
793 Annotated::new("another string".into()),
794 Annotated::new("last string".into()),
795 ];
796
797 attributes.insert(
798 "array",
799 AttributeValue {
800 ty: Annotated::new(AttributeType::Array),
801 value: Annotated::new(Value::Array(array)),
802 },
803 );
804
805 let mut value = Annotated::new(TestObject {
806 attributes: Annotated::new(attributes),
807 number: Annotated::empty(),
808 other_number: Annotated::empty(),
809 body: Annotated::new("Short".to_owned()),
810 footer: Annotated::empty(),
811 });
812
813 let mut processor = TrimmingProcessor::new(100);
814 let state = ProcessingState::new_root(Default::default(), []);
815 processor::process_value(&mut value, &mut processor, &state).unwrap();
816
817 insta::assert_json_snapshot!(SerializableAnnotated(&value), @r###"
820 {
821 "body": "Short",
822 "attributes": {
823 "array": {
824 "type": "array",
825 "value": [
826 "first string",
827 "second string",
828 "another...",
829 null
830 ]
831 }
832 },
833 "_meta": {
834 "attributes": {
835 "": {
836 "len": 55
837 },
838 "array": {
839 "value": {
840 "2": {
841 "": {
842 "rem": [
843 [
844 "!limit",
845 "s",
846 7,
847 10
848 ]
849 ],
850 "len": 14
851 }
852 },
853 "3": {
854 "": {
855 "rem": [
856 [
857 "trimmed",
858 "x"
859 ]
860 ]
861 }
862 }
863 }
864 }
865 }
866 }
867 }
868 "###);
869 }
870
871 #[test]
872 fn test_oversized_key_does_not_consume_global_limit() {
873 let mut attributes = Attributes::new();
874 attributes.insert("a", 1); attributes.insert("this_key_is_exactly_35_chars_long!!", true); let mut value = Annotated::new(TestObject {
878 body: Annotated::new("Hi".to_owned()), number: Annotated::new(0),
880 other_number: Annotated::empty(),
881 attributes: Annotated::new(attributes),
882 footer: Annotated::new("Hello World".to_owned()), });
884
885 let mut processor = TrimmingProcessor::new(100);
886 let state = ProcessingState::root_builder().max_bytes(30).build();
887 processor::process_value(&mut value, &mut processor, &state).unwrap();
888
889 insta::assert_json_snapshot!(SerializableAnnotated(&value), @r###"
890 {
891 "body": "Hi",
892 "number": 0,
893 "attributes": {
894 "a": {
895 "type": "integer",
896 "value": 1
897 },
898 "this_key_is_exactly_35_chars_long!!": null
899 },
900 "footer": "Hello World",
901 "_meta": {
902 "attributes": {
903 "": {
904 "len": 45
905 },
906 "this_key_is_exactly_35_chars_long!!": {
907 "": {
908 "rem": [
909 [
910 "trimmed",
911 "x"
912 ]
913 ]
914 }
915 }
916 }
917 }
918 }
919 "###);
920 }
921
922 #[test]
923 fn test_invalid_values() {
924 let mut attributes = Attributes::new();
925 attributes.insert("small", 17); attributes.insert("medium string", "This string should be trimmed"); attributes.insert("attribute is very large and should be removed", true); attributes
931 .0
932 .insert("removed attribute".to_owned(), Annotated::empty());
933 attributes
934 .0
935 .insert("another removed attribute".to_owned(), Annotated::empty());
936
937 let mut attributes = Annotated::new(attributes);
938
939 let state = ProcessingState::root_builder().max_bytes(40).build();
940 processor::process_value(&mut attributes, &mut TrimmingProcessor::new(20), &state).unwrap();
941 let attributes_after_trimming = attributes.clone();
942 processor::process_value(&mut attributes, &mut TrimmingProcessor::new(20), &state).unwrap();
943
944 assert_eq!(
945 &attributes, &attributes_after_trimming,
946 "trimming should be idempotent"
947 );
948
949 insta::assert_json_snapshot!(SerializableAnnotated(&attributes), @r###"
950 {
951 "medium string": {
952 "type": "string",
953 "value": "This string..."
954 },
955 "removed attribute": null,
956 "small": {
957 "type": "integer",
958 "value": 17
959 },
960 "_meta": {
961 "": {
962 "len": 143
963 },
964 "medium string": {
965 "value": {
966 "": {
967 "rem": [
968 [
969 "!limit",
970 "s",
971 11,
972 14
973 ]
974 ],
975 "len": 29
976 }
977 }
978 }
979 }
980 }
981 "###);
982 }
983}