relay_event_normalization/eap/
time.rs

1//! Time normalization for EAP items.
2
3use chrono::{DateTime, Utc};
4use relay_event_schema::{
5    processor::{self, ProcessValue, ProcessingState},
6    protocol::{OurLog, SpanV2, Timestamp, TraceMetric},
7};
8use relay_protocol::{Annotated, ErrorKind};
9use std::time::Duration;
10
11use crate::ClockDriftProcessor;
12
13/// Configuration parameters for [`normalize`].
14#[derive(Debug, Default, Clone, Copy)]
15pub struct Config {
16    /// Timestamp when the item was received.
17    pub received_at: DateTime<Utc>,
18    /// Client local timestamp when the SDK sent the item.
19    pub sent_at: Option<DateTime<Utc>>,
20    /// Maximum amount of time the timestamp is allowed to be in the past.
21    pub max_in_past: Option<Duration>,
22    /// Maximum amount of time the timestamp is allowed to be in the future.
23    pub max_in_future: Option<Duration>,
24    /// Limits clock drift correction to a minimum duration.
25    pub minimum_clock_drift: Duration,
26}
27
28/// Normalizes and validates timestamps.
29///
30/// Applies a time shift correction to correct for time drift on clients, see [`ClockDriftProcessor`].
31/// Also makes sure timestamps are within boundaries defined by [`Config::max_in_past`] and
32/// [`Config::max_in_future`].
33pub fn normalize<T>(item: &mut Annotated<T>, config: Config)
34where
35    T: TimeNormalize,
36{
37    let received_at = config.received_at;
38    let mut sent_at = config.sent_at;
39    let mut error_kind = ErrorKind::ClockDrift;
40
41    let timestamp = item
42        .value_mut()
43        .as_mut()
44        .and_then(|t| t.reference_timestamp_mut().value().copied());
45
46    if let Some(timestamp) = timestamp {
47        if config
48            .max_in_past
49            .is_some_and(|delta| timestamp < received_at - delta)
50        {
51            error_kind = ErrorKind::PastTimestamp;
52            sent_at = Some(timestamp.into_inner());
53        } else if config
54            .max_in_future
55            .is_some_and(|delta| timestamp > received_at + delta)
56        {
57            error_kind = ErrorKind::FutureTimestamp;
58            sent_at = Some(timestamp.into_inner());
59        }
60    }
61
62    let mut processor = ClockDriftProcessor::new(sent_at, received_at)
63        .at_least(config.minimum_clock_drift)
64        .error_kind(error_kind);
65
66    if processor.is_drifted() {
67        let _ = processor::process_value(item, &mut processor, ProcessingState::root());
68        if let Some(item) = item.value_mut() {
69            processor.apply_correction_meta(item.reference_timestamp_mut().meta_mut());
70        }
71    }
72}
73
74/// Items which can be processed by [`normalize`].
75pub trait TimeNormalize: ProcessValue {
76    /// The base, reference timestamp of the item used for time shifts.
77    ///
78    /// Represents the timestamp when the item was created.
79    fn reference_timestamp_mut(&mut self) -> &mut Annotated<Timestamp>;
80}
81
82impl TimeNormalize for OurLog {
83    fn reference_timestamp_mut(&mut self) -> &mut Annotated<Timestamp> {
84        &mut self.timestamp
85    }
86}
87
88impl TimeNormalize for SpanV2 {
89    fn reference_timestamp_mut(&mut self) -> &mut Annotated<Timestamp> {
90        &mut self.start_timestamp
91    }
92}
93
94impl TimeNormalize for TraceMetric {
95    fn reference_timestamp_mut(&mut self) -> &mut Annotated<Timestamp> {
96        &mut self.timestamp
97    }
98}
99
100#[cfg(test)]
101mod tests {
102    use super::*;
103
104    use relay_event_schema::processor::ProcessValue;
105    use relay_protocol::{Annotated, Empty, FromValue, IntoValue, assert_annotated_snapshot};
106
107    #[derive(Debug, Clone, FromValue, IntoValue, Empty, ProcessValue)]
108    struct TestItem {
109        base: Annotated<Timestamp>,
110        other: Annotated<Timestamp>,
111    }
112
113    impl TimeNormalize for TestItem {
114        fn reference_timestamp_mut(&mut self) -> &mut Annotated<Timestamp> {
115            &mut self.base
116        }
117    }
118
119    fn ts(secs: i64) -> Timestamp {
120        Timestamp(DateTime::from_timestamp_secs(secs).unwrap())
121    }
122
123    #[test]
124    fn test_normalize_time_no_drift() {
125        let mut item = Annotated::new(TestItem {
126            base: ts(1_000).into(),
127            other: ts(1_010).into(),
128        });
129
130        let config = Config {
131            received_at: ts(1_100).0,
132            ..Default::default()
133        };
134
135        normalize(&mut item, config);
136
137        assert_annotated_snapshot!(item, @r#"
138        {
139          "base": 1000.0,
140          "other": 1010.0
141        }
142        "#);
143    }
144
145    #[test]
146    fn test_normalize_time_client_drift() {
147        let mut item = Annotated::new(TestItem {
148            base: ts(50_000).into(),
149            other: ts(50_010).into(),
150        });
151
152        let config = Config {
153            sent_at: Some(ts(0).0),
154            received_at: ts(51_000).0,
155            ..Default::default()
156        };
157
158        normalize(&mut item, config);
159
160        assert_annotated_snapshot!(item, @r#"
161        {
162          "base": 101000.0,
163          "other": 101010.0,
164          "_meta": {
165            "base": {
166              "": {
167                "err": [
168                  [
169                    "clock_drift",
170                    {
171                      "sdk_time": "1970-01-01T00:00:00+00:00",
172                      "server_time": "1970-01-01T14:10:00+00:00"
173                    }
174                  ]
175                ]
176              }
177            }
178          }
179        }
180        "#);
181    }
182
183    #[test]
184    fn test_normalize_time_too_far_in_past() {
185        let mut item = Annotated::new(TestItem {
186            base: ts(90_000).into(),
187            other: ts(80_000).into(),
188        });
189
190        let config = Config {
191            received_at: ts(100_000).0,
192            max_in_past: Some(Duration::from_secs(10)),
193            ..Default::default()
194        };
195
196        normalize(&mut item, config);
197
198        assert_annotated_snapshot!(item, @r#"
199        {
200          "base": 100000.0,
201          "other": 90000.0,
202          "_meta": {
203            "base": {
204              "": {
205                "err": [
206                  [
207                    "past_timestamp",
208                    {
209                      "sdk_time": "1970-01-02T01:00:00+00:00",
210                      "server_time": "1970-01-02T03:46:40+00:00"
211                    }
212                  ]
213                ]
214              }
215            }
216          }
217        }
218        "#);
219    }
220
221    #[test]
222    fn test_normalize_time_too_far_in_future() {
223        let mut item = Annotated::new(TestItem {
224            base: ts(90_000).into(),
225            other: ts(80_000).into(),
226        });
227
228        let config = Config {
229            received_at: ts(10_000).0,
230            max_in_future: Some(Duration::from_secs(10)),
231            ..Default::default()
232        };
233
234        normalize(&mut item, config);
235
236        assert_annotated_snapshot!(item, @r#"
237        {
238          "base": 10000.0,
239          "other": 0.0,
240          "_meta": {
241            "base": {
242              "": {
243                "err": [
244                  [
245                    "future_timestamp",
246                    {
247                      "sdk_time": "1970-01-02T01:00:00+00:00",
248                      "server_time": "1970-01-01T02:46:40+00:00"
249                    }
250                  ]
251                ]
252              }
253            }
254          }
255        }
256        "#);
257    }
258}