relay_kafka/statsd.rs
1//! Kafka producer metrics.
2//!
3//! ## Producer Name Tag
4//!
5//! Many metrics include a `producer_name` tag to identify the deployment or application instance
6//! producing messages. This tag is derived using the following fallback strategy:
7//!
8//! 1. **`client.id`** - If explicitly configured in Kafka config parameters
9//! 2. **Config name** - The name of the secondary Kafka config (e.g., "profiles_cluster", "cluster1")
10//! 3. **`"unknown"`** - Final fallback for the default Kafka configuration
11//!
12//! Example configurations:
13//! ```yaml
14//! # Explicit client.id (highest priority)
15//! secondary_kafka_configs:
16//! profiles_cluster:
17//! - name: "client.id"
18//! value: "relay-profiles-prod"
19//! # Results in: producer_name="relay-profiles-prod"
20//!
21//! # Config name fallback
22//! secondary_kafka_configs:
23//! cluster1:
24//! - name: "bootstrap.servers"
25//! value: "kafka:9092"
26//! # Results in: producer_name="cluster1"
27//!
28//! # Default config fallback
29//! kafka:
30//! - name: "bootstrap.servers"
31//! value: "kafka:9092"
32//! # Results in: producer_name="unknown"
33//! ```
34
35use relay_statsd::{CounterMetric, GaugeMetric, HistogramMetric};
36
37pub enum KafkaCounters {
38 /// Number of messages that failed to be enqueued in the Kafka producer's memory buffer.
39 ///
40 /// These errors include, for example, _"UnknownTopic"_ errors when attempting to send a
41 /// message a topic that does not exist.
42 ///
43 /// This metric is tagged with:
44 /// - `topic`: The Kafka topic being produced to.
45 /// - `variant`: The Kafka message variant.
46 /// - `producer_name`: The configured producer name/deployment identifier.
47 ProducerEnqueueError,
48
49 /// Number of messages that were written to the wrong partition because of configured rate limits.
50 ///
51 /// Each topic in Relay can optionally be configured with a per-partition-key rate limit. This
52 /// rate limit does not drop messages, but instead disables semantic partitioning. Everytime
53 /// this happens for a message, this counter is incremented.
54 ///
55 /// This metric is tagged with:
56 /// - `topic`: The Kafka topic being produced to.
57 /// - `variant`: The Kafka message variant.
58 /// - `producer_name`: The configured producer name/deployment identifier.
59 ProducerPartitionKeyRateLimit,
60
61 /// Number of successful message produce operations.
62 ///
63 /// This metric is tagged with:
64 /// - `topic`: The Kafka topic being produced to.
65 /// - `producer_name`: The configured producer name/deployment identifier.
66 ProduceStatusSuccess,
67
68 /// Number of failed message produce operations.
69 ///
70 /// This metric is tagged with:
71 /// - `topic`: The Kafka topic being produced to.
72 /// - `producer_name`: The configured producer name/deployment identifier.
73 ProduceStatusError,
74}
75
76impl CounterMetric for KafkaCounters {
77 fn name(&self) -> &'static str {
78 match self {
79 Self::ProducerEnqueueError => "producer.enqueue.error",
80 Self::ProducerPartitionKeyRateLimit => "producer.partition_key.rate_limit",
81 Self::ProduceStatusSuccess => "producer.produce_status.success",
82 Self::ProduceStatusError => "producer.produce_status.error",
83 }
84 }
85}
86
87pub enum KafkaHistograms {
88 /// Size of emitted kafka message in bytes.
89 ///
90 /// This metric is tagged with:
91 /// - `topic`: The Kafka topic being produced to.
92 /// - `variant`: The Kafka message variant.
93 /// - `producer_name`: The configured producer name/deployment identifier.
94 KafkaMessageSize,
95}
96
97impl HistogramMetric for KafkaHistograms {
98 fn name(&self) -> &'static str {
99 match self {
100 Self::KafkaMessageSize => "kafka.message_size",
101 }
102 }
103}
104/// Gauge metrics for the Kafka producer.
105///
106/// Most of these metrics are taken from the [`rdkafka::statistics`] module.
107pub enum KafkaGauges {
108 /// The number of messages waiting to be sent to, or acknowledged by, the broker.
109 ///
110 /// See <https://docs.confluent.io/platform/7.5/clients/librdkafka/html/rdkafka_8h.html#ad4b3b7659cf9a79d3353810d6b625bb7>.
111 ///
112 /// This metric is tagged with:
113 /// - `topic`: The Kafka topic being produced to.
114 /// - `variant`: The Kafka message variant.
115 /// - `producer_name`: The configured producer name/deployment identifier.
116 InFlightCount,
117
118 /// The current number of messages in producer queues.
119 ///
120 /// This metric is tagged with:
121 /// - `producer_name`: The configured producer name/deployment identifier.
122 MessageCount,
123
124 /// The maximum number of messages allowed in the producer queues.
125 ///
126 /// This metric is tagged with:
127 /// - `producer_name`: The configured producer name/deployment identifier.
128 MessageCountMax,
129
130 /// The current total size of messages in producer queues.
131 ///
132 /// This metric is tagged with:
133 /// - `producer_name`: The configured producer name/deployment identifier.
134 MessageSize,
135
136 /// The maximum total size of messages allowed in the producer queues.
137 ///
138 /// This metric is tagged with:
139 /// - `producer_name`: The configured producer name/deployment identifier.
140 MessageSizeMax,
141
142 /// The total number of messages transmitted (produced) to all brokers.
143 ///
144 /// This metric is tagged with:
145 /// - `producer_name`: The configured producer name/deployment identifier.
146 TxMsgs,
147
148 /// The number of requests awaiting transmission to the broker.
149 ///
150 /// This metric is tagged with:
151 /// - `broker_name`: The broker hostname, port, and ID, in the form HOSTNAME:PORT/ID.
152 /// - `producer_name`: The configured producer name/deployment identifier.
153 OutboundBufferRequests,
154
155 /// The number of messages awaiting transmission to the broker.
156 ///
157 /// This metric is tagged with:
158 /// - `broker_name`: The broker hostname, port, and ID, in the form HOSTNAME:PORT/ID.
159 /// - `producer_name`: The configured producer name/deployment identifier.
160 OutboundBufferMessages,
161
162 /// The number of connection attempts, including successful and failed attempts, and name resolution failures.
163 ///
164 /// This metric is tagged with:
165 /// - `broker_name`: The broker hostname, port, and ID, in the form HOSTNAME:PORT/ID.
166 /// - `producer_name`: The configured producer name/deployment identifier.
167 Connects,
168
169 /// The number of disconnections, whether triggered by the broker, the network, the load balancer, or something else.
170 ///
171 /// This metric is tagged with:
172 /// - `broker_name`: The broker hostname, port, and ID, in the form HOSTNAME:PORT/ID.
173 /// - `producer_name`: The configured producer name/deployment identifier.
174 Disconnects,
175
176 /// Average internal producer queue latency, in milliseconds.
177 ///
178 /// This metric is tagged with:
179 /// - `broker_name`: The broker hostname, port, and ID, in the form HOSTNAME:PORT/ID.
180 /// - `producer_name`: The configured producer name/deployment identifier.
181 BrokerIntLatencyAvg,
182
183 /// 99th percentile internal producer queue latency, in milliseconds.
184 ///
185 /// This metric is tagged with:
186 /// - `broker_name`: The broker hostname, port, and ID, in the form HOSTNAME:PORT/ID.
187 /// - `producer_name`: The configured producer name/deployment identifier.
188 BrokerIntLatencyP99,
189
190 /// Average output buffer latency, in milliseconds.
191 ///
192 /// This metric is tagged with:
193 /// - `broker_name`: The broker hostname, port, and ID, in the form HOSTNAME:PORT/ID.
194 /// - `producer_name`: The configured producer name/deployment identifier.
195 BrokerOutbufLatencyAvg,
196
197 /// 99th percentile output buffer latency, in milliseconds.
198 ///
199 /// This metric is tagged with:
200 /// - `broker_name`: The broker hostname, port, and ID, in the form HOSTNAME:PORT/ID.
201 /// - `producer_name`: The configured producer name/deployment identifier.
202 BrokerOutbufLatencyP99,
203
204 /// Average round-trip time to the broker, in milliseconds.
205 ///
206 /// This metric is tagged with:
207 /// - `broker_name`: The broker hostname, port, and ID, in the form HOSTNAME:PORT/ID.
208 /// - `producer_name`: The configured producer name/deployment identifier.
209 BrokerRttAvg,
210
211 /// 99th percentile round-trip time to the broker, in milliseconds.
212 ///
213 /// This metric is tagged with:
214 /// - `broker_name`: The broker hostname, port, and ID, in the form HOSTNAME:PORT/ID.
215 /// - `producer_name`: The configured producer name/deployment identifier.
216 BrokerRttP99,
217
218 /// Total number of requests sent to the broker.
219 ///
220 /// This metric is tagged with:
221 /// - `broker_name`: The broker hostname, port, and ID, in the form HOSTNAME:PORT/ID.
222 /// - `producer_name`: The configured producer name/deployment identifier.
223 BrokerTx,
224
225 /// Total number of bytes sent to the broker.
226 ///
227 /// This metric is tagged with:
228 /// - `broker_name`: The broker hostname, port, and ID, in the form HOSTNAME:PORT/ID.
229 /// - `producer_name`: The configured producer name/deployment identifier.
230 BrokerTxBytes,
231}
232
233impl GaugeMetric for KafkaGauges {
234 fn name(&self) -> &'static str {
235 match self {
236 KafkaGauges::InFlightCount => "kafka.in_flight_count",
237 KafkaGauges::MessageCount => "kafka.stats.message_count",
238 KafkaGauges::MessageCountMax => "kafka.stats.message_count_max",
239 KafkaGauges::MessageSize => "kafka.stats.message_size",
240 KafkaGauges::MessageSizeMax => "kafka.stats.message_size_max",
241 KafkaGauges::TxMsgs => "kafka.stats.txmsgs",
242 KafkaGauges::OutboundBufferRequests => "kafka.stats.broker.outbuf.requests",
243 KafkaGauges::OutboundBufferMessages => "kafka.stats.broker.outbuf.messages",
244 KafkaGauges::Connects => "kafka.stats.broker.connects",
245 KafkaGauges::Disconnects => "kafka.stats.broker.disconnects",
246 KafkaGauges::BrokerIntLatencyAvg => "kafka.stats.broker.int_latency.avg",
247 KafkaGauges::BrokerIntLatencyP99 => "kafka.stats.broker.int_latency.p99",
248 KafkaGauges::BrokerOutbufLatencyAvg => "kafka.stats.broker.outbuf_latency.avg",
249 KafkaGauges::BrokerOutbufLatencyP99 => "kafka.stats.broker.outbuf_latency.p99",
250 KafkaGauges::BrokerRttAvg => "kafka.stats.broker.rtt.avg",
251 KafkaGauges::BrokerRttP99 => "kafka.stats.broker.rtt.p99",
252 KafkaGauges::BrokerTx => "kafka.stats.broker.tx",
253 KafkaGauges::BrokerTxBytes => "kafka.stats.broker.txbytes",
254 }
255 }
256}