1use std::collections::HashMap;
7use std::ops::Deref;
8use std::time::Duration;
9
10use relay_base_schema::metrics::{DurationUnit, MetricUnit};
11use relay_event_schema::protocol::{Breakdowns, Event, Measurement, Measurements, Timestamp};
12use relay_protocol::Annotated;
13use serde::{Deserialize, Serialize};
14
15#[derive(Clone, Copy, Debug)]
17pub struct TimeWindowSpan {
18 pub start: Timestamp,
20 pub end: Timestamp,
22}
23
24impl TimeWindowSpan {
25 pub fn new(mut start: Timestamp, mut end: Timestamp) -> Self {
30 if end < start {
31 std::mem::swap(&mut start, &mut end);
32 }
33
34 TimeWindowSpan { start, end }
35 }
36
37 pub fn duration(&self) -> Duration {
39 (self.end - self.start).to_std().unwrap_or_default()
41 }
42}
43
44#[derive(Debug, Eq, Hash, PartialEq)]
45enum OperationBreakdown<'a> {
46 Emit(&'a str),
47 DoNotEmit(&'a str),
48}
49
50fn get_operation_duration(mut intervals: Vec<TimeWindowSpan>) -> Duration {
51 intervals.sort_unstable_by_key(|span| span.start);
52
53 let mut duration = Duration::new(0, 0);
54 let mut last_end = None;
55
56 for mut interval in intervals {
57 if let Some(cutoff) = last_end {
58 interval = TimeWindowSpan::new(interval.start.max(cutoff), interval.end.max(cutoff));
60 }
61
62 duration += interval.duration();
63 last_end = Some(interval.end);
64 }
65
66 duration
67}
68
69trait EmitBreakdowns {
71 fn emit_breakdowns(&self, event: &Event) -> Option<Measurements>;
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
76#[serde(rename_all = "camelCase")]
77pub struct SpanOperationsConfig {
78 pub matches: Vec<String>,
84}
85
86impl EmitBreakdowns for SpanOperationsConfig {
87 fn emit_breakdowns(&self, event: &Event) -> Option<Measurements> {
88 if self.matches.is_empty() {
89 return None;
90 }
91
92 let spans = event.spans.value()?;
93
94 let mut intervals = HashMap::new();
96
97 for span in spans.iter() {
98 let span = match span.value() {
99 None => continue,
100 Some(span) => span,
101 };
102
103 let name = match span.op.as_str() {
104 None => continue,
105 Some(span_op) => span_op,
106 };
107
108 let interval = match (span.start_timestamp.value(), span.timestamp.value()) {
109 (Some(start), Some(end)) => TimeWindowSpan::new(*start, *end),
110 _ => continue,
111 };
112
113 let key = match self.matches.iter().find(|n| name.starts_with(*n)) {
114 Some(op_name) => OperationBreakdown::Emit(op_name),
115 None => OperationBreakdown::DoNotEmit(name),
116 };
117
118 intervals.entry(key).or_insert_with(Vec::new).push(interval);
119 }
120
121 if intervals.is_empty() {
122 return None;
123 }
124
125 let mut breakdown = Measurements::default();
126 let mut total_time = Duration::new(0, 0);
127
128 for (key, intervals) in intervals {
129 if intervals.is_empty() {
130 continue;
131 }
132
133 let op_duration = get_operation_duration(intervals);
134 total_time += op_duration;
135
136 let operation_name = match key {
137 OperationBreakdown::Emit(name) => name,
138 OperationBreakdown::DoNotEmit(_) => continue,
139 };
140
141 let op_value = Measurement {
142 value: Annotated::new(relay_common::time::duration_to_millis(op_duration)),
143 unit: Annotated::new(MetricUnit::Duration(DurationUnit::MilliSecond)),
144 };
145
146 let op_breakdown_name = format!("ops.{operation_name}");
147 breakdown.insert(op_breakdown_name, Annotated::new(op_value));
148 }
149
150 let total_time_value = Annotated::new(Measurement {
151 value: Annotated::new(relay_common::time::duration_to_millis(total_time)),
152 unit: Annotated::new(MetricUnit::Duration(DurationUnit::MilliSecond)),
153 });
154 breakdown.insert("total.time".to_string(), total_time_value);
155
156 Some(breakdown)
157 }
158}
159
160#[derive(Debug, Clone, Serialize, Deserialize)]
162#[serde(tag = "type", rename_all = "camelCase")]
163pub enum BreakdownConfig {
164 #[serde(alias = "span_operations")]
166 SpanOperations(SpanOperationsConfig),
167
168 #[serde(other)]
170 Unsupported,
171}
172
173impl EmitBreakdowns for BreakdownConfig {
174 fn emit_breakdowns(&self, event: &Event) -> Option<Measurements> {
175 match self {
176 BreakdownConfig::SpanOperations(config) => config.emit_breakdowns(event),
177 BreakdownConfig::Unsupported => None,
178 }
179 }
180}
181
182type BreakdownName = String;
183
184#[derive(Debug, Clone, Default, Serialize, Deserialize)]
189pub struct BreakdownsConfig(pub HashMap<BreakdownName, BreakdownConfig>);
190
191impl Deref for BreakdownsConfig {
192 type Target = HashMap<BreakdownName, BreakdownConfig>;
193
194 fn deref(&self) -> &Self::Target {
195 &self.0
196 }
197}
198
199fn get_breakdown_measurements<'a>(
200 event: &'a Event,
201 breakdowns_config: &'a BreakdownsConfig,
202) -> impl Iterator<Item = (&'a str, Measurements)> {
203 breakdowns_config
204 .iter()
205 .filter_map(move |(breakdown_name, breakdown_config)| {
206 if !Breakdowns::is_valid_breakdown_name(breakdown_name) {
208 return None;
209 }
210
211 let measurements = breakdown_config.emit_breakdowns(event)?;
212
213 if measurements.is_empty() {
214 return None;
215 }
216
217 Some((breakdown_name.as_str(), measurements))
218 })
219}
220
221pub fn normalize_breakdowns(event: &mut Event, breakdowns_config: &BreakdownsConfig) {
223 let mut event_breakdowns = Breakdowns::default();
224
225 for (breakdown_name, breakdown) in get_breakdown_measurements(event, breakdowns_config) {
226 event_breakdowns
227 .entry(breakdown_name.to_owned())
228 .or_insert_with(|| Annotated::new(Measurements::default()))
229 .value_mut()
230 .get_or_insert_with(Measurements::default)
231 .extend(breakdown.into_inner());
232 }
233
234 if event_breakdowns.is_empty() {
237 event.breakdowns = Annotated::empty();
238 } else {
239 event.breakdowns = Annotated::new(event_breakdowns);
240 }
241}
242
243#[cfg(test)]
244mod tests {
245 use chrono::{TimeZone, Timelike, Utc};
246 use relay_event_schema::protocol::{EventType, Span, SpanId, SpanStatus};
247 use relay_protocol::Object;
248 use similar_asserts::assert_eq;
249
250 use super::*;
251
252 #[test]
253 fn test_skip_with_empty_breakdowns_config() {
254 let mut event = Event::default();
255 normalize_breakdowns(&mut event, &BreakdownsConfig::default());
256 assert_eq!(event.breakdowns.value(), None);
257 }
258
259 #[test]
260 fn test_noop_breakdowns_with_empty_config() {
261 let breakdowns = Breakdowns({
262 let mut span_ops_breakdown = Measurements::default();
263
264 span_ops_breakdown.insert(
265 "lcp".to_owned(),
266 Annotated::new(Measurement {
267 value: Annotated::new(420.69),
268 unit: Annotated::empty(),
269 }),
270 );
271
272 let mut breakdowns = Object::new();
273 breakdowns.insert("span_ops".to_owned(), Annotated::new(span_ops_breakdown));
274
275 breakdowns
276 });
277
278 let mut event = Event {
279 ty: EventType::Transaction.into(),
280 breakdowns: breakdowns.into(),
281 ..Default::default()
282 };
283 normalize_breakdowns(&mut event, &BreakdownsConfig::default());
284 assert_eq!(event.breakdowns.into_value(), None);
285 }
286
287 #[test]
288 fn test_emit_ops_breakdown() {
289 fn make_span(
290 start: Annotated<Timestamp>,
291 end: Annotated<Timestamp>,
292 op_name: String,
293 ) -> Annotated<Span> {
294 Annotated::new(Span {
295 timestamp: end,
296 start_timestamp: start,
297 description: Annotated::new("desc".to_owned()),
298 op: Annotated::new(op_name),
299 trace_id: Annotated::new("4c79f60c11214eb38604f4ae0781bfb2".parse().unwrap()),
300 span_id: Annotated::new(SpanId("fa90fdead5f74052".into())),
301 status: Annotated::new(SpanStatus::Ok),
302 ..Default::default()
303 })
304 }
305
306 let spans = vec![
307 make_span(
308 Annotated::new(Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 0).unwrap().into()),
309 Annotated::new(Utc.with_ymd_and_hms(2020, 1, 1, 1, 0, 0).unwrap().into()),
310 "http".to_string(),
311 ),
312 make_span(
314 Annotated::new(Utc.with_ymd_and_hms(2020, 1, 1, 2, 0, 0).unwrap().into()),
315 Annotated::new(Utc.with_ymd_and_hms(2020, 1, 1, 3, 0, 0).unwrap().into()),
316 "db".to_string(),
317 ),
318 make_span(
319 Annotated::new(Utc.with_ymd_and_hms(2020, 1, 1, 2, 30, 0).unwrap().into()),
320 Annotated::new(Utc.with_ymd_and_hms(2020, 1, 1, 3, 30, 0).unwrap().into()),
321 "db.postgres".to_string(),
322 ),
323 make_span(
324 Annotated::new(Utc.with_ymd_and_hms(2020, 1, 1, 4, 0, 0).unwrap().into()),
325 Annotated::new(Utc.with_ymd_and_hms(2020, 1, 1, 4, 30, 0).unwrap().into()),
326 "db.mongo".to_string(),
327 ),
328 make_span(
329 Annotated::new(Utc.with_ymd_and_hms(2020, 1, 1, 5, 0, 0).unwrap().into()),
330 Annotated::new(
331 Utc.with_ymd_and_hms(2020, 1, 1, 6, 0, 0)
332 .unwrap()
333 .with_nanosecond(10_000)
334 .unwrap()
335 .into(),
336 ),
337 "browser".to_string(),
338 ),
339 ];
340
341 let mut event = Event {
342 ty: EventType::Transaction.into(),
343 spans: spans.into(),
344 ..Default::default()
345 };
346
347 let breakdowns_config = BreakdownsConfig({
348 let mut config = HashMap::new();
349
350 let span_ops_config = BreakdownConfig::SpanOperations(SpanOperationsConfig {
351 matches: vec!["http".to_string(), "db".to_string()],
352 });
353
354 config.insert("span_ops".to_string(), span_ops_config.clone());
355 config.insert("span_ops_2".to_string(), span_ops_config);
356
357 config
358 });
359
360 normalize_breakdowns(&mut event, &breakdowns_config);
361
362 let expected_breakdowns = Breakdowns({
363 let mut span_ops_breakdown = Measurements::default();
364
365 span_ops_breakdown.insert(
366 "ops.http".to_owned(),
367 Annotated::new(Measurement {
368 value: Annotated::new(3_600_000.0),
370 unit: Annotated::new(MetricUnit::Duration(DurationUnit::MilliSecond)),
371 }),
372 );
373
374 span_ops_breakdown.insert(
375 "ops.db".to_owned(),
376 Annotated::new(Measurement {
377 value: Annotated::new(7_200_000.0),
379 unit: Annotated::new(MetricUnit::Duration(DurationUnit::MilliSecond)),
380 }),
381 );
382
383 span_ops_breakdown.insert(
384 "total.time".to_owned(),
385 Annotated::new(Measurement {
386 value: Annotated::new(14_400_000.01),
388 unit: Annotated::new(MetricUnit::Duration(DurationUnit::MilliSecond)),
389 }),
390 );
391
392 let mut breakdowns = Object::new();
393 breakdowns.insert(
394 "span_ops_2".to_owned(),
395 Annotated::new(span_ops_breakdown.clone()),
396 );
397
398 breakdowns.insert("span_ops".to_owned(), Annotated::new(span_ops_breakdown));
399
400 breakdowns
401 });
402
403 assert_eq!(event.breakdowns.into_value().unwrap(), expected_breakdowns);
404 }
405}