1use std::borrow::Cow;
4use std::collections::{BTreeMap, HashMap};
5use std::fmt;
6use std::sync::Arc;
7use std::sync::atomic::{AtomicU32, Ordering};
8use std::time::{Duration, Instant};
9
10use rdkafka::ClientConfig;
11use rdkafka::message::Header;
12use rdkafka::producer::{BaseRecord, Producer as _};
13use relay_statsd::metric;
14use thiserror::Error;
15
16use crate::KafkaTopicConfig;
17use crate::config::{KafkaParams, KafkaTopic};
18use crate::debounced::Debounced;
19use crate::limits::KafkaRateLimits;
20use crate::producer::utils::KafkaHeaders;
21use crate::statsd::{KafkaCounters, KafkaGauges, KafkaHistograms};
22
23mod utils;
24use utils::{Context, ThreadedProducer};
25
26#[cfg(debug_assertions)]
27mod schemas;
28
29const REPORT_FREQUENCY_SECS: u64 = 1;
30const KAFKA_FETCH_METADATA_TIMEOUT: Duration = Duration::from_secs(30);
31
32pub type Key = u128;
34
35#[derive(Error, Debug)]
37pub enum ClientError {
38 #[error("failed to send kafka message")]
40 SendFailed(#[source] rdkafka::error::KafkaError),
41
42 #[error("failed to find producer for the requested kafka topic")]
44 InvalidTopicName,
45
46 #[error("failed to create kafka producer: invalid kafka config")]
48 InvalidConfig(#[source] rdkafka::error::KafkaError),
49
50 #[error("failed to serialize kafka message")]
52 InvalidMsgPack(#[source] rmp_serde::encode::Error),
53
54 #[error("failed to serialize json message")]
56 InvalidJson(#[source] serde_json::Error),
57
58 #[cfg(debug_assertions)]
60 #[error("schema validation failed")]
61 SchemaValidationFailed(#[source] schemas::SchemaError),
62
63 #[error("no kafka configuration for topic")]
65 MissingTopic,
66
67 #[error("failed to fetch the metadata of Kafka")]
69 MetadataFetchError(rdkafka::error::KafkaError),
70
71 #[error("failed to validate the topic with name {0}: {1:?}")]
73 TopicError(String, rdkafka_sys::rd_kafka_resp_err_t),
74
75 #[error("failed to encode protobuf because the buffer is too small")]
78 ProtobufEncodingFailed,
79}
80
81pub trait Message {
83 fn key(&self) -> Option<Key>;
85
86 fn variant(&self) -> &'static str;
88
89 fn headers(&self) -> Option<&BTreeMap<String, String>>;
91
92 fn serialize(&self) -> Result<SerializationOutput<'_>, ClientError>;
98}
99
100#[derive(Debug, Clone)]
102pub enum SerializationOutput<'a> {
103 Json(Cow<'a, [u8]>),
105
106 MsgPack(Cow<'a, [u8]>),
108
109 Protobuf(Cow<'a, [u8]>),
111}
112
113impl SerializationOutput<'_> {
114 pub fn as_bytes(&self) -> &[u8] {
116 match self {
117 SerializationOutput::Json(cow) => cow,
118 SerializationOutput::MsgPack(cow) => cow,
119 SerializationOutput::Protobuf(cow) => cow,
120 }
121 }
122}
123
124struct TopicProducers {
125 producers: Vec<TopicProducer>,
127}
128
129impl TopicProducers {
130 fn new() -> Self {
131 Self {
132 producers: Vec::new(),
133 }
134 }
135
136 fn select(&self, key: u128) -> Option<&TopicProducer> {
137 debug_assert!(!self.producers.is_empty());
138
139 if self.producers.is_empty() {
140 return None;
141 } else if self.producers.len() == 1 {
142 return self.producers.first();
143 }
144
145 let select = (key % self.producers.len() as u128) as usize;
146 self.producers.get(select)
147 }
148
149 fn validate_topic(&self) -> Result<(), ClientError> {
151 for tp in &self.producers {
152 let client = tp.producer.client();
153 let metadata = client
154 .fetch_metadata(Some(&tp.topic_name), KAFKA_FETCH_METADATA_TIMEOUT)
155 .map_err(ClientError::MetadataFetchError)?;
156
157 for topic in metadata.topics() {
158 if let Some(error) = topic.error() {
159 return Err(ClientError::TopicError(topic.name().to_owned(), error));
160 }
161 }
162 }
163
164 Ok(())
165 }
166}
167
168struct TopicProducer {
169 pub topic_name: String,
170 pub producer: Arc<ThreadedProducer>,
171 pub rate_limiter: Option<KafkaRateLimits>,
172}
173
174struct Producer {
176 topic_producers: TopicProducers,
178 metrics: Debounced,
180 next_key: AtomicU32,
181}
182
183impl Producer {
184 fn new(topic_producers: TopicProducers) -> Self {
185 Self {
186 topic_producers,
187 metrics: Debounced::new(REPORT_FREQUENCY_SECS),
188 next_key: AtomicU32::new(0),
189 }
190 }
191}
192
193impl Producer {
194 fn send(
196 &self,
197 key: Option<Key>,
198 headers: Option<&BTreeMap<String, String>>,
199 variant: &str,
200 payload: &[u8],
201 ) -> Result<&str, ClientError> {
202 let now = Instant::now();
203
204 let mut key = key.unwrap_or_else(|| self.next_key());
207
208 let Some(TopicProducer {
209 topic_name,
210 producer,
211 rate_limiter,
212 }) = self.topic_producers.select(key)
213 else {
214 return Err(ClientError::MissingTopic);
215 };
216
217 let producer_name = producer.context().producer_name();
218
219 metric!(
220 histogram(KafkaHistograms::KafkaMessageSize) = payload.len() as u64,
221 variant = variant,
222 topic = topic_name,
223 producer_name = producer_name
224 );
225
226 let mut headers = headers
227 .unwrap_or(&BTreeMap::new())
228 .iter()
229 .map(|(key, value)| Header {
230 key,
231 value: Some(value),
232 })
233 .collect::<KafkaHeaders>();
234
235 if let Some(limiter) = &rate_limiter {
239 let is_limited = limiter.try_increment(now, key, 1) < 1;
240
241 if is_limited {
242 metric!(
243 counter(KafkaCounters::ProducerPartitionKeyRateLimit) += 1,
244 variant = variant,
245 topic = topic_name,
246 producer_name = producer_name
247 );
248
249 headers.insert(Header {
250 key: "sentry-reshuffled",
251 value: Some("1"),
252 });
253
254 key = self.next_key();
256 }
257 }
258
259 let key = u128::to_be_bytes(key);
260 let mut record = BaseRecord::to(topic_name).payload(payload).key(&key);
261 if let Some(headers) = headers.into_inner() {
262 record = record.headers(headers);
263 }
264
265 self.metrics.debounce(now, || {
266 metric!(
267 gauge(KafkaGauges::InFlightCount) = producer.in_flight_count() as u64,
268 variant = variant,
269 topic = topic_name,
270 producer_name = producer_name
271 );
272 });
273
274 producer.send(record).map_err(|(error, _message)| {
275 relay_log::error!(
276 error = &error as &dyn std::error::Error,
277 tags.variant = variant,
278 tags.topic = topic_name,
279 "error sending kafka message",
280 );
281 metric!(
282 counter(KafkaCounters::ProducerEnqueueError) += 1,
283 variant = variant,
284 topic = topic_name,
285 producer_name = producer_name
286 );
287 ClientError::SendFailed(error)
288 })?;
289
290 Ok(topic_name)
291 }
292
293 fn next_key(&self) -> u128 {
302 self.next_key.fetch_add(1, Ordering::Relaxed) as u128
305 }
306}
307
308impl fmt::Debug for Producer {
309 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
310 let topic_names: Vec<&String> = self
311 .topic_producers
312 .producers
313 .iter()
314 .map(|tp| &tp.topic_name)
315 .collect();
316 f.debug_struct("Producer")
317 .field("topic_names", &topic_names)
318 .field("producers", &"<ThreadedProducers>")
319 .finish_non_exhaustive()
320 }
321}
322
323#[derive(Debug)]
325pub struct KafkaClient {
326 producers: HashMap<KafkaTopic, Producer>,
327 #[cfg(debug_assertions)]
328 schema_validator: schemas::Validator,
329}
330
331impl KafkaClient {
332 pub fn builder() -> KafkaClientBuilder {
334 KafkaClientBuilder::default()
335 }
336
337 pub fn send_message(
341 &self,
342 topic: KafkaTopic,
343 message: &impl Message,
344 ) -> Result<&str, ClientError> {
345 let serialized = message.serialize()?;
346
347 #[cfg(debug_assertions)]
348 if let SerializationOutput::Json(ref bytes) = serialized {
349 self.schema_validator
350 .validate_message_schema(topic, bytes)
351 .map_err(ClientError::SchemaValidationFailed)?;
352 }
353 self.send(
354 topic,
355 message.key(),
356 message.headers(),
357 message.variant(),
358 serialized.as_bytes(),
359 )
360 }
361
362 pub fn send(
366 &self,
367 topic: KafkaTopic,
368 key: Option<Key>,
369 headers: Option<&BTreeMap<String, String>>,
370 variant: &str,
371 payload: &[u8],
372 ) -> Result<&str, ClientError> {
373 let producer = self.producers.get(&topic).ok_or_else(|| {
374 relay_log::error!(
375 "attempted to send message to {topic:?} using an unconfigured kafka producer",
376 );
377 ClientError::InvalidTopicName
378 })?;
379
380 producer.send(key, headers, variant, payload)
381 }
382}
383
384#[derive(Default)]
386pub struct KafkaClientBuilder {
387 reused_producers: BTreeMap<Option<String>, Arc<ThreadedProducer>>,
388 producers: HashMap<KafkaTopic, Producer>,
389}
390
391impl KafkaClientBuilder {
392 pub fn new() -> Self {
394 Self::default()
395 }
396
397 pub fn add_kafka_topic_config(
404 mut self,
405 topic: KafkaTopic,
406 topic_config: &KafkaTopicConfig<'_>,
407 validate_topic: bool,
408 ) -> Result<Self, ClientError> {
409 let mut topic_producers = TopicProducers::new();
410
411 for params in topic_config.topics() {
415 let KafkaParams {
416 topic_name,
417 config_name,
418 params: config_params,
419 key_rate_limit,
420 } = params;
421
422 let rate_limiter = key_rate_limit.map(|limit| {
423 KafkaRateLimits::new(
424 limit.limit_per_window,
425 Duration::from_secs(limit.window_secs),
426 )
427 });
428
429 let config_name = config_name.map(str::to_string);
430
431 let threaded_producer = if let Some(producer) = self.reused_producers.get(&config_name)
433 {
434 Arc::clone(producer)
435 } else {
436 let mut client_config = ClientConfig::new();
437 for config_p in *config_params {
438 client_config.set(config_p.name.as_str(), config_p.value.as_str());
439 }
440
441 let producer_name = config_params
443 .iter()
444 .find(|p| p.name == "client.id")
445 .map(|p| p.value.clone())
446 .or_else(|| config_name.clone())
447 .unwrap_or_else(|| "unknown".to_owned());
448
449 let producer = Arc::new(
450 client_config
451 .create_with_context(Context::new(producer_name))
452 .map_err(ClientError::InvalidConfig)?,
453 );
454
455 self.reused_producers
456 .insert(config_name, Arc::clone(&producer));
457
458 producer
459 };
460
461 topic_producers.producers.push(TopicProducer {
462 topic_name: topic_name.clone(),
463 producer: threaded_producer,
464 rate_limiter,
465 });
466 }
467
468 let producer = Producer::new(topic_producers);
469 if validate_topic {
470 producer.topic_producers.validate_topic()?;
471 }
472 self.producers.insert(topic, producer);
473
474 Ok(self)
475 }
476
477 pub fn build(self) -> KafkaClient {
479 KafkaClient {
480 producers: self.producers,
481 #[cfg(debug_assertions)]
482 schema_validator: schemas::Validator::default(),
483 }
484 }
485}
486
487impl fmt::Debug for KafkaClientBuilder {
488 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
489 f.debug_struct("KafkaClientBuilder")
490 .field("reused_producers", &"<CachedProducers>")
491 .field("producers", &self.producers)
492 .finish()
493 }
494}