relay_kafka/statsd.rs
1use relay_statsd::{CounterMetric, GaugeMetric, HistogramMetric};
2
3pub enum KafkaCounters {
4 /// Number of producer errors occurred after an envelope was already enqueued for sending to
5 /// Kafka.
6 ///
7 /// These errors include, for example, _"MessageTooLarge"_ errors when the broker does not
8 /// accept the requests over a certain size, which is usually due to invalid or inconsistent
9 /// broker/producer configurations.
10 ///
11 /// This metric is tagged with:
12 /// - `topic`: The Kafka topic being produced to.
13 ProcessingProduceError,
14
15 /// Number of messages that failed to be enqueued in the Kafka producer's memory buffer.
16 ///
17 /// These errors include, for example, _"UnknownTopic"_ errors when attempting to send a
18 /// message a topic that does not exist.
19 ///
20 /// This metric is tagged with:
21 /// - `topic`: The Kafka topic being produced to.
22 /// - `variant`: The Kafka message variant.
23 ProducerEnqueueError,
24
25 /// Number of messages that were written to the wrong partition because of configured rate limits.
26 ///
27 /// Each topic in Relay can optionally be configured with a per-partition-key rate limit. This
28 /// rate limit does not drop messages, but instead disables semantic partitioning. Everytime
29 /// this happens for a message, this counter is incremented.
30 ProducerPartitionKeyRateLimit,
31}
32
33impl CounterMetric for KafkaCounters {
34 fn name(&self) -> &'static str {
35 match self {
36 Self::ProcessingProduceError => "processing.produce.error",
37 Self::ProducerEnqueueError => "producer.enqueue.error",
38 Self::ProducerPartitionKeyRateLimit => "producer.partition_key.rate_limit",
39 }
40 }
41}
42
43pub enum KafkaHistograms {
44 /// Size of emitted kafka message in bytes, tagged by message type.
45 KafkaMessageSize,
46}
47
48impl HistogramMetric for KafkaHistograms {
49 fn name(&self) -> &'static str {
50 match self {
51 Self::KafkaMessageSize => "kafka.message_size",
52 }
53 }
54}
55/// Gauge metrics for the Kafka producer.
56///
57/// Most of these metrics are taken from the [`rdkafka::statistics`] module.
58pub enum KafkaGauges {
59 /// The number of messages waiting to be sent to, or acknowledged by, the broker.
60 ///
61 /// See <https://docs.confluent.io/platform/7.5/clients/librdkafka/html/rdkafka_8h.html#ad4b3b7659cf9a79d3353810d6b625bb7>.
62 ///
63 /// This metric is tagged with:
64 /// - `topic`: The Kafka topic being produced to.
65 /// - `variant`: The Kafka message variant.
66 InFlightCount,
67
68 /// The current number of messages in producer queues.
69 MessageCount,
70
71 /// The maximum number of messages allowed in the producer queues.
72 MessageCountMax,
73
74 /// The current total size of messages in producer queues.
75 MessageSize,
76
77 /// The maximum total size of messages allowed in the producer queues.
78 MessageSizeMax,
79
80 /// The number of requests awaiting transmission to the broker.
81 ///
82 /// This metric is tagged with:
83 /// - `broker_name`: The broker hostname, port, and ID, in the form HOSTNAME:PORT/ID.
84 OutboundBufferRequests,
85
86 /// The number of messages awaiting transmission to the broker.
87 ///
88 /// This metric is tagged with:
89 /// - `broker_name`: The broker hostname, port, and ID, in the form HOSTNAME:PORT/ID.
90 OutboundBufferMessages,
91
92 /// The number of connection attempts, including successful and failed attempts, and name resolution failures.
93 ///
94 /// This metric is tagged with:
95 /// - `broker_name`: The broker hostname, port, and ID, in the form HOSTNAME:PORT/ID.
96 Connects,
97
98 /// The number of disconnections, whether triggered by the broker, the network, the load balancer, or something else.
99 ///
100 /// This metric is tagged with:
101 /// - `broker_name`: The broker hostname, port, and ID, in the form HOSTNAME:PORT/ID.
102 Disconnects,
103
104 /// Maximum internal producer queue latency, in microseconds.
105 ///
106 /// This metric is tagged with:
107 /// - `broker_name`: The broker hostname, port, and ID, in the form HOSTNAME:PORT/ID.
108 ProducerQueueLatency,
109
110 /// Maximum internal request queue latency, in microseconds.
111 ///
112 /// This metric is tagged with:
113 /// - `broker_name`: The broker hostname, port, and ID, in the form HOSTNAME:PORT/ID.
114 RequestQueueLatency,
115}
116
117impl GaugeMetric for KafkaGauges {
118 fn name(&self) -> &'static str {
119 match self {
120 KafkaGauges::InFlightCount => "kafka.in_flight_count",
121 KafkaGauges::MessageCount => "kafka.stats.message_count",
122 KafkaGauges::MessageCountMax => "kafka.stats.message_count_max",
123 KafkaGauges::MessageSize => "kafka.stats.message_size",
124 KafkaGauges::MessageSizeMax => "kafka.stats.message_size_max",
125 KafkaGauges::OutboundBufferRequests => "kafka.stats.broker.outbuf.requests",
126 KafkaGauges::OutboundBufferMessages => "kafka.stats.broker.outbuf.messages",
127 KafkaGauges::Connects => "kafka.stats.broker.connects",
128 KafkaGauges::Disconnects => "kafka.stats.broker.disconnects",
129 KafkaGauges::ProducerQueueLatency => "kafka.stats.broker.int_latency",
130 KafkaGauges::RequestQueueLatency => "kafka.stats.broker.outbuf_latency",
131 }
132 }
133}