relay_kafka/producer/
utils.rs

1use std::error::Error;
2
3use rdkafka::message::{Header, OwnedHeaders, ToBytes};
4use rdkafka::producer::{DeliveryResult, ProducerContext};
5use rdkafka::{ClientContext, Message};
6use relay_statsd::metric;
7
8use crate::statsd::{KafkaCounters, KafkaGauges};
9
10/// A thin wrapper around [`OwnedHeaders`].
11///
12/// Unlike [`OwnedHeaders`], this will not allocate on creation.
13/// Allocations are tuned for the use-case in a [`super::Producer`].
14pub struct KafkaHeaders(Option<OwnedHeaders>);
15
16impl KafkaHeaders {
17    pub fn new() -> Self {
18        Self(None)
19    }
20
21    pub fn insert<V>(&mut self, header: Header<'_, &V>)
22    where
23        V: ToBytes + ?Sized,
24    {
25        self.extend(Some(header));
26    }
27
28    pub fn into_inner(self) -> Option<OwnedHeaders> {
29        self.0
30    }
31}
32
33impl<'a, 'b, V> Extend<Header<'a, &'b V>> for KafkaHeaders
34where
35    V: ToBytes + ?Sized,
36{
37    fn extend<T: IntoIterator<Item = Header<'a, &'b V>>>(&mut self, iter: T) {
38        let mut iter = iter.into_iter();
39
40        // Probe if the iterator is empty, if it is empty, no need to do anything.
41        let Some(first) = iter.next() else {
42            return;
43        };
44
45        let mut headers = self.0.take().unwrap_or_else(|| {
46            // Get a size hint from the iterator, +2 for the already removed
47            // first element and reserving space for 1 extra header which is conditionally
48            // added by the `Producer` in this crate.
49            //
50            // This means we might allocate a little bit too much, but we never have to resize
51            // and allocate a second time, a good trade-off.
52            let size = iter.size_hint().0 + 2;
53            OwnedHeaders::new_with_capacity(size)
54        });
55        headers = headers.insert(first);
56        for remaining in iter {
57            headers = headers.insert(remaining);
58        }
59
60        self.0 = Some(headers);
61    }
62}
63
64impl<'a, 'b, V> FromIterator<Header<'a, &'b V>> for KafkaHeaders
65where
66    V: ToBytes + ?Sized,
67{
68    fn from_iter<I: IntoIterator<Item = Header<'a, &'b V>>>(iter: I) -> Self {
69        let mut c = Self::new();
70        c.extend(iter);
71        c
72    }
73}
74
75/// Kafka client and producer context that logs statistics and producer errors.
76#[derive(Debug)]
77pub struct Context {
78    /// Producer name for deployment identification
79    producer_name: String,
80}
81
82impl Context {
83    pub fn new(producer_name: String) -> Self {
84        Self { producer_name }
85    }
86
87    pub fn producer_name(&self) -> &str {
88        &self.producer_name
89    }
90}
91
92impl ClientContext for Context {
93    /// Report client statistics as statsd metrics.
94    ///
95    /// This method is only called if `statistics.interval.ms` is configured.
96    fn stats(&self, statistics: rdkafka::Statistics) {
97        let producer_name = &self.producer_name;
98
99        relay_statsd::metric!(
100            gauge(KafkaGauges::MessageCount) = statistics.msg_cnt,
101            producer_name = producer_name
102        );
103        relay_statsd::metric!(
104            gauge(KafkaGauges::MessageCountMax) = statistics.msg_max,
105            producer_name = producer_name
106        );
107        relay_statsd::metric!(
108            gauge(KafkaGauges::MessageSize) = statistics.msg_size,
109            producer_name = producer_name
110        );
111        relay_statsd::metric!(
112            gauge(KafkaGauges::MessageSizeMax) = statistics.msg_size_max,
113            producer_name = producer_name
114        );
115        relay_statsd::metric!(
116            gauge(KafkaGauges::TxMsgs) = statistics.txmsgs as u64,
117            producer_name = producer_name
118        );
119
120        for (_, broker) in statistics.brokers {
121            relay_statsd::metric!(
122                gauge(KafkaGauges::OutboundBufferRequests) = broker.outbuf_cnt as u64,
123                broker_name = &broker.name,
124                producer_name = producer_name
125            );
126            relay_statsd::metric!(
127                gauge(KafkaGauges::OutboundBufferMessages) = broker.outbuf_msg_cnt as u64,
128                broker_name = &broker.name,
129                producer_name = producer_name
130            );
131            if let Some(connects) = broker.connects {
132                relay_statsd::metric!(
133                    gauge(KafkaGauges::Connects) = connects as u64,
134                    broker_name = &broker.name,
135                    producer_name = producer_name
136                );
137            }
138            if let Some(disconnects) = broker.disconnects {
139                relay_statsd::metric!(
140                    gauge(KafkaGauges::Disconnects) = disconnects as u64,
141                    broker_name = &broker.name,
142                    producer_name = producer_name
143                );
144            }
145            if let Some(int_latency) = broker.int_latency {
146                relay_statsd::metric!(
147                    gauge(KafkaGauges::BrokerIntLatencyAvg) = (int_latency.avg / 1000) as u64,
148                    broker_name = &broker.name,
149                    producer_name = producer_name
150                );
151                relay_statsd::metric!(
152                    gauge(KafkaGauges::BrokerIntLatencyP99) = (int_latency.p99 / 1000) as u64,
153                    broker_name = &broker.name,
154                    producer_name = producer_name
155                );
156            }
157            if let Some(outbuf_latency) = broker.outbuf_latency {
158                relay_statsd::metric!(
159                    gauge(KafkaGauges::BrokerOutbufLatencyAvg) = (outbuf_latency.avg / 1000) as u64,
160                    broker_name = &broker.name,
161                    producer_name = producer_name
162                );
163                relay_statsd::metric!(
164                    gauge(KafkaGauges::BrokerOutbufLatencyP99) = (outbuf_latency.p99 / 1000) as u64,
165                    broker_name = &broker.name,
166                    producer_name = producer_name
167                );
168            }
169            if let Some(rtt) = broker.rtt {
170                relay_statsd::metric!(
171                    gauge(KafkaGauges::BrokerRttAvg) = (rtt.avg / 1000) as u64,
172                    broker_name = &broker.name,
173                    producer_name = producer_name
174                );
175                relay_statsd::metric!(
176                    gauge(KafkaGauges::BrokerRttP99) = (rtt.p99 / 1000) as u64,
177                    broker_name = &broker.name,
178                    producer_name = producer_name
179                );
180            }
181            relay_statsd::metric!(
182                gauge(KafkaGauges::BrokerTx) = broker.tx,
183                broker_name = &broker.name,
184                producer_name = producer_name
185            );
186            relay_statsd::metric!(
187                gauge(KafkaGauges::BrokerTxBytes) = broker.txbytes,
188                broker_name = &broker.name,
189                producer_name = producer_name
190            );
191        }
192    }
193}
194
195impl ProducerContext for Context {
196    type DeliveryOpaque = ();
197
198    /// This method is called after attempting to send a message to Kafka.
199    /// It's called asynchronously for every message, so we want to handle errors explicitly here.
200    fn delivery(&self, result: &DeliveryResult, _delivery_opaque: Self::DeliveryOpaque) {
201        // TODO: any `Accepted` outcomes (e.g. spans) should be logged here instead of on the caller side,
202        // such that we do not over-report in the error case.
203
204        match result {
205            Ok(message) => {
206                metric!(
207                    counter(KafkaCounters::ProduceStatusSuccess) += 1,
208                    topic = message.topic(),
209                    producer_name = &self.producer_name
210                );
211            }
212            Err((error, message)) => {
213                relay_log::error!(
214                    error = error as &dyn Error,
215                    payload_len = message.payload_len(),
216                    tags.topic = message.topic(),
217                    "failed to produce message to Kafka (delivery callback)",
218                );
219
220                metric!(
221                    counter(KafkaCounters::ProduceStatusError) += 1,
222                    topic = message.topic(),
223                    producer_name = &self.producer_name
224                );
225            }
226        }
227    }
228}
229
230/// The wrapper type around the kafka [`rdkafka::producer::ThreadedProducer`] with our own
231/// [`Context`].
232pub type ThreadedProducer = rdkafka::producer::ThreadedProducer<Context>;