relay_statsd/lib.rs
1//! A high-level StatsD metric client built on cadence.
2//!
3//! ## Defining Metrics
4//!
5//! In order to use metrics, one needs to first define one of the metric traits on a custom enum.
6//! The following types of metrics are available: `counter`, `timer`, `gauge`, `histogram`, and
7//! `set`. For explanations on what that means see [Metric Types].
8//!
9//! The metric traits serve only to provide a type safe metric name. All metric types have exactly
10//! the same form, they are different only to ensure that a metric can only be used for the type for
11//! which it was defined, (e.g. a counter metric cannot be used as a timer metric). See the traits
12//! for more detailed examples.
13//!
14//! ## Initializing the Client
15//!
16//! Metrics can be used without initializing a statsd client. In that case, invoking `with_client`
17//! or the [`metric!`] macro will become a noop. Only when configured, metrics will actually be
18//! collected.
19//!
20//! To initialize the client, either use [`set_client`] to pass a custom client, or use
21//! [`init`] to create a default client with known arguments:
22//!
23//! ```no_run
24//! # use std::collections::BTreeMap;
25//!
26//! relay_statsd::init("myprefix", "localhost:8125", BTreeMap::new(), 1.0, true);
27//! ```
28//!
29//! ## Macro Usage
30//!
31//! The recommended way to record metrics is by using the [`metric!`] macro. See the trait docs
32//! for more information on how to record each type of metric.
33//!
34//! ```
35//! use relay_statsd::{metric, CounterMetric};
36//!
37//! struct MyCounter;
38//!
39//! impl CounterMetric for MyCounter {
40//! fn name(&self) -> &'static str {
41//! "counter"
42//! }
43//! }
44//!
45//! metric!(counter(MyCounter) += 1);
46//! ```
47//!
48//! ## Manual Usage
49//!
50//! ```
51//! use relay_statsd::prelude::*;
52//!
53//! relay_statsd::with_client(|client| {
54//! client.count("mymetric", 1).ok();
55//! });
56//! ```
57//!
58//! [Metric Types]: https://github.com/statsd/statsd/blob/master/docs/metric_types.md
59use cadence::{Metric, MetricBuilder, StatsdClient};
60use parking_lot::RwLock;
61use rand::distributions::{Distribution, Uniform};
62use statsdproxy::cadence::StatsdProxyMetricSink;
63use statsdproxy::config::AggregateMetricsConfig;
64use std::collections::BTreeMap;
65use std::net::ToSocketAddrs;
66use std::ops::{Deref, DerefMut};
67use std::sync::Arc;
68use std::time::Duration;
69
70/// Maximum number of metric events that can be queued before we start dropping them
71const METRICS_MAX_QUEUE_SIZE: usize = 100_000;
72
73/// Client configuration object to store globally.
74#[derive(Debug)]
75pub struct MetricsClient {
76 /// The raw statsd client.
77 pub statsd_client: StatsdClient,
78 /// Default tags to apply to every metric.
79 pub default_tags: BTreeMap<String, String>,
80 /// Global sample rate.
81 pub sample_rate: f32,
82 /// Receiver for external listeners.
83 ///
84 /// Only available when the client was initialized with `init_basic`.
85 pub rx: Option<crossbeam_channel::Receiver<Vec<u8>>>,
86}
87
88impl Deref for MetricsClient {
89 type Target = StatsdClient;
90
91 fn deref(&self) -> &StatsdClient {
92 &self.statsd_client
93 }
94}
95
96impl DerefMut for MetricsClient {
97 fn deref_mut(&mut self) -> &mut StatsdClient {
98 &mut self.statsd_client
99 }
100}
101
102impl MetricsClient {
103 /// Send a metric with the default tags defined on this `MetricsClient`.
104 #[inline(always)]
105 pub fn send_metric<'a, T>(&'a self, mut metric: MetricBuilder<'a, '_, T>)
106 where
107 T: Metric + From<String>,
108 {
109 if !self._should_send() {
110 return;
111 }
112
113 for (k, v) in &self.default_tags {
114 metric = metric.with_tag(k, v);
115 }
116
117 if let Err(error) = metric.try_send() {
118 relay_log::error!(
119 error = &error as &dyn std::error::Error,
120 maximum_capacity = METRICS_MAX_QUEUE_SIZE,
121 "Error sending a metric",
122 );
123 }
124 }
125
126 fn _should_send(&self) -> bool {
127 if self.sample_rate <= 0.0 {
128 false
129 } else if self.sample_rate >= 1.0 {
130 true
131 } else {
132 // Using thread local RNG and uniform distribution here because Rng::gen_range is
133 // "optimized for the case that only a single sample is made from the given range".
134 // See https://docs.rs/rand/0.7.3/rand/distributions/uniform/struct.Uniform.html for more
135 // details.
136 let mut rng = rand::thread_rng();
137 RNG_UNIFORM_DISTRIBUTION
138 .with(|uniform_dist| uniform_dist.sample(&mut rng) <= self.sample_rate)
139 }
140 }
141}
142
143static METRICS_CLIENT: RwLock<Option<Arc<MetricsClient>>> = RwLock::new(None);
144
145thread_local! {
146 static CURRENT_CLIENT: std::cell::RefCell<Option<Arc<MetricsClient>>> = METRICS_CLIENT.read().clone().into();
147 static RNG_UNIFORM_DISTRIBUTION: Uniform<f32> = Uniform::new(0.0, 1.0);
148}
149
150/// Internal prelude for the macro
151#[doc(hidden)]
152pub mod _pred {
153 pub use cadence::prelude::*;
154}
155
156/// The metrics prelude that is necessary to use the client.
157pub mod prelude {
158 pub use cadence::prelude::*;
159}
160
161/// Set a new statsd client.
162pub fn set_client(client: MetricsClient) {
163 *METRICS_CLIENT.write() = Some(Arc::new(client));
164 CURRENT_CLIENT.with(|cell| cell.replace(METRICS_CLIENT.read().clone()));
165}
166
167/// Set a test client for the period of the called function (only affects the current thread).
168// TODO: replace usages with `init_basic`
169pub fn with_capturing_test_client(f: impl FnOnce()) -> Vec<String> {
170 let (rx, sink) = cadence::SpyMetricSink::new();
171 let test_client = MetricsClient {
172 statsd_client: StatsdClient::from_sink("", sink),
173 default_tags: Default::default(),
174 sample_rate: 1.0,
175 rx: None,
176 };
177
178 CURRENT_CLIENT.with(|cell| {
179 let old_client = cell.replace(Some(Arc::new(test_client)));
180 f();
181 cell.replace(old_client);
182 });
183
184 rx.iter().map(|x| String::from_utf8(x).unwrap()).collect()
185}
186
187// Setup a simple metrics listener.
188//
189// Returns `None` if the global metrics client has already been configured.
190pub fn init_basic() -> Option<crossbeam_channel::Receiver<Vec<u8>>> {
191 CURRENT_CLIENT.with(|cell| {
192 if cell.borrow().is_none() {
193 // Setup basic observable metrics sink.
194 let (receiver, sink) = cadence::SpyMetricSink::new();
195 let test_client = MetricsClient {
196 statsd_client: StatsdClient::from_sink("", sink),
197 default_tags: Default::default(),
198 sample_rate: 1.0,
199 rx: Some(receiver.clone()),
200 };
201 cell.replace(Some(Arc::new(test_client)));
202 }
203 });
204
205 CURRENT_CLIENT.with(|cell| {
206 cell.borrow()
207 .as_deref()
208 .and_then(|client| match &client.rx {
209 Some(rx) => Some(rx.clone()),
210 None => {
211 relay_log::error!("Metrics client was already set up.");
212 None
213 }
214 })
215 })
216}
217
218/// Disable the client again.
219pub fn disable() {
220 *METRICS_CLIENT.write() = None;
221}
222
223/// Tell the metrics system to report to statsd.
224pub fn init<A: ToSocketAddrs>(
225 prefix: &str,
226 host: A,
227 default_tags: BTreeMap<String, String>,
228 sample_rate: f32,
229 aggregate: bool,
230) {
231 let addrs: Vec<_> = host.to_socket_addrs().unwrap().collect();
232 if !addrs.is_empty() {
233 relay_log::info!("reporting metrics to statsd at {}", addrs[0]);
234 }
235
236 // Normalize sample_rate
237 let sample_rate = sample_rate.clamp(0., 1.);
238 relay_log::debug!(
239 "metrics sample rate is set to {sample_rate}{}",
240 if sample_rate == 0.0 {
241 ", no metrics will be reported"
242 } else {
243 ""
244 }
245 );
246
247 let statsd_client = if aggregate {
248 let statsdproxy_sink = StatsdProxyMetricSink::new(move || {
249 let upstream = statsdproxy::middleware::upstream::Upstream::new(addrs[0])
250 .expect("failed to create statsdproxy metric sink");
251
252 statsdproxy::middleware::aggregate::AggregateMetrics::new(
253 AggregateMetricsConfig {
254 aggregate_gauges: true,
255 aggregate_counters: true,
256 flush_interval: Duration::from_millis(50),
257 flush_offset: 0,
258 max_map_size: None,
259 },
260 upstream,
261 )
262 });
263
264 StatsdClient::from_sink(prefix, statsdproxy_sink)
265 } else {
266 let statsdproxy_sink = StatsdProxyMetricSink::new(move || {
267 statsdproxy::middleware::upstream::Upstream::new(addrs[0])
268 .expect("failed to create statsdproxy metric sind")
269 });
270 StatsdClient::from_sink(prefix, statsdproxy_sink)
271 };
272
273 set_client(MetricsClient {
274 statsd_client,
275 default_tags,
276 sample_rate,
277 rx: None,
278 });
279}
280
281/// Invoke a callback with the current statsd client.
282///
283/// If statsd is not configured the callback is not invoked. For the most part
284/// the [`metric!`] macro should be used instead.
285#[inline(always)]
286pub fn with_client<F, R>(f: F) -> R
287where
288 F: FnOnce(&MetricsClient) -> R,
289 R: Default,
290{
291 CURRENT_CLIENT.with(|client| {
292 if let Some(client) = client.borrow().as_deref() {
293 f(client)
294 } else {
295 R::default()
296 }
297 })
298}
299
300/// A metric for capturing timings.
301///
302/// Timings are a positive number of milliseconds between a start and end time. Examples include
303/// time taken to render a web page or time taken for a database call to return.
304///
305/// ## Example
306///
307/// ```
308/// use relay_statsd::{metric, TimerMetric};
309///
310/// enum MyTimer {
311/// ProcessA,
312/// ProcessB,
313/// }
314///
315/// impl TimerMetric for MyTimer {
316/// fn name(&self) -> &'static str {
317/// match self {
318/// Self::ProcessA => "process_a",
319/// Self::ProcessB => "process_b",
320/// }
321/// }
322/// }
323///
324/// # fn process_a() {}
325///
326/// // measure time by explicitly setting a std::timer::Duration
327/// # use std::time::Instant;
328/// let start_time = Instant::now();
329/// process_a();
330/// metric!(timer(MyTimer::ProcessA) = start_time.elapsed());
331///
332/// // provide tags to a timer
333/// metric!(
334/// timer(MyTimer::ProcessA) = start_time.elapsed(),
335/// server = "server1",
336/// host = "host1",
337/// );
338///
339/// // measure time implicitly by enclosing a code block in a metric
340/// metric!(timer(MyTimer::ProcessA), {
341/// process_a();
342/// });
343///
344/// // measure block and also provide tags
345/// metric!(
346/// timer(MyTimer::ProcessB),
347/// server = "server1",
348/// host = "host1",
349/// {
350/// process_a();
351/// }
352/// );
353///
354/// ```
355pub trait TimerMetric {
356 /// Returns the timer metric name that will be sent to statsd.
357 fn name(&self) -> &'static str;
358}
359
360/// A metric for capturing counters.
361///
362/// Counters are simple values incremented or decremented by a client. The rates at which these
363/// events occur or average values will be determined by the server receiving them. Examples of
364/// counter uses include number of logins to a system or requests received.
365///
366/// ## Example
367///
368/// ```
369/// use relay_statsd::{metric, CounterMetric};
370///
371/// enum MyCounter {
372/// TotalRequests,
373/// TotalBytes,
374/// }
375///
376/// impl CounterMetric for MyCounter {
377/// fn name(&self) -> &'static str {
378/// match self {
379/// Self::TotalRequests => "total_requests",
380/// Self::TotalBytes => "total_bytes",
381/// }
382/// }
383/// }
384///
385/// # let buffer = &[(), ()];
386///
387/// // add to the counter
388/// metric!(counter(MyCounter::TotalRequests) += 1);
389/// metric!(counter(MyCounter::TotalBytes) += buffer.len() as i64);
390///
391/// // add to the counter and provide tags
392/// metric!(
393/// counter(MyCounter::TotalRequests) += 1,
394/// server = "s1",
395/// host = "h1"
396/// );
397///
398/// // subtract from the counter
399/// metric!(counter(MyCounter::TotalRequests) -= 1);
400///
401/// // subtract from the counter and provide tags
402/// metric!(
403/// counter(MyCounter::TotalRequests) -= 1,
404/// server = "s1",
405/// host = "h1"
406/// );
407/// ```
408pub trait CounterMetric {
409 /// Returns the counter metric name that will be sent to statsd.
410 fn name(&self) -> &'static str;
411}
412
413/// A metric for capturing histograms.
414///
415/// Histograms are values whose distribution is calculated by the server. The distribution
416/// calculated for histograms is often similar to that of timers. Histograms can be thought of as a
417/// more general (not limited to timing things) form of timers.
418///
419/// ## Example
420///
421/// ```
422/// use relay_statsd::{metric, HistogramMetric};
423///
424/// struct QueueSize;
425///
426/// impl HistogramMetric for QueueSize {
427/// fn name(&self) -> &'static str {
428/// "queue_size"
429/// }
430/// }
431///
432/// # use std::collections::VecDeque;
433/// let queue = VecDeque::new();
434/// # let _hint: &VecDeque<()> = &queue;
435///
436/// // record a histogram value
437/// metric!(histogram(QueueSize) = queue.len() as u64);
438///
439/// // record with tags
440/// metric!(
441/// histogram(QueueSize) = queue.len() as u64,
442/// server = "server1",
443/// host = "host1",
444/// );
445/// ```
446pub trait HistogramMetric {
447 /// Returns the histogram metric name that will be sent to statsd.
448 fn name(&self) -> &'static str;
449}
450
451/// A metric for capturing sets.
452///
453/// Sets count the number of unique elements in a group. You can use them to, for example, count the
454/// unique visitors to your site.
455///
456/// ## Example
457///
458/// ```
459/// use relay_statsd::{metric, SetMetric};
460///
461/// enum MySet {
462/// UniqueProjects,
463/// UniqueUsers,
464/// }
465///
466/// impl SetMetric for MySet {
467/// fn name(&self) -> &'static str {
468/// match self {
469/// MySet::UniqueProjects => "unique_projects",
470/// MySet::UniqueUsers => "unique_users",
471/// }
472/// }
473/// }
474///
475/// # use std::collections::HashSet;
476/// let users = HashSet::new();
477/// # let _hint: &HashSet<()> = &users;
478///
479/// // use a set metric
480/// metric!(set(MySet::UniqueUsers) = users.len() as i64);
481///
482/// // use a set metric with tags
483/// metric!(
484/// set(MySet::UniqueUsers) = users.len() as i64,
485/// server = "server1",
486/// host = "host1",
487/// );
488/// ```
489pub trait SetMetric {
490 /// Returns the set metric name that will be sent to statsd.
491 fn name(&self) -> &'static str;
492}
493
494/// A metric for capturing gauges.
495///
496/// Gauge values are an instantaneous measurement of a value determined by the client. They do not
497/// change unless changed by the client. Examples include things like load average or how many
498/// connections are active.
499///
500/// ## Example
501///
502/// ```
503/// use relay_statsd::{metric, GaugeMetric};
504///
505/// struct QueueSize;
506///
507/// impl GaugeMetric for QueueSize {
508/// fn name(&self) -> &'static str {
509/// "queue_size"
510/// }
511/// }
512///
513/// # use std::collections::VecDeque;
514/// let queue = VecDeque::new();
515/// # let _hint: &VecDeque<()> = &queue;
516///
517/// // a simple gauge value
518/// metric!(gauge(QueueSize) = queue.len() as u64);
519///
520/// // a gauge with tags
521/// metric!(
522/// gauge(QueueSize) = queue.len() as u64,
523/// server = "server1",
524/// host = "host1"
525/// );
526/// ```
527pub trait GaugeMetric {
528 /// Returns the gauge metric name that will be sent to statsd.
529 fn name(&self) -> &'static str;
530}
531
532/// Emits a metric.
533///
534/// See [crate-level documentation](self) for examples.
535#[macro_export]
536macro_rules! metric {
537 // counter increment
538 (counter($id:expr) += $value:expr $(, $k:ident = $v:expr)* $(,)?) => {
539 match $value {
540 value if value != 0 => {
541 $crate::with_client(|client| {
542 use $crate::_pred::*;
543 client.send_metric(
544 client.count_with_tags(&$crate::CounterMetric::name(&$id), value)
545 $(.with_tag(stringify!($k), $v))*
546 )
547 })
548 },
549 _ => {},
550 };
551 };
552
553 // counter decrement
554 (counter($id:expr) -= $value:expr $(, $k:ident = $v:expr)* $(,)?) => {
555 match $value {
556 value if value != 0 => {
557 $crate::with_client(|client| {
558 use $crate::_pred::*;
559 client.send_metric(
560 client.count_with_tags(&$crate::CounterMetric::name(&$id), -value)
561 $(.with_tag(stringify!($k), $v))*
562 )
563 })
564 },
565 _ => {},
566 };
567 };
568
569 // gauge set
570 (gauge($id:expr) = $value:expr $(, $k:ident = $v:expr)* $(,)?) => {
571 $crate::with_client(|client| {
572 use $crate::_pred::*;
573 client.send_metric(
574 client.gauge_with_tags(&$crate::GaugeMetric::name(&$id), $value)
575 $(.with_tag(stringify!($k), $v))*
576 )
577 })
578 };
579
580 // histogram
581 (histogram($id:expr) = $value:expr $(, $k:ident = $v:expr)* $(,)?) => {
582 $crate::with_client(|client| {
583 use $crate::_pred::*;
584 client.send_metric(
585 client.histogram_with_tags(&$crate::HistogramMetric::name(&$id), $value)
586 $(.with_tag(stringify!($k), $v))*
587 )
588 })
589 };
590
591 // sets (count unique occurrences of a value per time interval)
592 (set($id:expr) = $value:expr $(, $k:ident = $v:expr)* $(,)?) => {
593 $crate::with_client(|client| {
594 use $crate::_pred::*;
595 client.send_metric(
596 client.set_with_tags(&$crate::SetMetric::name(&$id), $value)
597 $(.with_tag(stringify!($k), $v))*
598 )
599 })
600 };
601
602 // timer value (duration)
603 (timer($id:expr) = $value:expr $(, $k:ident = $v:expr)* $(,)?) => {
604 $crate::with_client(|client| {
605 use $crate::_pred::*;
606 client.send_metric(
607 // NOTE: cadence histograms support Duration out of the box and converts it to nanos,
608 // but we want milliseconds for historical reasons.
609 client.histogram_with_tags(&$crate::TimerMetric::name(&$id), $value.as_nanos() as f64 / 1e6)
610 $(.with_tag(stringify!($k), $v))*
611 )
612 })
613 };
614
615 // timed block
616 (timer($id:expr), $($k:ident = $v:expr,)* $block:block) => {{
617 let now = std::time::Instant::now();
618 let rv = {$block};
619 $crate::metric!(timer($id) = now.elapsed() $(, $k = $v)*);
620 rv
621 }};
622}
623
624#[cfg(test)]
625mod tests {
626 use std::time::Duration;
627
628 use cadence::{NopMetricSink, StatsdClient};
629
630 use crate::{
631 set_client, with_capturing_test_client, with_client, GaugeMetric, MetricsClient,
632 TimerMetric,
633 };
634
635 enum TestGauges {
636 Foo,
637 Bar,
638 }
639
640 impl GaugeMetric for TestGauges {
641 fn name(&self) -> &'static str {
642 match self {
643 Self::Foo => "foo",
644 Self::Bar => "bar",
645 }
646 }
647 }
648
649 #[test]
650 fn test_capturing_client() {
651 let captures = with_capturing_test_client(|| {
652 metric!(
653 gauge(TestGauges::Foo) = 123,
654 server = "server1",
655 host = "host1"
656 );
657 metric!(
658 gauge(TestGauges::Bar) = 456,
659 server = "server2",
660 host = "host2"
661 );
662 });
663
664 assert_eq!(
665 captures,
666 [
667 "foo:123|g|#server:server1,host:host1",
668 "bar:456|g|#server:server2,host:host2"
669 ]
670 )
671 }
672
673 #[test]
674 fn current_client_is_global_client() {
675 let client1 = with_client(|c| format!("{c:?}"));
676 set_client(MetricsClient {
677 statsd_client: StatsdClient::from_sink("", NopMetricSink),
678 default_tags: Default::default(),
679 sample_rate: 1.0,
680 rx: None,
681 });
682 let client2 = with_client(|c| format!("{c:?}"));
683
684 // After setting the global client,the current client must change:
685 assert_ne!(client1, client2);
686 }
687
688 struct TestTimer;
689
690 impl TimerMetric for TestTimer {
691 fn name(&self) -> &'static str {
692 "timer"
693 }
694 }
695
696 #[test]
697 fn nanos_rounding_error() {
698 let one_day = Duration::from_secs(60 * 60 * 24);
699 let captures = with_capturing_test_client(|| {
700 metric!(timer(TestTimer) = one_day + Duration::from_nanos(1),);
701 });
702
703 // for "short" durations, precision is preserved:
704 assert_eq!(captures, ["timer:86400000.000001|h"]); // h is for histogram, not hours
705
706 let one_year = Duration::from_secs(60 * 60 * 24 * 365);
707 let captures = with_capturing_test_client(|| {
708 metric!(timer(TestTimer) = one_year + Duration::from_nanos(1),);
709 });
710
711 // for very long durations, precision is lost:
712 assert_eq!(captures, ["timer:31536000000|h"]);
713 }
714}