relay_event_normalization/normalize/
breakdowns.rs

1//! Computation of breakdowns from event data.
2//!
3//! Breakdowns are product-defined numbers that are indirectly reported by the client, and are
4//! materialized during ingestion. They are usually an aggregation over data present in the event.
5
6use std::collections::HashMap;
7use std::ops::Deref;
8use std::time::Duration;
9
10use relay_base_schema::metrics::{DurationUnit, MetricUnit};
11use relay_event_schema::protocol::{Breakdowns, Event, Measurement, Measurements, Timestamp};
12use relay_protocol::Annotated;
13use serde::{Deserialize, Serialize};
14
15/// A time window declared by its start and end timestamp.
16#[derive(Clone, Copy, Debug)]
17pub struct TimeWindowSpan {
18    /// The inclusive start timestamp of the span.
19    pub start: Timestamp,
20    /// The exclusive end timestamp of the span.
21    pub end: Timestamp,
22}
23
24impl TimeWindowSpan {
25    /// Creates a new time span with the given `start` and `end` date.
26    ///
27    /// For normalization purposes, start and end can be swapped. The constructed `TimeWindowSpan`
28    /// always has the later timestamp in `end`.
29    pub fn new(mut start: Timestamp, mut end: Timestamp) -> Self {
30        if end < start {
31            std::mem::swap(&mut start, &mut end);
32        }
33
34        TimeWindowSpan { start, end }
35    }
36
37    /// Returns the duration from start to end.
38    pub fn duration(&self) -> Duration {
39        // Cannot fail since durations are ordered in the constructor
40        (self.end - self.start).to_std().unwrap_or_default()
41    }
42}
43
44#[derive(Debug, Eq, Hash, PartialEq)]
45enum OperationBreakdown<'a> {
46    Emit(&'a str),
47    DoNotEmit(&'a str),
48}
49
50fn get_operation_duration(mut intervals: Vec<TimeWindowSpan>) -> Duration {
51    intervals.sort_unstable_by_key(|span| span.start);
52
53    let mut duration = Duration::new(0, 0);
54    let mut last_end = None;
55
56    for mut interval in intervals {
57        if let Some(cutoff) = last_end {
58            // ensure the current interval doesn't overlap with the last one
59            interval = TimeWindowSpan::new(interval.start.max(cutoff), interval.end.max(cutoff));
60        }
61
62        duration += interval.duration();
63        last_end = Some(interval.end);
64    }
65
66    duration
67}
68
69/// Emit breakdowns that are derived using information from the given event.
70trait EmitBreakdowns {
71    fn emit_breakdowns(&self, event: &Event) -> Option<Measurements>;
72}
73
74/// Configuration to define breakdowns based on span operation name.
75#[derive(Debug, Clone, Serialize, Deserialize)]
76#[serde(rename_all = "camelCase")]
77pub struct SpanOperationsConfig {
78    /// A list of declarations for span operations to extract breakdowns for.
79    ///
80    /// The match is successful if the span operation name starts with any string in the array. If
81    /// any string in the array has at least one match, then a breakdown group is created, and its
82    /// name will be the matched string.
83    pub matches: Vec<String>,
84}
85
86impl EmitBreakdowns for SpanOperationsConfig {
87    fn emit_breakdowns(&self, event: &Event) -> Option<Measurements> {
88        if self.matches.is_empty() {
89            return None;
90        }
91
92        let spans = event.spans.value()?;
93
94        // Generate span operation breakdowns
95        let mut intervals = HashMap::new();
96
97        for span in spans.iter() {
98            let span = match span.value() {
99                None => continue,
100                Some(span) => span,
101            };
102
103            let name = match span.op.as_str() {
104                None => continue,
105                Some(span_op) => span_op,
106            };
107
108            let interval = match (span.start_timestamp.value(), span.timestamp.value()) {
109                (Some(start), Some(end)) => TimeWindowSpan::new(*start, *end),
110                _ => continue,
111            };
112
113            let key = match self.matches.iter().find(|n| name.starts_with(*n)) {
114                Some(op_name) => OperationBreakdown::Emit(op_name),
115                None => OperationBreakdown::DoNotEmit(name),
116            };
117
118            intervals.entry(key).or_insert_with(Vec::new).push(interval);
119        }
120
121        if intervals.is_empty() {
122            return None;
123        }
124
125        let mut breakdown = Measurements::default();
126        let mut total_time = Duration::new(0, 0);
127
128        for (key, intervals) in intervals {
129            if intervals.is_empty() {
130                continue;
131            }
132
133            let op_duration = get_operation_duration(intervals);
134            total_time += op_duration;
135
136            let operation_name = match key {
137                OperationBreakdown::Emit(name) => name,
138                OperationBreakdown::DoNotEmit(_) => continue,
139            };
140
141            let op_value = Measurement {
142                value: Annotated::new(relay_common::time::duration_to_millis(op_duration)),
143                unit: Annotated::new(MetricUnit::Duration(DurationUnit::MilliSecond)),
144            };
145
146            let op_breakdown_name = format!("ops.{operation_name}");
147            breakdown.insert(op_breakdown_name, Annotated::new(op_value));
148        }
149
150        let total_time_value = Annotated::new(Measurement {
151            value: Annotated::new(relay_common::time::duration_to_millis(total_time)),
152            unit: Annotated::new(MetricUnit::Duration(DurationUnit::MilliSecond)),
153        });
154        breakdown.insert("total.time".to_string(), total_time_value);
155
156        Some(breakdown)
157    }
158}
159
160/// Configuration to define breakdown to be generated based on properties and breakdown type.
161#[derive(Debug, Clone, Serialize, Deserialize)]
162#[serde(tag = "type", rename_all = "camelCase")]
163pub enum BreakdownConfig {
164    /// Compute breakdowns based on span operation name.
165    #[serde(alias = "span_operations")]
166    SpanOperations(SpanOperationsConfig),
167
168    /// An unknown rule ignored for forward compatibility.
169    #[serde(other)]
170    Unsupported,
171}
172
173impl EmitBreakdowns for BreakdownConfig {
174    fn emit_breakdowns(&self, event: &Event) -> Option<Measurements> {
175        match self {
176            BreakdownConfig::SpanOperations(config) => config.emit_breakdowns(event),
177            BreakdownConfig::Unsupported => None,
178        }
179    }
180}
181
182type BreakdownName = String;
183
184/// Configuration for computing breakdowns from data in the event.
185///
186/// Breakdowns are product-defined numbers that are indirectly reported by the client, and are
187/// materialized during ingestion. They are usually an aggregation over data present in the event.
188#[derive(Debug, Clone, Default, Serialize, Deserialize)]
189pub struct BreakdownsConfig(pub HashMap<BreakdownName, BreakdownConfig>);
190
191impl Deref for BreakdownsConfig {
192    type Target = HashMap<BreakdownName, BreakdownConfig>;
193
194    fn deref(&self) -> &Self::Target {
195        &self.0
196    }
197}
198
199fn get_breakdown_measurements<'a>(
200    event: &'a Event,
201    breakdowns_config: &'a BreakdownsConfig,
202) -> impl Iterator<Item = (&'a str, Measurements)> {
203    breakdowns_config
204        .iter()
205        .filter_map(move |(breakdown_name, breakdown_config)| {
206            // TODO: move this to deserialization in a follow-up.
207            if !Breakdowns::is_valid_breakdown_name(breakdown_name) {
208                return None;
209            }
210
211            let measurements = breakdown_config.emit_breakdowns(event)?;
212
213            if measurements.is_empty() {
214                return None;
215            }
216
217            Some((breakdown_name.as_str(), measurements))
218        })
219}
220
221/// Computes breakdowns for an event based on the given configuration.
222pub fn normalize_breakdowns(event: &mut Event, breakdowns_config: &BreakdownsConfig) {
223    let mut event_breakdowns = Breakdowns::default();
224
225    for (breakdown_name, breakdown) in get_breakdown_measurements(event, breakdowns_config) {
226        event_breakdowns
227            .entry(breakdown_name.to_owned())
228            .or_insert_with(|| Annotated::new(Measurements::default()))
229            .value_mut()
230            .get_or_insert_with(Measurements::default)
231            .extend(breakdown.into_inner());
232    }
233
234    // Do not accept SDK-defined breakdowns. This is required for idempotency in multiple layers of
235    // Relay, and also such that performance metrics extraction produces the correct data.
236    if event_breakdowns.is_empty() {
237        event.breakdowns = Annotated::empty();
238    } else {
239        event.breakdowns = Annotated::new(event_breakdowns);
240    }
241}
242
243#[cfg(test)]
244mod tests {
245    use chrono::{TimeZone, Timelike, Utc};
246    use relay_event_schema::protocol::{EventType, Span, SpanId, SpanStatus};
247    use relay_protocol::Object;
248    use similar_asserts::assert_eq;
249
250    use super::*;
251
252    #[test]
253    fn test_skip_with_empty_breakdowns_config() {
254        let mut event = Event::default();
255        normalize_breakdowns(&mut event, &BreakdownsConfig::default());
256        assert_eq!(event.breakdowns.value(), None);
257    }
258
259    #[test]
260    fn test_noop_breakdowns_with_empty_config() {
261        let breakdowns = Breakdowns({
262            let mut span_ops_breakdown = Measurements::default();
263
264            span_ops_breakdown.insert(
265                "lcp".to_owned(),
266                Annotated::new(Measurement {
267                    value: Annotated::new(420.69),
268                    unit: Annotated::empty(),
269                }),
270            );
271
272            let mut breakdowns = Object::new();
273            breakdowns.insert("span_ops".to_owned(), Annotated::new(span_ops_breakdown));
274
275            breakdowns
276        });
277
278        let mut event = Event {
279            ty: EventType::Transaction.into(),
280            breakdowns: breakdowns.into(),
281            ..Default::default()
282        };
283        normalize_breakdowns(&mut event, &BreakdownsConfig::default());
284        assert_eq!(event.breakdowns.into_value(), None);
285    }
286
287    #[test]
288    fn test_emit_ops_breakdown() {
289        fn make_span(
290            start: Annotated<Timestamp>,
291            end: Annotated<Timestamp>,
292            op_name: String,
293        ) -> Annotated<Span> {
294            Annotated::new(Span {
295                timestamp: end,
296                start_timestamp: start,
297                description: Annotated::new("desc".to_owned()),
298                op: Annotated::new(op_name),
299                trace_id: Annotated::new("4c79f60c11214eb38604f4ae0781bfb2".parse().unwrap()),
300                span_id: Annotated::new(SpanId("fa90fdead5f74052".into())),
301                status: Annotated::new(SpanStatus::Ok),
302                ..Default::default()
303            })
304        }
305
306        let spans = vec![
307            make_span(
308                Annotated::new(Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 0).unwrap().into()),
309                Annotated::new(Utc.with_ymd_and_hms(2020, 1, 1, 1, 0, 0).unwrap().into()),
310                "http".to_string(),
311            ),
312            // overlapping spans
313            make_span(
314                Annotated::new(Utc.with_ymd_and_hms(2020, 1, 1, 2, 0, 0).unwrap().into()),
315                Annotated::new(Utc.with_ymd_and_hms(2020, 1, 1, 3, 0, 0).unwrap().into()),
316                "db".to_string(),
317            ),
318            make_span(
319                Annotated::new(Utc.with_ymd_and_hms(2020, 1, 1, 2, 30, 0).unwrap().into()),
320                Annotated::new(Utc.with_ymd_and_hms(2020, 1, 1, 3, 30, 0).unwrap().into()),
321                "db.postgres".to_string(),
322            ),
323            make_span(
324                Annotated::new(Utc.with_ymd_and_hms(2020, 1, 1, 4, 0, 0).unwrap().into()),
325                Annotated::new(Utc.with_ymd_and_hms(2020, 1, 1, 4, 30, 0).unwrap().into()),
326                "db.mongo".to_string(),
327            ),
328            make_span(
329                Annotated::new(Utc.with_ymd_and_hms(2020, 1, 1, 5, 0, 0).unwrap().into()),
330                Annotated::new(
331                    Utc.with_ymd_and_hms(2020, 1, 1, 6, 0, 0)
332                        .unwrap()
333                        .with_nanosecond(10_000)
334                        .unwrap()
335                        .into(),
336                ),
337                "browser".to_string(),
338            ),
339        ];
340
341        let mut event = Event {
342            ty: EventType::Transaction.into(),
343            spans: spans.into(),
344            ..Default::default()
345        };
346
347        let breakdowns_config = BreakdownsConfig({
348            let mut config = HashMap::new();
349
350            let span_ops_config = BreakdownConfig::SpanOperations(SpanOperationsConfig {
351                matches: vec!["http".to_string(), "db".to_string()],
352            });
353
354            config.insert("span_ops".to_string(), span_ops_config.clone());
355            config.insert("span_ops_2".to_string(), span_ops_config);
356
357            config
358        });
359
360        normalize_breakdowns(&mut event, &breakdowns_config);
361
362        let expected_breakdowns = Breakdowns({
363            let mut span_ops_breakdown = Measurements::default();
364
365            span_ops_breakdown.insert(
366                "ops.http".to_owned(),
367                Annotated::new(Measurement {
368                    // 1 hour in milliseconds
369                    value: Annotated::new(3_600_000.0),
370                    unit: Annotated::new(MetricUnit::Duration(DurationUnit::MilliSecond)),
371                }),
372            );
373
374            span_ops_breakdown.insert(
375                "ops.db".to_owned(),
376                Annotated::new(Measurement {
377                    // 2 hours in milliseconds
378                    value: Annotated::new(7_200_000.0),
379                    unit: Annotated::new(MetricUnit::Duration(DurationUnit::MilliSecond)),
380                }),
381            );
382
383            span_ops_breakdown.insert(
384                "total.time".to_owned(),
385                Annotated::new(Measurement {
386                    // 4 hours and 10 microseconds in milliseconds
387                    value: Annotated::new(14_400_000.01),
388                    unit: Annotated::new(MetricUnit::Duration(DurationUnit::MilliSecond)),
389                }),
390            );
391
392            let mut breakdowns = Object::new();
393            breakdowns.insert(
394                "span_ops_2".to_owned(),
395                Annotated::new(span_ops_breakdown.clone()),
396            );
397
398            breakdowns.insert("span_ops".to_owned(), Annotated::new(span_ops_breakdown));
399
400            breakdowns
401        });
402
403        assert_eq!(event.breakdowns.into_value().unwrap(), expected_breakdowns);
404    }
405}