relay_event_normalization/
clock_drift.rs

1use std::time::Duration;
2
3use chrono::{DateTime, Duration as SignedDuration, Utc};
4use relay_common::time::UnixTimestamp;
5use relay_event_schema::processor::{ProcessValue, ProcessingResult, ProcessingState, Processor};
6use relay_event_schema::protocol::{Event, Timestamp};
7use relay_protocol::{Error, ErrorKind, Meta};
8
9/// A signed correction that contains the sender's timestamp as well as the drift to the receiver.
10#[derive(Clone, Copy, Debug)]
11struct ClockCorrection {
12    sent_at: DateTime<Utc>,
13    drift: SignedDuration,
14}
15
16impl ClockCorrection {
17    fn new(sent_at: DateTime<Utc>, received_at: DateTime<Utc>) -> Self {
18        let drift = received_at - sent_at;
19        Self { sent_at, drift }
20    }
21
22    fn at_least(self, lower_bound: Duration) -> Option<Self> {
23        if self.drift.num_seconds().unsigned_abs() >= lower_bound.as_secs() {
24            Some(self)
25        } else {
26            None
27        }
28    }
29}
30
31/// Corrects clock drift based on the sender's and receivers timestamps.
32///
33/// Clock drift correction applies to all timestamps in the event protocol. This includes especially
34/// the event's timestamp, breadcrumbs and spans.
35///
36/// There is a minimum clock drift of _55 minutes_ to compensate for network latency and small clock
37/// drift on the sender's machine, but allow to detect timezone differences. For differences lower
38/// than that, no correction is performed.
39///
40/// Clock drift is corrected in both ways:
41///
42/// - The drift is added to timestamps if the received time is after the send time. This indicates
43///   that the sender's clock was lagging behind. For instance, if an event was received with
44///   yesterday's timestamp, one day is added to all timestamps.
45///
46/// - The drift is subtracted from timestamps if the received time is before the send time. This
47///   indicates that the sender's clock was running ahead. For instance, if an event was received
48///   with tomorrow's timestamp, one day is subtracted from all timestamps.
49#[derive(Debug)]
50pub struct ClockDriftProcessor {
51    received_at: DateTime<Utc>,
52    correction: Option<ClockCorrection>,
53    kind: ErrorKind,
54}
55
56impl ClockDriftProcessor {
57    /// Creates a new `ClockDriftProcessor`.
58    ///
59    /// If no `sent_at` timestamp is provided, then clock drift correction is disabled. The drift is
60    /// calculated from the signed difference between the receiver's and the sender's timestamp.
61    pub fn new(sent_at: Option<DateTime<Utc>>, received_at: DateTime<Utc>) -> Self {
62        let correction = sent_at.map(|sent_at| ClockCorrection::new(sent_at, received_at));
63
64        Self {
65            received_at,
66            correction,
67            kind: ErrorKind::ClockDrift,
68        }
69    }
70
71    /// Limits clock drift correction to a minimum duration.
72    ///
73    /// If the detected clock drift is lower than the given duration, no correction is performed and
74    /// `is_drifted` returns `false`. By default, there is no lower bound and every drift is
75    /// corrected.
76    pub fn at_least(mut self, lower_bound: Duration) -> Self {
77        self.correction = self.correction.and_then(|c| c.at_least(lower_bound));
78        self
79    }
80
81    /// Use the given error kind for the attached eventerror instead of the default
82    /// `ErrorKind::ClockDrift`.
83    pub fn error_kind(mut self, kind: ErrorKind) -> Self {
84        self.kind = kind;
85        self
86    }
87
88    /// Returns `true` if the clocks are significantly drifted.
89    pub fn is_drifted(&self) -> bool {
90        self.correction.is_some()
91    }
92
93    /// Processes the given `UnixTimestamp` by applying clock drift correction.
94    pub fn process_timestamp(&self, timestamp: &mut UnixTimestamp) {
95        if let Some(correction) = self.correction {
96            let secs = correction.drift.num_seconds();
97            *timestamp = if secs > 0 {
98                UnixTimestamp::from_secs(timestamp.as_secs() + secs as u64)
99            } else {
100                UnixTimestamp::from_secs(timestamp.as_secs() - secs.saturating_abs() as u64)
101            }
102        }
103    }
104
105    /// Processes the given [`DateTime`].
106    pub fn process_datetime(&self, datetime: &mut DateTime<Utc>) {
107        if let Some(correction) = self.correction {
108            *datetime += correction.drift;
109        }
110    }
111}
112
113impl Processor for ClockDriftProcessor {
114    fn process_event(
115        &mut self,
116        event: &mut Event,
117        _meta: &mut Meta,
118        state: &ProcessingState<'_>,
119    ) -> ProcessingResult {
120        if let Some(correction) = self.correction {
121            event.process_child_values(self, state)?;
122
123            let timestamp_meta = event.timestamp.meta_mut();
124            timestamp_meta.add_error(Error::with(self.kind.clone(), |e| {
125                e.insert("sdk_time", correction.sent_at.to_rfc3339());
126                e.insert("server_time", self.received_at.to_rfc3339());
127            }));
128        }
129
130        Ok(())
131    }
132
133    fn process_timestamp(
134        &mut self,
135        timestamp: &mut Timestamp,
136        _meta: &mut Meta,
137        _state: &ProcessingState<'_>,
138    ) -> ProcessingResult {
139        if let Some(correction) = self.correction {
140            // NB: We're not setting the original value here, as this could considerably increase
141            // the event's size. Instead, attach an error message to the top-level event.
142            *timestamp = *timestamp + correction.drift;
143        }
144
145        Ok(())
146    }
147}
148
149#[cfg(test)]
150mod tests {
151    use chrono::offset::TimeZone;
152    use relay_event_schema::processor::process_value;
153    use relay_event_schema::protocol::{Contexts, EventType, SpanId, TraceContext};
154    use relay_protocol::Annotated;
155    use similar_asserts::assert_eq;
156
157    use super::*;
158
159    fn create_transaction(start: DateTime<Utc>, end: DateTime<Utc>) -> Annotated<Event> {
160        Annotated::new(Event {
161            ty: Annotated::new(EventType::Transaction),
162            timestamp: Annotated::new(end.into()),
163            start_timestamp: Annotated::new(start.into()),
164            contexts: {
165                let mut contexts = Contexts::new();
166                contexts.add(TraceContext {
167                    trace_id: Annotated::new("4c79f60c11214eb38604f4ae0781bfb2".parse().unwrap()),
168                    span_id: Annotated::new(SpanId("fa90fdead5f74053".into())),
169                    op: Annotated::new("http.server".to_owned()),
170                    ..Default::default()
171                });
172                Annotated::new(contexts)
173            },
174            spans: Annotated::new(vec![]),
175            ..Default::default()
176        })
177    }
178
179    #[test]
180    fn test_no_sent_at() {
181        let start = Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap();
182        let end = Utc.with_ymd_and_hms(2000, 1, 2, 0, 0, 0).unwrap();
183        let now = end;
184
185        // No information on delay, do not default to anything.
186        let mut processor = ClockDriftProcessor::new(None, now);
187        let mut event = create_transaction(start, end);
188        process_value(&mut event, &mut processor, ProcessingState::root()).unwrap();
189
190        let event = event.value().unwrap();
191        assert_eq!(*event.timestamp.value().unwrap(), end);
192        assert_eq!(*event.start_timestamp.value().unwrap(), start);
193    }
194
195    #[test]
196    fn test_no_clock_drift() {
197        let start = Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap();
198        let end = Utc.with_ymd_and_hms(2000, 1, 2, 0, 0, 0).unwrap();
199
200        let now = end;
201
202        // The event was sent instantly without delay
203        let mut processor = ClockDriftProcessor::new(Some(end), now);
204        let mut event = create_transaction(start, end);
205        process_value(&mut event, &mut processor, ProcessingState::root()).unwrap();
206
207        let event = event.value().unwrap();
208        assert_eq!(*event.timestamp.value().unwrap(), end);
209        assert_eq!(*event.start_timestamp.value().unwrap(), start);
210    }
211
212    #[test]
213    fn test_clock_drift_lower_bound() {
214        let start = Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap();
215        let end = Utc.with_ymd_and_hms(2000, 1, 2, 0, 0, 0).unwrap();
216
217        let drift = SignedDuration::minutes(1);
218        let now = end + drift;
219
220        // The event was sent and received with minimal delay, which should not correct
221        let mut processor =
222            ClockDriftProcessor::new(Some(end), now).at_least(Duration::from_secs(3600));
223        let mut event = create_transaction(start, end);
224        process_value(&mut event, &mut processor, ProcessingState::root()).unwrap();
225
226        let event = event.value().unwrap();
227        assert_eq!(*event.timestamp.value().unwrap(), end);
228        assert_eq!(*event.start_timestamp.value().unwrap(), start);
229    }
230
231    #[test]
232    fn test_clock_drift_from_past() {
233        let start = Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap();
234        let end = Utc.with_ymd_and_hms(2000, 1, 2, 0, 0, 0).unwrap();
235
236        let drift = SignedDuration::days(1);
237        let now = end + drift;
238
239        // The event was sent and received with delay
240        let mut processor = ClockDriftProcessor::new(Some(end), now);
241        let mut event = create_transaction(start, end);
242        process_value(&mut event, &mut processor, ProcessingState::root()).unwrap();
243
244        let event = event.value().unwrap();
245        assert_eq!(*event.timestamp.value().unwrap(), now);
246        assert_eq!(*event.start_timestamp.value().unwrap(), start + drift);
247    }
248
249    #[test]
250    fn test_clock_drift_from_future() {
251        let start = Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap();
252        let end = Utc.with_ymd_and_hms(2000, 1, 2, 0, 0, 0).unwrap();
253
254        let drift = -SignedDuration::seconds(60);
255        let now = end + drift;
256
257        // The event was sent and received with delay
258        let mut processor = ClockDriftProcessor::new(Some(end), now);
259        let mut event = create_transaction(start, end);
260        process_value(&mut event, &mut processor, ProcessingState::root()).unwrap();
261
262        let event = event.value().unwrap();
263        assert_eq!(*event.timestamp.value().unwrap(), now);
264        assert_eq!(*event.start_timestamp.value().unwrap(), start + drift);
265    }
266
267    #[test]
268    fn test_clock_drift_unix() {
269        let sent_at = Utc.with_ymd_and_hms(2000, 1, 2, 0, 0, 0).unwrap();
270        let drift = SignedDuration::days(1);
271        let now = sent_at + drift;
272
273        let processor = ClockDriftProcessor::new(Some(sent_at), now);
274        let mut timestamp = UnixTimestamp::from_secs(sent_at.timestamp() as u64);
275        processor.process_timestamp(&mut timestamp);
276
277        assert_eq!(timestamp.as_secs(), now.timestamp() as u64);
278    }
279
280    #[test]
281    fn test_process_datetime() {
282        let sent_at = Utc.with_ymd_and_hms(2000, 1, 2, 0, 0, 0).unwrap();
283        let drift = SignedDuration::days(1);
284        let now = sent_at + drift;
285
286        let processor = ClockDriftProcessor::new(Some(sent_at), now);
287        let mut datetime = Utc.with_ymd_and_hms(2021, 11, 29, 0, 0, 0).unwrap();
288        processor.process_datetime(&mut datetime);
289
290        assert_eq!(
291            datetime,
292            Utc.with_ymd_and_hms(2021, 11, 30, 0, 0, 0).unwrap()
293        );
294    }
295}