relay_kafka/producer/
utils.rs1use std::error::Error;
2
3use rdkafka::message::{Header, OwnedHeaders, ToBytes};
4use rdkafka::producer::{DeliveryResult, ProducerContext};
5use rdkafka::{ClientContext, Message};
6use relay_statsd::metric;
7
8use crate::statsd::{KafkaCounters, KafkaGauges};
9
10pub struct KafkaHeaders(Option<OwnedHeaders>);
15
16impl KafkaHeaders {
17 pub fn new() -> Self {
18 Self(None)
19 }
20
21 pub fn insert<V>(&mut self, header: Header<'_, &V>)
22 where
23 V: ToBytes + ?Sized,
24 {
25 self.extend(Some(header));
26 }
27
28 pub fn into_inner(self) -> Option<OwnedHeaders> {
29 self.0
30 }
31}
32
33impl<'a, 'b, V> Extend<Header<'a, &'b V>> for KafkaHeaders
34where
35 V: ToBytes + ?Sized,
36{
37 fn extend<T: IntoIterator<Item = Header<'a, &'b V>>>(&mut self, iter: T) {
38 let mut iter = iter.into_iter();
39
40 let Some(first) = iter.next() else {
42 return;
43 };
44
45 let mut headers = self.0.take().unwrap_or_else(|| {
46 let size = iter.size_hint().0 + 2;
53 OwnedHeaders::new_with_capacity(size)
54 });
55 headers = headers.insert(first);
56 for remaining in iter {
57 headers = headers.insert(remaining);
58 }
59
60 self.0 = Some(headers);
61 }
62}
63
64impl<'a, 'b, V> FromIterator<Header<'a, &'b V>> for KafkaHeaders
65where
66 V: ToBytes + ?Sized,
67{
68 fn from_iter<I: IntoIterator<Item = Header<'a, &'b V>>>(iter: I) -> Self {
69 let mut c = Self::new();
70 c.extend(iter);
71 c
72 }
73}
74
75#[derive(Debug)]
77pub struct Context;
78
79impl ClientContext for Context {
80 fn stats(&self, statistics: rdkafka::Statistics) {
84 relay_statsd::metric!(gauge(KafkaGauges::MessageCount) = statistics.msg_cnt);
85 relay_statsd::metric!(gauge(KafkaGauges::MessageCountMax) = statistics.msg_max);
86 relay_statsd::metric!(gauge(KafkaGauges::MessageSize) = statistics.msg_size);
87 relay_statsd::metric!(gauge(KafkaGauges::MessageSizeMax) = statistics.msg_size_max);
88
89 for (_, broker) in statistics.brokers {
90 relay_statsd::metric!(
91 gauge(KafkaGauges::OutboundBufferRequests) = broker.outbuf_cnt as u64,
92 broker_name = &broker.name
93 );
94 relay_statsd::metric!(
95 gauge(KafkaGauges::OutboundBufferMessages) = broker.outbuf_msg_cnt as u64,
96 broker_name = &broker.name
97 );
98 if let Some(connects) = broker.connects {
99 relay_statsd::metric!(
100 gauge(KafkaGauges::Connects) = connects as u64,
101 broker_name = &broker.name
102 );
103 }
104 if let Some(disconnects) = broker.disconnects {
105 relay_statsd::metric!(
106 gauge(KafkaGauges::Disconnects) = disconnects as u64,
107 broker_name = &broker.name
108 );
109 }
110 if let Some(int_latency) = broker.int_latency {
111 relay_statsd::metric!(
112 gauge(KafkaGauges::ProducerQueueLatency) = int_latency.max as u64,
113 broker_name = &broker.name
114 );
115 }
116 if let Some(outbuf_latency) = broker.outbuf_latency {
117 relay_statsd::metric!(
118 gauge(KafkaGauges::RequestQueueLatency) = outbuf_latency.max as u64,
119 broker_name = &broker.name
120 );
121 }
122 }
123 }
124}
125
126impl ProducerContext for Context {
127 type DeliveryOpaque = ();
128
129 fn delivery(&self, result: &DeliveryResult, _delivery_opaque: Self::DeliveryOpaque) {
132 if let Err((error, message)) = result {
136 relay_log::error!(
137 error = error as &dyn Error,
138 payload_len = message.payload_len(),
139 tags.topic = message.topic(),
140 "failed to produce message to Kafka (delivery callback)",
141 );
142
143 metric!(
144 counter(KafkaCounters::ProcessingProduceError) += 1,
145 topic = message.topic()
146 );
147 }
148 }
149}
150
151pub type ThreadedProducer = rdkafka::producer::ThreadedProducer<Context>;