relay_event_normalization/
clock_drift.rsuse std::time::Duration;
use chrono::{DateTime, Duration as SignedDuration, Utc};
use relay_common::time::UnixTimestamp;
use relay_event_schema::processor::{ProcessValue, ProcessingResult, ProcessingState, Processor};
use relay_event_schema::protocol::{Event, Timestamp};
use relay_protocol::{Error, ErrorKind, Meta};
#[derive(Clone, Copy, Debug)]
struct ClockCorrection {
sent_at: DateTime<Utc>,
drift: SignedDuration,
}
impl ClockCorrection {
fn new(sent_at: DateTime<Utc>, received_at: DateTime<Utc>) -> Self {
let drift = received_at - sent_at;
Self { sent_at, drift }
}
fn at_least(self, lower_bound: Duration) -> Option<Self> {
if self.drift.num_seconds().unsigned_abs() >= lower_bound.as_secs() {
Some(self)
} else {
None
}
}
}
#[derive(Debug)]
pub struct ClockDriftProcessor {
received_at: DateTime<Utc>,
correction: Option<ClockCorrection>,
kind: ErrorKind,
}
impl ClockDriftProcessor {
pub fn new(sent_at: Option<DateTime<Utc>>, received_at: DateTime<Utc>) -> Self {
let correction = sent_at.map(|sent_at| ClockCorrection::new(sent_at, received_at));
Self {
received_at,
correction,
kind: ErrorKind::ClockDrift,
}
}
pub fn at_least(mut self, lower_bound: Duration) -> Self {
self.correction = self.correction.and_then(|c| c.at_least(lower_bound));
self
}
pub fn error_kind(mut self, kind: ErrorKind) -> Self {
self.kind = kind;
self
}
pub fn is_drifted(&self) -> bool {
self.correction.is_some()
}
pub fn process_timestamp(&self, timestamp: &mut UnixTimestamp) {
if let Some(correction) = self.correction {
let secs = correction.drift.num_seconds();
*timestamp = if secs > 0 {
UnixTimestamp::from_secs(timestamp.as_secs() + secs as u64)
} else {
UnixTimestamp::from_secs(timestamp.as_secs() - secs.saturating_abs() as u64)
}
}
}
pub fn process_datetime(&self, datetime: &mut DateTime<Utc>) {
if let Some(correction) = self.correction {
*datetime += correction.drift;
}
}
}
impl Processor for ClockDriftProcessor {
fn process_event(
&mut self,
event: &mut Event,
_meta: &mut Meta,
state: &ProcessingState<'_>,
) -> ProcessingResult {
if let Some(correction) = self.correction {
event.process_child_values(self, state)?;
let timestamp_meta = event.timestamp.meta_mut();
timestamp_meta.add_error(Error::with(self.kind.clone(), |e| {
e.insert("sdk_time", correction.sent_at.to_rfc3339());
e.insert("server_time", self.received_at.to_rfc3339());
}));
}
Ok(())
}
fn process_timestamp(
&mut self,
timestamp: &mut Timestamp,
_meta: &mut Meta,
_state: &ProcessingState<'_>,
) -> ProcessingResult {
if let Some(correction) = self.correction {
*timestamp = *timestamp + correction.drift;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use chrono::offset::TimeZone;
use relay_event_schema::processor::process_value;
use relay_event_schema::protocol::{Contexts, EventType, SpanId, TraceContext, TraceId};
use relay_protocol::Annotated;
use similar_asserts::assert_eq;
use super::*;
fn create_transaction(start: DateTime<Utc>, end: DateTime<Utc>) -> Annotated<Event> {
Annotated::new(Event {
ty: Annotated::new(EventType::Transaction),
timestamp: Annotated::new(end.into()),
start_timestamp: Annotated::new(start.into()),
contexts: {
let mut contexts = Contexts::new();
contexts.add(TraceContext {
trace_id: Annotated::new(TraceId("4c79f60c11214eb38604f4ae0781bfb2".into())),
span_id: Annotated::new(SpanId("fa90fdead5f74053".into())),
op: Annotated::new("http.server".to_owned()),
..Default::default()
});
Annotated::new(contexts)
},
spans: Annotated::new(vec![]),
..Default::default()
})
}
#[test]
fn test_no_sent_at() {
let start = Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap();
let end = Utc.with_ymd_and_hms(2000, 1, 2, 0, 0, 0).unwrap();
let now = end;
let mut processor = ClockDriftProcessor::new(None, now);
let mut event = create_transaction(start, end);
process_value(&mut event, &mut processor, ProcessingState::root()).unwrap();
let event = event.value().unwrap();
assert_eq!(*event.timestamp.value().unwrap(), end);
assert_eq!(*event.start_timestamp.value().unwrap(), start);
}
#[test]
fn test_no_clock_drift() {
let start = Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap();
let end = Utc.with_ymd_and_hms(2000, 1, 2, 0, 0, 0).unwrap();
let now = end;
let mut processor = ClockDriftProcessor::new(Some(end), now);
let mut event = create_transaction(start, end);
process_value(&mut event, &mut processor, ProcessingState::root()).unwrap();
let event = event.value().unwrap();
assert_eq!(*event.timestamp.value().unwrap(), end);
assert_eq!(*event.start_timestamp.value().unwrap(), start);
}
#[test]
fn test_clock_drift_lower_bound() {
let start = Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap();
let end = Utc.with_ymd_and_hms(2000, 1, 2, 0, 0, 0).unwrap();
let drift = SignedDuration::minutes(1);
let now = end + drift;
let mut processor =
ClockDriftProcessor::new(Some(end), now).at_least(Duration::from_secs(3600));
let mut event = create_transaction(start, end);
process_value(&mut event, &mut processor, ProcessingState::root()).unwrap();
let event = event.value().unwrap();
assert_eq!(*event.timestamp.value().unwrap(), end);
assert_eq!(*event.start_timestamp.value().unwrap(), start);
}
#[test]
fn test_clock_drift_from_past() {
let start = Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap();
let end = Utc.with_ymd_and_hms(2000, 1, 2, 0, 0, 0).unwrap();
let drift = SignedDuration::days(1);
let now = end + drift;
let mut processor = ClockDriftProcessor::new(Some(end), now);
let mut event = create_transaction(start, end);
process_value(&mut event, &mut processor, ProcessingState::root()).unwrap();
let event = event.value().unwrap();
assert_eq!(*event.timestamp.value().unwrap(), now);
assert_eq!(*event.start_timestamp.value().unwrap(), start + drift);
}
#[test]
fn test_clock_drift_from_future() {
let start = Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap();
let end = Utc.with_ymd_and_hms(2000, 1, 2, 0, 0, 0).unwrap();
let drift = -SignedDuration::seconds(60);
let now = end + drift;
let mut processor = ClockDriftProcessor::new(Some(end), now);
let mut event = create_transaction(start, end);
process_value(&mut event, &mut processor, ProcessingState::root()).unwrap();
let event = event.value().unwrap();
assert_eq!(*event.timestamp.value().unwrap(), now);
assert_eq!(*event.start_timestamp.value().unwrap(), start + drift);
}
#[test]
fn test_clock_drift_unix() {
let sent_at = Utc.with_ymd_and_hms(2000, 1, 2, 0, 0, 0).unwrap();
let drift = SignedDuration::days(1);
let now = sent_at + drift;
let processor = ClockDriftProcessor::new(Some(sent_at), now);
let mut timestamp = UnixTimestamp::from_secs(sent_at.timestamp() as u64);
processor.process_timestamp(&mut timestamp);
assert_eq!(timestamp.as_secs(), now.timestamp() as u64);
}
#[test]
fn test_process_datetime() {
let sent_at = Utc.with_ymd_and_hms(2000, 1, 2, 0, 0, 0).unwrap();
let drift = SignedDuration::days(1);
let now = sent_at + drift;
let processor = ClockDriftProcessor::new(Some(sent_at), now);
let mut datetime = Utc.with_ymd_and_hms(2021, 11, 29, 0, 0, 0).unwrap();
processor.process_datetime(&mut datetime);
assert_eq!(
datetime,
Utc.with_ymd_and_hms(2021, 11, 30, 0, 0, 0).unwrap()
);
}
}