relay_statsd/
lib.rs

1//! A high-level StatsD metric client built on cadence.
2//!
3//! ## Defining Metrics
4//!
5//! In order to use metrics, one needs to first define one of the metric traits on a custom enum.
6//! The following types of metrics are available: `counter`, `timer`, `gauge`, `histogram`, and
7//! `set`. For explanations on what that means see [Metric Types].
8//!
9//! The metric traits serve only to provide a type safe metric name. All metric types have exactly
10//! the same form, they are different only to ensure that a metric can only be used for the type for
11//! which it was defined, (e.g. a counter metric cannot be used as a timer metric). See the traits
12//! for more detailed examples.
13//!
14//! ## Initializing the Client
15//!
16//! Metrics can be used without initializing a statsd client. In that case, invoking `with_client`
17//! or the [`metric!`] macro will become a noop. Only when configured, metrics will actually be
18//! collected.
19//!
20//! To initialize the client, either use [`set_client`] to pass a custom client, or use
21//! [`init`] to create a default client with known arguments:
22//!
23//! ```no_run
24//! # use std::collections::BTreeMap;
25//!
26//! relay_statsd::init("myprefix", "localhost:8125", BTreeMap::new(), 1.0, true);
27//! ```
28//!
29//! ## Macro Usage
30//!
31//! The recommended way to record metrics is by using the [`metric!`] macro. See the trait docs
32//! for more information on how to record each type of metric.
33//!
34//! ```
35//! use relay_statsd::{metric, CounterMetric};
36//!
37//! struct MyCounter;
38//!
39//! impl CounterMetric for MyCounter {
40//!     fn name(&self) -> &'static str {
41//!         "counter"
42//!     }
43//! }
44//!
45//! metric!(counter(MyCounter) += 1);
46//! ```
47//!
48//! ## Manual Usage
49//!
50//! ```
51//! use relay_statsd::prelude::*;
52//!
53//! relay_statsd::with_client(|client| {
54//!     client.count("mymetric", 1).ok();
55//! });
56//! ```
57//!
58//! [Metric Types]: https://github.com/statsd/statsd/blob/master/docs/metric_types.md
59use cadence::{Metric, MetricBuilder, StatsdClient};
60use parking_lot::RwLock;
61use rand::distributions::{Distribution, Uniform};
62use statsdproxy::cadence::StatsdProxyMetricSink;
63use statsdproxy::config::AggregateMetricsConfig;
64use std::collections::BTreeMap;
65use std::net::ToSocketAddrs;
66use std::ops::{Deref, DerefMut};
67use std::sync::Arc;
68use std::time::Duration;
69
70/// Maximum number of metric events that can be queued before we start dropping them
71const METRICS_MAX_QUEUE_SIZE: usize = 100_000;
72
73/// Client configuration object to store globally.
74#[derive(Debug)]
75pub struct MetricsClient {
76    /// The raw statsd client.
77    pub statsd_client: StatsdClient,
78    /// Default tags to apply to every metric.
79    pub default_tags: BTreeMap<String, String>,
80    /// Global sample rate.
81    pub sample_rate: f32,
82    /// Receiver for external listeners.
83    ///
84    /// Only available when the client was initialized with `init_basic`.
85    pub rx: Option<crossbeam_channel::Receiver<Vec<u8>>>,
86}
87
88impl Deref for MetricsClient {
89    type Target = StatsdClient;
90
91    fn deref(&self) -> &StatsdClient {
92        &self.statsd_client
93    }
94}
95
96impl DerefMut for MetricsClient {
97    fn deref_mut(&mut self) -> &mut StatsdClient {
98        &mut self.statsd_client
99    }
100}
101
102impl MetricsClient {
103    /// Send a metric with the default tags defined on this `MetricsClient`.
104    #[inline(always)]
105    pub fn send_metric<'a, T>(&'a self, mut metric: MetricBuilder<'a, '_, T>)
106    where
107        T: Metric + From<String>,
108    {
109        if !self._should_send() {
110            return;
111        }
112
113        for (k, v) in &self.default_tags {
114            metric = metric.with_tag(k, v);
115        }
116
117        if let Err(error) = metric.try_send() {
118            relay_log::error!(
119                error = &error as &dyn std::error::Error,
120                maximum_capacity = METRICS_MAX_QUEUE_SIZE,
121                "Error sending a metric",
122            );
123        }
124    }
125
126    fn _should_send(&self) -> bool {
127        if self.sample_rate <= 0.0 {
128            false
129        } else if self.sample_rate >= 1.0 {
130            true
131        } else {
132            // Using thread local RNG and uniform distribution here because Rng::gen_range is
133            // "optimized for the case that only a single sample is made from the given range".
134            // See https://docs.rs/rand/0.7.3/rand/distributions/uniform/struct.Uniform.html for more
135            // details.
136            let mut rng = rand::thread_rng();
137            RNG_UNIFORM_DISTRIBUTION
138                .with(|uniform_dist| uniform_dist.sample(&mut rng) <= self.sample_rate)
139        }
140    }
141}
142
143static METRICS_CLIENT: RwLock<Option<Arc<MetricsClient>>> = RwLock::new(None);
144
145thread_local! {
146    static CURRENT_CLIENT: std::cell::RefCell<Option<Arc<MetricsClient>>>  = METRICS_CLIENT.read().clone().into();
147    static RNG_UNIFORM_DISTRIBUTION: Uniform<f32> = Uniform::new(0.0, 1.0);
148}
149
150/// Internal prelude for the macro
151#[doc(hidden)]
152pub mod _pred {
153    pub use cadence::prelude::*;
154}
155
156/// The metrics prelude that is necessary to use the client.
157pub mod prelude {
158    pub use cadence::prelude::*;
159}
160
161/// Set a new statsd client.
162pub fn set_client(client: MetricsClient) {
163    *METRICS_CLIENT.write() = Some(Arc::new(client));
164    CURRENT_CLIENT.with(|cell| cell.replace(METRICS_CLIENT.read().clone()));
165}
166
167/// Set a test client for the period of the called function (only affects the current thread).
168// TODO: replace usages with `init_basic`
169pub fn with_capturing_test_client(f: impl FnOnce()) -> Vec<String> {
170    let (rx, sink) = cadence::SpyMetricSink::new();
171    let test_client = MetricsClient {
172        statsd_client: StatsdClient::from_sink("", sink),
173        default_tags: Default::default(),
174        sample_rate: 1.0,
175        rx: None,
176    };
177
178    CURRENT_CLIENT.with(|cell| {
179        let old_client = cell.replace(Some(Arc::new(test_client)));
180        f();
181        cell.replace(old_client);
182    });
183
184    rx.iter().map(|x| String::from_utf8(x).unwrap()).collect()
185}
186
187// Setup a simple metrics listener.
188//
189// Returns `None` if the global metrics client has already been configured.
190pub fn init_basic() -> Option<crossbeam_channel::Receiver<Vec<u8>>> {
191    CURRENT_CLIENT.with(|cell| {
192        if cell.borrow().is_none() {
193            // Setup basic observable metrics sink.
194            let (receiver, sink) = cadence::SpyMetricSink::new();
195            let test_client = MetricsClient {
196                statsd_client: StatsdClient::from_sink("", sink),
197                default_tags: Default::default(),
198                sample_rate: 1.0,
199                rx: Some(receiver.clone()),
200            };
201            cell.replace(Some(Arc::new(test_client)));
202        }
203    });
204
205    CURRENT_CLIENT.with(|cell| {
206        cell.borrow()
207            .as_deref()
208            .and_then(|client| match &client.rx {
209                Some(rx) => Some(rx.clone()),
210                None => {
211                    relay_log::error!("Metrics client was already set up.");
212                    None
213                }
214            })
215    })
216}
217
218/// Disable the client again.
219pub fn disable() {
220    *METRICS_CLIENT.write() = None;
221}
222
223/// Tell the metrics system to report to statsd.
224pub fn init<A: ToSocketAddrs>(
225    prefix: &str,
226    host: A,
227    default_tags: BTreeMap<String, String>,
228    sample_rate: f32,
229    aggregate: bool,
230) {
231    let addrs: Vec<_> = host.to_socket_addrs().unwrap().collect();
232    if !addrs.is_empty() {
233        relay_log::info!("reporting metrics to statsd at {}", addrs[0]);
234    }
235
236    // Normalize sample_rate
237    let sample_rate = sample_rate.clamp(0., 1.);
238    relay_log::debug!(
239        "metrics sample rate is set to {sample_rate}{}",
240        if sample_rate == 0.0 {
241            ", no metrics will be reported"
242        } else {
243            ""
244        }
245    );
246
247    let statsd_client = if aggregate {
248        let statsdproxy_sink = StatsdProxyMetricSink::new(move || {
249            let upstream = statsdproxy::middleware::upstream::Upstream::new(addrs[0])
250                .expect("failed to create statsdproxy metric sink");
251
252            statsdproxy::middleware::aggregate::AggregateMetrics::new(
253                AggregateMetricsConfig {
254                    aggregate_gauges: true,
255                    aggregate_counters: true,
256                    flush_interval: Duration::from_millis(50),
257                    flush_offset: 0,
258                    max_map_size: None,
259                },
260                upstream,
261            )
262        });
263
264        StatsdClient::from_sink(prefix, statsdproxy_sink)
265    } else {
266        let statsdproxy_sink = StatsdProxyMetricSink::new(move || {
267            statsdproxy::middleware::upstream::Upstream::new(addrs[0])
268                .expect("failed to create statsdproxy metric sind")
269        });
270        StatsdClient::from_sink(prefix, statsdproxy_sink)
271    };
272
273    set_client(MetricsClient {
274        statsd_client,
275        default_tags,
276        sample_rate,
277        rx: None,
278    });
279}
280
281/// Invoke a callback with the current statsd client.
282///
283/// If statsd is not configured the callback is not invoked.  For the most part
284/// the [`metric!`] macro should be used instead.
285#[inline(always)]
286pub fn with_client<F, R>(f: F) -> R
287where
288    F: FnOnce(&MetricsClient) -> R,
289    R: Default,
290{
291    CURRENT_CLIENT.with(|client| {
292        if let Some(client) = client.borrow().as_deref() {
293            f(client)
294        } else {
295            R::default()
296        }
297    })
298}
299
300/// A metric for capturing timings.
301///
302/// Timings are a positive number of milliseconds between a start and end time. Examples include
303/// time taken to render a web page or time taken for a database call to return.
304///
305/// ## Example
306///
307/// ```
308/// use relay_statsd::{metric, TimerMetric};
309///
310/// enum MyTimer {
311///     ProcessA,
312///     ProcessB,
313/// }
314///
315/// impl TimerMetric for MyTimer {
316///     fn name(&self) -> &'static str {
317///         match self {
318///             Self::ProcessA => "process_a",
319///             Self::ProcessB => "process_b",
320///         }
321///     }
322/// }
323///
324/// # fn process_a() {}
325///
326/// // measure time by explicitly setting a std::timer::Duration
327/// # use std::time::Instant;
328/// let start_time = Instant::now();
329/// process_a();
330/// metric!(timer(MyTimer::ProcessA) = start_time.elapsed());
331///
332/// // provide tags to a timer
333/// metric!(
334///     timer(MyTimer::ProcessA) = start_time.elapsed(),
335///     server = "server1",
336///     host = "host1",
337/// );
338///
339/// // measure time implicitly by enclosing a code block in a metric
340/// metric!(timer(MyTimer::ProcessA), {
341///     process_a();
342/// });
343///
344/// // measure block and also provide tags
345/// metric!(
346///     timer(MyTimer::ProcessB),
347///     server = "server1",
348///     host = "host1",
349///     {
350///         process_a();
351///     }
352/// );
353///
354/// ```
355pub trait TimerMetric {
356    /// Returns the timer metric name that will be sent to statsd.
357    fn name(&self) -> &'static str;
358}
359
360/// A metric for capturing counters.
361///
362/// Counters are simple values incremented or decremented by a client. The rates at which these
363/// events occur or average values will be determined by the server receiving them. Examples of
364/// counter uses include number of logins to a system or requests received.
365///
366/// ## Example
367///
368/// ```
369/// use relay_statsd::{metric, CounterMetric};
370///
371/// enum MyCounter {
372///     TotalRequests,
373///     TotalBytes,
374/// }
375///
376/// impl CounterMetric for MyCounter {
377///     fn name(&self) -> &'static str {
378///         match self {
379///             Self::TotalRequests => "total_requests",
380///             Self::TotalBytes => "total_bytes",
381///         }
382///     }
383/// }
384///
385/// # let buffer = &[(), ()];
386///
387/// // add to the counter
388/// metric!(counter(MyCounter::TotalRequests) += 1);
389/// metric!(counter(MyCounter::TotalBytes) += buffer.len() as i64);
390///
391/// // add to the counter and provide tags
392/// metric!(
393///     counter(MyCounter::TotalRequests) += 1,
394///     server = "s1",
395///     host = "h1"
396/// );
397///
398/// // subtract from the counter
399/// metric!(counter(MyCounter::TotalRequests) -= 1);
400///
401/// // subtract from the counter and provide tags
402/// metric!(
403///     counter(MyCounter::TotalRequests) -= 1,
404///     server = "s1",
405///     host = "h1"
406/// );
407/// ```
408pub trait CounterMetric {
409    /// Returns the counter metric name that will be sent to statsd.
410    fn name(&self) -> &'static str;
411}
412
413/// A metric for capturing histograms.
414///
415/// Histograms are values whose distribution is calculated by the server. The distribution
416/// calculated for histograms is often similar to that of timers. Histograms can be thought of as a
417/// more general (not limited to timing things) form of timers.
418///
419/// ## Example
420///
421/// ```
422/// use relay_statsd::{metric, HistogramMetric};
423///
424/// struct QueueSize;
425///
426/// impl HistogramMetric for QueueSize {
427///     fn name(&self) -> &'static str {
428///         "queue_size"
429///     }
430/// }
431///
432/// # use std::collections::VecDeque;
433/// let queue = VecDeque::new();
434/// # let _hint: &VecDeque<()> = &queue;
435///
436/// // record a histogram value
437/// metric!(histogram(QueueSize) = queue.len() as u64);
438///
439/// // record with tags
440/// metric!(
441///     histogram(QueueSize) = queue.len() as u64,
442///     server = "server1",
443///     host = "host1",
444/// );
445/// ```
446pub trait HistogramMetric {
447    /// Returns the histogram metric name that will be sent to statsd.
448    fn name(&self) -> &'static str;
449}
450
451/// A metric for capturing sets.
452///
453/// Sets count the number of unique elements in a group. You can use them to, for example, count the
454/// unique visitors to your site.
455///
456/// ## Example
457///
458/// ```
459/// use relay_statsd::{metric, SetMetric};
460///
461/// enum MySet {
462///     UniqueProjects,
463///     UniqueUsers,
464/// }
465///
466/// impl SetMetric for MySet {
467///     fn name(&self) -> &'static str {
468///         match self {
469///             MySet::UniqueProjects => "unique_projects",
470///             MySet::UniqueUsers => "unique_users",
471///         }
472///     }
473/// }
474///
475/// # use std::collections::HashSet;
476/// let users = HashSet::new();
477/// # let _hint: &HashSet<()> = &users;
478///
479/// // use a set metric
480/// metric!(set(MySet::UniqueUsers) = users.len() as i64);
481///
482/// // use a set metric with tags
483/// metric!(
484///     set(MySet::UniqueUsers) = users.len() as i64,
485///     server = "server1",
486///     host = "host1",
487/// );
488/// ```
489pub trait SetMetric {
490    /// Returns the set metric name that will be sent to statsd.
491    fn name(&self) -> &'static str;
492}
493
494/// A metric for capturing gauges.
495///
496/// Gauge values are an instantaneous measurement of a value determined by the client. They do not
497/// change unless changed by the client. Examples include things like load average or how many
498/// connections are active.
499///
500/// ## Example
501///
502/// ```
503/// use relay_statsd::{metric, GaugeMetric};
504///
505/// struct QueueSize;
506///
507/// impl GaugeMetric for QueueSize {
508///     fn name(&self) -> &'static str {
509///         "queue_size"
510///     }
511/// }
512///
513/// # use std::collections::VecDeque;
514/// let queue = VecDeque::new();
515/// # let _hint: &VecDeque<()> = &queue;
516///
517/// // a simple gauge value
518/// metric!(gauge(QueueSize) = queue.len() as u64);
519///
520/// // a gauge with tags
521/// metric!(
522///     gauge(QueueSize) = queue.len() as u64,
523///     server = "server1",
524///     host = "host1"
525/// );
526/// ```
527pub trait GaugeMetric {
528    /// Returns the gauge metric name that will be sent to statsd.
529    fn name(&self) -> &'static str;
530}
531
532/// Emits a metric.
533///
534/// See [crate-level documentation](self) for examples.
535#[macro_export]
536macro_rules! metric {
537    // counter increment
538    (counter($id:expr) += $value:expr $(, $k:ident = $v:expr)* $(,)?) => {
539        match $value {
540            value if value != 0 => {
541                $crate::with_client(|client| {
542                    use $crate::_pred::*;
543                    client.send_metric(
544                        client.count_with_tags(&$crate::CounterMetric::name(&$id), value)
545                        $(.with_tag(stringify!($k), $v))*
546                    )
547                })
548            },
549            _ => {},
550        };
551    };
552
553    // counter decrement
554    (counter($id:expr) -= $value:expr $(, $k:ident = $v:expr)* $(,)?) => {
555        match $value {
556            value if value != 0 => {
557                $crate::with_client(|client| {
558                    use $crate::_pred::*;
559                    client.send_metric(
560                        client.count_with_tags(&$crate::CounterMetric::name(&$id), -value)
561                            $(.with_tag(stringify!($k), $v))*
562                    )
563                })
564            },
565            _ => {},
566        };
567    };
568
569    // gauge set
570    (gauge($id:expr) = $value:expr $(, $k:ident = $v:expr)* $(,)?) => {
571        $crate::with_client(|client| {
572            use $crate::_pred::*;
573            client.send_metric(
574                client.gauge_with_tags(&$crate::GaugeMetric::name(&$id), $value)
575                    $(.with_tag(stringify!($k), $v))*
576            )
577        })
578    };
579
580    // histogram
581    (histogram($id:expr) = $value:expr $(, $k:ident = $v:expr)* $(,)?) => {
582        $crate::with_client(|client| {
583            use $crate::_pred::*;
584            client.send_metric(
585                client.histogram_with_tags(&$crate::HistogramMetric::name(&$id), $value)
586                    $(.with_tag(stringify!($k), $v))*
587            )
588        })
589    };
590
591    // sets (count unique occurrences of a value per time interval)
592    (set($id:expr) = $value:expr $(, $k:ident = $v:expr)* $(,)?) => {
593        $crate::with_client(|client| {
594            use $crate::_pred::*;
595            client.send_metric(
596                client.set_with_tags(&$crate::SetMetric::name(&$id), $value)
597                    $(.with_tag(stringify!($k), $v))*
598            )
599        })
600    };
601
602    // timer value (duration)
603    (timer($id:expr) = $value:expr $(, $k:ident = $v:expr)* $(,)?) => {
604        $crate::with_client(|client| {
605            use $crate::_pred::*;
606            client.send_metric(
607                // NOTE: cadence histograms support Duration out of the box and converts it to nanos,
608                // but we want milliseconds for historical reasons.
609                client.histogram_with_tags(&$crate::TimerMetric::name(&$id), $value.as_nanos() as f64 / 1e6)
610                    $(.with_tag(stringify!($k), $v))*
611            )
612        })
613    };
614
615    // timed block
616    (timer($id:expr), $($k:ident = $v:expr,)* $block:block) => {{
617        let now = std::time::Instant::now();
618        let rv = {$block};
619        $crate::metric!(timer($id) = now.elapsed() $(, $k = $v)*);
620        rv
621    }};
622}
623
624#[cfg(test)]
625mod tests {
626    use std::time::Duration;
627
628    use cadence::{NopMetricSink, StatsdClient};
629
630    use crate::{
631        set_client, with_capturing_test_client, with_client, GaugeMetric, MetricsClient,
632        TimerMetric,
633    };
634
635    enum TestGauges {
636        Foo,
637        Bar,
638    }
639
640    impl GaugeMetric for TestGauges {
641        fn name(&self) -> &'static str {
642            match self {
643                Self::Foo => "foo",
644                Self::Bar => "bar",
645            }
646        }
647    }
648
649    #[test]
650    fn test_capturing_client() {
651        let captures = with_capturing_test_client(|| {
652            metric!(
653                gauge(TestGauges::Foo) = 123,
654                server = "server1",
655                host = "host1"
656            );
657            metric!(
658                gauge(TestGauges::Bar) = 456,
659                server = "server2",
660                host = "host2"
661            );
662        });
663
664        assert_eq!(
665            captures,
666            [
667                "foo:123|g|#server:server1,host:host1",
668                "bar:456|g|#server:server2,host:host2"
669            ]
670        )
671    }
672
673    #[test]
674    fn current_client_is_global_client() {
675        let client1 = with_client(|c| format!("{c:?}"));
676        set_client(MetricsClient {
677            statsd_client: StatsdClient::from_sink("", NopMetricSink),
678            default_tags: Default::default(),
679            sample_rate: 1.0,
680            rx: None,
681        });
682        let client2 = with_client(|c| format!("{c:?}"));
683
684        // After setting the global client,the current client must change:
685        assert_ne!(client1, client2);
686    }
687
688    struct TestTimer;
689
690    impl TimerMetric for TestTimer {
691        fn name(&self) -> &'static str {
692            "timer"
693        }
694    }
695
696    #[test]
697    fn nanos_rounding_error() {
698        let one_day = Duration::from_secs(60 * 60 * 24);
699        let captures = with_capturing_test_client(|| {
700            metric!(timer(TestTimer) = one_day + Duration::from_nanos(1),);
701        });
702
703        // for "short" durations, precision is preserved:
704        assert_eq!(captures, ["timer:86400000.000001|h"]); // h is for histogram, not hours
705
706        let one_year = Duration::from_secs(60 * 60 * 24 * 365);
707        let captures = with_capturing_test_client(|| {
708            metric!(timer(TestTimer) = one_year + Duration::from_nanos(1),);
709        });
710
711        // for very long durations, precision is lost:
712        assert_eq!(captures, ["timer:31536000000|h"]);
713    }
714}