relay_event_normalization/
clock_drift.rs

1use std::time::Duration;
2
3use chrono::{DateTime, Duration as SignedDuration, Utc};
4use relay_common::time::UnixTimestamp;
5use relay_event_schema::processor::{ProcessValue, ProcessingResult, ProcessingState, Processor};
6use relay_event_schema::protocol::{Event, Timestamp};
7use relay_protocol::{Error, ErrorKind, Meta};
8
9/// A signed correction that contains the sender's timestamp as well as the drift to the receiver.
10#[derive(Clone, Copy, Debug)]
11struct ClockCorrection {
12    sent_at: DateTime<Utc>,
13    drift: SignedDuration,
14}
15
16impl ClockCorrection {
17    fn new(sent_at: DateTime<Utc>, received_at: DateTime<Utc>) -> Self {
18        let drift = received_at - sent_at;
19        Self { sent_at, drift }
20    }
21
22    fn at_least(self, lower_bound: Duration) -> Option<Self> {
23        if self.drift.num_seconds().unsigned_abs() >= lower_bound.as_secs() {
24            Some(self)
25        } else {
26            None
27        }
28    }
29}
30
31/// Corrects clock drift based on the sender's and receivers timestamps.
32///
33/// Clock drift correction applies to all timestamps in the event protocol. This includes especially
34/// the event's timestamp, breadcrumbs and spans.
35///
36/// There is a minimum clock drift of _55 minutes_ to compensate for network latency and small clock
37/// drift on the sender's machine, but allow to detect timezone differences. For differences lower
38/// than that, no correction is performed.
39///
40/// Clock drift is corrected in both ways:
41///
42/// - The drift is added to timestamps if the received time is after the send time. This indicates
43///   that the sender's clock was lagging behind. For instance, if an event was received with
44///   yesterday's timestamp, one day is added to all timestamps.
45///
46/// - The drift is subtracted from timestamps if the received time is before the send time. This
47///   indicates that the sender's clock was running ahead. For instance, if an event was received
48///   with tomorrow's timestamp, one day is subtracted from all timestamps.
49#[derive(Debug)]
50pub struct ClockDriftProcessor {
51    received_at: DateTime<Utc>,
52    correction: Option<ClockCorrection>,
53    kind: ErrorKind,
54}
55
56impl ClockDriftProcessor {
57    /// Creates a new `ClockDriftProcessor`.
58    ///
59    /// If no `sent_at` timestamp is provided, then clock drift correction is disabled. The drift is
60    /// calculated from the signed difference between the receiver's and the sender's timestamp.
61    pub fn new(sent_at: Option<DateTime<Utc>>, received_at: DateTime<Utc>) -> Self {
62        let correction = sent_at.map(|sent_at| ClockCorrection::new(sent_at, received_at));
63
64        Self {
65            received_at,
66            correction,
67            kind: ErrorKind::ClockDrift,
68        }
69    }
70
71    /// Limits clock drift correction to a minimum duration.
72    ///
73    /// If the detected clock drift is lower than the given duration, no correction is performed and
74    /// `is_drifted` returns `false`. By default, there is no lower bound and every drift is
75    /// corrected.
76    pub fn at_least(mut self, lower_bound: Duration) -> Self {
77        self.correction = self.correction.and_then(|c| c.at_least(lower_bound));
78        self
79    }
80
81    /// Use the given error kind for the attached eventerror instead of the default
82    /// `ErrorKind::ClockDrift`.
83    pub fn error_kind(mut self, kind: ErrorKind) -> Self {
84        self.kind = kind;
85        self
86    }
87
88    /// Returns `true` if the clocks are significantly drifted.
89    pub fn is_drifted(&self) -> bool {
90        self.correction.is_some()
91    }
92
93    /// Applies correction metadata to the passed `meta`.
94    pub fn apply_correction_meta(&self, meta: &mut Meta) {
95        let Some(correction) = self.correction else {
96            return;
97        };
98
99        meta.add_error(Error::with(self.kind.clone(), |e| {
100            e.insert("sdk_time", correction.sent_at.to_rfc3339());
101            e.insert("server_time", self.received_at.to_rfc3339());
102        }));
103    }
104
105    /// Processes the given `UnixTimestamp` by applying clock drift correction.
106    pub fn process_timestamp(&self, timestamp: &mut UnixTimestamp) {
107        if let Some(correction) = self.correction {
108            let secs = correction.drift.num_seconds();
109            *timestamp = if secs > 0 {
110                UnixTimestamp::from_secs(timestamp.as_secs() + secs as u64)
111            } else {
112                UnixTimestamp::from_secs(timestamp.as_secs() - secs.saturating_abs() as u64)
113            }
114        }
115    }
116
117    /// Processes the given [`DateTime`].
118    pub fn process_datetime(&self, datetime: &mut DateTime<Utc>) {
119        if let Some(correction) = self.correction {
120            *datetime += correction.drift;
121        }
122    }
123}
124
125impl Processor for ClockDriftProcessor {
126    fn process_event(
127        &mut self,
128        event: &mut Event,
129        _meta: &mut Meta,
130        state: &ProcessingState<'_>,
131    ) -> ProcessingResult {
132        if self.correction.is_some() {
133            event.process_child_values(self, state)?;
134            self.apply_correction_meta(event.timestamp.meta_mut());
135        }
136
137        Ok(())
138    }
139
140    fn process_timestamp(
141        &mut self,
142        timestamp: &mut Timestamp,
143        _meta: &mut Meta,
144        _state: &ProcessingState<'_>,
145    ) -> ProcessingResult {
146        if let Some(correction) = self.correction {
147            // NB: We're not setting the original value here, as this could considerably increase
148            // the event's size. Instead, attach an error message to the top-level event.
149            *timestamp = *timestamp + correction.drift;
150        }
151
152        Ok(())
153    }
154}
155
156#[cfg(test)]
157mod tests {
158    use chrono::offset::TimeZone;
159    use relay_event_schema::processor::process_value;
160    use relay_event_schema::protocol::{Contexts, EventType, TraceContext};
161    use relay_protocol::Annotated;
162    use similar_asserts::assert_eq;
163
164    use super::*;
165
166    fn create_transaction(start: DateTime<Utc>, end: DateTime<Utc>) -> Annotated<Event> {
167        Annotated::new(Event {
168            ty: Annotated::new(EventType::Transaction),
169            timestamp: Annotated::new(end.into()),
170            start_timestamp: Annotated::new(start.into()),
171            contexts: {
172                let mut contexts = Contexts::new();
173                contexts.add(TraceContext {
174                    trace_id: Annotated::new("4c79f60c11214eb38604f4ae0781bfb2".parse().unwrap()),
175                    span_id: Annotated::new("fa90fdead5f74053".parse().unwrap()),
176                    op: Annotated::new("http.server".to_owned()),
177                    ..Default::default()
178                });
179                Annotated::new(contexts)
180            },
181            spans: Annotated::new(vec![]),
182            ..Default::default()
183        })
184    }
185
186    #[test]
187    fn test_no_sent_at() {
188        let start = Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap();
189        let end = Utc.with_ymd_and_hms(2000, 1, 2, 0, 0, 0).unwrap();
190        let now = end;
191
192        // No information on delay, do not default to anything.
193        let mut processor = ClockDriftProcessor::new(None, now);
194        let mut event = create_transaction(start, end);
195        process_value(&mut event, &mut processor, ProcessingState::root()).unwrap();
196
197        let event = event.value().unwrap();
198        assert_eq!(*event.timestamp.value().unwrap(), end);
199        assert_eq!(*event.start_timestamp.value().unwrap(), start);
200    }
201
202    #[test]
203    fn test_no_clock_drift() {
204        let start = Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap();
205        let end = Utc.with_ymd_and_hms(2000, 1, 2, 0, 0, 0).unwrap();
206
207        let now = end;
208
209        // The event was sent instantly without delay
210        let mut processor = ClockDriftProcessor::new(Some(end), now);
211        let mut event = create_transaction(start, end);
212        process_value(&mut event, &mut processor, ProcessingState::root()).unwrap();
213
214        let event = event.value().unwrap();
215        assert_eq!(*event.timestamp.value().unwrap(), end);
216        assert_eq!(*event.start_timestamp.value().unwrap(), start);
217    }
218
219    #[test]
220    fn test_clock_drift_lower_bound() {
221        let start = Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap();
222        let end = Utc.with_ymd_and_hms(2000, 1, 2, 0, 0, 0).unwrap();
223
224        let drift = SignedDuration::minutes(1);
225        let now = end + drift;
226
227        // The event was sent and received with minimal delay, which should not correct
228        let mut processor =
229            ClockDriftProcessor::new(Some(end), now).at_least(Duration::from_secs(3600));
230        let mut event = create_transaction(start, end);
231        process_value(&mut event, &mut processor, ProcessingState::root()).unwrap();
232
233        let event = event.value().unwrap();
234        assert_eq!(*event.timestamp.value().unwrap(), end);
235        assert_eq!(*event.start_timestamp.value().unwrap(), start);
236    }
237
238    #[test]
239    fn test_clock_drift_from_past() {
240        let start = Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap();
241        let end = Utc.with_ymd_and_hms(2000, 1, 2, 0, 0, 0).unwrap();
242
243        let drift = SignedDuration::days(1);
244        let now = end + drift;
245
246        // The event was sent and received with delay
247        let mut processor = ClockDriftProcessor::new(Some(end), now);
248        let mut event = create_transaction(start, end);
249        process_value(&mut event, &mut processor, ProcessingState::root()).unwrap();
250
251        let event = event.value().unwrap();
252        assert_eq!(*event.timestamp.value().unwrap(), now);
253        assert_eq!(*event.start_timestamp.value().unwrap(), start + drift);
254    }
255
256    #[test]
257    fn test_clock_drift_from_future() {
258        let start = Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap();
259        let end = Utc.with_ymd_and_hms(2000, 1, 2, 0, 0, 0).unwrap();
260
261        let drift = -SignedDuration::seconds(60);
262        let now = end + drift;
263
264        // The event was sent and received with delay
265        let mut processor = ClockDriftProcessor::new(Some(end), now);
266        let mut event = create_transaction(start, end);
267        process_value(&mut event, &mut processor, ProcessingState::root()).unwrap();
268
269        let event = event.value().unwrap();
270        assert_eq!(*event.timestamp.value().unwrap(), now);
271        assert_eq!(*event.start_timestamp.value().unwrap(), start + drift);
272    }
273
274    #[test]
275    fn test_clock_drift_unix() {
276        let sent_at = Utc.with_ymd_and_hms(2000, 1, 2, 0, 0, 0).unwrap();
277        let drift = SignedDuration::days(1);
278        let now = sent_at + drift;
279
280        let processor = ClockDriftProcessor::new(Some(sent_at), now);
281        let mut timestamp = UnixTimestamp::from_secs(sent_at.timestamp() as u64);
282        processor.process_timestamp(&mut timestamp);
283
284        assert_eq!(timestamp.as_secs(), now.timestamp() as u64);
285    }
286
287    #[test]
288    fn test_process_datetime() {
289        let sent_at = Utc.with_ymd_and_hms(2000, 1, 2, 0, 0, 0).unwrap();
290        let drift = SignedDuration::days(1);
291        let now = sent_at + drift;
292
293        let processor = ClockDriftProcessor::new(Some(sent_at), now);
294        let mut datetime = Utc.with_ymd_and_hms(2021, 11, 29, 0, 0, 0).unwrap();
295        processor.process_datetime(&mut datetime);
296
297        assert_eq!(
298            datetime,
299            Utc.with_ymd_and_hms(2021, 11, 30, 0, 0, 0).unwrap()
300        );
301    }
302}