Skip to main content

relay_event_normalization/eap/
time.rs

1//! Time normalization for EAP items.
2
3use chrono::{DateTime, Utc};
4use relay_conventions::attributes::SENTRY__TIMESTAMP__SEQUENCE;
5use relay_event_schema::{
6    processor::{self, ProcessValue, ProcessingState},
7    protocol::{Attributes, OurLog, Replay, SpanV2, Timestamp, TraceMetric},
8};
9use relay_protocol::{Annotated, ErrorKind, Remark, RemarkType};
10use std::time::Duration;
11
12use crate::ClockDriftProcessor;
13
14/// Configuration parameters for [`normalize`].
15#[derive(Debug, Default, Clone, Copy)]
16pub struct Config {
17    /// Apply a time sequence shift as provided by the SDK on the timestamp.
18    ///
19    /// This must only run once, to not shift the same timestamp multiple times and therefore should
20    /// be limited to processing Relays.
21    pub apply_sequence_shift: bool,
22    /// Timestamp when the item was received.
23    pub received_at: DateTime<Utc>,
24    /// Client local timestamp when the SDK sent the item.
25    pub sent_at: Option<DateTime<Utc>>,
26    /// Maximum amount of time the timestamp is allowed to be in the past.
27    pub max_in_past: Option<Duration>,
28    /// Maximum amount of time the timestamp is allowed to be in the future.
29    pub max_in_future: Option<Duration>,
30    /// Limits clock drift correction to a minimum duration.
31    pub minimum_clock_drift: Duration,
32}
33
34/// Normalizes and validates timestamps.
35///
36/// Applies a time shift correction to correct for time drift on clients, see [`ClockDriftProcessor`].
37/// Also makes sure timestamps are within boundaries defined by [`Config::max_in_past`] and
38/// [`Config::max_in_future`].
39pub fn normalize<T>(item: &mut Annotated<T>, config: Config)
40where
41    T: TimeNormalize,
42{
43    let received_at = config.received_at;
44    let mut sent_at = config.sent_at;
45    let mut error_kind = ErrorKind::ClockDrift;
46
47    let timestamp = item
48        .value_mut()
49        .as_mut()
50        .map(|t| t.reference_timestamp_mut())
51        .and_then(|ts| ts.value().copied());
52
53    if let Some(timestamp) = timestamp {
54        if config
55            .max_in_past
56            .is_some_and(|delta| timestamp < received_at - delta)
57        {
58            error_kind = ErrorKind::PastTimestamp;
59            sent_at = Some(timestamp.into_inner());
60        } else if config
61            .max_in_future
62            .is_some_and(|delta| timestamp > received_at + delta)
63        {
64            error_kind = ErrorKind::FutureTimestamp;
65            sent_at = Some(timestamp.into_inner());
66        }
67    }
68
69    let mut processor = ClockDriftProcessor::new(sent_at, received_at)
70        .at_least(config.minimum_clock_drift)
71        .error_kind(error_kind);
72
73    if processor.is_drifted() {
74        let _ = processor::process_value(item, &mut processor, ProcessingState::root());
75        if let Some(item) = item.value_mut() {
76            processor.apply_correction_meta(item.reference_timestamp_mut().meta_mut());
77        }
78    }
79
80    let sequence = item
81        .value()
82        .and_then(|t| t.timestamp_sequence())
83        .filter(|d| *d > 0);
84
85    let timestamp = item
86        .value_mut()
87        .as_mut()
88        .map(|t| t.reference_timestamp_mut());
89
90    if config.apply_sequence_shift
91        && let Some(sequence) = sequence
92        && let Some(ts) = timestamp
93        && let Some(ts_value) = ts.value_mut()
94    {
95        // Always unconditionally apply the time-shift, this puts us potentially slightly over `max_in_future`,
96        // by up to ~5s, but this is preferable over losing the ordering.
97        ts_value.0 += chrono::TimeDelta::nanoseconds(sequence.into());
98        ts.meta_mut()
99            .add_remark(Remark::new(RemarkType::Substituted, "timestamp.sequence"));
100    }
101}
102
103/// Items which can be processed by [`normalize`].
104pub trait TimeNormalize: ProcessValue {
105    /// The base, reference timestamp of the item used for time shifts.
106    ///
107    /// Represents the timestamp when the item was created.
108    fn reference_timestamp_mut(&mut self) -> &mut Annotated<Timestamp>;
109
110    /// A tie breaker sent from SDKs for timestamps.
111    ///
112    /// This is usually stored in [`SENTRY__TIMESTAMP__SEQUENCE`] and applied as additional
113    /// nanoseconds to the timestamp.
114    fn timestamp_sequence(&self) -> Option<u32>;
115}
116
117impl TimeNormalize for OurLog {
118    fn reference_timestamp_mut(&mut self) -> &mut Annotated<Timestamp> {
119        &mut self.timestamp
120    }
121
122    fn timestamp_sequence(&self) -> Option<u32> {
123        get_timestamp_sequence(&self.attributes)
124    }
125}
126
127impl TimeNormalize for SpanV2 {
128    fn reference_timestamp_mut(&mut self) -> &mut Annotated<Timestamp> {
129        &mut self.start_timestamp
130    }
131
132    fn timestamp_sequence(&self) -> Option<u32> {
133        // Not supported for spans.
134        //
135        // If this ever becomes necessary to add, extra care must be taken to not create invalid
136        // spans where the start timestamp is moved after the end timestamp.
137        None
138    }
139}
140
141impl TimeNormalize for TraceMetric {
142    fn reference_timestamp_mut(&mut self) -> &mut Annotated<Timestamp> {
143        &mut self.timestamp
144    }
145
146    fn timestamp_sequence(&self) -> Option<u32> {
147        get_timestamp_sequence(&self.attributes)
148    }
149}
150
151impl TimeNormalize for Replay {
152    fn reference_timestamp_mut(&mut self) -> &mut Annotated<Timestamp> {
153        &mut self.timestamp
154    }
155
156    fn timestamp_sequence(&self) -> Option<u32> {
157        None
158    }
159}
160
161fn get_timestamp_sequence(attributes: &Annotated<Attributes>) -> Option<u32> {
162    attributes
163        .value()
164        .and_then(|attrs| attrs.get_value(SENTRY__TIMESTAMP__SEQUENCE))
165        .and_then(|v| v.as_f64())
166        .map(|v| v as _)
167}
168
169#[cfg(test)]
170mod tests {
171    use super::*;
172
173    use relay_event_schema::processor::ProcessValue;
174    use relay_protocol::{
175        Annotated, Empty, FromValue, IntoValue, assert_annotated_snapshot, get_value,
176    };
177
178    #[derive(Debug, Clone, FromValue, IntoValue, Empty, ProcessValue)]
179    struct TestItem {
180        base: Annotated<Timestamp>,
181        other: Annotated<Timestamp>,
182    }
183
184    impl TimeNormalize for TestItem {
185        fn reference_timestamp_mut(&mut self) -> &mut Annotated<Timestamp> {
186            &mut self.base
187        }
188
189        fn timestamp_sequence(&self) -> Option<u32> {
190            Some(123)
191        }
192    }
193
194    fn ts(secs: i64) -> Timestamp {
195        Timestamp(DateTime::from_timestamp_secs(secs).unwrap())
196    }
197
198    #[test]
199    fn test_normalize_time_no_drift() {
200        let mut item = Annotated::new(TestItem {
201            base: ts(1_000).into(),
202            other: ts(1_010).into(),
203        });
204
205        let config = Config {
206            received_at: ts(1_100).0,
207            ..Default::default()
208        };
209
210        normalize(&mut item, config);
211
212        assert_annotated_snapshot!(item, @r#"
213        {
214          "base": 1000.0,
215          "other": 1010.0
216        }
217        "#);
218    }
219
220    #[test]
221    fn test_normalize_time_client_drift() {
222        let mut item = Annotated::new(TestItem {
223            base: ts(50_000).into(),
224            other: ts(50_010).into(),
225        });
226
227        let config = Config {
228            sent_at: Some(ts(0).0),
229            received_at: ts(51_000).0,
230            ..Default::default()
231        };
232
233        normalize(&mut item, config);
234
235        assert_annotated_snapshot!(item, @r#"
236        {
237          "base": 101000.0,
238          "other": 101010.0,
239          "_meta": {
240            "base": {
241              "": {
242                "err": [
243                  [
244                    "clock_drift",
245                    {
246                      "sdk_time": "1970-01-01T00:00:00+00:00",
247                      "server_time": "1970-01-01T14:10:00+00:00"
248                    }
249                  ]
250                ]
251              }
252            }
253          }
254        }
255        "#);
256    }
257
258    #[test]
259    fn test_normalize_time_too_far_in_past() {
260        let mut item = Annotated::new(TestItem {
261            base: ts(90_000).into(),
262            other: ts(80_000).into(),
263        });
264
265        let config = Config {
266            received_at: ts(100_000).0,
267            max_in_past: Some(Duration::from_secs(10)),
268            ..Default::default()
269        };
270
271        normalize(&mut item, config);
272
273        assert_annotated_snapshot!(item, @r#"
274        {
275          "base": 100000.0,
276          "other": 90000.0,
277          "_meta": {
278            "base": {
279              "": {
280                "err": [
281                  [
282                    "past_timestamp",
283                    {
284                      "sdk_time": "1970-01-02T01:00:00+00:00",
285                      "server_time": "1970-01-02T03:46:40+00:00"
286                    }
287                  ]
288                ]
289              }
290            }
291          }
292        }
293        "#);
294    }
295
296    #[test]
297    fn test_normalize_time_too_far_in_future() {
298        let mut item = Annotated::new(TestItem {
299            base: ts(90_000).into(),
300            other: ts(80_000).into(),
301        });
302
303        let config = Config {
304            received_at: ts(10_000).0,
305            max_in_future: Some(Duration::from_secs(10)),
306            ..Default::default()
307        };
308
309        normalize(&mut item, config);
310
311        assert_annotated_snapshot!(item, @r#"
312        {
313          "base": 10000.0,
314          "other": 0.0,
315          "_meta": {
316            "base": {
317              "": {
318                "err": [
319                  [
320                    "future_timestamp",
321                    {
322                      "sdk_time": "1970-01-02T01:00:00+00:00",
323                      "server_time": "1970-01-01T02:46:40+00:00"
324                    }
325                  ]
326                ]
327              }
328            }
329          }
330        }
331        "#);
332    }
333
334    #[test]
335    fn test_normalize_time_sequence_shift() {
336        let mut item = Annotated::new(TestItem {
337            base: ts(90_000).into(),
338            other: ts(80_000).into(),
339        });
340
341        let config = Config {
342            apply_sequence_shift: true,
343            ..Default::default()
344        };
345
346        normalize(&mut item, config);
347
348        insta::assert_json_snapshot!(IntoValue::extract_meta_tree(&item), @r#"
349        {
350          "base": {
351            "": {
352              "rem": [
353                [
354                  "timestamp.sequence",
355                  "s"
356                ]
357              ]
358            }
359          }
360        }
361        "#);
362
363        // Need to assert the raw values instead of a snapshot because the serialization format of
364        // `Timestamp` is not precise enough for nanosecond precision.
365        assert_eq!(
366            get_value!(item.base!).0,
367            DateTime::from_timestamp_secs(90_000).unwrap() + chrono::TimeDelta::nanoseconds(123)
368        );
369        assert_eq!(
370            get_value!(item.other!).0,
371            DateTime::from_timestamp_secs(80_000).unwrap()
372        );
373    }
374
375    #[test]
376    fn test_normalize_time_sequence_shift_and_correction() {
377        let mut item = Annotated::new(TestItem {
378            base: ts(90_000).into(),
379            other: ts(80_000).into(),
380        });
381
382        let config = Config {
383            apply_sequence_shift: true,
384            received_at: ts(10_000).0,
385            max_in_future: Some(Duration::from_secs(10)),
386            ..Default::default()
387        };
388
389        normalize(&mut item, config);
390
391        insta::assert_json_snapshot!(IntoValue::extract_meta_tree(&item), @r#"
392        {
393          "base": {
394            "": {
395              "rem": [
396                [
397                  "timestamp.sequence",
398                  "s"
399                ]
400              ],
401              "err": [
402                [
403                  "future_timestamp",
404                  {
405                    "sdk_time": "1970-01-02T01:00:00+00:00",
406                    "server_time": "1970-01-01T02:46:40+00:00"
407                  }
408                ]
409              ]
410            }
411          }
412        }
413        "#);
414
415        assert_eq!(
416            get_value!(item.base!).0,
417            DateTime::from_timestamp_secs(10_000).unwrap() + chrono::TimeDelta::nanoseconds(123)
418        );
419        assert_eq!(
420            get_value!(item.other!).0,
421            DateTime::from_timestamp_secs(0).unwrap()
422        );
423    }
424}