relay_kafka/
statsd.rs

1//! Kafka producer metrics.
2//!
3//! ## Producer Name Tag
4//!
5//! Many metrics include a `producer_name` tag to identify the deployment or application instance
6//! producing messages. This tag is derived using the following fallback strategy:
7//!
8//! 1. **`client.id`** - If explicitly configured in Kafka config parameters
9//! 2. **Config name** - The name of the secondary Kafka config (e.g., "profiles_cluster", "cluster1")  
10//! 3. **`"unknown"`** - Final fallback for the default Kafka configuration
11//!
12//! Example configurations:
13//! ```yaml
14//! # Explicit client.id (highest priority)
15//! secondary_kafka_configs:
16//!   profiles_cluster:
17//!     - name: "client.id"
18//!       value: "relay-profiles-prod"
19//! # Results in: producer_name="relay-profiles-prod"
20//!
21//! # Config name fallback
22//! secondary_kafka_configs:
23//!   cluster1:
24//!     - name: "bootstrap.servers"
25//!       value: "kafka:9092"
26//! # Results in: producer_name="cluster1"
27//!
28//! # Default config fallback
29//! kafka:
30//!   - name: "bootstrap.servers"
31//!     value: "kafka:9092"
32//! # Results in: producer_name="unknown"
33//! ```
34
35use relay_statsd::{CounterMetric, GaugeMetric, HistogramMetric};
36
37pub enum KafkaCounters {
38    /// Number of messages that failed to be enqueued in the Kafka producer's memory buffer.
39    ///
40    /// These errors include, for example, _"UnknownTopic"_ errors when attempting to send a
41    /// message a topic that does not exist.
42    ///
43    /// This metric is tagged with:
44    /// - `topic`: The Kafka topic being produced to.
45    /// - `variant`: The Kafka message variant.
46    /// - `producer_name`: The configured producer name/deployment identifier.
47    ProducerEnqueueError,
48
49    /// Number of messages that were written to the wrong partition because of configured rate limits.
50    ///
51    /// Each topic in Relay can optionally be configured with a per-partition-key rate limit. This
52    /// rate limit does not drop messages, but instead disables semantic partitioning. Everytime
53    /// this happens for a message, this counter is incremented.
54    ///
55    /// This metric is tagged with:
56    /// - `topic`: The Kafka topic being produced to.
57    /// - `variant`: The Kafka message variant.
58    /// - `producer_name`: The configured producer name/deployment identifier.
59    ProducerPartitionKeyRateLimit,
60
61    /// Number of successful message produce operations.
62    ///
63    /// This metric is tagged with:
64    /// - `topic`: The Kafka topic being produced to.
65    /// - `producer_name`: The configured producer name/deployment identifier.
66    ProduceStatusSuccess,
67
68    /// Number of failed message produce operations.
69    ///
70    /// This metric is tagged with:
71    /// - `topic`: The Kafka topic being produced to.
72    /// - `producer_name`: The configured producer name/deployment identifier.
73    ProduceStatusError,
74}
75
76impl CounterMetric for KafkaCounters {
77    fn name(&self) -> &'static str {
78        match self {
79            Self::ProducerEnqueueError => "producer.enqueue.error",
80            Self::ProducerPartitionKeyRateLimit => "producer.partition_key.rate_limit",
81            Self::ProduceStatusSuccess => "producer.produce_status.success",
82            Self::ProduceStatusError => "producer.produce_status.error",
83        }
84    }
85}
86
87pub enum KafkaHistograms {
88    /// Size of emitted kafka message in bytes.
89    ///
90    /// This metric is tagged with:
91    /// - `topic`: The Kafka topic being produced to.
92    /// - `variant`: The Kafka message variant.
93    /// - `producer_name`: The configured producer name/deployment identifier.
94    KafkaMessageSize,
95}
96
97impl HistogramMetric for KafkaHistograms {
98    fn name(&self) -> &'static str {
99        match self {
100            Self::KafkaMessageSize => "kafka.message_size",
101        }
102    }
103}
104/// Gauge metrics for the Kafka producer.
105///
106/// Most of these metrics are taken from the [`rdkafka::statistics`] module.
107pub enum KafkaGauges {
108    /// The number of messages waiting to be sent to, or acknowledged by, the broker.
109    ///
110    /// See <https://docs.confluent.io/platform/7.5/clients/librdkafka/html/rdkafka_8h.html#ad4b3b7659cf9a79d3353810d6b625bb7>.
111    ///
112    /// This metric is tagged with:
113    /// - `topic`: The Kafka topic being produced to.
114    /// - `variant`: The Kafka message variant.
115    /// - `producer_name`: The configured producer name/deployment identifier.
116    InFlightCount,
117
118    /// The current number of messages in producer queues.
119    ///
120    /// This metric is tagged with:
121    /// - `producer_name`: The configured producer name/deployment identifier.
122    MessageCount,
123
124    /// The maximum number of messages allowed in the producer queues.
125    ///
126    /// This metric is tagged with:
127    /// - `producer_name`: The configured producer name/deployment identifier.
128    MessageCountMax,
129
130    /// The current total size of messages in producer queues.
131    ///
132    /// This metric is tagged with:
133    /// - `producer_name`: The configured producer name/deployment identifier.
134    MessageSize,
135
136    /// The maximum total size of messages allowed in the producer queues.
137    ///
138    /// This metric is tagged with:
139    /// - `producer_name`: The configured producer name/deployment identifier.
140    MessageSizeMax,
141
142    /// The total number of messages transmitted (produced) to all brokers.
143    ///
144    /// This metric is tagged with:
145    /// - `producer_name`: The configured producer name/deployment identifier.
146    TxMsgs,
147
148    /// The number of requests awaiting transmission to the broker.
149    ///
150    /// This metric is tagged with:
151    /// - `broker_name`: The broker hostname, port, and ID, in the form HOSTNAME:PORT/ID.
152    /// - `producer_name`: The configured producer name/deployment identifier.
153    OutboundBufferRequests,
154
155    /// The number of messages awaiting transmission to the broker.
156    ///
157    /// This metric is tagged with:
158    /// - `broker_name`: The broker hostname, port, and ID, in the form HOSTNAME:PORT/ID.
159    /// - `producer_name`: The configured producer name/deployment identifier.
160    OutboundBufferMessages,
161
162    /// The number of connection attempts, including successful and failed attempts, and name resolution failures.
163    ///
164    /// This metric is tagged with:
165    /// - `broker_name`: The broker hostname, port, and ID, in the form HOSTNAME:PORT/ID.
166    /// - `producer_name`: The configured producer name/deployment identifier.
167    Connects,
168
169    /// The number of disconnections, whether triggered by the broker, the network, the load balancer, or something else.
170    ///
171    /// This metric is tagged with:
172    /// - `broker_name`: The broker hostname, port, and ID, in the form HOSTNAME:PORT/ID.
173    /// - `producer_name`: The configured producer name/deployment identifier.
174    Disconnects,
175
176    /// Average internal producer queue latency, in milliseconds.
177    ///
178    /// This metric is tagged with:
179    /// - `broker_name`: The broker hostname, port, and ID, in the form HOSTNAME:PORT/ID.
180    /// - `producer_name`: The configured producer name/deployment identifier.
181    BrokerIntLatencyAvg,
182
183    /// 99th percentile internal producer queue latency, in milliseconds.
184    ///
185    /// This metric is tagged with:
186    /// - `broker_name`: The broker hostname, port, and ID, in the form HOSTNAME:PORT/ID.
187    /// - `producer_name`: The configured producer name/deployment identifier.
188    BrokerIntLatencyP99,
189
190    /// Average output buffer latency, in milliseconds.
191    ///
192    /// This metric is tagged with:
193    /// - `broker_name`: The broker hostname, port, and ID, in the form HOSTNAME:PORT/ID.
194    /// - `producer_name`: The configured producer name/deployment identifier.
195    BrokerOutbufLatencyAvg,
196
197    /// 99th percentile output buffer latency, in milliseconds.
198    ///
199    /// This metric is tagged with:
200    /// - `broker_name`: The broker hostname, port, and ID, in the form HOSTNAME:PORT/ID.
201    /// - `producer_name`: The configured producer name/deployment identifier.
202    BrokerOutbufLatencyP99,
203
204    /// Average round-trip time to the broker, in milliseconds.
205    ///
206    /// This metric is tagged with:
207    /// - `broker_name`: The broker hostname, port, and ID, in the form HOSTNAME:PORT/ID.
208    /// - `producer_name`: The configured producer name/deployment identifier.
209    BrokerRttAvg,
210
211    /// 99th percentile round-trip time to the broker, in milliseconds.
212    ///
213    /// This metric is tagged with:
214    /// - `broker_name`: The broker hostname, port, and ID, in the form HOSTNAME:PORT/ID.
215    /// - `producer_name`: The configured producer name/deployment identifier.
216    BrokerRttP99,
217
218    /// Total number of requests sent to the broker.
219    ///
220    /// This metric is tagged with:
221    /// - `broker_name`: The broker hostname, port, and ID, in the form HOSTNAME:PORT/ID.
222    /// - `producer_name`: The configured producer name/deployment identifier.
223    BrokerTx,
224
225    /// Total number of bytes sent to the broker.
226    ///
227    /// This metric is tagged with:
228    /// - `broker_name`: The broker hostname, port, and ID, in the form HOSTNAME:PORT/ID.
229    /// - `producer_name`: The configured producer name/deployment identifier.
230    BrokerTxBytes,
231}
232
233impl GaugeMetric for KafkaGauges {
234    fn name(&self) -> &'static str {
235        match self {
236            KafkaGauges::InFlightCount => "kafka.in_flight_count",
237            KafkaGauges::MessageCount => "kafka.stats.message_count",
238            KafkaGauges::MessageCountMax => "kafka.stats.message_count_max",
239            KafkaGauges::MessageSize => "kafka.stats.message_size",
240            KafkaGauges::MessageSizeMax => "kafka.stats.message_size_max",
241            KafkaGauges::TxMsgs => "kafka.stats.txmsgs",
242            KafkaGauges::OutboundBufferRequests => "kafka.stats.broker.outbuf.requests",
243            KafkaGauges::OutboundBufferMessages => "kafka.stats.broker.outbuf.messages",
244            KafkaGauges::Connects => "kafka.stats.broker.connects",
245            KafkaGauges::Disconnects => "kafka.stats.broker.disconnects",
246            KafkaGauges::BrokerIntLatencyAvg => "kafka.stats.broker.int_latency.avg",
247            KafkaGauges::BrokerIntLatencyP99 => "kafka.stats.broker.int_latency.p99",
248            KafkaGauges::BrokerOutbufLatencyAvg => "kafka.stats.broker.outbuf_latency.avg",
249            KafkaGauges::BrokerOutbufLatencyP99 => "kafka.stats.broker.outbuf_latency.p99",
250            KafkaGauges::BrokerRttAvg => "kafka.stats.broker.rtt.avg",
251            KafkaGauges::BrokerRttP99 => "kafka.stats.broker.rtt.p99",
252            KafkaGauges::BrokerTx => "kafka.stats.broker.tx",
253            KafkaGauges::BrokerTxBytes => "kafka.stats.broker.txbytes",
254        }
255    }
256}