1use std::borrow::Cow;
4use std::collections::{BTreeMap, HashMap};
5use std::fmt;
6use std::sync::Arc;
7use std::sync::atomic::{AtomicU32, Ordering};
8use std::time::{Duration, Instant};
9
10use rdkafka::ClientConfig;
11use rdkafka::message::Header;
12use rdkafka::producer::{BaseRecord, Producer as _};
13use relay_statsd::metric;
14use thiserror::Error;
15
16use crate::KafkaTopicConfig;
17use crate::config::{KafkaParams, KafkaTopic};
18use crate::debounced::Debounced;
19use crate::limits::KafkaRateLimits;
20use crate::producer::utils::KafkaHeaders;
21use crate::statsd::{KafkaCounters, KafkaDistributions, KafkaGauges};
22
23mod utils;
24use utils::{Context, ThreadedProducer};
25
26#[cfg(debug_assertions)]
27mod schemas;
28
29const REPORT_FREQUENCY_SECS: u64 = 1;
30const KAFKA_FETCH_METADATA_TIMEOUT: Duration = Duration::from_secs(30);
31
32pub type Key = u128;
34
35#[derive(Error, Debug)]
37pub enum ClientError {
38 #[error("failed to send kafka message")]
40 SendFailed(#[source] rdkafka::error::KafkaError),
41
42 #[error("failed to find producer for the requested kafka topic")]
44 InvalidTopicName,
45
46 #[error("failed to create kafka producer: invalid kafka config")]
48 InvalidConfig(#[source] rdkafka::error::KafkaError),
49
50 #[error("failed to serialize kafka message")]
52 InvalidMsgPack(#[source] rmp_serde::encode::Error),
53
54 #[error("failed to serialize json message")]
56 InvalidJson(#[from] serde_json::Error),
57
58 #[cfg(debug_assertions)]
60 #[error("schema validation failed")]
61 SchemaValidationFailed(#[source] schemas::SchemaError),
62
63 #[error("no kafka configuration for topic")]
65 MissingTopic,
66
67 #[error("failed to fetch the metadata of Kafka")]
69 MetadataFetchError(rdkafka::error::KafkaError),
70
71 #[error("failed to validate the topic with name {0}: {1:?}")]
73 TopicError(String, rdkafka_sys::rd_kafka_resp_err_t),
74
75 #[error("failed to encode protobuf because the buffer is too small")]
78 ProtobufEncodingFailed,
79}
80
81pub trait Message {
83 fn key(&self) -> Option<Key>;
85
86 fn variant(&self) -> &'static str;
88
89 fn headers(&self) -> Option<&BTreeMap<String, String>>;
91
92 fn serialize(&self) -> Result<SerializationOutput<'_>, ClientError>;
98}
99
100#[derive(Debug, Clone)]
102pub enum SerializationOutput<'a> {
103 Json(Cow<'a, [u8]>),
105
106 MsgPack(Cow<'a, [u8]>),
108
109 Protobuf(Cow<'a, [u8]>),
111}
112
113impl SerializationOutput<'_> {
114 pub fn as_bytes(&self) -> &[u8] {
116 match self {
117 SerializationOutput::Json(cow) => cow,
118 SerializationOutput::MsgPack(cow) => cow,
119 SerializationOutput::Protobuf(cow) => cow,
120 }
121 }
122}
123
124struct TopicProducers {
125 producers: Vec<TopicProducer>,
127}
128
129impl TopicProducers {
130 fn new() -> Self {
131 Self {
132 producers: Vec::new(),
133 }
134 }
135
136 fn select(&self, key: u128) -> Option<&TopicProducer> {
137 debug_assert!(!self.producers.is_empty());
138
139 if self.producers.is_empty() {
140 return None;
141 } else if self.producers.len() == 1 {
142 return self.producers.first();
143 }
144
145 let select = (key % self.producers.len() as u128) as usize;
146 self.producers.get(select)
147 }
148
149 fn validate_topic(&self) -> Result<(), ClientError> {
151 for tp in &self.producers {
152 let client = tp.producer.client();
153 let metadata = client
154 .fetch_metadata(Some(&tp.topic_name), KAFKA_FETCH_METADATA_TIMEOUT)
155 .map_err(ClientError::MetadataFetchError)?;
156
157 for topic in metadata.topics() {
158 if let Some(error) = topic.error() {
159 return Err(ClientError::TopicError(topic.name().to_owned(), error));
160 }
161 }
162 }
163
164 Ok(())
165 }
166}
167
168struct TopicProducer {
169 pub topic_name: String,
170 pub producer: Arc<ThreadedProducer>,
171 pub rate_limiter: Option<KafkaRateLimits>,
172}
173
174struct Producer {
176 topic_producers: TopicProducers,
178 metrics: Debounced,
180 next_key: AtomicU32,
181}
182
183impl Producer {
184 fn new(topic_producers: TopicProducers) -> Self {
185 Self {
186 topic_producers,
187 metrics: Debounced::new(REPORT_FREQUENCY_SECS),
188 next_key: AtomicU32::new(0),
189 }
190 }
191}
192
193impl Producer {
194 fn send(
196 &self,
197 key: Option<Key>,
198 headers: Option<&BTreeMap<String, String>>,
199 variant: &str,
200 payload: &[u8],
201 ) -> Result<&str, ClientError> {
202 let now = Instant::now();
203
204 let mut key = key.unwrap_or_else(|| self.next_key());
207
208 let Some(TopicProducer {
209 topic_name,
210 producer,
211 rate_limiter,
212 }) = self.topic_producers.select(key)
213 else {
214 return Err(ClientError::MissingTopic);
215 };
216
217 relay_log::configure_scope(|s| s.set_tag("topic", topic_name));
218
219 let producer_name = producer.context().producer_name();
220
221 metric!(
222 distribution(KafkaDistributions::KafkaMessageSize) = payload.len() as u64,
223 variant = variant,
224 topic = topic_name,
225 producer_name = producer_name
226 );
227
228 let mut headers = headers
229 .unwrap_or(&BTreeMap::new())
230 .iter()
231 .map(|(key, value)| Header {
232 key,
233 value: Some(value),
234 })
235 .collect::<KafkaHeaders>();
236
237 if let Some(limiter) = &rate_limiter {
241 let is_limited = limiter.try_increment(now, key, 1) < 1;
242
243 if is_limited {
244 metric!(
245 counter(KafkaCounters::ProducerPartitionKeyRateLimit) += 1,
246 variant = variant,
247 topic = topic_name,
248 producer_name = producer_name
249 );
250
251 headers.insert(Header {
252 key: "sentry-reshuffled",
253 value: Some("1"),
254 });
255
256 key = self.next_key();
258 }
259 }
260
261 let key = u128::to_be_bytes(key);
262 let mut record = BaseRecord::to(topic_name).payload(payload).key(&key);
263 if let Some(headers) = headers.into_inner() {
264 record = record.headers(headers);
265 }
266
267 self.metrics.debounce(now, || {
268 metric!(
269 gauge(KafkaGauges::InFlightCount) = producer.in_flight_count() as u64,
270 variant = variant,
271 topic = topic_name,
272 producer_name = producer_name
273 );
274 });
275
276 producer.send(record).map_err(|(error, _message)| {
277 metric!(
278 counter(KafkaCounters::ProducerEnqueueError) += 1,
279 variant = variant,
280 topic = topic_name,
281 producer_name = producer_name
282 );
283 ClientError::SendFailed(error)
284 })?;
285
286 Ok(topic_name)
287 }
288
289 fn next_key(&self) -> u128 {
298 self.next_key.fetch_add(1, Ordering::Relaxed) as u128
301 }
302}
303
304impl fmt::Debug for Producer {
305 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
306 let topic_names: Vec<&String> = self
307 .topic_producers
308 .producers
309 .iter()
310 .map(|tp| &tp.topic_name)
311 .collect();
312 f.debug_struct("Producer")
313 .field("topic_names", &topic_names)
314 .field("producers", &"<ThreadedProducers>")
315 .finish_non_exhaustive()
316 }
317}
318
319#[derive(Debug)]
321pub struct KafkaClient {
322 producers: HashMap<KafkaTopic, Producer>,
323 #[cfg(debug_assertions)]
324 schema_validator: schemas::Validator,
325}
326
327impl KafkaClient {
328 pub fn builder() -> KafkaClientBuilder {
330 KafkaClientBuilder::default()
331 }
332
333 pub fn send_message(
337 &self,
338 topic: KafkaTopic,
339 message: &impl Message,
340 ) -> Result<&str, ClientError> {
341 let serialized = message.serialize()?;
342
343 #[cfg(debug_assertions)]
344 if let SerializationOutput::Json(ref bytes) = serialized {
345 self.schema_validator
346 .validate_message_schema(topic, bytes)
347 .map_err(ClientError::SchemaValidationFailed)?;
348 }
349 self.send(
350 topic,
351 message.key(),
352 message.headers(),
353 message.variant(),
354 serialized.as_bytes(),
355 )
356 }
357
358 fn send(
362 &self,
363 topic: KafkaTopic,
364 key: Option<Key>,
365 headers: Option<&BTreeMap<String, String>>,
366 variant: &str,
367 payload: &[u8],
368 ) -> Result<&str, ClientError> {
369 let producer = self
370 .producers
371 .get(&topic)
372 .ok_or_else(|| ClientError::InvalidTopicName)?;
373
374 producer.send(key, headers, variant, payload)
375 }
376}
377
378#[derive(Default)]
380pub struct KafkaClientBuilder {
381 reused_producers: BTreeMap<Option<String>, Arc<ThreadedProducer>>,
382 producers: HashMap<KafkaTopic, Producer>,
383}
384
385impl KafkaClientBuilder {
386 pub fn new() -> Self {
388 Self::default()
389 }
390
391 pub fn add_kafka_topic_config(
398 mut self,
399 topic: KafkaTopic,
400 topic_config: &KafkaTopicConfig<'_>,
401 validate_topic: bool,
402 ) -> Result<Self, ClientError> {
403 let mut topic_producers = TopicProducers::new();
404
405 for params in topic_config.topics() {
409 let KafkaParams {
410 topic_name,
411 config_name,
412 params: config_params,
413 key_rate_limit,
414 } = params;
415
416 let rate_limiter = key_rate_limit.map(|limit| {
417 KafkaRateLimits::new(
418 limit.limit_per_window,
419 Duration::from_secs(limit.window_secs),
420 )
421 });
422
423 let config_name = config_name.map(str::to_owned);
424
425 let threaded_producer = if let Some(producer) = self.reused_producers.get(&config_name)
427 {
428 Arc::clone(producer)
429 } else {
430 let mut client_config = ClientConfig::new();
431 for config_p in *config_params {
432 client_config.set(config_p.name.as_str(), config_p.value.as_str());
433 }
434
435 let producer_name = config_params
437 .iter()
438 .find(|p| p.name == "client.id")
439 .map(|p| p.value.clone())
440 .or_else(|| config_name.clone())
441 .unwrap_or_else(|| "unknown".to_owned());
442
443 let producer = Arc::new(
444 client_config
445 .create_with_context(Context::new(producer_name))
446 .map_err(ClientError::InvalidConfig)?,
447 );
448
449 self.reused_producers
450 .insert(config_name, Arc::clone(&producer));
451
452 producer
453 };
454
455 topic_producers.producers.push(TopicProducer {
456 topic_name: topic_name.clone(),
457 producer: threaded_producer,
458 rate_limiter,
459 });
460 }
461
462 let producer = Producer::new(topic_producers);
463 if validate_topic {
464 producer.topic_producers.validate_topic()?;
465 }
466 self.producers.insert(topic, producer);
467
468 Ok(self)
469 }
470
471 pub fn build(self) -> KafkaClient {
473 KafkaClient {
474 producers: self.producers,
475 #[cfg(debug_assertions)]
476 schema_validator: schemas::Validator::default(),
477 }
478 }
479}
480
481impl fmt::Debug for KafkaClientBuilder {
482 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
483 f.debug_struct("KafkaClientBuilder")
484 .field("reused_producers", &"<CachedProducers>")
485 .field("producers", &self.producers)
486 .finish()
487 }
488}