1use std::borrow::Cow;
4use std::collections::{BTreeMap, HashMap};
5use std::fmt;
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use rdkafka::message::{Header, OwnedHeaders};
11use rdkafka::producer::{BaseRecord, Producer as _};
12use rdkafka::ClientConfig;
13use relay_statsd::metric;
14use thiserror::Error;
15
16use crate::config::{KafkaParams, KafkaTopic};
17use crate::statsd::{KafkaCounters, KafkaGauges, KafkaHistograms};
18
19mod utils;
20use utils::{Context, ThreadedProducer};
21
22#[cfg(feature = "schemas")]
23mod schemas;
24
25const REPORT_FREQUENCY_SECS: u64 = 1;
26const KAFKA_FETCH_METADATA_TIMEOUT: Duration = Duration::from_secs(30);
27
28#[derive(Error, Debug)]
30pub enum ClientError {
31 #[error("failed to send kafka message")]
33 SendFailed(#[source] rdkafka::error::KafkaError),
34
35 #[error("failed to find producer for the requested kafka topic")]
37 InvalidTopicName,
38
39 #[error("failed to create kafka producer: invalid kafka config")]
41 InvalidConfig(#[source] rdkafka::error::KafkaError),
42
43 #[error("failed to serialize kafka message")]
45 InvalidMsgPack(#[source] rmp_serde::encode::Error),
46
47 #[error("failed to serialize json message")]
49 InvalidJson(#[source] serde_json::Error),
50
51 #[cfg(feature = "schemas")]
53 #[error("failed to run schema validation on message")]
54 SchemaValidationFailed(#[source] schemas::SchemaError),
55
56 #[error("invalid kafka shard")]
58 InvalidShard,
59
60 #[error("failed to fetch the metadata of Kafka")]
62 MetadataFetchError(rdkafka::error::KafkaError),
63
64 #[error("failed to validate the topic with name {0}: {1:?}")]
66 TopicError(String, rdkafka_sys::rd_kafka_resp_err_t),
67}
68
69pub trait Message {
71 fn key(&self) -> [u8; 16];
73
74 fn variant(&self) -> &'static str;
76
77 fn headers(&self) -> Option<&BTreeMap<String, String>>;
79
80 fn serialize(&self) -> Result<Cow<'_, [u8]>, ClientError>;
86}
87
88struct Producer {
90 topic_name: String,
92 producer: Arc<ThreadedProducer>,
94 metrics: Debounced,
96}
97
98impl Producer {
99 fn new(topic_name: String, producer: Arc<ThreadedProducer>) -> Self {
100 Self {
101 topic_name,
102 producer,
103 metrics: Debounced::new(REPORT_FREQUENCY_SECS),
104 }
105 }
106
107 fn validate_topic(&self) -> Result<(), ClientError> {
109 let client = self.producer.client();
110 let metadata = client
111 .fetch_metadata(Some(&self.topic_name), KAFKA_FETCH_METADATA_TIMEOUT)
112 .map_err(ClientError::MetadataFetchError)?;
113
114 for topic in metadata.topics() {
115 if let Some(error) = topic.error() {
116 return Err(ClientError::TopicError(topic.name().to_string(), error));
117 }
118 }
119
120 Ok(())
121 }
122}
123
124impl fmt::Debug for Producer {
125 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
126 f.debug_struct("Producer")
127 .field("topic_name", &self.topic_name)
128 .field("producer", &"<ThreadedProducer>")
129 .finish_non_exhaustive()
130 }
131}
132
133#[derive(Debug)]
135pub struct KafkaClient {
136 producers: HashMap<KafkaTopic, Producer>,
137 #[cfg(feature = "schemas")]
138 schema_validator: schemas::Validator,
139}
140
141impl KafkaClient {
142 pub fn builder() -> KafkaClientBuilder {
144 KafkaClientBuilder::default()
145 }
146
147 pub fn send_message(
151 &self,
152 topic: KafkaTopic,
153 message: &impl Message,
154 ) -> Result<&str, ClientError> {
155 let serialized = message.serialize()?;
156 #[cfg(feature = "schemas")]
157 self.schema_validator
158 .validate_message_schema(topic, &serialized)
159 .map_err(ClientError::SchemaValidationFailed)?;
160 let key = message.key();
161 self.send(
162 topic,
163 &key,
164 message.headers(),
165 message.variant(),
166 &serialized,
167 )
168 }
169
170 pub fn send(
174 &self,
175 topic: KafkaTopic,
176 key: &[u8; 16],
177 headers: Option<&BTreeMap<String, String>>,
178 variant: &str,
179 payload: &[u8],
180 ) -> Result<&str, ClientError> {
181 let producer = self.producers.get(&topic).ok_or_else(|| {
182 relay_log::error!(
183 "attempted to send message to {topic:?} using an unconfigured kafka producer",
184 );
185 ClientError::InvalidTopicName
186 })?;
187 producer.send(key, headers, variant, payload)
188 }
189}
190
191#[derive(Default)]
193pub struct KafkaClientBuilder {
194 reused_producers: BTreeMap<Option<String>, Arc<ThreadedProducer>>,
195 producers: HashMap<KafkaTopic, Producer>,
196}
197
198impl KafkaClientBuilder {
199 pub fn new() -> Self {
201 Self::default()
202 }
203
204 pub fn add_kafka_topic_config(
211 mut self,
212 topic: KafkaTopic,
213 params: &KafkaParams,
214 validate_topic: bool,
215 ) -> Result<Self, ClientError> {
216 let mut client_config = ClientConfig::new();
217
218 let KafkaParams {
219 topic_name,
220 config_name,
221 params,
222 } = params;
223
224 let config_name = config_name.map(str::to_string);
225
226 if let Some(producer) = self.reused_producers.get(&config_name) {
227 let producer = Producer::new((*topic_name).to_string(), Arc::clone(producer));
228 if validate_topic {
229 producer.validate_topic()?;
230 }
231 self.producers.insert(topic, producer);
232 return Ok(self);
233 }
234
235 for config_p in *params {
236 client_config.set(config_p.name.as_str(), config_p.value.as_str());
237 }
238
239 let producer = Arc::new(
240 client_config
241 .create_with_context(Context)
242 .map_err(ClientError::InvalidConfig)?,
243 );
244
245 self.reused_producers
246 .insert(config_name, Arc::clone(&producer));
247
248 let producer = Producer::new((*topic_name).to_string(), producer);
249 if validate_topic {
250 producer.validate_topic()?;
251 }
252 self.producers.insert(topic, producer);
253
254 Ok(self)
255 }
256
257 pub fn build(self) -> KafkaClient {
259 KafkaClient {
260 producers: self.producers,
261 #[cfg(feature = "schemas")]
262 schema_validator: schemas::Validator::default(),
263 }
264 }
265}
266
267impl fmt::Debug for KafkaClientBuilder {
268 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
269 f.debug_struct("KafkaClientBuilder")
270 .field("reused_producers", &"<CachedProducers>")
271 .field("producers", &self.producers)
272 .finish()
273 }
274}
275
276impl Producer {
277 fn send(
279 &self,
280 key: &[u8; 16],
281 headers: Option<&BTreeMap<String, String>>,
282 variant: &str,
283 payload: &[u8],
284 ) -> Result<&str, ClientError> {
285 metric!(
286 histogram(KafkaHistograms::KafkaMessageSize) = payload.len() as u64,
287 variant = variant
288 );
289
290 let topic_name = self.topic_name.as_str();
291 let mut record = BaseRecord::to(topic_name).key(key).payload(payload);
292
293 if let Some(headers) = headers {
295 let mut kafka_headers = OwnedHeaders::new();
296 for (key, value) in headers {
297 kafka_headers = kafka_headers.insert(Header {
298 key,
299 value: Some(value),
300 });
301 }
302 record = record.headers(kafka_headers);
303 }
304
305 self.metrics.debounce(|| {
306 metric!(
307 gauge(KafkaGauges::InFlightCount) = self.producer.in_flight_count() as u64,
308 variant = variant,
309 topic = topic_name
310 );
311 });
312
313 self.producer
314 .send(record)
315 .map(|_| topic_name)
316 .map_err(|(error, _message)| {
317 relay_log::error!(
318 error = &error as &dyn std::error::Error,
319 tags.variant = variant,
320 tags.topic = topic_name,
321 "error sending kafka message",
322 );
323 metric!(
324 counter(KafkaCounters::ProducerEnqueueError) += 1,
325 variant = variant,
326 topic = topic_name
327 );
328 ClientError::SendFailed(error)
329 })
330 }
331}
332
333struct Debounced {
334 last_activation: AtomicU64,
336 interval: u64,
338 instant: Instant,
340}
341
342impl Debounced {
343 pub fn new(interval: u64) -> Self {
344 Self {
345 last_activation: AtomicU64::new(0),
346 interval,
347 instant: Instant::now(),
348 }
349 }
350
351 fn debounce(&self, f: impl FnOnce()) -> bool {
352 let now = self.instant.elapsed().as_secs() + self.interval;
354
355 let prev = self.last_activation.load(Ordering::Relaxed);
356 if now.saturating_sub(prev) < self.interval {
357 return false;
358 }
359
360 if self
361 .last_activation
362 .compare_exchange(prev, now, Ordering::SeqCst, Ordering::Acquire)
363 .is_ok()
364 {
365 f();
366 return true;
367 }
368
369 false
370 }
371}
372
373#[cfg(test)]
374mod tests {
375 use std::thread;
376
377 use super::*;
378
379 #[test]
380 fn test_debounce() {
381 let d = Debounced::new(1);
382
383 assert!(d.debounce(|| {}));
384 for _ in 0..10 {
385 assert!(!d.debounce(|| {}));
386 }
387
388 thread::sleep(Duration::from_secs(1));
389
390 assert!(d.debounce(|| {}));
391 for _ in 0..10 {
392 assert!(!d.debounce(|| {}));
393 }
394 }
395}