relay_server/metrics/
bucket_encoding.rs

1use std::io;
2
3use relay_dynamic_config::{BucketEncoding, GlobalConfig};
4use relay_metrics::{Bucket, BucketValue, MetricNamespace, SetView};
5use relay_protocol::FiniteF64;
6use serde::Serialize;
7
8static BASE64_NOPAD: data_encoding::Encoding = data_encoding::BASE64_NOPAD;
9
10pub struct BucketEncoder<'a> {
11    global_config: &'a GlobalConfig,
12    buffer: String,
13}
14
15impl<'a> BucketEncoder<'a> {
16    /// Creates a new bucket encoder with the provided config.
17    pub fn new(global_config: &'a GlobalConfig) -> Self {
18        Self {
19            global_config,
20            buffer: String::new(),
21        }
22    }
23
24    /// Prepares the bucket before encoding.
25    ///
26    /// Returns the namespace extracted from the bucket.
27    ///
28    /// Some encodings need the bucket to be sorted or otherwise modified,
29    /// afterwards the bucket can be split into multiple smaller views
30    /// and encoded one by one.
31    pub fn prepare(&self, bucket: &mut Bucket) -> MetricNamespace {
32        let namespace = bucket.name.namespace();
33
34        if let BucketValue::Distribution(ref mut distribution) = bucket.value {
35            let enc = self.global_config.options.metric_bucket_dist_encodings;
36            let enc = enc.for_namespace(namespace);
37
38            if matches!(enc, BucketEncoding::Zstd) {
39                distribution.sort_unstable();
40            }
41        }
42
43        namespace
44    }
45
46    /// Encodes a distribution.
47    pub fn encode_distribution<'data>(
48        &mut self,
49        namespace: MetricNamespace,
50        dist: &'data [FiniteF64],
51    ) -> io::Result<ArrayEncoding<'_, &'data [FiniteF64]>> {
52        let enc = self.global_config.options.metric_bucket_dist_encodings;
53        let enc = enc.for_namespace(namespace);
54        self.do_encode(enc, dist)
55    }
56
57    /// Encodes a set.
58    pub fn encode_set<'data>(
59        &mut self,
60        namespace: MetricNamespace,
61        set: SetView<'data>,
62    ) -> io::Result<ArrayEncoding<'_, SetView<'data>>> {
63        let enc = self.global_config.options.metric_bucket_set_encodings;
64        let enc = enc.for_namespace(namespace);
65        self.do_encode(enc, set)
66    }
67
68    fn do_encode<T: Encodable>(
69        &mut self,
70        enc: BucketEncoding,
71        data: T,
72    ) -> io::Result<ArrayEncoding<'_, T>> {
73        // If the buffer is not cleared before encoding more data,
74        // the new data will just be appended to the end.
75        self.buffer.clear();
76
77        match enc {
78            BucketEncoding::Legacy => Ok(ArrayEncoding::Legacy(data)),
79            BucketEncoding::Array => {
80                Ok(ArrayEncoding::Dynamic(DynamicArrayEncoding::Array { data }))
81            }
82            BucketEncoding::Base64 => base64(data, &mut self.buffer),
83            BucketEncoding::Zstd => zstd(data, &mut self.buffer),
84        }
85    }
86}
87
88/// Dynamic array encoding intended for distribution and set metric buckets.
89#[derive(Clone, Debug, Serialize)]
90#[serde(untagged)]
91pub enum ArrayEncoding<'a, T> {
92    /// The original, legacy, encoding.
93    ///
94    /// Encodes all values as an array of numbers.
95    Legacy(T),
96    /// Dynamic encoding supporting multiple formats.
97    ///
98    /// Adds metadata and adds support for multiple different encodings.
99    Dynamic(DynamicArrayEncoding<'a, T>),
100}
101
102impl<T> ArrayEncoding<'_, T> {
103    /// Name of the encoding.
104    ///
105    /// Should only be used for debugging purposes.
106    pub fn name(&self) -> &'static str {
107        match self {
108            Self::Legacy(_) => "legacy",
109            Self::Dynamic(dynamic) => dynamic.format(),
110        }
111    }
112}
113
114#[derive(Clone, Debug, Serialize)]
115#[serde(tag = "format", rename_all = "lowercase")]
116pub enum DynamicArrayEncoding<'a, T> {
117    /// Array encoding.
118    ///
119    /// Encodes all items as an array.
120    Array { data: T },
121    /// Base64 (with padding) encoding.
122    ///
123    /// Converts all items to little endian byte sequences
124    /// and Base64 encodes the raw little endian bytes.
125    Base64 { data: &'a str },
126    /// Zstd encoding.
127    ///
128    /// Converts all items to little endian byte sequences,
129    /// compresses the data using zstd and then encodes the result
130    /// using Base64 (with padding).
131    ///
132    /// Items may be sorted to achieve better compression results.
133    Zstd { data: &'a str },
134}
135
136impl<T> DynamicArrayEncoding<'_, T> {
137    /// Returns the serialized format name.
138    pub fn format(&self) -> &'static str {
139        match self {
140            DynamicArrayEncoding::Array { .. } => "array",
141            DynamicArrayEncoding::Base64 { .. } => "base64",
142            DynamicArrayEncoding::Zstd { .. } => "zstd",
143        }
144    }
145}
146
147fn base64<T: Encodable>(data: T, buffer: &mut String) -> io::Result<ArrayEncoding<T>> {
148    let mut writer = EncoderWriteAdapter(BASE64_NOPAD.new_encoder(buffer));
149    data.write_to(&mut writer)?;
150    drop(writer);
151
152    Ok(ArrayEncoding::Dynamic(DynamicArrayEncoding::Base64 {
153        data: buffer,
154    }))
155}
156
157fn zstd<T: Encodable>(data: T, buffer: &mut String) -> io::Result<ArrayEncoding<T>> {
158    // Use the fastest compression level, our main objective here is to get the best
159    // compression ratio for least amount of time spent.
160    let mut writer = zstd::Encoder::new(EncoderWriteAdapter(BASE64_NOPAD.new_encoder(buffer)), 1)?;
161    data.write_to(&mut writer)?;
162    writer.finish()?;
163
164    Ok(ArrayEncoding::Dynamic(DynamicArrayEncoding::Zstd {
165        data: buffer,
166    }))
167}
168
169struct EncoderWriteAdapter<'a>(pub data_encoding::Encoder<'a>);
170
171impl io::Write for EncoderWriteAdapter<'_> {
172    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
173        self.0.append(buf);
174        Ok(buf.len())
175    }
176
177    fn flush(&mut self) -> io::Result<()> {
178        Ok(())
179    }
180}
181
182trait Encodable {
183    fn write_to(&self, writer: impl io::Write) -> io::Result<()>;
184}
185
186impl Encodable for SetView<'_> {
187    #[inline(always)]
188    fn write_to(&self, mut writer: impl io::Write) -> io::Result<()> {
189        for value in self.iter() {
190            writer.write_all(&value.to_le_bytes())?;
191        }
192        Ok(())
193    }
194}
195
196impl Encodable for &[FiniteF64] {
197    #[inline(always)]
198    fn write_to(&self, mut writer: impl io::Write) -> io::Result<()> {
199        for value in self.iter() {
200            writer.write_all(&value.to_f64().to_le_bytes())?;
201        }
202        Ok(())
203    }
204}