relay_kafka/producer/
utils.rs1use std::error::Error;
2
3use rdkafka::message::{Header, OwnedHeaders, ToBytes};
4use rdkafka::producer::{DeliveryResult, ProducerContext};
5use rdkafka::{ClientContext, Message};
6use relay_statsd::metric;
7
8use crate::statsd::{KafkaCounters, KafkaGauges};
9
10pub struct KafkaHeaders(Option<OwnedHeaders>);
15
16impl KafkaHeaders {
17 pub fn new() -> Self {
18 Self(None)
19 }
20
21 pub fn insert<V>(&mut self, header: Header<'_, &V>)
22 where
23 V: ToBytes + ?Sized,
24 {
25 self.extend(Some(header));
26 }
27
28 pub fn into_inner(self) -> Option<OwnedHeaders> {
29 self.0
30 }
31}
32
33impl<'a, 'b, V> Extend<Header<'a, &'b V>> for KafkaHeaders
34where
35 V: ToBytes + ?Sized,
36{
37 fn extend<T: IntoIterator<Item = Header<'a, &'b V>>>(&mut self, iter: T) {
38 let mut iter = iter.into_iter();
39
40 let Some(first) = iter.next() else {
42 return;
43 };
44
45 let mut headers = self.0.take().unwrap_or_else(|| {
46 let size = iter.size_hint().0 + 2;
53 OwnedHeaders::new_with_capacity(size)
54 });
55 headers = headers.insert(first);
56 for remaining in iter {
57 headers = headers.insert(remaining);
58 }
59
60 self.0 = Some(headers);
61 }
62}
63
64impl<'a, 'b, V> FromIterator<Header<'a, &'b V>> for KafkaHeaders
65where
66 V: ToBytes + ?Sized,
67{
68 fn from_iter<I: IntoIterator<Item = Header<'a, &'b V>>>(iter: I) -> Self {
69 let mut c = Self::new();
70 c.extend(iter);
71 c
72 }
73}
74
75#[derive(Debug)]
77pub struct Context {
78 producer_name: String,
80}
81
82impl Context {
83 pub fn new(producer_name: String) -> Self {
84 Self { producer_name }
85 }
86
87 pub fn producer_name(&self) -> &str {
88 &self.producer_name
89 }
90}
91
92impl ClientContext for Context {
93 fn stats(&self, statistics: rdkafka::Statistics) {
97 let producer_name = &self.producer_name;
98
99 relay_statsd::metric!(
100 gauge(KafkaGauges::MessageCount) = statistics.msg_cnt,
101 producer_name = producer_name
102 );
103 relay_statsd::metric!(
104 gauge(KafkaGauges::MessageCountMax) = statistics.msg_max,
105 producer_name = producer_name
106 );
107 relay_statsd::metric!(
108 gauge(KafkaGauges::MessageSize) = statistics.msg_size,
109 producer_name = producer_name
110 );
111 relay_statsd::metric!(
112 gauge(KafkaGauges::MessageSizeMax) = statistics.msg_size_max,
113 producer_name = producer_name
114 );
115 relay_statsd::metric!(
116 gauge(KafkaGauges::TxMsgs) = statistics.txmsgs as u64,
117 producer_name = producer_name
118 );
119
120 for (_, broker) in statistics.brokers {
121 relay_statsd::metric!(
122 gauge(KafkaGauges::OutboundBufferRequests) = broker.outbuf_cnt as u64,
123 broker_name = &broker.name,
124 producer_name = producer_name
125 );
126 relay_statsd::metric!(
127 gauge(KafkaGauges::OutboundBufferMessages) = broker.outbuf_msg_cnt as u64,
128 broker_name = &broker.name,
129 producer_name = producer_name
130 );
131 if let Some(connects) = broker.connects {
132 relay_statsd::metric!(
133 gauge(KafkaGauges::Connects) = connects as u64,
134 broker_name = &broker.name,
135 producer_name = producer_name
136 );
137 }
138 if let Some(disconnects) = broker.disconnects {
139 relay_statsd::metric!(
140 gauge(KafkaGauges::Disconnects) = disconnects as u64,
141 broker_name = &broker.name,
142 producer_name = producer_name
143 );
144 }
145 if let Some(int_latency) = broker.int_latency {
146 relay_statsd::metric!(
147 gauge(KafkaGauges::BrokerIntLatencyAvg) = (int_latency.avg / 1000) as u64,
148 broker_name = &broker.name,
149 producer_name = producer_name
150 );
151 relay_statsd::metric!(
152 gauge(KafkaGauges::BrokerIntLatencyP99) = (int_latency.p99 / 1000) as u64,
153 broker_name = &broker.name,
154 producer_name = producer_name
155 );
156 }
157 if let Some(outbuf_latency) = broker.outbuf_latency {
158 relay_statsd::metric!(
159 gauge(KafkaGauges::BrokerOutbufLatencyAvg) = (outbuf_latency.avg / 1000) as u64,
160 broker_name = &broker.name,
161 producer_name = producer_name
162 );
163 relay_statsd::metric!(
164 gauge(KafkaGauges::BrokerOutbufLatencyP99) = (outbuf_latency.p99 / 1000) as u64,
165 broker_name = &broker.name,
166 producer_name = producer_name
167 );
168 }
169 if let Some(rtt) = broker.rtt {
170 relay_statsd::metric!(
171 gauge(KafkaGauges::BrokerRttAvg) = (rtt.avg / 1000) as u64,
172 broker_name = &broker.name,
173 producer_name = producer_name
174 );
175 relay_statsd::metric!(
176 gauge(KafkaGauges::BrokerRttP99) = (rtt.p99 / 1000) as u64,
177 broker_name = &broker.name,
178 producer_name = producer_name
179 );
180 }
181 relay_statsd::metric!(
182 gauge(KafkaGauges::BrokerTx) = broker.tx,
183 broker_name = &broker.name,
184 producer_name = producer_name
185 );
186 relay_statsd::metric!(
187 gauge(KafkaGauges::BrokerTxBytes) = broker.txbytes,
188 broker_name = &broker.name,
189 producer_name = producer_name
190 );
191 }
192 }
193}
194
195impl ProducerContext for Context {
196 type DeliveryOpaque = ();
197
198 fn delivery(&self, result: &DeliveryResult, _delivery_opaque: Self::DeliveryOpaque) {
201 match result {
205 Ok(message) => {
206 metric!(
207 counter(KafkaCounters::ProduceStatusSuccess) += 1,
208 topic = message.topic(),
209 producer_name = &self.producer_name
210 );
211 }
212 Err((error, message)) => {
213 relay_log::error!(
214 error = error as &dyn Error,
215 payload_len = message.payload_len(),
216 tags.topic = message.topic(),
217 "failed to produce message to Kafka (delivery callback)",
218 );
219
220 metric!(
221 counter(KafkaCounters::ProduceStatusError) += 1,
222 topic = message.topic(),
223 producer_name = &self.producer_name
224 );
225 }
226 }
227 }
228}
229
230pub type ThreadedProducer = rdkafka::producer::ThreadedProducer<Context>;