Skip to main content

relay_profiling/sample/
v2.rs

1//! Sample Format V2
2//!
3//! This version of the sample format expects a collection of samples to be sent with no reference
4//! to the events collected while the profiler was running.
5//!
6//! We collect a profiler ID, meaning to be a random identifier for this specific instance of the
7//! profiler and not a persistent ID. It only needs to be valid from the start of the profiler to
8//! when it stops and will be useful to then group samples on the backend.
9//!
10//! Spans are expected to carry the profiler ID to know which samples are associated with them.
11//!
12use bytes::Bytes;
13use hashbrown::HashMap;
14use serde::{Deserialize, Serialize};
15use std::collections::{BTreeMap, HashSet};
16
17use relay_event_schema::protocol::EventId;
18use relay_protocol::FiniteF64;
19
20use crate::MAX_PROFILE_CHUNK_DURATION;
21use crate::error::ProfileError;
22use crate::measurements::ChunkMeasurement;
23use crate::sample::{DebugMeta, Frame, ThreadMetadata, Version};
24use crate::types::ClientSdk;
25
26const MAX_PROFILE_CHUNK_DURATION_SECS: f64 = MAX_PROFILE_CHUNK_DURATION.as_secs_f64();
27
28#[derive(Debug, Serialize, Deserialize)]
29pub struct ProfileMetadata {
30    /// Random UUID identifying a chunk
31    pub chunk_id: EventId,
32    /// Random UUID for each profiler session
33    pub profiler_id: EventId,
34
35    #[serde(default, skip_serializing_if = "DebugMeta::is_empty")]
36    pub debug_meta: DebugMeta,
37
38    #[serde(skip_serializing_if = "Option::is_none")]
39    pub environment: Option<String>,
40    pub platform: String,
41    #[serde(skip_serializing_if = "Option::is_none")]
42    pub content_type: Option<String>,
43    pub release: Option<String>,
44
45    pub client_sdk: ClientSdk,
46
47    /// Hard-coded string containing "2" to indicate the format version.
48    pub version: Version,
49}
50
51impl relay_protocol::Getter for ProfileMetadata {
52    fn get_value(&self, path: &str) -> Option<relay_protocol::Val<'_>> {
53        match path {
54            "release" => self.release.as_deref().map(|release| release.into()),
55            "platform" => Some(self.platform.as_str().into()),
56            _ => None,
57        }
58    }
59}
60
61#[derive(Debug, Serialize, Deserialize)]
62pub struct Sample {
63    /// Unix timestamp in seconds with millisecond precision when the sample
64    /// was captured.
65    pub timestamp: FiniteF64,
66    /// Index of the stack in the `stacks` field of the profile.
67    pub stack_id: usize,
68    /// Thread or queue identifier
69    pub thread_id: String,
70}
71
72#[derive(Debug, Serialize, Deserialize)]
73pub struct ProfileChunk {
74    // `measurements` contains CPU/memory measurements we do during the capture of the chunk.
75    #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
76    pub measurements: BTreeMap<String, ChunkMeasurement>,
77    /// This struct contains all the metadata related to the chunk but all fields are expected to
78    /// be at the top-level of the object.
79    #[serde(flatten)]
80    pub metadata: ProfileMetadata,
81    #[serde(default)]
82    pub profile: ProfileData,
83}
84
85impl ProfileChunk {
86    /// Parses a [`ProfileChunk`] from a JSON `payload`.
87    pub fn parse(payload: &[u8]) -> Result<Self, ProfileError> {
88        let d = &mut serde_json::Deserializer::from_slice(payload);
89        serde_path_to_error::deserialize(d).map_err(ProfileError::InvalidJson)
90    }
91
92    /// Normalizes the [`ProfileChunk`].
93    pub fn normalize(&mut self) -> Result<(), ProfileError> {
94        let platform = self.metadata.platform.as_str();
95        self.profile.normalize(platform)
96    }
97
98    /// Serializes the [`ProfileChunk`] into its JSON form.
99    pub fn serialize(&self) -> Result<Bytes, ProfileError> {
100        serde_json::to_vec(self)
101            .map(Bytes::from)
102            .map_err(|_| ProfileError::CannotSerializePayload)
103    }
104}
105
106impl crate::profile_chunk::ProfileChunk for ProfileChunk {
107    fn platform(&self) -> &str {
108        &self.metadata.platform
109    }
110
111    fn normalize(&mut self) -> Result<(), ProfileError> {
112        ProfileChunk::normalize(self)
113    }
114}
115
116impl relay_filter::Filterable for ProfileChunk {
117    fn release(&self) -> Option<&str> {
118        self.metadata.release.as_deref()
119    }
120}
121
122impl relay_protocol::Getter for ProfileChunk {
123    fn get_value(&self, path: &str) -> Option<relay_protocol::Val<'_>> {
124        self.metadata
125            .get_value(path.strip_prefix(crate::PROFIL_GETTER_PREFIX)?)
126    }
127}
128
129#[derive(Debug, Default, Serialize, Deserialize)]
130pub struct ProfileData {
131    /// `samples` contains the list of samples referencing a stack and thread identifier.
132    /// If 2 stack of frames captured at 2 different timestamps are identical, you're expected to
133    /// reference the same `stack_id`.
134    pub samples: Vec<Sample>,
135    /// `stacks` contains a list of stacks indicating the index of the frame in the `frames` field.
136    /// We do this to not have to repeat frames in different stacks.
137    pub stacks: Vec<Vec<usize>>,
138    /// `frames` contains a list of unique frames found in the profile.
139    pub frames: Vec<Frame>,
140
141    /// `thread_metadata` contains information about the thread or the queue. The identifier is a
142    /// string and can be any unique identifier for the thread or stack (an integer or an address
143    /// for example).
144    #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
145    pub thread_metadata: BTreeMap<String, ThreadMetadata>,
146}
147
148impl ProfileData {
149    /// Returns `true` if the [`ProfileData`] does not contain any data.
150    pub fn is_empty(&self) -> bool {
151        let Self {
152            samples,
153            stacks,
154            frames,
155            thread_metadata,
156        } = self;
157
158        samples.is_empty() && stacks.is_empty() && frames.is_empty() && thread_metadata.is_empty()
159    }
160
161    /// Ensures valid profile chunk or returns an error.
162    ///
163    /// Mutates the profile chunk. Removes invalid samples and threads.
164    /// Throws an error if the profile chunk is malformed.
165    /// Removes extra metadata that are not referenced in the samples.
166    pub fn normalize(&mut self, platform: &str) -> Result<(), ProfileError> {
167        self.remove_single_samples_per_thread();
168
169        if self.samples.is_empty() {
170            return Err(ProfileError::NotEnoughSamples);
171        }
172
173        if !self.all_stacks_referenced_by_samples_exist() {
174            return Err(ProfileError::MalformedSamples);
175        }
176
177        if !self.all_frames_referenced_by_stacks_exist() {
178            return Err(ProfileError::MalformedStacks);
179        }
180
181        self.samples.sort_by_key(|s| s.timestamp);
182
183        if self.is_above_max_duration() {
184            return Err(ProfileError::DurationIsTooLong);
185        }
186
187        self.strip_pointer_authentication_code(platform);
188        self.remove_unreferenced_threads();
189
190        Ok(())
191    }
192
193    fn is_above_max_duration(&self) -> bool {
194        if self.samples.is_empty() {
195            return false;
196        }
197        let mut min = self.samples[0].timestamp;
198        let mut max = self.samples[0].timestamp;
199
200        for sample in self.samples.iter().skip(1) {
201            if sample.timestamp < min {
202                min = sample.timestamp
203            } else if sample.timestamp > max {
204                max = sample.timestamp
205            }
206        }
207
208        let duration = max.saturating_sub(min);
209        duration.to_f64() > MAX_PROFILE_CHUNK_DURATION_SECS
210    }
211
212    fn strip_pointer_authentication_code(&mut self, platform: &str) {
213        let addr = match platform {
214            // https://github.com/microsoft/plcrashreporter/blob/748087386cfc517936315c107f722b146b0ad1ab/Source/PLCrashAsyncThread_arm.c#L84
215            "cocoa" => 0x0000000FFFFFFFFF,
216            _ => return,
217        };
218        for frame in &mut self.frames {
219            frame.strip_pointer_authentication_code(addr);
220        }
221    }
222
223    /// Checks that all stacks referenced by the samples exist in the stacks.
224    fn all_stacks_referenced_by_samples_exist(&self) -> bool {
225        self.samples
226            .iter()
227            .all(|sample| self.stacks.get(sample.stack_id).is_some())
228    }
229
230    /// Checks that all frames referenced by the stacks exist in the frames.
231    fn all_frames_referenced_by_stacks_exist(&self) -> bool {
232        self.stacks.iter().all(|stack| {
233            stack
234                .iter()
235                .all(|frame_id| self.frames.get(*frame_id).is_some())
236        })
237    }
238
239    fn remove_unreferenced_threads(&mut self) {
240        let thread_ids = self
241            .samples
242            .iter()
243            .map(|sample| sample.thread_id.clone())
244            .collect::<HashSet<_>>();
245        self.thread_metadata
246            .retain(|thread_id, _| thread_ids.contains(thread_id));
247    }
248
249    /// Removes a sample when it's the only non-idle sample on its thread
250    fn remove_single_samples_per_thread(&mut self) {
251        let mut sample_count_by_thread_id: hashbrown::HashMap<String, u32> = HashMap::new();
252
253        for s in &self.samples {
254            if let Some(stack) = self.stacks.get(s.stack_id) {
255                // We only count non-idle samples
256                if stack.is_empty() {
257                    continue;
258                }
259            } else {
260                continue;
261            }
262            *sample_count_by_thread_id
263                .entry(s.thread_id.to_owned())
264                .or_default() += 1;
265        }
266
267        sample_count_by_thread_id.retain(|_, count| *count > 1);
268        self.samples
269            .retain(|sample| sample_count_by_thread_id.contains_key(&sample.thread_id));
270    }
271}
272
273#[cfg(test)]
274mod tests {
275    use relay_protocol::FiniteF64;
276
277    use super::*;
278
279    #[test]
280    fn test_roundtrip() {
281        let first_payload = include_bytes!("../../tests/fixtures/sample/v2/valid.json");
282        let first_parse = ProfileChunk::parse(first_payload);
283        assert!(first_parse.is_ok(), "{first_parse:#?}");
284        let second_payload = serde_json::to_vec(&first_parse.unwrap()).unwrap();
285        let second_parse = ProfileChunk::parse(&second_payload[..]);
286        assert!(second_parse.is_ok(), "{second_parse:#?}");
287    }
288
289    #[test]
290    fn test_samples_are_sorted() {
291        let mut chunk = ProfileData {
292            samples: vec![
293                Sample {
294                    stack_id: 0,
295                    thread_id: "1".into(),
296                    timestamp: FiniteF64::new(60.0).unwrap(),
297                },
298                Sample {
299                    stack_id: 0,
300                    thread_id: "1".to_owned(),
301                    timestamp: FiniteF64::new(30.0).unwrap(),
302                },
303            ],
304            stacks: vec![vec![0]],
305            frames: vec![Default::default()],
306            ..Default::default()
307        };
308
309        assert!(chunk.normalize("python").is_ok());
310
311        let timestamps: Vec<FiniteF64> = chunk.samples.iter().map(|s| s.timestamp).collect();
312
313        assert_eq!(
314            timestamps,
315            vec![FiniteF64::new(30.0).unwrap(), FiniteF64::new(60.0).unwrap(),]
316        );
317    }
318
319    #[test]
320    fn test_is_above_max_duration() {
321        struct TestStruct {
322            name: String,
323            profile: ProfileData,
324            want: bool,
325        }
326
327        let test_cases = [
328            TestStruct {
329                name: "not above max duration".to_owned(),
330                profile: ProfileData {
331                    samples: vec![
332                        Sample {
333                            stack_id: 0,
334                            thread_id: "1".into(),
335                            timestamp: FiniteF64::new(30.0).unwrap(),
336                        },
337                        Sample {
338                            stack_id: 0,
339                            thread_id: "1".to_owned(),
340                            timestamp: FiniteF64::new(60.0).unwrap(),
341                        },
342                    ],
343                    stacks: vec![vec![0]],
344                    frames: vec![Default::default()],
345                    ..Default::default()
346                },
347                want: false,
348            },
349            TestStruct {
350                name: "above max duration".to_owned(),
351                profile: ProfileData {
352                    samples: vec![
353                        Sample {
354                            stack_id: 0,
355                            thread_id: "1".into(),
356                            timestamp: FiniteF64::new(10.0).unwrap(),
357                        },
358                        Sample {
359                            stack_id: 0,
360                            thread_id: "1".to_owned(),
361                            timestamp: FiniteF64::new(80.0).unwrap(),
362                        },
363                    ],
364                    stacks: vec![vec![0]],
365                    frames: vec![Default::default()],
366                    ..Default::default()
367                },
368                want: true,
369            },
370            TestStruct {
371                name: "unsorted samples not above max duration".to_owned(),
372                profile: ProfileData {
373                    samples: vec![
374                        Sample {
375                            stack_id: 0,
376                            thread_id: "1".into(),
377                            timestamp: FiniteF64::new(50.0).unwrap(),
378                        },
379                        Sample {
380                            stack_id: 0,
381                            thread_id: "1".to_owned(),
382                            timestamp: FiniteF64::new(20.0).unwrap(),
383                        },
384                    ],
385                    stacks: vec![vec![0]],
386                    frames: vec![Default::default()],
387                    ..Default::default()
388                },
389                want: false,
390            },
391        ];
392        for test in &test_cases {
393            assert_eq!(
394                test.profile.is_above_max_duration(),
395                test.want,
396                "test <{}> failed",
397                test.name
398            )
399        }
400    }
401
402    #[test]
403    fn test_single_samples_are_removed() {
404        let mut chunk = ProfileData {
405            samples: vec![
406                Sample {
407                    stack_id: 1,
408                    thread_id: "1".into(),
409                    timestamp: FiniteF64::new(60.0).unwrap(),
410                },
411                Sample {
412                    stack_id: 1,
413                    thread_id: "1".into(),
414                    timestamp: FiniteF64::new(60.0).unwrap(),
415                },
416                Sample {
417                    stack_id: 0,
418                    thread_id: "1".into(),
419                    timestamp: FiniteF64::new(60.0).unwrap(),
420                },
421                Sample {
422                    stack_id: 1,
423                    thread_id: "1".into(),
424                    timestamp: FiniteF64::new(60.0).unwrap(),
425                },
426                Sample {
427                    stack_id: 0,
428                    thread_id: "2".to_owned(),
429                    timestamp: FiniteF64::new(30.0).unwrap(),
430                },
431                Sample {
432                    stack_id: 1,
433                    thread_id: "2".into(),
434                    timestamp: FiniteF64::new(60.0).unwrap(),
435                },
436                Sample {
437                    stack_id: 1,
438                    thread_id: "2".into(),
439                    timestamp: FiniteF64::new(60.0).unwrap(),
440                },
441                Sample {
442                    stack_id: 0,
443                    thread_id: "3".to_owned(),
444                    timestamp: FiniteF64::new(30.0).unwrap(),
445                },
446                Sample {
447                    stack_id: 0,
448                    thread_id: "3".to_owned(),
449                    timestamp: FiniteF64::new(30.0).unwrap(),
450                },
451                Sample {
452                    stack_id: 1,
453                    thread_id: "3".into(),
454                    timestamp: FiniteF64::new(60.0).unwrap(),
455                },
456                Sample {
457                    stack_id: 1,
458                    thread_id: "3".into(),
459                    timestamp: FiniteF64::new(60.0).unwrap(),
460                },
461            ],
462            stacks: vec![vec![0], vec![]],
463            frames: vec![Default::default()],
464            ..Default::default()
465        };
466
467        chunk.remove_single_samples_per_thread();
468
469        // Only 4 samples from thread_id 3 are retained.
470        assert_eq!(chunk.samples.len(), 4);
471    }
472}