relay_event_normalization/
clock_drift.rs1use std::time::Duration;
2
3use chrono::{DateTime, Duration as SignedDuration, Utc};
4use relay_common::time::UnixTimestamp;
5use relay_event_schema::processor::{ProcessValue, ProcessingResult, ProcessingState, Processor};
6use relay_event_schema::protocol::{Event, Timestamp};
7use relay_protocol::{Error, ErrorKind, Meta};
8
9#[derive(Clone, Copy, Debug)]
11struct ClockCorrection {
12 sent_at: DateTime<Utc>,
13 drift: SignedDuration,
14}
15
16impl ClockCorrection {
17 fn new(sent_at: DateTime<Utc>, received_at: DateTime<Utc>) -> Self {
18 let drift = received_at - sent_at;
19 Self { sent_at, drift }
20 }
21
22 fn at_least(self, lower_bound: Duration) -> Option<Self> {
23 if self.drift.num_seconds().unsigned_abs() >= lower_bound.as_secs() {
24 Some(self)
25 } else {
26 None
27 }
28 }
29}
30
31#[derive(Debug)]
50pub struct ClockDriftProcessor {
51 received_at: DateTime<Utc>,
52 correction: Option<ClockCorrection>,
53 kind: ErrorKind,
54}
55
56impl ClockDriftProcessor {
57 pub fn new(sent_at: Option<DateTime<Utc>>, received_at: DateTime<Utc>) -> Self {
62 let correction = sent_at.map(|sent_at| ClockCorrection::new(sent_at, received_at));
63
64 Self {
65 received_at,
66 correction,
67 kind: ErrorKind::ClockDrift,
68 }
69 }
70
71 pub fn at_least(mut self, lower_bound: Duration) -> Self {
77 self.correction = self.correction.and_then(|c| c.at_least(lower_bound));
78 self
79 }
80
81 pub fn error_kind(mut self, kind: ErrorKind) -> Self {
84 self.kind = kind;
85 self
86 }
87
88 pub fn is_drifted(&self) -> bool {
90 self.correction.is_some()
91 }
92
93 pub fn apply_correction_meta(&self, meta: &mut Meta) {
95 let Some(correction) = self.correction else {
96 return;
97 };
98
99 meta.add_error(Error::with(self.kind.clone(), |e| {
100 e.insert("sdk_time", correction.sent_at.to_rfc3339());
101 e.insert("server_time", self.received_at.to_rfc3339());
102 }));
103 }
104
105 pub fn process_timestamp(&self, timestamp: &mut UnixTimestamp) {
107 if let Some(correction) = self.correction {
108 let secs = correction.drift.num_seconds();
109 *timestamp = if secs > 0 {
110 UnixTimestamp::from_secs(timestamp.as_secs() + secs as u64)
111 } else {
112 UnixTimestamp::from_secs(timestamp.as_secs() - secs.saturating_abs() as u64)
113 }
114 }
115 }
116
117 pub fn process_datetime(&self, datetime: &mut DateTime<Utc>) {
119 if let Some(correction) = self.correction {
120 *datetime += correction.drift;
121 }
122 }
123}
124
125impl Processor for ClockDriftProcessor {
126 fn process_event(
127 &mut self,
128 event: &mut Event,
129 _meta: &mut Meta,
130 state: &ProcessingState<'_>,
131 ) -> ProcessingResult {
132 if self.correction.is_some() {
133 event.process_child_values(self, state)?;
134 self.apply_correction_meta(event.timestamp.meta_mut());
135 }
136
137 Ok(())
138 }
139
140 fn process_timestamp(
141 &mut self,
142 timestamp: &mut Timestamp,
143 _meta: &mut Meta,
144 _state: &ProcessingState<'_>,
145 ) -> ProcessingResult {
146 if let Some(correction) = self.correction {
147 *timestamp = *timestamp + correction.drift;
150 }
151
152 Ok(())
153 }
154}
155
156#[cfg(test)]
157mod tests {
158 use chrono::offset::TimeZone;
159 use relay_event_schema::processor::process_value;
160 use relay_event_schema::protocol::{Contexts, EventType, TraceContext};
161 use relay_protocol::Annotated;
162 use similar_asserts::assert_eq;
163
164 use super::*;
165
166 fn create_transaction(start: DateTime<Utc>, end: DateTime<Utc>) -> Annotated<Event> {
167 Annotated::new(Event {
168 ty: Annotated::new(EventType::Transaction),
169 timestamp: Annotated::new(end.into()),
170 start_timestamp: Annotated::new(start.into()),
171 contexts: {
172 let mut contexts = Contexts::new();
173 contexts.add(TraceContext {
174 trace_id: Annotated::new("4c79f60c11214eb38604f4ae0781bfb2".parse().unwrap()),
175 span_id: Annotated::new("fa90fdead5f74053".parse().unwrap()),
176 op: Annotated::new("http.server".to_owned()),
177 ..Default::default()
178 });
179 Annotated::new(contexts)
180 },
181 spans: Annotated::new(vec![]),
182 ..Default::default()
183 })
184 }
185
186 #[test]
187 fn test_no_sent_at() {
188 let start = Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap();
189 let end = Utc.with_ymd_and_hms(2000, 1, 2, 0, 0, 0).unwrap();
190 let now = end;
191
192 let mut processor = ClockDriftProcessor::new(None, now);
194 let mut event = create_transaction(start, end);
195 process_value(&mut event, &mut processor, ProcessingState::root()).unwrap();
196
197 let event = event.value().unwrap();
198 assert_eq!(*event.timestamp.value().unwrap(), end);
199 assert_eq!(*event.start_timestamp.value().unwrap(), start);
200 }
201
202 #[test]
203 fn test_no_clock_drift() {
204 let start = Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap();
205 let end = Utc.with_ymd_and_hms(2000, 1, 2, 0, 0, 0).unwrap();
206
207 let now = end;
208
209 let mut processor = ClockDriftProcessor::new(Some(end), now);
211 let mut event = create_transaction(start, end);
212 process_value(&mut event, &mut processor, ProcessingState::root()).unwrap();
213
214 let event = event.value().unwrap();
215 assert_eq!(*event.timestamp.value().unwrap(), end);
216 assert_eq!(*event.start_timestamp.value().unwrap(), start);
217 }
218
219 #[test]
220 fn test_clock_drift_lower_bound() {
221 let start = Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap();
222 let end = Utc.with_ymd_and_hms(2000, 1, 2, 0, 0, 0).unwrap();
223
224 let drift = SignedDuration::minutes(1);
225 let now = end + drift;
226
227 let mut processor =
229 ClockDriftProcessor::new(Some(end), now).at_least(Duration::from_secs(3600));
230 let mut event = create_transaction(start, end);
231 process_value(&mut event, &mut processor, ProcessingState::root()).unwrap();
232
233 let event = event.value().unwrap();
234 assert_eq!(*event.timestamp.value().unwrap(), end);
235 assert_eq!(*event.start_timestamp.value().unwrap(), start);
236 }
237
238 #[test]
239 fn test_clock_drift_from_past() {
240 let start = Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap();
241 let end = Utc.with_ymd_and_hms(2000, 1, 2, 0, 0, 0).unwrap();
242
243 let drift = SignedDuration::days(1);
244 let now = end + drift;
245
246 let mut processor = ClockDriftProcessor::new(Some(end), now);
248 let mut event = create_transaction(start, end);
249 process_value(&mut event, &mut processor, ProcessingState::root()).unwrap();
250
251 let event = event.value().unwrap();
252 assert_eq!(*event.timestamp.value().unwrap(), now);
253 assert_eq!(*event.start_timestamp.value().unwrap(), start + drift);
254 }
255
256 #[test]
257 fn test_clock_drift_from_future() {
258 let start = Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap();
259 let end = Utc.with_ymd_and_hms(2000, 1, 2, 0, 0, 0).unwrap();
260
261 let drift = -SignedDuration::seconds(60);
262 let now = end + drift;
263
264 let mut processor = ClockDriftProcessor::new(Some(end), now);
266 let mut event = create_transaction(start, end);
267 process_value(&mut event, &mut processor, ProcessingState::root()).unwrap();
268
269 let event = event.value().unwrap();
270 assert_eq!(*event.timestamp.value().unwrap(), now);
271 assert_eq!(*event.start_timestamp.value().unwrap(), start + drift);
272 }
273
274 #[test]
275 fn test_clock_drift_unix() {
276 let sent_at = Utc.with_ymd_and_hms(2000, 1, 2, 0, 0, 0).unwrap();
277 let drift = SignedDuration::days(1);
278 let now = sent_at + drift;
279
280 let processor = ClockDriftProcessor::new(Some(sent_at), now);
281 let mut timestamp = UnixTimestamp::from_secs(sent_at.timestamp() as u64);
282 processor.process_timestamp(&mut timestamp);
283
284 assert_eq!(timestamp.as_secs(), now.timestamp() as u64);
285 }
286
287 #[test]
288 fn test_process_datetime() {
289 let sent_at = Utc.with_ymd_and_hms(2000, 1, 2, 0, 0, 0).unwrap();
290 let drift = SignedDuration::days(1);
291 let now = sent_at + drift;
292
293 let processor = ClockDriftProcessor::new(Some(sent_at), now);
294 let mut datetime = Utc.with_ymd_and_hms(2021, 11, 29, 0, 0, 0).unwrap();
295 processor.process_datetime(&mut datetime);
296
297 assert_eq!(
298 datetime,
299 Utc.with_ymd_and_hms(2021, 11, 30, 0, 0, 0).unwrap()
300 );
301 }
302}