relay_kafka/
statsd.rs

1use relay_statsd::{CounterMetric, GaugeMetric, HistogramMetric};
2
3pub enum KafkaCounters {
4    /// Number of producer errors occurred after an envelope was already enqueued for sending to
5    /// Kafka.
6    ///
7    /// These errors include, for example, _"MessageTooLarge"_ errors when the broker does not
8    /// accept the requests over a certain size, which is usually due to invalid or inconsistent
9    /// broker/producer configurations.
10    ///
11    /// This metric is tagged with:
12    ///  - `topic`: The Kafka topic being produced to.
13    ProcessingProduceError,
14
15    /// Number of messages that failed to be enqueued in the Kafka producer's memory buffer.
16    ///
17    /// These errors include, for example, _"UnknownTopic"_ errors when attempting to send a
18    /// message a topic that does not exist.
19    ///
20    /// This metric is tagged with:
21    /// - `topic`: The Kafka topic being produced to.
22    /// - `variant`: The Kafka message variant.
23    ProducerEnqueueError,
24
25    /// Number of messages that were written to the wrong partition because of configured rate limits.
26    ///
27    /// Each topic in Relay can optionally be configured with a per-partition-key rate limit. This
28    /// rate limit does not drop messages, but instead disables semantic partitioning. Everytime
29    /// this happens for a message, this counter is incremented.
30    ProducerPartitionKeyRateLimit,
31}
32
33impl CounterMetric for KafkaCounters {
34    fn name(&self) -> &'static str {
35        match self {
36            Self::ProcessingProduceError => "processing.produce.error",
37            Self::ProducerEnqueueError => "producer.enqueue.error",
38            Self::ProducerPartitionKeyRateLimit => "producer.partition_key.rate_limit",
39        }
40    }
41}
42
43pub enum KafkaHistograms {
44    /// Size of emitted kafka message in bytes, tagged by message type.
45    KafkaMessageSize,
46}
47
48impl HistogramMetric for KafkaHistograms {
49    fn name(&self) -> &'static str {
50        match self {
51            Self::KafkaMessageSize => "kafka.message_size",
52        }
53    }
54}
55/// Gauge metrics for the Kafka producer.
56///
57/// Most of these metrics are taken from the [`rdkafka::statistics`] module.
58pub enum KafkaGauges {
59    /// The number of messages waiting to be sent to, or acknowledged by, the broker.
60    ///
61    /// See <https://docs.confluent.io/platform/7.5/clients/librdkafka/html/rdkafka_8h.html#ad4b3b7659cf9a79d3353810d6b625bb7>.
62    ///
63    /// This metric is tagged with:
64    /// - `topic`: The Kafka topic being produced to.
65    /// - `variant`: The Kafka message variant.
66    InFlightCount,
67
68    /// The current number of messages in producer queues.
69    MessageCount,
70
71    /// The maximum number of messages allowed in the producer queues.
72    MessageCountMax,
73
74    /// The current total size of messages in producer queues.
75    MessageSize,
76
77    /// The maximum total size of messages allowed in the producer queues.
78    MessageSizeMax,
79
80    /// The number of requests awaiting transmission to the broker.
81    ///
82    /// This metric is tagged with:
83    /// - `broker_name`: The broker hostname, port, and ID, in the form HOSTNAME:PORT/ID.
84    OutboundBufferRequests,
85
86    /// The number of messages awaiting transmission to the broker.
87    ///
88    /// This metric is tagged with:
89    /// - `broker_name`: The broker hostname, port, and ID, in the form HOSTNAME:PORT/ID.
90    OutboundBufferMessages,
91
92    /// The number of connection attempts, including successful and failed attempts, and name resolution failures.
93    ///
94    /// This metric is tagged with:
95    /// - `broker_name`: The broker hostname, port, and ID, in the form HOSTNAME:PORT/ID.
96    Connects,
97
98    /// The number of disconnections, whether triggered by the broker, the network, the load balancer, or something else.
99    ///
100    /// This metric is tagged with:
101    /// - `broker_name`: The broker hostname, port, and ID, in the form HOSTNAME:PORT/ID.
102    Disconnects,
103
104    /// Maximum internal producer queue latency, in microseconds.
105    ///
106    /// This metric is tagged with:
107    /// - `broker_name`: The broker hostname, port, and ID, in the form HOSTNAME:PORT/ID.
108    ProducerQueueLatency,
109
110    /// Maximum internal request queue latency, in microseconds.
111    ///
112    /// This metric is tagged with:
113    /// - `broker_name`: The broker hostname, port, and ID, in the form HOSTNAME:PORT/ID.
114    RequestQueueLatency,
115}
116
117impl GaugeMetric for KafkaGauges {
118    fn name(&self) -> &'static str {
119        match self {
120            KafkaGauges::InFlightCount => "kafka.in_flight_count",
121            KafkaGauges::MessageCount => "kafka.stats.message_count",
122            KafkaGauges::MessageCountMax => "kafka.stats.message_count_max",
123            KafkaGauges::MessageSize => "kafka.stats.message_size",
124            KafkaGauges::MessageSizeMax => "kafka.stats.message_size_max",
125            KafkaGauges::OutboundBufferRequests => "kafka.stats.broker.outbuf.requests",
126            KafkaGauges::OutboundBufferMessages => "kafka.stats.broker.outbuf.messages",
127            KafkaGauges::Connects => "kafka.stats.broker.connects",
128            KafkaGauges::Disconnects => "kafka.stats.broker.disconnects",
129            KafkaGauges::ProducerQueueLatency => "kafka.stats.broker.int_latency",
130            KafkaGauges::RequestQueueLatency => "kafka.stats.broker.outbuf_latency",
131        }
132    }
133}