relay_event_normalization/eap/
time.rs1use chrono::{DateTime, Utc};
4use relay_conventions::attributes::SENTRY__TIMESTAMP__SEQUENCE;
5use relay_event_schema::{
6 processor::{self, ProcessValue, ProcessingState},
7 protocol::{Attributes, OurLog, Replay, SpanV2, Timestamp, TraceMetric},
8};
9use relay_protocol::{Annotated, ErrorKind, Remark, RemarkType};
10use std::time::Duration;
11
12use crate::ClockDriftProcessor;
13
14#[derive(Debug, Default, Clone, Copy)]
16pub struct Config {
17 pub apply_sequence_shift: bool,
22 pub received_at: DateTime<Utc>,
24 pub sent_at: Option<DateTime<Utc>>,
26 pub max_in_past: Option<Duration>,
28 pub max_in_future: Option<Duration>,
30 pub minimum_clock_drift: Duration,
32}
33
34pub fn normalize<T>(item: &mut Annotated<T>, config: Config)
40where
41 T: TimeNormalize,
42{
43 let received_at = config.received_at;
44 let mut sent_at = config.sent_at;
45 let mut error_kind = ErrorKind::ClockDrift;
46
47 let timestamp = item
48 .value_mut()
49 .as_mut()
50 .map(|t| t.reference_timestamp_mut())
51 .and_then(|ts| ts.value().copied());
52
53 if let Some(timestamp) = timestamp {
54 if config
55 .max_in_past
56 .is_some_and(|delta| timestamp < received_at - delta)
57 {
58 error_kind = ErrorKind::PastTimestamp;
59 sent_at = Some(timestamp.into_inner());
60 } else if config
61 .max_in_future
62 .is_some_and(|delta| timestamp > received_at + delta)
63 {
64 error_kind = ErrorKind::FutureTimestamp;
65 sent_at = Some(timestamp.into_inner());
66 }
67 }
68
69 let mut processor = ClockDriftProcessor::new(sent_at, received_at)
70 .at_least(config.minimum_clock_drift)
71 .error_kind(error_kind);
72
73 if processor.is_drifted() {
74 let _ = processor::process_value(item, &mut processor, ProcessingState::root());
75 if let Some(item) = item.value_mut() {
76 processor.apply_correction_meta(item.reference_timestamp_mut().meta_mut());
77 }
78 }
79
80 let sequence = item
81 .value()
82 .and_then(|t| t.timestamp_sequence())
83 .filter(|d| *d > 0);
84
85 let timestamp = item
86 .value_mut()
87 .as_mut()
88 .map(|t| t.reference_timestamp_mut());
89
90 if config.apply_sequence_shift
91 && let Some(sequence) = sequence
92 && let Some(ts) = timestamp
93 && let Some(ts_value) = ts.value_mut()
94 {
95 ts_value.0 += chrono::TimeDelta::nanoseconds(sequence.into());
98 ts.meta_mut()
99 .add_remark(Remark::new(RemarkType::Substituted, "timestamp.sequence"));
100 }
101}
102
103pub trait TimeNormalize: ProcessValue {
105 fn reference_timestamp_mut(&mut self) -> &mut Annotated<Timestamp>;
109
110 fn timestamp_sequence(&self) -> Option<u32>;
115}
116
117impl TimeNormalize for OurLog {
118 fn reference_timestamp_mut(&mut self) -> &mut Annotated<Timestamp> {
119 &mut self.timestamp
120 }
121
122 fn timestamp_sequence(&self) -> Option<u32> {
123 get_timestamp_sequence(&self.attributes)
124 }
125}
126
127impl TimeNormalize for SpanV2 {
128 fn reference_timestamp_mut(&mut self) -> &mut Annotated<Timestamp> {
129 &mut self.start_timestamp
130 }
131
132 fn timestamp_sequence(&self) -> Option<u32> {
133 None
138 }
139}
140
141impl TimeNormalize for TraceMetric {
142 fn reference_timestamp_mut(&mut self) -> &mut Annotated<Timestamp> {
143 &mut self.timestamp
144 }
145
146 fn timestamp_sequence(&self) -> Option<u32> {
147 get_timestamp_sequence(&self.attributes)
148 }
149}
150
151impl TimeNormalize for Replay {
152 fn reference_timestamp_mut(&mut self) -> &mut Annotated<Timestamp> {
153 &mut self.timestamp
154 }
155
156 fn timestamp_sequence(&self) -> Option<u32> {
157 None
158 }
159}
160
161fn get_timestamp_sequence(attributes: &Annotated<Attributes>) -> Option<u32> {
162 attributes
163 .value()
164 .and_then(|attrs| attrs.get_value(SENTRY__TIMESTAMP__SEQUENCE))
165 .and_then(|v| v.as_f64())
166 .map(|v| v as _)
167}
168
169#[cfg(test)]
170mod tests {
171 use super::*;
172
173 use relay_event_schema::processor::ProcessValue;
174 use relay_protocol::{
175 Annotated, Empty, FromValue, IntoValue, assert_annotated_snapshot, get_value,
176 };
177
178 #[derive(Debug, Clone, FromValue, IntoValue, Empty, ProcessValue)]
179 struct TestItem {
180 base: Annotated<Timestamp>,
181 other: Annotated<Timestamp>,
182 }
183
184 impl TimeNormalize for TestItem {
185 fn reference_timestamp_mut(&mut self) -> &mut Annotated<Timestamp> {
186 &mut self.base
187 }
188
189 fn timestamp_sequence(&self) -> Option<u32> {
190 Some(123)
191 }
192 }
193
194 fn ts(secs: i64) -> Timestamp {
195 Timestamp(DateTime::from_timestamp_secs(secs).unwrap())
196 }
197
198 #[test]
199 fn test_normalize_time_no_drift() {
200 let mut item = Annotated::new(TestItem {
201 base: ts(1_000).into(),
202 other: ts(1_010).into(),
203 });
204
205 let config = Config {
206 received_at: ts(1_100).0,
207 ..Default::default()
208 };
209
210 normalize(&mut item, config);
211
212 assert_annotated_snapshot!(item, @r#"
213 {
214 "base": 1000.0,
215 "other": 1010.0
216 }
217 "#);
218 }
219
220 #[test]
221 fn test_normalize_time_client_drift() {
222 let mut item = Annotated::new(TestItem {
223 base: ts(50_000).into(),
224 other: ts(50_010).into(),
225 });
226
227 let config = Config {
228 sent_at: Some(ts(0).0),
229 received_at: ts(51_000).0,
230 ..Default::default()
231 };
232
233 normalize(&mut item, config);
234
235 assert_annotated_snapshot!(item, @r#"
236 {
237 "base": 101000.0,
238 "other": 101010.0,
239 "_meta": {
240 "base": {
241 "": {
242 "err": [
243 [
244 "clock_drift",
245 {
246 "sdk_time": "1970-01-01T00:00:00+00:00",
247 "server_time": "1970-01-01T14:10:00+00:00"
248 }
249 ]
250 ]
251 }
252 }
253 }
254 }
255 "#);
256 }
257
258 #[test]
259 fn test_normalize_time_too_far_in_past() {
260 let mut item = Annotated::new(TestItem {
261 base: ts(90_000).into(),
262 other: ts(80_000).into(),
263 });
264
265 let config = Config {
266 received_at: ts(100_000).0,
267 max_in_past: Some(Duration::from_secs(10)),
268 ..Default::default()
269 };
270
271 normalize(&mut item, config);
272
273 assert_annotated_snapshot!(item, @r#"
274 {
275 "base": 100000.0,
276 "other": 90000.0,
277 "_meta": {
278 "base": {
279 "": {
280 "err": [
281 [
282 "past_timestamp",
283 {
284 "sdk_time": "1970-01-02T01:00:00+00:00",
285 "server_time": "1970-01-02T03:46:40+00:00"
286 }
287 ]
288 ]
289 }
290 }
291 }
292 }
293 "#);
294 }
295
296 #[test]
297 fn test_normalize_time_too_far_in_future() {
298 let mut item = Annotated::new(TestItem {
299 base: ts(90_000).into(),
300 other: ts(80_000).into(),
301 });
302
303 let config = Config {
304 received_at: ts(10_000).0,
305 max_in_future: Some(Duration::from_secs(10)),
306 ..Default::default()
307 };
308
309 normalize(&mut item, config);
310
311 assert_annotated_snapshot!(item, @r#"
312 {
313 "base": 10000.0,
314 "other": 0.0,
315 "_meta": {
316 "base": {
317 "": {
318 "err": [
319 [
320 "future_timestamp",
321 {
322 "sdk_time": "1970-01-02T01:00:00+00:00",
323 "server_time": "1970-01-01T02:46:40+00:00"
324 }
325 ]
326 ]
327 }
328 }
329 }
330 }
331 "#);
332 }
333
334 #[test]
335 fn test_normalize_time_sequence_shift() {
336 let mut item = Annotated::new(TestItem {
337 base: ts(90_000).into(),
338 other: ts(80_000).into(),
339 });
340
341 let config = Config {
342 apply_sequence_shift: true,
343 ..Default::default()
344 };
345
346 normalize(&mut item, config);
347
348 insta::assert_json_snapshot!(IntoValue::extract_meta_tree(&item), @r#"
349 {
350 "base": {
351 "": {
352 "rem": [
353 [
354 "timestamp.sequence",
355 "s"
356 ]
357 ]
358 }
359 }
360 }
361 "#);
362
363 assert_eq!(
366 get_value!(item.base!).0,
367 DateTime::from_timestamp_secs(90_000).unwrap() + chrono::TimeDelta::nanoseconds(123)
368 );
369 assert_eq!(
370 get_value!(item.other!).0,
371 DateTime::from_timestamp_secs(80_000).unwrap()
372 );
373 }
374
375 #[test]
376 fn test_normalize_time_sequence_shift_and_correction() {
377 let mut item = Annotated::new(TestItem {
378 base: ts(90_000).into(),
379 other: ts(80_000).into(),
380 });
381
382 let config = Config {
383 apply_sequence_shift: true,
384 received_at: ts(10_000).0,
385 max_in_future: Some(Duration::from_secs(10)),
386 ..Default::default()
387 };
388
389 normalize(&mut item, config);
390
391 insta::assert_json_snapshot!(IntoValue::extract_meta_tree(&item), @r#"
392 {
393 "base": {
394 "": {
395 "rem": [
396 [
397 "timestamp.sequence",
398 "s"
399 ]
400 ],
401 "err": [
402 [
403 "future_timestamp",
404 {
405 "sdk_time": "1970-01-02T01:00:00+00:00",
406 "server_time": "1970-01-01T02:46:40+00:00"
407 }
408 ]
409 ]
410 }
411 }
412 }
413 "#);
414
415 assert_eq!(
416 get_value!(item.base!).0,
417 DateTime::from_timestamp_secs(10_000).unwrap() + chrono::TimeDelta::nanoseconds(123)
418 );
419 assert_eq!(
420 get_value!(item.other!).0,
421 DateTime::from_timestamp_secs(0).unwrap()
422 );
423 }
424}