1use std::borrow::Cow;
4use std::collections::{BTreeMap, HashMap};
5use std::fmt;
6use std::sync::Arc;
7use std::sync::atomic::{AtomicU32, Ordering};
8use std::time::{Duration, Instant};
9
10use rdkafka::ClientConfig;
11use rdkafka::message::Header;
12use rdkafka::producer::{BaseRecord, Producer as _};
13use relay_statsd::metric;
14use thiserror::Error;
15
16use crate::KafkaTopicConfig;
17use crate::config::{KafkaParams, KafkaTopic};
18use crate::debounced::Debounced;
19use crate::limits::KafkaRateLimits;
20use crate::producer::utils::KafkaHeaders;
21use crate::statsd::{KafkaCounters, KafkaGauges, KafkaHistograms};
22
23mod utils;
24use utils::{Context, ThreadedProducer};
25
26#[cfg(debug_assertions)]
27mod schemas;
28
29const REPORT_FREQUENCY_SECS: u64 = 1;
30const KAFKA_FETCH_METADATA_TIMEOUT: Duration = Duration::from_secs(30);
31
32pub type Key = u128;
34
35#[derive(Error, Debug)]
37pub enum ClientError {
38 #[error("failed to send kafka message")]
40 SendFailed(#[source] rdkafka::error::KafkaError),
41
42 #[error("failed to find producer for the requested kafka topic")]
44 InvalidTopicName,
45
46 #[error("failed to create kafka producer: invalid kafka config")]
48 InvalidConfig(#[source] rdkafka::error::KafkaError),
49
50 #[error("failed to serialize kafka message")]
52 InvalidMsgPack(#[source] rmp_serde::encode::Error),
53
54 #[error("failed to serialize json message")]
56 InvalidJson(#[source] serde_json::Error),
57
58 #[cfg(debug_assertions)]
60 #[error("failed to run schema validation on message")]
61 SchemaValidationFailed(#[source] schemas::SchemaError),
62
63 #[error("no kafka configuration for topic")]
65 MissingTopic,
66
67 #[error("failed to fetch the metadata of Kafka")]
69 MetadataFetchError(rdkafka::error::KafkaError),
70
71 #[error("failed to validate the topic with name {0}: {1:?}")]
73 TopicError(String, rdkafka_sys::rd_kafka_resp_err_t),
74
75 #[error("failed to encode protobuf because the buffer is too small")]
78 ProtobufEncodingFailed,
79}
80
81pub trait Message {
83 fn key(&self) -> Option<Key>;
85
86 fn variant(&self) -> &'static str;
88
89 fn headers(&self) -> Option<&BTreeMap<String, String>>;
91
92 fn serialize(&self) -> Result<SerializationOutput<'_>, ClientError>;
98}
99
100#[derive(Debug, Clone)]
102pub enum SerializationOutput<'a> {
103 Json(Cow<'a, [u8]>),
105
106 MsgPack(Cow<'a, [u8]>),
108
109 Protobuf(Cow<'a, [u8]>),
111}
112
113impl SerializationOutput<'_> {
114 pub fn as_bytes(&self) -> &[u8] {
116 match self {
117 SerializationOutput::Json(cow) => cow,
118 SerializationOutput::MsgPack(cow) => cow,
119 SerializationOutput::Protobuf(cow) => cow,
120 }
121 }
122}
123
124struct TopicProducers {
125 producers: Vec<TopicProducer>,
127}
128
129impl TopicProducers {
130 fn new() -> Self {
131 Self {
132 producers: Vec::new(),
133 }
134 }
135
136 fn select(&self, key: u128) -> Option<&TopicProducer> {
137 debug_assert!(!self.producers.is_empty());
138
139 if self.producers.is_empty() {
140 return None;
141 } else if self.producers.len() == 1 {
142 return self.producers.first();
143 }
144
145 let select = (key % self.producers.len() as u128) as usize;
146 self.producers.get(select)
147 }
148
149 fn validate_topic(&self) -> Result<(), ClientError> {
151 for tp in &self.producers {
152 let client = tp.producer.client();
153 let metadata = client
154 .fetch_metadata(Some(&tp.topic_name), KAFKA_FETCH_METADATA_TIMEOUT)
155 .map_err(ClientError::MetadataFetchError)?;
156
157 for topic in metadata.topics() {
158 if let Some(error) = topic.error() {
159 return Err(ClientError::TopicError(topic.name().to_owned(), error));
160 }
161 }
162 }
163
164 Ok(())
165 }
166}
167
168struct TopicProducer {
169 pub topic_name: String,
170 pub producer: Arc<ThreadedProducer>,
171 pub rate_limiter: Option<KafkaRateLimits>,
172}
173
174struct Producer {
176 topic_producers: TopicProducers,
178 metrics: Debounced,
180 next_key: AtomicU32,
181}
182
183impl Producer {
184 fn new(topic_producers: TopicProducers) -> Self {
185 Self {
186 topic_producers,
187 metrics: Debounced::new(REPORT_FREQUENCY_SECS),
188 next_key: AtomicU32::new(0),
189 }
190 }
191}
192
193impl Producer {
194 fn send(
196 &self,
197 key: Option<Key>,
198 headers: Option<&BTreeMap<String, String>>,
199 variant: &str,
200 payload: &[u8],
201 ) -> Result<&str, ClientError> {
202 let now = Instant::now();
203
204 let mut key = key.unwrap_or_else(|| self.next_key());
207
208 let Some(TopicProducer {
209 topic_name,
210 producer,
211 rate_limiter,
212 }) = self.topic_producers.select(key)
213 else {
214 return Err(ClientError::MissingTopic);
215 };
216
217 let producer_name = producer.context().producer_name();
218
219 metric!(
220 histogram(KafkaHistograms::KafkaMessageSize) = payload.len() as u64,
221 variant = variant,
222 topic = topic_name,
223 producer_name = producer_name
224 );
225
226 let mut headers = headers
227 .unwrap_or(&BTreeMap::new())
228 .iter()
229 .map(|(key, value)| Header {
230 key,
231 value: Some(value),
232 })
233 .collect::<KafkaHeaders>();
234
235 if let Some(limiter) = &rate_limiter {
239 let is_limited = limiter.try_increment(now, key, 1) < 1;
240
241 if is_limited {
242 metric!(
243 counter(KafkaCounters::ProducerPartitionKeyRateLimit) += 1,
244 variant = variant,
245 topic = topic_name,
246 producer_name = producer_name
247 );
248
249 headers.insert(Header {
250 key: "sentry-reshuffled",
251 value: Some("1"),
252 });
253
254 key = self.next_key();
256 }
257 }
258
259 let key = u128::to_be_bytes(key);
260 let mut record = BaseRecord::to(topic_name).payload(payload).key(&key);
261 if let Some(headers) = headers.into_inner() {
262 record = record.headers(headers);
263 }
264
265 self.metrics.debounce(now, || {
266 metric!(
267 gauge(KafkaGauges::InFlightCount) = producer.in_flight_count() as u64,
268 variant = variant,
269 topic = topic_name,
270 producer_name = producer_name
271 );
272 });
273
274 producer.send(record).map_err(|(error, _message)| {
275 relay_log::error!(
276 error = &error as &dyn std::error::Error,
277 tags.variant = variant,
278 tags.topic = topic_name,
279 "error sending kafka message",
280 );
281 metric!(
282 counter(KafkaCounters::ProducerEnqueueError) += 1,
283 variant = variant,
284 topic = topic_name,
285 producer_name = producer_name
286 );
287 ClientError::SendFailed(error)
288 })?;
289
290 Ok(topic_name)
291 }
292
293 fn next_key(&self) -> u128 {
302 self.next_key.fetch_add(1, Ordering::Relaxed) as u128
305 }
306}
307
308impl fmt::Debug for Producer {
309 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
310 let topic_names: Vec<&String> = self
311 .topic_producers
312 .producers
313 .iter()
314 .map(|tp| &tp.topic_name)
315 .collect();
316 f.debug_struct("Producer")
317 .field("topic_names", &topic_names)
318 .field("producers", &"<ThreadedProducers>")
319 .finish_non_exhaustive()
320 }
321}
322
323#[derive(Debug)]
325pub struct KafkaClient {
326 producers: HashMap<KafkaTopic, Producer>,
327 #[cfg(debug_assertions)]
328 schema_validator: schemas::Validator,
329}
330
331impl KafkaClient {
332 pub fn builder() -> KafkaClientBuilder {
334 KafkaClientBuilder::default()
335 }
336
337 pub fn send_message(
341 &self,
342 topic: KafkaTopic,
343 message: &impl Message,
344 ) -> Result<&str, ClientError> {
345 let serialized = message.serialize()?;
346
347 #[cfg(debug_assertions)]
348 if let SerializationOutput::Json(ref bytes) = serialized {
349 self.schema_validator
350 .validate_message_schema(topic, bytes)
351 .map_err(ClientError::SchemaValidationFailed)?;
352 }
353
354 self.send(
355 topic,
356 message.key(),
357 message.headers(),
358 message.variant(),
359 serialized.as_bytes(),
360 )
361 }
362
363 pub fn send(
367 &self,
368 topic: KafkaTopic,
369 key: Option<Key>,
370 headers: Option<&BTreeMap<String, String>>,
371 variant: &str,
372 payload: &[u8],
373 ) -> Result<&str, ClientError> {
374 let producer = self.producers.get(&topic).ok_or_else(|| {
375 relay_log::error!(
376 "attempted to send message to {topic:?} using an unconfigured kafka producer",
377 );
378 ClientError::InvalidTopicName
379 })?;
380
381 producer.send(key, headers, variant, payload)
382 }
383}
384
385#[derive(Default)]
387pub struct KafkaClientBuilder {
388 reused_producers: BTreeMap<Option<String>, Arc<ThreadedProducer>>,
389 producers: HashMap<KafkaTopic, Producer>,
390}
391
392impl KafkaClientBuilder {
393 pub fn new() -> Self {
395 Self::default()
396 }
397
398 pub fn add_kafka_topic_config(
405 mut self,
406 topic: KafkaTopic,
407 topic_config: &KafkaTopicConfig<'_>,
408 validate_topic: bool,
409 ) -> Result<Self, ClientError> {
410 let mut topic_producers = TopicProducers::new();
411
412 for params in topic_config.topics() {
416 let KafkaParams {
417 topic_name,
418 config_name,
419 params: config_params,
420 key_rate_limit,
421 } = params;
422
423 let rate_limiter = key_rate_limit.map(|limit| {
424 KafkaRateLimits::new(
425 limit.limit_per_window,
426 Duration::from_secs(limit.window_secs),
427 )
428 });
429
430 let config_name = config_name.map(str::to_string);
431
432 let threaded_producer = if let Some(producer) = self.reused_producers.get(&config_name)
434 {
435 Arc::clone(producer)
436 } else {
437 let mut client_config = ClientConfig::new();
438 for config_p in *config_params {
439 client_config.set(config_p.name.as_str(), config_p.value.as_str());
440 }
441
442 let producer_name = config_params
444 .iter()
445 .find(|p| p.name == "client.id")
446 .map(|p| p.value.clone())
447 .or_else(|| config_name.clone())
448 .unwrap_or_else(|| "unknown".to_owned());
449
450 let producer = Arc::new(
451 client_config
452 .create_with_context(Context::new(producer_name))
453 .map_err(ClientError::InvalidConfig)?,
454 );
455
456 self.reused_producers
457 .insert(config_name, Arc::clone(&producer));
458
459 producer
460 };
461
462 topic_producers.producers.push(TopicProducer {
463 topic_name: topic_name.clone(),
464 producer: threaded_producer,
465 rate_limiter,
466 });
467 }
468
469 let producer = Producer::new(topic_producers);
470 if validate_topic {
471 producer.topic_producers.validate_topic()?;
472 }
473 self.producers.insert(topic, producer);
474
475 Ok(self)
476 }
477
478 pub fn build(self) -> KafkaClient {
480 KafkaClient {
481 producers: self.producers,
482 #[cfg(debug_assertions)]
483 schema_validator: schemas::Validator::default(),
484 }
485 }
486}
487
488impl fmt::Debug for KafkaClientBuilder {
489 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
490 f.debug_struct("KafkaClientBuilder")
491 .field("reused_producers", &"<CachedProducers>")
492 .field("producers", &self.producers)
493 .finish()
494 }
495}