1use std::borrow::Cow;
4use std::collections::{BTreeMap, HashMap};
5use std::fmt;
6use std::sync::Arc;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::time::{Duration, Instant};
9
10use rdkafka::ClientConfig;
11use rdkafka::message::{Header, OwnedHeaders};
12use rdkafka::producer::{BaseRecord, Producer as _};
13use relay_statsd::metric;
14use thiserror::Error;
15
16use crate::config::{KafkaParams, KafkaTopic};
17use crate::statsd::{KafkaCounters, KafkaGauges, KafkaHistograms};
18
19mod utils;
20use utils::{Context, ThreadedProducer};
21
22#[cfg(feature = "schemas")]
23mod schemas;
24
25const REPORT_FREQUENCY_SECS: u64 = 1;
26const KAFKA_FETCH_METADATA_TIMEOUT: Duration = Duration::from_secs(30);
27
28#[derive(Error, Debug)]
30pub enum ClientError {
31 #[error("failed to send kafka message")]
33 SendFailed(#[source] rdkafka::error::KafkaError),
34
35 #[error("failed to find producer for the requested kafka topic")]
37 InvalidTopicName,
38
39 #[error("failed to create kafka producer: invalid kafka config")]
41 InvalidConfig(#[source] rdkafka::error::KafkaError),
42
43 #[error("failed to serialize kafka message")]
45 InvalidMsgPack(#[source] rmp_serde::encode::Error),
46
47 #[error("failed to serialize json message")]
49 InvalidJson(#[source] serde_json::Error),
50
51 #[cfg(feature = "schemas")]
53 #[error("failed to run schema validation on message")]
54 SchemaValidationFailed(#[source] schemas::SchemaError),
55
56 #[error("invalid kafka shard")]
58 InvalidShard,
59
60 #[error("failed to fetch the metadata of Kafka")]
62 MetadataFetchError(rdkafka::error::KafkaError),
63
64 #[error("failed to validate the topic with name {0}: {1:?}")]
66 TopicError(String, rdkafka_sys::rd_kafka_resp_err_t),
67
68 #[error("failed to encode protobuf because the buffer is too small")]
71 ProtobufEncodingFailed,
72}
73
74pub trait Message {
76 fn key(&self) -> [u8; 16];
78
79 fn variant(&self) -> &'static str;
81
82 fn headers(&self) -> Option<&BTreeMap<String, String>>;
84
85 fn serialize(&self) -> Result<Cow<'_, [u8]>, ClientError>;
91}
92
93struct Producer {
95 topic_name: String,
97 producer: Arc<ThreadedProducer>,
99 metrics: Debounced,
101}
102
103impl Producer {
104 fn new(topic_name: String, producer: Arc<ThreadedProducer>) -> Self {
105 Self {
106 topic_name,
107 producer,
108 metrics: Debounced::new(REPORT_FREQUENCY_SECS),
109 }
110 }
111
112 fn validate_topic(&self) -> Result<(), ClientError> {
114 let client = self.producer.client();
115 let metadata = client
116 .fetch_metadata(Some(&self.topic_name), KAFKA_FETCH_METADATA_TIMEOUT)
117 .map_err(ClientError::MetadataFetchError)?;
118
119 for topic in metadata.topics() {
120 if let Some(error) = topic.error() {
121 return Err(ClientError::TopicError(topic.name().to_string(), error));
122 }
123 }
124
125 Ok(())
126 }
127}
128
129impl fmt::Debug for Producer {
130 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
131 f.debug_struct("Producer")
132 .field("topic_name", &self.topic_name)
133 .field("producer", &"<ThreadedProducer>")
134 .finish_non_exhaustive()
135 }
136}
137
138#[derive(Debug)]
140pub struct KafkaClient {
141 producers: HashMap<KafkaTopic, Producer>,
142 #[cfg(feature = "schemas")]
143 schema_validator: schemas::Validator,
144}
145
146impl KafkaClient {
147 pub fn builder() -> KafkaClientBuilder {
149 KafkaClientBuilder::default()
150 }
151
152 pub fn send_message(
156 &self,
157 topic: KafkaTopic,
158 message: &impl Message,
159 ) -> Result<&str, ClientError> {
160 let serialized = message.serialize()?;
161 #[cfg(feature = "schemas")]
162 self.schema_validator
163 .validate_message_schema(topic, &serialized)
164 .map_err(ClientError::SchemaValidationFailed)?;
165 let key = message.key();
166 self.send(
167 topic,
168 &key,
169 message.headers(),
170 message.variant(),
171 &serialized,
172 )
173 }
174
175 pub fn send(
179 &self,
180 topic: KafkaTopic,
181 key: &[u8; 16],
182 headers: Option<&BTreeMap<String, String>>,
183 variant: &str,
184 payload: &[u8],
185 ) -> Result<&str, ClientError> {
186 let producer = self.producers.get(&topic).ok_or_else(|| {
187 relay_log::error!(
188 "attempted to send message to {topic:?} using an unconfigured kafka producer",
189 );
190 ClientError::InvalidTopicName
191 })?;
192 producer.send(key, headers, variant, payload)
193 }
194}
195
196#[derive(Default)]
198pub struct KafkaClientBuilder {
199 reused_producers: BTreeMap<Option<String>, Arc<ThreadedProducer>>,
200 producers: HashMap<KafkaTopic, Producer>,
201}
202
203impl KafkaClientBuilder {
204 pub fn new() -> Self {
206 Self::default()
207 }
208
209 pub fn add_kafka_topic_config(
216 mut self,
217 topic: KafkaTopic,
218 params: &KafkaParams,
219 validate_topic: bool,
220 ) -> Result<Self, ClientError> {
221 let mut client_config = ClientConfig::new();
222
223 let KafkaParams {
224 topic_name,
225 config_name,
226 params,
227 } = params;
228
229 let config_name = config_name.map(str::to_string);
230
231 if let Some(producer) = self.reused_producers.get(&config_name) {
232 let producer = Producer::new((*topic_name).to_string(), Arc::clone(producer));
233 if validate_topic {
234 producer.validate_topic()?;
235 }
236 self.producers.insert(topic, producer);
237 return Ok(self);
238 }
239
240 for config_p in *params {
241 client_config.set(config_p.name.as_str(), config_p.value.as_str());
242 }
243
244 let producer = Arc::new(
245 client_config
246 .create_with_context(Context)
247 .map_err(ClientError::InvalidConfig)?,
248 );
249
250 self.reused_producers
251 .insert(config_name, Arc::clone(&producer));
252
253 let producer = Producer::new((*topic_name).to_string(), producer);
254 if validate_topic {
255 producer.validate_topic()?;
256 }
257 self.producers.insert(topic, producer);
258
259 Ok(self)
260 }
261
262 pub fn build(self) -> KafkaClient {
264 KafkaClient {
265 producers: self.producers,
266 #[cfg(feature = "schemas")]
267 schema_validator: schemas::Validator::default(),
268 }
269 }
270}
271
272impl fmt::Debug for KafkaClientBuilder {
273 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
274 f.debug_struct("KafkaClientBuilder")
275 .field("reused_producers", &"<CachedProducers>")
276 .field("producers", &self.producers)
277 .finish()
278 }
279}
280
281impl Producer {
282 fn send(
284 &self,
285 key: &[u8; 16],
286 headers: Option<&BTreeMap<String, String>>,
287 variant: &str,
288 payload: &[u8],
289 ) -> Result<&str, ClientError> {
290 metric!(
291 histogram(KafkaHistograms::KafkaMessageSize) = payload.len() as u64,
292 variant = variant
293 );
294
295 let topic_name = self.topic_name.as_str();
296 let mut record = BaseRecord::to(topic_name).key(key).payload(payload);
297
298 if let Some(headers) = headers {
300 let mut kafka_headers = OwnedHeaders::new();
301 for (key, value) in headers {
302 kafka_headers = kafka_headers.insert(Header {
303 key,
304 value: Some(value),
305 });
306 }
307 record = record.headers(kafka_headers);
308 }
309
310 self.metrics.debounce(|| {
311 metric!(
312 gauge(KafkaGauges::InFlightCount) = self.producer.in_flight_count() as u64,
313 variant = variant,
314 topic = topic_name
315 );
316 });
317
318 self.producer
319 .send(record)
320 .map(|_| topic_name)
321 .map_err(|(error, _message)| {
322 relay_log::error!(
323 error = &error as &dyn std::error::Error,
324 tags.variant = variant,
325 tags.topic = topic_name,
326 "error sending kafka message",
327 );
328 metric!(
329 counter(KafkaCounters::ProducerEnqueueError) += 1,
330 variant = variant,
331 topic = topic_name
332 );
333 ClientError::SendFailed(error)
334 })
335 }
336}
337
338struct Debounced {
339 last_activation: AtomicU64,
341 interval: u64,
343 instant: Instant,
345}
346
347impl Debounced {
348 pub fn new(interval: u64) -> Self {
349 Self {
350 last_activation: AtomicU64::new(0),
351 interval,
352 instant: Instant::now(),
353 }
354 }
355
356 fn debounce(&self, f: impl FnOnce()) -> bool {
357 let now = self.instant.elapsed().as_secs() + self.interval;
359
360 let prev = self.last_activation.load(Ordering::Relaxed);
361 if now.saturating_sub(prev) < self.interval {
362 return false;
363 }
364
365 if self
366 .last_activation
367 .compare_exchange(prev, now, Ordering::SeqCst, Ordering::Acquire)
368 .is_ok()
369 {
370 f();
371 return true;
372 }
373
374 false
375 }
376}
377
378#[cfg(test)]
379mod tests {
380 use std::thread;
381
382 use super::*;
383
384 #[test]
385 fn test_debounce() {
386 let d = Debounced::new(1);
387
388 assert!(d.debounce(|| {}));
389 for _ in 0..10 {
390 assert!(!d.debounce(|| {}));
391 }
392
393 thread::sleep(Duration::from_secs(1));
394
395 assert!(d.debounce(|| {}));
396 for _ in 0..10 {
397 assert!(!d.debounce(|| {}));
398 }
399 }
400}