relay_server/metrics/
bucket_encoding.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
use std::io;

use relay_dynamic_config::{BucketEncoding, GlobalConfig};
use relay_metrics::{Bucket, BucketValue, FiniteF64, MetricNamespace, SetView};
use serde::Serialize;

static BASE64_NOPAD: data_encoding::Encoding = data_encoding::BASE64_NOPAD;

pub struct BucketEncoder<'a> {
    global_config: &'a GlobalConfig,
    buffer: String,
}

impl<'a> BucketEncoder<'a> {
    /// Creates a new bucket encoder with the provided config.
    pub fn new(global_config: &'a GlobalConfig) -> Self {
        Self {
            global_config,
            buffer: String::new(),
        }
    }

    /// Prepares the bucket before encoding.
    ///
    /// Returns the namespace extracted from the bucket.
    ///
    /// Some encodings need the bucket to be sorted or otherwise modified,
    /// afterwards the bucket can be split into multiple smaller views
    /// and encoded one by one.
    pub fn prepare(&self, bucket: &mut Bucket) -> MetricNamespace {
        let namespace = bucket.name.namespace();

        if let BucketValue::Distribution(ref mut distribution) = bucket.value {
            let enc = self.global_config.options.metric_bucket_dist_encodings;
            let enc = enc.for_namespace(namespace);

            if matches!(enc, BucketEncoding::Zstd) {
                distribution.sort_unstable();
            }
        }

        namespace
    }

    /// Encodes a distribution.
    pub fn encode_distribution<'data>(
        &mut self,
        namespace: MetricNamespace,
        dist: &'data [FiniteF64],
    ) -> io::Result<ArrayEncoding<'_, &'data [FiniteF64]>> {
        let enc = self.global_config.options.metric_bucket_dist_encodings;
        let enc = enc.for_namespace(namespace);
        self.do_encode(enc, dist)
    }

    /// Encodes a set.
    pub fn encode_set<'data>(
        &mut self,
        namespace: MetricNamespace,
        set: SetView<'data>,
    ) -> io::Result<ArrayEncoding<'_, SetView<'data>>> {
        let enc = self.global_config.options.metric_bucket_set_encodings;
        let enc = enc.for_namespace(namespace);
        self.do_encode(enc, set)
    }

    fn do_encode<T: Encodable>(
        &mut self,
        enc: BucketEncoding,
        data: T,
    ) -> io::Result<ArrayEncoding<'_, T>> {
        // If the buffer is not cleared before encoding more data,
        // the new data will just be appended to the end.
        self.buffer.clear();

        match enc {
            BucketEncoding::Legacy => Ok(ArrayEncoding::Legacy(data)),
            BucketEncoding::Array => {
                Ok(ArrayEncoding::Dynamic(DynamicArrayEncoding::Array { data }))
            }
            BucketEncoding::Base64 => base64(data, &mut self.buffer),
            BucketEncoding::Zstd => zstd(data, &mut self.buffer),
        }
    }
}

/// Dynamic array encoding intended for distribution and set metric buckets.
#[derive(Clone, Debug, Serialize)]
#[serde(untagged)]
pub enum ArrayEncoding<'a, T> {
    /// The original, legacy, encoding.
    ///
    /// Encodes all values as an array of numbers.
    Legacy(T),
    /// Dynamic encoding supporting multiple formats.
    ///
    /// Adds metadata and adds support for multiple different encodings.
    Dynamic(DynamicArrayEncoding<'a, T>),
}

impl<T> ArrayEncoding<'_, T> {
    /// Name of the encoding.
    ///
    /// Should only be used for debugging purposes.
    pub fn name(&self) -> &'static str {
        match self {
            Self::Legacy(_) => "legacy",
            Self::Dynamic(dynamic) => dynamic.format(),
        }
    }
}

#[derive(Clone, Debug, Serialize)]
#[serde(tag = "format", rename_all = "lowercase")]
pub enum DynamicArrayEncoding<'a, T> {
    /// Array encoding.
    ///
    /// Encodes all items as an array.
    Array { data: T },
    /// Base64 (with padding) encoding.
    ///
    /// Converts all items to little endian byte sequences
    /// and Base64 encodes the raw little endian bytes.
    Base64 { data: &'a str },
    /// Zstd encoding.
    ///
    /// Converts all items to little endian byte sequences,
    /// compresses the data using zstd and then encodes the result
    /// using Base64 (with padding).
    ///
    /// Items may be sorted to achieve better compression results.
    Zstd { data: &'a str },
}

impl<T> DynamicArrayEncoding<'_, T> {
    /// Returns the serialized format name.
    pub fn format(&self) -> &'static str {
        match self {
            DynamicArrayEncoding::Array { .. } => "array",
            DynamicArrayEncoding::Base64 { .. } => "base64",
            DynamicArrayEncoding::Zstd { .. } => "zstd",
        }
    }
}

fn base64<T: Encodable>(data: T, buffer: &mut String) -> io::Result<ArrayEncoding<T>> {
    let mut writer = EncoderWriteAdapter(BASE64_NOPAD.new_encoder(buffer));
    data.write_to(&mut writer)?;
    drop(writer);

    Ok(ArrayEncoding::Dynamic(DynamicArrayEncoding::Base64 {
        data: buffer,
    }))
}

fn zstd<T: Encodable>(data: T, buffer: &mut String) -> io::Result<ArrayEncoding<T>> {
    let mut writer = zstd::Encoder::new(
        EncoderWriteAdapter(BASE64_NOPAD.new_encoder(buffer)),
        zstd::DEFAULT_COMPRESSION_LEVEL,
    )?;

    data.write_to(&mut writer)?;

    writer.finish()?;

    Ok(ArrayEncoding::Dynamic(DynamicArrayEncoding::Zstd {
        data: buffer,
    }))
}

struct EncoderWriteAdapter<'a>(pub data_encoding::Encoder<'a>);

impl io::Write for EncoderWriteAdapter<'_> {
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
        self.0.append(buf);
        Ok(buf.len())
    }

    fn flush(&mut self) -> io::Result<()> {
        Ok(())
    }
}

trait Encodable {
    fn write_to(&self, writer: impl io::Write) -> io::Result<()>;
}

impl Encodable for SetView<'_> {
    #[inline(always)]
    fn write_to(&self, mut writer: impl io::Write) -> io::Result<()> {
        for value in self.iter() {
            writer.write_all(&value.to_le_bytes())?;
        }
        Ok(())
    }
}

impl Encodable for &[FiniteF64] {
    #[inline(always)]
    fn write_to(&self, mut writer: impl io::Write) -> io::Result<()> {
        for value in self.iter() {
            writer.write_all(&value.to_f64().to_le_bytes())?;
        }
        Ok(())
    }
}