relay_event_normalization/eap/
time.rs1use chrono::{DateTime, Utc};
4use relay_event_schema::{
5 processor::{self, ProcessValue, ProcessingState},
6 protocol::{OurLog, SpanV2, Timestamp, TraceMetric},
7};
8use relay_protocol::{Annotated, ErrorKind};
9use std::time::Duration;
10
11use crate::ClockDriftProcessor;
12
13#[derive(Debug, Default, Clone, Copy)]
15pub struct Config {
16 pub received_at: DateTime<Utc>,
18 pub sent_at: Option<DateTime<Utc>>,
20 pub max_in_past: Option<Duration>,
22 pub max_in_future: Option<Duration>,
24 pub minimum_clock_drift: Duration,
26}
27
28pub fn normalize<T>(item: &mut Annotated<T>, config: Config)
34where
35 T: TimeNormalize,
36{
37 let received_at = config.received_at;
38 let mut sent_at = config.sent_at;
39 let mut error_kind = ErrorKind::ClockDrift;
40
41 let timestamp = item
42 .value_mut()
43 .as_mut()
44 .and_then(|t| t.reference_timestamp_mut().value().copied());
45
46 if let Some(timestamp) = timestamp {
47 if config
48 .max_in_past
49 .is_some_and(|delta| timestamp < received_at - delta)
50 {
51 error_kind = ErrorKind::PastTimestamp;
52 sent_at = Some(timestamp.into_inner());
53 } else if config
54 .max_in_future
55 .is_some_and(|delta| timestamp > received_at + delta)
56 {
57 error_kind = ErrorKind::FutureTimestamp;
58 sent_at = Some(timestamp.into_inner());
59 }
60 }
61
62 let mut processor = ClockDriftProcessor::new(sent_at, received_at)
63 .at_least(config.minimum_clock_drift)
64 .error_kind(error_kind);
65
66 if processor.is_drifted() {
67 let _ = processor::process_value(item, &mut processor, ProcessingState::root());
68 if let Some(item) = item.value_mut() {
69 processor.apply_correction_meta(item.reference_timestamp_mut().meta_mut());
70 }
71 }
72}
73
74pub trait TimeNormalize: ProcessValue {
76 fn reference_timestamp_mut(&mut self) -> &mut Annotated<Timestamp>;
80}
81
82impl TimeNormalize for OurLog {
83 fn reference_timestamp_mut(&mut self) -> &mut Annotated<Timestamp> {
84 &mut self.timestamp
85 }
86}
87
88impl TimeNormalize for SpanV2 {
89 fn reference_timestamp_mut(&mut self) -> &mut Annotated<Timestamp> {
90 &mut self.start_timestamp
91 }
92}
93
94impl TimeNormalize for TraceMetric {
95 fn reference_timestamp_mut(&mut self) -> &mut Annotated<Timestamp> {
96 &mut self.timestamp
97 }
98}
99
100#[cfg(test)]
101mod tests {
102 use super::*;
103
104 use relay_event_schema::processor::ProcessValue;
105 use relay_protocol::{Annotated, Empty, FromValue, IntoValue, assert_annotated_snapshot};
106
107 #[derive(Debug, Clone, FromValue, IntoValue, Empty, ProcessValue)]
108 struct TestItem {
109 base: Annotated<Timestamp>,
110 other: Annotated<Timestamp>,
111 }
112
113 impl TimeNormalize for TestItem {
114 fn reference_timestamp_mut(&mut self) -> &mut Annotated<Timestamp> {
115 &mut self.base
116 }
117 }
118
119 fn ts(secs: i64) -> Timestamp {
120 Timestamp(DateTime::from_timestamp_secs(secs).unwrap())
121 }
122
123 #[test]
124 fn test_normalize_time_no_drift() {
125 let mut item = Annotated::new(TestItem {
126 base: ts(1_000).into(),
127 other: ts(1_010).into(),
128 });
129
130 let config = Config {
131 received_at: ts(1_100).0,
132 ..Default::default()
133 };
134
135 normalize(&mut item, config);
136
137 assert_annotated_snapshot!(item, @r#"
138 {
139 "base": 1000.0,
140 "other": 1010.0
141 }
142 "#);
143 }
144
145 #[test]
146 fn test_normalize_time_client_drift() {
147 let mut item = Annotated::new(TestItem {
148 base: ts(50_000).into(),
149 other: ts(50_010).into(),
150 });
151
152 let config = Config {
153 sent_at: Some(ts(0).0),
154 received_at: ts(51_000).0,
155 ..Default::default()
156 };
157
158 normalize(&mut item, config);
159
160 assert_annotated_snapshot!(item, @r#"
161 {
162 "base": 101000.0,
163 "other": 101010.0,
164 "_meta": {
165 "base": {
166 "": {
167 "err": [
168 [
169 "clock_drift",
170 {
171 "sdk_time": "1970-01-01T00:00:00+00:00",
172 "server_time": "1970-01-01T14:10:00+00:00"
173 }
174 ]
175 ]
176 }
177 }
178 }
179 }
180 "#);
181 }
182
183 #[test]
184 fn test_normalize_time_too_far_in_past() {
185 let mut item = Annotated::new(TestItem {
186 base: ts(90_000).into(),
187 other: ts(80_000).into(),
188 });
189
190 let config = Config {
191 received_at: ts(100_000).0,
192 max_in_past: Some(Duration::from_secs(10)),
193 ..Default::default()
194 };
195
196 normalize(&mut item, config);
197
198 assert_annotated_snapshot!(item, @r#"
199 {
200 "base": 100000.0,
201 "other": 90000.0,
202 "_meta": {
203 "base": {
204 "": {
205 "err": [
206 [
207 "past_timestamp",
208 {
209 "sdk_time": "1970-01-02T01:00:00+00:00",
210 "server_time": "1970-01-02T03:46:40+00:00"
211 }
212 ]
213 ]
214 }
215 }
216 }
217 }
218 "#);
219 }
220
221 #[test]
222 fn test_normalize_time_too_far_in_future() {
223 let mut item = Annotated::new(TestItem {
224 base: ts(90_000).into(),
225 other: ts(80_000).into(),
226 });
227
228 let config = Config {
229 received_at: ts(10_000).0,
230 max_in_future: Some(Duration::from_secs(10)),
231 ..Default::default()
232 };
233
234 normalize(&mut item, config);
235
236 assert_annotated_snapshot!(item, @r#"
237 {
238 "base": 10000.0,
239 "other": 0.0,
240 "_meta": {
241 "base": {
242 "": {
243 "err": [
244 [
245 "future_timestamp",
246 {
247 "sdk_time": "1970-01-02T01:00:00+00:00",
248 "server_time": "1970-01-01T02:46:40+00:00"
249 }
250 ]
251 ]
252 }
253 }
254 }
255 }
256 "#);
257 }
258}