1use bytes::Bytes;
13use hashbrown::HashMap;
14use serde::{Deserialize, Serialize};
15use std::collections::{BTreeMap, HashSet};
16
17use relay_event_schema::protocol::EventId;
18use relay_protocol::FiniteF64;
19
20use crate::MAX_PROFILE_CHUNK_DURATION;
21use crate::error::ProfileError;
22use crate::measurements::ChunkMeasurement;
23use crate::sample::{DebugMeta, Frame, ThreadMetadata, Version};
24use crate::types::ClientSdk;
25
26const MAX_PROFILE_CHUNK_DURATION_SECS: f64 = MAX_PROFILE_CHUNK_DURATION.as_secs_f64();
27
28#[derive(Debug, Serialize, Deserialize)]
29pub struct ProfileMetadata {
30 pub chunk_id: EventId,
32 pub profiler_id: EventId,
34
35 #[serde(default, skip_serializing_if = "DebugMeta::is_empty")]
36 pub debug_meta: DebugMeta,
37
38 #[serde(skip_serializing_if = "Option::is_none")]
39 pub environment: Option<String>,
40 pub platform: String,
41 #[serde(skip_serializing_if = "Option::is_none")]
42 pub content_type: Option<String>,
43 pub release: Option<String>,
44
45 pub client_sdk: ClientSdk,
46
47 pub version: Version,
49}
50
51impl relay_protocol::Getter for ProfileMetadata {
52 fn get_value(&self, path: &str) -> Option<relay_protocol::Val<'_>> {
53 match path {
54 "release" => self.release.as_deref().map(|release| release.into()),
55 "platform" => Some(self.platform.as_str().into()),
56 _ => None,
57 }
58 }
59}
60
61#[derive(Debug, Serialize, Deserialize)]
62pub struct Sample {
63 pub timestamp: FiniteF64,
66 pub stack_id: usize,
68 pub thread_id: String,
70}
71
72#[derive(Debug, Serialize, Deserialize)]
73pub struct ProfileChunk {
74 #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
76 pub measurements: BTreeMap<String, ChunkMeasurement>,
77 #[serde(flatten)]
80 pub metadata: ProfileMetadata,
81 #[serde(default)]
82 pub profile: ProfileData,
83}
84
85impl ProfileChunk {
86 pub fn parse(payload: &[u8]) -> Result<Self, ProfileError> {
88 let d = &mut serde_json::Deserializer::from_slice(payload);
89 serde_path_to_error::deserialize(d).map_err(ProfileError::InvalidJson)
90 }
91
92 pub fn normalize(&mut self) -> Result<(), ProfileError> {
94 let platform = self.metadata.platform.as_str();
95 self.profile.normalize(platform)
96 }
97
98 pub fn serialize(&self) -> Result<Bytes, ProfileError> {
100 serde_json::to_vec(self)
101 .map(Bytes::from)
102 .map_err(|_| ProfileError::CannotSerializePayload)
103 }
104}
105
106impl crate::profile_chunk::ProfileChunk for ProfileChunk {
107 fn platform(&self) -> &str {
108 &self.metadata.platform
109 }
110
111 fn normalize(&mut self) -> Result<(), ProfileError> {
112 ProfileChunk::normalize(self)
113 }
114}
115
116impl relay_filter::Filterable for ProfileChunk {
117 fn release(&self) -> Option<&str> {
118 self.metadata.release.as_deref()
119 }
120}
121
122impl relay_protocol::Getter for ProfileChunk {
123 fn get_value(&self, path: &str) -> Option<relay_protocol::Val<'_>> {
124 self.metadata
125 .get_value(path.strip_prefix(crate::PROFIL_GETTER_PREFIX)?)
126 }
127}
128
129#[derive(Debug, Default, Serialize, Deserialize)]
130pub struct ProfileData {
131 pub samples: Vec<Sample>,
135 pub stacks: Vec<Vec<usize>>,
138 pub frames: Vec<Frame>,
140
141 #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
145 pub thread_metadata: BTreeMap<String, ThreadMetadata>,
146}
147
148impl ProfileData {
149 pub fn is_empty(&self) -> bool {
151 let Self {
152 samples,
153 stacks,
154 frames,
155 thread_metadata,
156 } = self;
157
158 samples.is_empty() && stacks.is_empty() && frames.is_empty() && thread_metadata.is_empty()
159 }
160
161 pub fn normalize(&mut self, platform: &str) -> Result<(), ProfileError> {
167 self.remove_single_samples_per_thread();
168
169 if self.samples.is_empty() {
170 return Err(ProfileError::NotEnoughSamples);
171 }
172
173 if !self.all_stacks_referenced_by_samples_exist() {
174 return Err(ProfileError::MalformedSamples);
175 }
176
177 if !self.all_frames_referenced_by_stacks_exist() {
178 return Err(ProfileError::MalformedStacks);
179 }
180
181 self.samples.sort_by_key(|s| s.timestamp);
182
183 if self.is_above_max_duration() {
184 return Err(ProfileError::DurationIsTooLong);
185 }
186
187 self.strip_pointer_authentication_code(platform);
188 self.remove_unreferenced_threads();
189
190 Ok(())
191 }
192
193 fn is_above_max_duration(&self) -> bool {
194 if self.samples.is_empty() {
195 return false;
196 }
197 let mut min = self.samples[0].timestamp;
198 let mut max = self.samples[0].timestamp;
199
200 for sample in self.samples.iter().skip(1) {
201 if sample.timestamp < min {
202 min = sample.timestamp
203 } else if sample.timestamp > max {
204 max = sample.timestamp
205 }
206 }
207
208 let duration = max.saturating_sub(min);
209 duration.to_f64() > MAX_PROFILE_CHUNK_DURATION_SECS
210 }
211
212 fn strip_pointer_authentication_code(&mut self, platform: &str) {
213 let addr = match platform {
214 "cocoa" => 0x0000000FFFFFFFFF,
216 _ => return,
217 };
218 for frame in &mut self.frames {
219 frame.strip_pointer_authentication_code(addr);
220 }
221 }
222
223 fn all_stacks_referenced_by_samples_exist(&self) -> bool {
225 self.samples
226 .iter()
227 .all(|sample| self.stacks.get(sample.stack_id).is_some())
228 }
229
230 fn all_frames_referenced_by_stacks_exist(&self) -> bool {
232 self.stacks.iter().all(|stack| {
233 stack
234 .iter()
235 .all(|frame_id| self.frames.get(*frame_id).is_some())
236 })
237 }
238
239 fn remove_unreferenced_threads(&mut self) {
240 let thread_ids = self
241 .samples
242 .iter()
243 .map(|sample| sample.thread_id.clone())
244 .collect::<HashSet<_>>();
245 self.thread_metadata
246 .retain(|thread_id, _| thread_ids.contains(thread_id));
247 }
248
249 fn remove_single_samples_per_thread(&mut self) {
251 let mut sample_count_by_thread_id: hashbrown::HashMap<String, u32> = HashMap::new();
252
253 for s in &self.samples {
254 if let Some(stack) = self.stacks.get(s.stack_id) {
255 if stack.is_empty() {
257 continue;
258 }
259 } else {
260 continue;
261 }
262 *sample_count_by_thread_id
263 .entry(s.thread_id.to_owned())
264 .or_default() += 1;
265 }
266
267 sample_count_by_thread_id.retain(|_, count| *count > 1);
268 self.samples
269 .retain(|sample| sample_count_by_thread_id.contains_key(&sample.thread_id));
270 }
271}
272
273#[cfg(test)]
274mod tests {
275 use relay_protocol::FiniteF64;
276
277 use super::*;
278
279 #[test]
280 fn test_roundtrip() {
281 let first_payload = include_bytes!("../../tests/fixtures/sample/v2/valid.json");
282 let first_parse = ProfileChunk::parse(first_payload);
283 assert!(first_parse.is_ok(), "{first_parse:#?}");
284 let second_payload = serde_json::to_vec(&first_parse.unwrap()).unwrap();
285 let second_parse = ProfileChunk::parse(&second_payload[..]);
286 assert!(second_parse.is_ok(), "{second_parse:#?}");
287 }
288
289 #[test]
290 fn test_samples_are_sorted() {
291 let mut chunk = ProfileData {
292 samples: vec![
293 Sample {
294 stack_id: 0,
295 thread_id: "1".into(),
296 timestamp: FiniteF64::new(60.0).unwrap(),
297 },
298 Sample {
299 stack_id: 0,
300 thread_id: "1".to_owned(),
301 timestamp: FiniteF64::new(30.0).unwrap(),
302 },
303 ],
304 stacks: vec![vec![0]],
305 frames: vec![Default::default()],
306 ..Default::default()
307 };
308
309 assert!(chunk.normalize("python").is_ok());
310
311 let timestamps: Vec<FiniteF64> = chunk.samples.iter().map(|s| s.timestamp).collect();
312
313 assert_eq!(
314 timestamps,
315 vec![FiniteF64::new(30.0).unwrap(), FiniteF64::new(60.0).unwrap(),]
316 );
317 }
318
319 #[test]
320 fn test_is_above_max_duration() {
321 struct TestStruct {
322 name: String,
323 profile: ProfileData,
324 want: bool,
325 }
326
327 let test_cases = [
328 TestStruct {
329 name: "not above max duration".to_owned(),
330 profile: ProfileData {
331 samples: vec![
332 Sample {
333 stack_id: 0,
334 thread_id: "1".into(),
335 timestamp: FiniteF64::new(30.0).unwrap(),
336 },
337 Sample {
338 stack_id: 0,
339 thread_id: "1".to_owned(),
340 timestamp: FiniteF64::new(60.0).unwrap(),
341 },
342 ],
343 stacks: vec![vec![0]],
344 frames: vec![Default::default()],
345 ..Default::default()
346 },
347 want: false,
348 },
349 TestStruct {
350 name: "above max duration".to_owned(),
351 profile: ProfileData {
352 samples: vec![
353 Sample {
354 stack_id: 0,
355 thread_id: "1".into(),
356 timestamp: FiniteF64::new(10.0).unwrap(),
357 },
358 Sample {
359 stack_id: 0,
360 thread_id: "1".to_owned(),
361 timestamp: FiniteF64::new(80.0).unwrap(),
362 },
363 ],
364 stacks: vec![vec![0]],
365 frames: vec![Default::default()],
366 ..Default::default()
367 },
368 want: true,
369 },
370 TestStruct {
371 name: "unsorted samples not above max duration".to_owned(),
372 profile: ProfileData {
373 samples: vec![
374 Sample {
375 stack_id: 0,
376 thread_id: "1".into(),
377 timestamp: FiniteF64::new(50.0).unwrap(),
378 },
379 Sample {
380 stack_id: 0,
381 thread_id: "1".to_owned(),
382 timestamp: FiniteF64::new(20.0).unwrap(),
383 },
384 ],
385 stacks: vec![vec![0]],
386 frames: vec![Default::default()],
387 ..Default::default()
388 },
389 want: false,
390 },
391 ];
392 for test in &test_cases {
393 assert_eq!(
394 test.profile.is_above_max_duration(),
395 test.want,
396 "test <{}> failed",
397 test.name
398 )
399 }
400 }
401
402 #[test]
403 fn test_single_samples_are_removed() {
404 let mut chunk = ProfileData {
405 samples: vec![
406 Sample {
407 stack_id: 1,
408 thread_id: "1".into(),
409 timestamp: FiniteF64::new(60.0).unwrap(),
410 },
411 Sample {
412 stack_id: 1,
413 thread_id: "1".into(),
414 timestamp: FiniteF64::new(60.0).unwrap(),
415 },
416 Sample {
417 stack_id: 0,
418 thread_id: "1".into(),
419 timestamp: FiniteF64::new(60.0).unwrap(),
420 },
421 Sample {
422 stack_id: 1,
423 thread_id: "1".into(),
424 timestamp: FiniteF64::new(60.0).unwrap(),
425 },
426 Sample {
427 stack_id: 0,
428 thread_id: "2".to_owned(),
429 timestamp: FiniteF64::new(30.0).unwrap(),
430 },
431 Sample {
432 stack_id: 1,
433 thread_id: "2".into(),
434 timestamp: FiniteF64::new(60.0).unwrap(),
435 },
436 Sample {
437 stack_id: 1,
438 thread_id: "2".into(),
439 timestamp: FiniteF64::new(60.0).unwrap(),
440 },
441 Sample {
442 stack_id: 0,
443 thread_id: "3".to_owned(),
444 timestamp: FiniteF64::new(30.0).unwrap(),
445 },
446 Sample {
447 stack_id: 0,
448 thread_id: "3".to_owned(),
449 timestamp: FiniteF64::new(30.0).unwrap(),
450 },
451 Sample {
452 stack_id: 1,
453 thread_id: "3".into(),
454 timestamp: FiniteF64::new(60.0).unwrap(),
455 },
456 Sample {
457 stack_id: 1,
458 thread_id: "3".into(),
459 timestamp: FiniteF64::new(60.0).unwrap(),
460 },
461 ],
462 stacks: vec![vec![0], vec![]],
463 frames: vec![Default::default()],
464 ..Default::default()
465 };
466
467 chunk.remove_single_samples_per_thread();
468
469 assert_eq!(chunk.samples.len(), 4);
471 }
472}