relay_kafka/producer/
utils.rs

1use std::error::Error;
2
3use rdkafka::message::{Header, OwnedHeaders, ToBytes};
4use rdkafka::producer::{DeliveryResult, ProducerContext};
5use rdkafka::{ClientContext, Message};
6use relay_statsd::metric;
7
8use crate::statsd::{KafkaCounters, KafkaGauges};
9
10/// A thin wrapper around [`OwnedHeaders`].
11///
12/// Unlike [`OwnedHeaders`], this will not allocate on creation.
13/// Allocations are tuned for the use-case in a [`super::Producer`].
14pub struct KafkaHeaders(Option<OwnedHeaders>);
15
16impl KafkaHeaders {
17    pub fn new() -> Self {
18        Self(None)
19    }
20
21    pub fn insert<V>(&mut self, header: Header<'_, &V>)
22    where
23        V: ToBytes + ?Sized,
24    {
25        self.extend(Some(header));
26    }
27
28    pub fn into_inner(self) -> Option<OwnedHeaders> {
29        self.0
30    }
31}
32
33impl<'a, 'b, V> Extend<Header<'a, &'b V>> for KafkaHeaders
34where
35    V: ToBytes + ?Sized,
36{
37    fn extend<T: IntoIterator<Item = Header<'a, &'b V>>>(&mut self, iter: T) {
38        let mut iter = iter.into_iter();
39
40        // Probe if the iterator is empty, if it is empty, no need to do anything.
41        let Some(first) = iter.next() else {
42            return;
43        };
44
45        let mut headers = self.0.take().unwrap_or_else(|| {
46            // Get a size hint from the iterator, +2 for the already removed
47            // first element and reserving space for 1 extra header which is conditionally
48            // added by the `Producer` in this crate.
49            //
50            // This means we might allocate a little bit too much, but we never have to resize
51            // and allocate a second time, a good trade-off.
52            let size = iter.size_hint().0 + 2;
53            OwnedHeaders::new_with_capacity(size)
54        });
55        headers = headers.insert(first);
56        for remaining in iter {
57            headers = headers.insert(remaining);
58        }
59
60        self.0 = Some(headers);
61    }
62}
63
64impl<'a, 'b, V> FromIterator<Header<'a, &'b V>> for KafkaHeaders
65where
66    V: ToBytes + ?Sized,
67{
68    fn from_iter<I: IntoIterator<Item = Header<'a, &'b V>>>(iter: I) -> Self {
69        let mut c = Self::new();
70        c.extend(iter);
71        c
72    }
73}
74
75/// Kafka client and producer context that logs statistics and producer errors.
76#[derive(Debug)]
77pub struct Context;
78
79impl ClientContext for Context {
80    /// Report client statistics as statsd metrics.
81    ///
82    /// This method is only called if `statistics.interval.ms` is configured.
83    fn stats(&self, statistics: rdkafka::Statistics) {
84        relay_statsd::metric!(gauge(KafkaGauges::MessageCount) = statistics.msg_cnt);
85        relay_statsd::metric!(gauge(KafkaGauges::MessageCountMax) = statistics.msg_max);
86        relay_statsd::metric!(gauge(KafkaGauges::MessageSize) = statistics.msg_size);
87        relay_statsd::metric!(gauge(KafkaGauges::MessageSizeMax) = statistics.msg_size_max);
88
89        for (_, broker) in statistics.brokers {
90            relay_statsd::metric!(
91                gauge(KafkaGauges::OutboundBufferRequests) = broker.outbuf_cnt as u64,
92                broker_name = &broker.name
93            );
94            relay_statsd::metric!(
95                gauge(KafkaGauges::OutboundBufferMessages) = broker.outbuf_msg_cnt as u64,
96                broker_name = &broker.name
97            );
98            if let Some(connects) = broker.connects {
99                relay_statsd::metric!(
100                    gauge(KafkaGauges::Connects) = connects as u64,
101                    broker_name = &broker.name
102                );
103            }
104            if let Some(disconnects) = broker.disconnects {
105                relay_statsd::metric!(
106                    gauge(KafkaGauges::Disconnects) = disconnects as u64,
107                    broker_name = &broker.name
108                );
109            }
110            if let Some(int_latency) = broker.int_latency {
111                relay_statsd::metric!(
112                    gauge(KafkaGauges::ProducerQueueLatency) = int_latency.max as u64,
113                    broker_name = &broker.name
114                );
115            }
116            if let Some(outbuf_latency) = broker.outbuf_latency {
117                relay_statsd::metric!(
118                    gauge(KafkaGauges::RequestQueueLatency) = outbuf_latency.max as u64,
119                    broker_name = &broker.name
120                );
121            }
122        }
123    }
124}
125
126impl ProducerContext for Context {
127    type DeliveryOpaque = ();
128
129    /// This method is called after attempting to send a message to Kafka.
130    /// It's called asynchronously for every message, so we want to handle errors explicitly here.
131    fn delivery(&self, result: &DeliveryResult, _delivery_opaque: Self::DeliveryOpaque) {
132        // TODO: any `Accepted` outcomes (e.g. spans) should be logged here instead of on the caller side,
133        // such that we do not over-report in the error case.
134
135        if let Err((error, message)) = result {
136            relay_log::error!(
137                error = error as &dyn Error,
138                payload_len = message.payload_len(),
139                tags.topic = message.topic(),
140                "failed to produce message to Kafka (delivery callback)",
141            );
142
143            metric!(
144                counter(KafkaCounters::ProcessingProduceError) += 1,
145                topic = message.topic()
146            );
147        }
148    }
149}
150
151/// The wrapper type around the kafka [`rdkafka::producer::ThreadedProducer`] with our own
152/// [`Context`].
153pub type ThreadedProducer = rdkafka::producer::ThreadedProducer<Context>;