relay_server/metrics/
bucket_encoding.rs1use std::io;
2
3use relay_dynamic_config::{BucketEncoding, GlobalConfig};
4use relay_metrics::{Bucket, BucketValue, MetricNamespace, SetView};
5use relay_protocol::FiniteF64;
6use serde::Serialize;
7
8static BASE64_NOPAD: data_encoding::Encoding = data_encoding::BASE64_NOPAD;
9
10pub struct BucketEncoder<'a> {
11 global_config: &'a GlobalConfig,
12 buffer: String,
13}
14
15impl<'a> BucketEncoder<'a> {
16 pub fn new(global_config: &'a GlobalConfig) -> Self {
18 Self {
19 global_config,
20 buffer: String::new(),
21 }
22 }
23
24 pub fn prepare(&self, bucket: &mut Bucket) -> MetricNamespace {
32 let namespace = bucket.name.namespace();
33
34 if let BucketValue::Distribution(ref mut distribution) = bucket.value {
35 let enc = self.global_config.options.metric_bucket_dist_encodings;
36 let enc = enc.for_namespace(namespace);
37
38 if matches!(enc, BucketEncoding::Zstd) {
39 distribution.sort_unstable();
40 }
41 }
42
43 namespace
44 }
45
46 pub fn encode_distribution<'data>(
48 &mut self,
49 namespace: MetricNamespace,
50 dist: &'data [FiniteF64],
51 ) -> io::Result<ArrayEncoding<'_, &'data [FiniteF64]>> {
52 let enc = self.global_config.options.metric_bucket_dist_encodings;
53 let enc = enc.for_namespace(namespace);
54 self.do_encode(enc, dist)
55 }
56
57 pub fn encode_set<'data>(
59 &mut self,
60 namespace: MetricNamespace,
61 set: SetView<'data>,
62 ) -> io::Result<ArrayEncoding<'_, SetView<'data>>> {
63 let enc = self.global_config.options.metric_bucket_set_encodings;
64 let enc = enc.for_namespace(namespace);
65 self.do_encode(enc, set)
66 }
67
68 fn do_encode<T: Encodable>(
69 &mut self,
70 enc: BucketEncoding,
71 data: T,
72 ) -> io::Result<ArrayEncoding<'_, T>> {
73 self.buffer.clear();
76
77 match enc {
78 BucketEncoding::Legacy => Ok(ArrayEncoding::Legacy(data)),
79 BucketEncoding::Array => {
80 Ok(ArrayEncoding::Dynamic(DynamicArrayEncoding::Array { data }))
81 }
82 BucketEncoding::Base64 => base64(data, &mut self.buffer),
83 BucketEncoding::Zstd => zstd(data, &mut self.buffer),
84 }
85 }
86}
87
88#[derive(Clone, Debug, Serialize)]
90#[serde(untagged)]
91pub enum ArrayEncoding<'a, T> {
92 Legacy(T),
96 Dynamic(DynamicArrayEncoding<'a, T>),
100}
101
102impl<T> ArrayEncoding<'_, T> {
103 pub fn name(&self) -> &'static str {
107 match self {
108 Self::Legacy(_) => "legacy",
109 Self::Dynamic(dynamic) => dynamic.format(),
110 }
111 }
112}
113
114#[derive(Clone, Debug, Serialize)]
115#[serde(tag = "format", rename_all = "lowercase")]
116pub enum DynamicArrayEncoding<'a, T> {
117 Array { data: T },
121 Base64 { data: &'a str },
126 Zstd { data: &'a str },
134}
135
136impl<T> DynamicArrayEncoding<'_, T> {
137 pub fn format(&self) -> &'static str {
139 match self {
140 DynamicArrayEncoding::Array { .. } => "array",
141 DynamicArrayEncoding::Base64 { .. } => "base64",
142 DynamicArrayEncoding::Zstd { .. } => "zstd",
143 }
144 }
145}
146
147fn base64<T: Encodable>(data: T, buffer: &mut String) -> io::Result<ArrayEncoding<T>> {
148 let mut writer = EncoderWriteAdapter(BASE64_NOPAD.new_encoder(buffer));
149 data.write_to(&mut writer)?;
150 drop(writer);
151
152 Ok(ArrayEncoding::Dynamic(DynamicArrayEncoding::Base64 {
153 data: buffer,
154 }))
155}
156
157fn zstd<T: Encodable>(data: T, buffer: &mut String) -> io::Result<ArrayEncoding<T>> {
158 let mut writer = zstd::Encoder::new(EncoderWriteAdapter(BASE64_NOPAD.new_encoder(buffer)), 1)?;
161 data.write_to(&mut writer)?;
162 writer.finish()?;
163
164 Ok(ArrayEncoding::Dynamic(DynamicArrayEncoding::Zstd {
165 data: buffer,
166 }))
167}
168
169struct EncoderWriteAdapter<'a>(pub data_encoding::Encoder<'a>);
170
171impl io::Write for EncoderWriteAdapter<'_> {
172 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
173 self.0.append(buf);
174 Ok(buf.len())
175 }
176
177 fn flush(&mut self) -> io::Result<()> {
178 Ok(())
179 }
180}
181
182trait Encodable {
183 fn write_to(&self, writer: impl io::Write) -> io::Result<()>;
184}
185
186impl Encodable for SetView<'_> {
187 #[inline(always)]
188 fn write_to(&self, mut writer: impl io::Write) -> io::Result<()> {
189 for value in self.iter() {
190 writer.write_all(&value.to_le_bytes())?;
191 }
192 Ok(())
193 }
194}
195
196impl Encodable for &[FiniteF64] {
197 #[inline(always)]
198 fn write_to(&self, mut writer: impl io::Write) -> io::Result<()> {
199 for value in self.iter() {
200 writer.write_all(&value.to_f64().to_le_bytes())?;
201 }
202 Ok(())
203 }
204}