relay_event_normalization/
clock_drift.rs1use std::time::Duration;
2
3use chrono::{DateTime, Duration as SignedDuration, Utc};
4use relay_common::time::UnixTimestamp;
5use relay_event_schema::processor::{ProcessValue, ProcessingResult, ProcessingState, Processor};
6use relay_event_schema::protocol::{Event, Timestamp};
7use relay_protocol::{Error, ErrorKind, Meta};
8
9#[derive(Clone, Copy, Debug)]
11struct ClockCorrection {
12 sent_at: DateTime<Utc>,
13 drift: SignedDuration,
14}
15
16impl ClockCorrection {
17 fn new(sent_at: DateTime<Utc>, received_at: DateTime<Utc>) -> Self {
18 let drift = received_at - sent_at;
19 Self { sent_at, drift }
20 }
21
22 fn at_least(self, lower_bound: Duration) -> Option<Self> {
23 if self.drift.num_seconds().unsigned_abs() >= lower_bound.as_secs() {
24 Some(self)
25 } else {
26 None
27 }
28 }
29}
30
31#[derive(Debug)]
50pub struct ClockDriftProcessor {
51 received_at: DateTime<Utc>,
52 correction: Option<ClockCorrection>,
53 kind: ErrorKind,
54}
55
56impl ClockDriftProcessor {
57 pub fn new(sent_at: Option<DateTime<Utc>>, received_at: DateTime<Utc>) -> Self {
62 let correction = sent_at.map(|sent_at| ClockCorrection::new(sent_at, received_at));
63
64 Self {
65 received_at,
66 correction,
67 kind: ErrorKind::ClockDrift,
68 }
69 }
70
71 pub fn at_least(mut self, lower_bound: Duration) -> Self {
77 self.correction = self.correction.and_then(|c| c.at_least(lower_bound));
78 self
79 }
80
81 pub fn error_kind(mut self, kind: ErrorKind) -> Self {
84 self.kind = kind;
85 self
86 }
87
88 pub fn is_drifted(&self) -> bool {
90 self.correction.is_some()
91 }
92
93 pub fn process_timestamp(&self, timestamp: &mut UnixTimestamp) {
95 if let Some(correction) = self.correction {
96 let secs = correction.drift.num_seconds();
97 *timestamp = if secs > 0 {
98 UnixTimestamp::from_secs(timestamp.as_secs() + secs as u64)
99 } else {
100 UnixTimestamp::from_secs(timestamp.as_secs() - secs.saturating_abs() as u64)
101 }
102 }
103 }
104
105 pub fn process_datetime(&self, datetime: &mut DateTime<Utc>) {
107 if let Some(correction) = self.correction {
108 *datetime += correction.drift;
109 }
110 }
111}
112
113impl Processor for ClockDriftProcessor {
114 fn process_event(
115 &mut self,
116 event: &mut Event,
117 _meta: &mut Meta,
118 state: &ProcessingState<'_>,
119 ) -> ProcessingResult {
120 if let Some(correction) = self.correction {
121 event.process_child_values(self, state)?;
122
123 let timestamp_meta = event.timestamp.meta_mut();
124 timestamp_meta.add_error(Error::with(self.kind.clone(), |e| {
125 e.insert("sdk_time", correction.sent_at.to_rfc3339());
126 e.insert("server_time", self.received_at.to_rfc3339());
127 }));
128 }
129
130 Ok(())
131 }
132
133 fn process_timestamp(
134 &mut self,
135 timestamp: &mut Timestamp,
136 _meta: &mut Meta,
137 _state: &ProcessingState<'_>,
138 ) -> ProcessingResult {
139 if let Some(correction) = self.correction {
140 *timestamp = *timestamp + correction.drift;
143 }
144
145 Ok(())
146 }
147}
148
149#[cfg(test)]
150mod tests {
151 use chrono::offset::TimeZone;
152 use relay_event_schema::processor::process_value;
153 use relay_event_schema::protocol::{Contexts, EventType, SpanId, TraceContext};
154 use relay_protocol::Annotated;
155 use similar_asserts::assert_eq;
156
157 use super::*;
158
159 fn create_transaction(start: DateTime<Utc>, end: DateTime<Utc>) -> Annotated<Event> {
160 Annotated::new(Event {
161 ty: Annotated::new(EventType::Transaction),
162 timestamp: Annotated::new(end.into()),
163 start_timestamp: Annotated::new(start.into()),
164 contexts: {
165 let mut contexts = Contexts::new();
166 contexts.add(TraceContext {
167 trace_id: Annotated::new("4c79f60c11214eb38604f4ae0781bfb2".parse().unwrap()),
168 span_id: Annotated::new(SpanId("fa90fdead5f74053".into())),
169 op: Annotated::new("http.server".to_owned()),
170 ..Default::default()
171 });
172 Annotated::new(contexts)
173 },
174 spans: Annotated::new(vec![]),
175 ..Default::default()
176 })
177 }
178
179 #[test]
180 fn test_no_sent_at() {
181 let start = Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap();
182 let end = Utc.with_ymd_and_hms(2000, 1, 2, 0, 0, 0).unwrap();
183 let now = end;
184
185 let mut processor = ClockDriftProcessor::new(None, now);
187 let mut event = create_transaction(start, end);
188 process_value(&mut event, &mut processor, ProcessingState::root()).unwrap();
189
190 let event = event.value().unwrap();
191 assert_eq!(*event.timestamp.value().unwrap(), end);
192 assert_eq!(*event.start_timestamp.value().unwrap(), start);
193 }
194
195 #[test]
196 fn test_no_clock_drift() {
197 let start = Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap();
198 let end = Utc.with_ymd_and_hms(2000, 1, 2, 0, 0, 0).unwrap();
199
200 let now = end;
201
202 let mut processor = ClockDriftProcessor::new(Some(end), now);
204 let mut event = create_transaction(start, end);
205 process_value(&mut event, &mut processor, ProcessingState::root()).unwrap();
206
207 let event = event.value().unwrap();
208 assert_eq!(*event.timestamp.value().unwrap(), end);
209 assert_eq!(*event.start_timestamp.value().unwrap(), start);
210 }
211
212 #[test]
213 fn test_clock_drift_lower_bound() {
214 let start = Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap();
215 let end = Utc.with_ymd_and_hms(2000, 1, 2, 0, 0, 0).unwrap();
216
217 let drift = SignedDuration::minutes(1);
218 let now = end + drift;
219
220 let mut processor =
222 ClockDriftProcessor::new(Some(end), now).at_least(Duration::from_secs(3600));
223 let mut event = create_transaction(start, end);
224 process_value(&mut event, &mut processor, ProcessingState::root()).unwrap();
225
226 let event = event.value().unwrap();
227 assert_eq!(*event.timestamp.value().unwrap(), end);
228 assert_eq!(*event.start_timestamp.value().unwrap(), start);
229 }
230
231 #[test]
232 fn test_clock_drift_from_past() {
233 let start = Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap();
234 let end = Utc.with_ymd_and_hms(2000, 1, 2, 0, 0, 0).unwrap();
235
236 let drift = SignedDuration::days(1);
237 let now = end + drift;
238
239 let mut processor = ClockDriftProcessor::new(Some(end), now);
241 let mut event = create_transaction(start, end);
242 process_value(&mut event, &mut processor, ProcessingState::root()).unwrap();
243
244 let event = event.value().unwrap();
245 assert_eq!(*event.timestamp.value().unwrap(), now);
246 assert_eq!(*event.start_timestamp.value().unwrap(), start + drift);
247 }
248
249 #[test]
250 fn test_clock_drift_from_future() {
251 let start = Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap();
252 let end = Utc.with_ymd_and_hms(2000, 1, 2, 0, 0, 0).unwrap();
253
254 let drift = -SignedDuration::seconds(60);
255 let now = end + drift;
256
257 let mut processor = ClockDriftProcessor::new(Some(end), now);
259 let mut event = create_transaction(start, end);
260 process_value(&mut event, &mut processor, ProcessingState::root()).unwrap();
261
262 let event = event.value().unwrap();
263 assert_eq!(*event.timestamp.value().unwrap(), now);
264 assert_eq!(*event.start_timestamp.value().unwrap(), start + drift);
265 }
266
267 #[test]
268 fn test_clock_drift_unix() {
269 let sent_at = Utc.with_ymd_and_hms(2000, 1, 2, 0, 0, 0).unwrap();
270 let drift = SignedDuration::days(1);
271 let now = sent_at + drift;
272
273 let processor = ClockDriftProcessor::new(Some(sent_at), now);
274 let mut timestamp = UnixTimestamp::from_secs(sent_at.timestamp() as u64);
275 processor.process_timestamp(&mut timestamp);
276
277 assert_eq!(timestamp.as_secs(), now.timestamp() as u64);
278 }
279
280 #[test]
281 fn test_process_datetime() {
282 let sent_at = Utc.with_ymd_and_hms(2000, 1, 2, 0, 0, 0).unwrap();
283 let drift = SignedDuration::days(1);
284 let now = sent_at + drift;
285
286 let processor = ClockDriftProcessor::new(Some(sent_at), now);
287 let mut datetime = Utc.with_ymd_and_hms(2021, 11, 29, 0, 0, 0).unwrap();
288 processor.process_datetime(&mut datetime);
289
290 assert_eq!(
291 datetime,
292 Utc.with_ymd_and_hms(2021, 11, 30, 0, 0, 0).unwrap()
293 );
294 }
295}