1use std::collections::HashMap;
7use std::ops::Deref;
8use std::time::Duration;
9
10use relay_base_schema::metrics::{DurationUnit, MetricUnit};
11use relay_event_schema::protocol::{Breakdowns, Event, Measurement, Measurements, Timestamp};
12use relay_protocol::Annotated;
13use serde::{Deserialize, Serialize};
14
15#[derive(Clone, Copy, Debug)]
17pub struct TimeWindowSpan {
18    pub start: Timestamp,
20    pub end: Timestamp,
22}
23
24impl TimeWindowSpan {
25    pub fn new(mut start: Timestamp, mut end: Timestamp) -> Self {
30        if end < start {
31            std::mem::swap(&mut start, &mut end);
32        }
33
34        TimeWindowSpan { start, end }
35    }
36
37    pub fn duration(&self) -> Duration {
39        (self.end - self.start).to_std().unwrap_or_default()
41    }
42}
43
44#[derive(Debug, Eq, Hash, PartialEq)]
45enum OperationBreakdown<'a> {
46    Emit(&'a str),
47    DoNotEmit(&'a str),
48}
49
50fn get_operation_duration(mut intervals: Vec<TimeWindowSpan>) -> Duration {
51    intervals.sort_unstable_by_key(|span| span.start);
52
53    let mut duration = Duration::new(0, 0);
54    let mut last_end = None;
55
56    for mut interval in intervals {
57        if let Some(cutoff) = last_end {
58            interval = TimeWindowSpan::new(interval.start.max(cutoff), interval.end.max(cutoff));
60        }
61
62        duration += interval.duration();
63        last_end = Some(interval.end);
64    }
65
66    duration
67}
68
69trait EmitBreakdowns {
71    fn emit_breakdowns(&self, event: &Event) -> Option<Measurements>;
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
76#[serde(rename_all = "camelCase")]
77pub struct SpanOperationsConfig {
78    pub matches: Vec<String>,
84}
85
86impl EmitBreakdowns for SpanOperationsConfig {
87    fn emit_breakdowns(&self, event: &Event) -> Option<Measurements> {
88        if self.matches.is_empty() {
89            return None;
90        }
91
92        let spans = event.spans.value()?;
93
94        let mut intervals = HashMap::new();
96
97        for span in spans.iter() {
98            let span = match span.value() {
99                None => continue,
100                Some(span) => span,
101            };
102
103            let name = match span.op.as_str() {
104                None => continue,
105                Some(span_op) => span_op,
106            };
107
108            let interval = match (span.start_timestamp.value(), span.timestamp.value()) {
109                (Some(start), Some(end)) => TimeWindowSpan::new(*start, *end),
110                _ => continue,
111            };
112
113            let key = match self.matches.iter().find(|n| name.starts_with(*n)) {
114                Some(op_name) => OperationBreakdown::Emit(op_name),
115                None => OperationBreakdown::DoNotEmit(name),
116            };
117
118            intervals.entry(key).or_insert_with(Vec::new).push(interval);
119        }
120
121        if intervals.is_empty() {
122            return None;
123        }
124
125        let mut breakdown = Measurements::default();
126        let mut total_time = Duration::new(0, 0);
127
128        for (key, intervals) in intervals {
129            if intervals.is_empty() {
130                continue;
131            }
132
133            let op_duration = get_operation_duration(intervals);
134            total_time += op_duration;
135
136            let operation_name = match key {
137                OperationBreakdown::Emit(name) => name,
138                OperationBreakdown::DoNotEmit(_) => continue,
139            };
140
141            let op_value = Measurement {
142                value: Annotated::try_from(relay_common::time::duration_to_millis(op_duration)),
143                unit: Annotated::new(MetricUnit::Duration(DurationUnit::MilliSecond)),
144            };
145
146            let op_breakdown_name = format!("ops.{operation_name}");
147            breakdown.insert(op_breakdown_name, Annotated::new(op_value));
148        }
149
150        let total_time_value = Annotated::new(Measurement {
151            value: Annotated::try_from(relay_common::time::duration_to_millis(total_time)),
152            unit: Annotated::new(MetricUnit::Duration(DurationUnit::MilliSecond)),
153        });
154        breakdown.insert("total.time".to_owned(), total_time_value);
155
156        Some(breakdown)
157    }
158}
159
160#[derive(Debug, Clone, Serialize, Deserialize)]
162#[serde(tag = "type", rename_all = "camelCase")]
163pub enum BreakdownConfig {
164    #[serde(alias = "span_operations")]
166    SpanOperations(SpanOperationsConfig),
167
168    #[serde(other)]
170    Unsupported,
171}
172
173impl EmitBreakdowns for BreakdownConfig {
174    fn emit_breakdowns(&self, event: &Event) -> Option<Measurements> {
175        match self {
176            BreakdownConfig::SpanOperations(config) => config.emit_breakdowns(event),
177            BreakdownConfig::Unsupported => None,
178        }
179    }
180}
181
182type BreakdownName = String;
183
184#[derive(Debug, Clone, Default, Serialize, Deserialize)]
189pub struct BreakdownsConfig(pub HashMap<BreakdownName, BreakdownConfig>);
190
191impl Deref for BreakdownsConfig {
192    type Target = HashMap<BreakdownName, BreakdownConfig>;
193
194    fn deref(&self) -> &Self::Target {
195        &self.0
196    }
197}
198
199fn get_breakdown_measurements<'a>(
200    event: &'a Event,
201    breakdowns_config: &'a BreakdownsConfig,
202) -> impl Iterator<Item = (&'a str, Measurements)> {
203    breakdowns_config
204        .iter()
205        .filter_map(move |(breakdown_name, breakdown_config)| {
206            if !Breakdowns::is_valid_breakdown_name(breakdown_name) {
208                return None;
209            }
210
211            let measurements = breakdown_config.emit_breakdowns(event)?;
212
213            if measurements.is_empty() {
214                return None;
215            }
216
217            Some((breakdown_name.as_str(), measurements))
218        })
219}
220
221pub fn normalize_breakdowns(event: &mut Event, breakdowns_config: &BreakdownsConfig) {
223    let mut event_breakdowns = Breakdowns::default();
224
225    for (breakdown_name, breakdown) in get_breakdown_measurements(event, breakdowns_config) {
226        event_breakdowns
227            .entry(breakdown_name.to_owned())
228            .or_insert_with(|| Annotated::new(Measurements::default()))
229            .value_mut()
230            .get_or_insert_with(Measurements::default)
231            .extend(breakdown.into_inner());
232    }
233
234    if event_breakdowns.is_empty() {
237        event.breakdowns = Annotated::empty();
238    } else {
239        event.breakdowns = Annotated::new(event_breakdowns);
240    }
241}
242
243#[cfg(test)]
244mod tests {
245    use chrono::{TimeZone, Timelike, Utc};
246    use relay_event_schema::protocol::{EventType, Span, SpanStatus};
247    use relay_protocol::Object;
248    use similar_asserts::assert_eq;
249
250    use super::*;
251
252    #[test]
253    fn test_skip_with_empty_breakdowns_config() {
254        let mut event = Event::default();
255        normalize_breakdowns(&mut event, &BreakdownsConfig::default());
256        assert_eq!(event.breakdowns.value(), None);
257    }
258
259    #[test]
260    fn test_noop_breakdowns_with_empty_config() {
261        let breakdowns = Breakdowns({
262            let mut span_ops_breakdown = Measurements::default();
263
264            span_ops_breakdown.insert(
265                "lcp".to_owned(),
266                Annotated::new(Measurement {
267                    value: Annotated::new(420.69.try_into().unwrap()),
268                    unit: Annotated::empty(),
269                }),
270            );
271
272            let mut breakdowns = Object::new();
273            breakdowns.insert("span_ops".to_owned(), Annotated::new(span_ops_breakdown));
274
275            breakdowns
276        });
277
278        let mut event = Event {
279            ty: EventType::Transaction.into(),
280            breakdowns: breakdowns.into(),
281            ..Default::default()
282        };
283        normalize_breakdowns(&mut event, &BreakdownsConfig::default());
284        assert_eq!(event.breakdowns.into_value(), None);
285    }
286
287    #[test]
288    fn test_emit_ops_breakdown() {
289        fn make_span(
290            start: Annotated<Timestamp>,
291            end: Annotated<Timestamp>,
292            op_name: String,
293        ) -> Annotated<Span> {
294            Annotated::new(Span {
295                timestamp: end,
296                start_timestamp: start,
297                description: Annotated::new("desc".to_owned()),
298                op: Annotated::new(op_name),
299                trace_id: Annotated::new("4c79f60c11214eb38604f4ae0781bfb2".parse().unwrap()),
300                span_id: Annotated::new("fa90fdead5f74052".parse().unwrap()),
301                status: Annotated::new(SpanStatus::Ok),
302                ..Default::default()
303            })
304        }
305
306        let spans = vec![
307            make_span(
308                Annotated::new(Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 0).unwrap().into()),
309                Annotated::new(Utc.with_ymd_and_hms(2020, 1, 1, 1, 0, 0).unwrap().into()),
310                "http".to_owned(),
311            ),
312            make_span(
314                Annotated::new(Utc.with_ymd_and_hms(2020, 1, 1, 2, 0, 0).unwrap().into()),
315                Annotated::new(Utc.with_ymd_and_hms(2020, 1, 1, 3, 0, 0).unwrap().into()),
316                "db".to_owned(),
317            ),
318            make_span(
319                Annotated::new(Utc.with_ymd_and_hms(2020, 1, 1, 2, 30, 0).unwrap().into()),
320                Annotated::new(Utc.with_ymd_and_hms(2020, 1, 1, 3, 30, 0).unwrap().into()),
321                "db.postgres".to_owned(),
322            ),
323            make_span(
324                Annotated::new(Utc.with_ymd_and_hms(2020, 1, 1, 4, 0, 0).unwrap().into()),
325                Annotated::new(Utc.with_ymd_and_hms(2020, 1, 1, 4, 30, 0).unwrap().into()),
326                "db.mongo".to_owned(),
327            ),
328            make_span(
329                Annotated::new(Utc.with_ymd_and_hms(2020, 1, 1, 5, 0, 0).unwrap().into()),
330                Annotated::new(
331                    Utc.with_ymd_and_hms(2020, 1, 1, 6, 0, 0)
332                        .unwrap()
333                        .with_nanosecond(10_000)
334                        .unwrap()
335                        .into(),
336                ),
337                "browser".to_owned(),
338            ),
339        ];
340
341        let mut event = Event {
342            ty: EventType::Transaction.into(),
343            spans: spans.into(),
344            ..Default::default()
345        };
346
347        let breakdowns_config = BreakdownsConfig({
348            let mut config = HashMap::new();
349
350            let span_ops_config = BreakdownConfig::SpanOperations(SpanOperationsConfig {
351                matches: vec!["http".to_owned(), "db".to_owned()],
352            });
353
354            config.insert("span_ops".to_owned(), span_ops_config.clone());
355            config.insert("span_ops_2".to_owned(), span_ops_config);
356
357            config
358        });
359
360        normalize_breakdowns(&mut event, &breakdowns_config);
361
362        let expected_breakdowns = Breakdowns({
363            let mut span_ops_breakdown = Measurements::default();
364
365            span_ops_breakdown.insert(
366                "ops.http".to_owned(),
367                Annotated::new(Measurement {
368                    value: Annotated::new(3_600_000.0.try_into().unwrap()),
370                    unit: Annotated::new(MetricUnit::Duration(DurationUnit::MilliSecond)),
371                }),
372            );
373
374            span_ops_breakdown.insert(
375                "ops.db".to_owned(),
376                Annotated::new(Measurement {
377                    value: Annotated::new(7_200_000.0.try_into().unwrap()),
379                    unit: Annotated::new(MetricUnit::Duration(DurationUnit::MilliSecond)),
380                }),
381            );
382
383            span_ops_breakdown.insert(
384                "total.time".to_owned(),
385                Annotated::new(Measurement {
386                    value: Annotated::new(14_400_000.01.try_into().unwrap()),
388                    unit: Annotated::new(MetricUnit::Duration(DurationUnit::MilliSecond)),
389                }),
390            );
391
392            let mut breakdowns = Object::new();
393            breakdowns.insert(
394                "span_ops_2".to_owned(),
395                Annotated::new(span_ops_breakdown.clone()),
396            );
397
398            breakdowns.insert("span_ops".to_owned(), Annotated::new(span_ops_breakdown));
399
400            breakdowns
401        });
402
403        assert_eq!(event.breakdowns.into_value().unwrap(), expected_breakdowns);
404    }
405}