1use std::borrow::Cow;
4use std::collections::{BTreeMap, HashMap};
5use std::fmt;
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8
9use rdkafka::ClientConfig;
10use rdkafka::message::Header;
11use rdkafka::producer::{BaseRecord, Producer as _};
12use relay_statsd::metric;
13use thiserror::Error;
14
15use crate::config::{KafkaParams, KafkaTopic};
16use crate::debounced::Debounced;
17use crate::limits::KafkaRateLimits;
18use crate::producer::utils::KafkaHeaders;
19use crate::statsd::{KafkaCounters, KafkaGauges, KafkaHistograms};
20
21mod utils;
22use utils::{Context, ThreadedProducer};
23
24#[cfg(feature = "schemas")]
25mod schemas;
26
27const REPORT_FREQUENCY_SECS: u64 = 1;
28const KAFKA_FETCH_METADATA_TIMEOUT: Duration = Duration::from_secs(30);
29
30#[derive(Error, Debug)]
32pub enum ClientError {
33 #[error("failed to send kafka message")]
35 SendFailed(#[source] rdkafka::error::KafkaError),
36
37 #[error("failed to find producer for the requested kafka topic")]
39 InvalidTopicName,
40
41 #[error("failed to create kafka producer: invalid kafka config")]
43 InvalidConfig(#[source] rdkafka::error::KafkaError),
44
45 #[error("failed to serialize kafka message")]
47 InvalidMsgPack(#[source] rmp_serde::encode::Error),
48
49 #[error("failed to serialize json message")]
51 InvalidJson(#[source] serde_json::Error),
52
53 #[cfg(feature = "schemas")]
55 #[error("failed to run schema validation on message")]
56 SchemaValidationFailed(#[source] schemas::SchemaError),
57
58 #[error("invalid kafka shard")]
60 InvalidShard,
61
62 #[error("failed to fetch the metadata of Kafka")]
64 MetadataFetchError(rdkafka::error::KafkaError),
65
66 #[error("failed to validate the topic with name {0}: {1:?}")]
68 TopicError(String, rdkafka_sys::rd_kafka_resp_err_t),
69
70 #[error("failed to encode protobuf because the buffer is too small")]
73 ProtobufEncodingFailed,
74}
75
76pub trait Message {
78 fn key(&self) -> Option<[u8; 16]>;
80
81 fn variant(&self) -> &'static str;
83
84 fn headers(&self) -> Option<&BTreeMap<String, String>>;
86
87 fn serialize(&self) -> Result<Cow<'_, [u8]>, ClientError>;
93}
94
95struct Producer {
97 topic_name: String,
99 producer: Arc<ThreadedProducer>,
101 metrics: Debounced,
103 rate_limiter: Option<KafkaRateLimits>,
105}
106
107impl Producer {
108 fn new(
109 topic_name: String,
110 producer: Arc<ThreadedProducer>,
111 rate_limiter: Option<KafkaRateLimits>,
112 ) -> Self {
113 Self {
114 topic_name,
115 producer,
116 metrics: Debounced::new(REPORT_FREQUENCY_SECS),
117 rate_limiter,
118 }
119 }
120
121 fn validate_topic(&self) -> Result<(), ClientError> {
123 let client = self.producer.client();
124 let metadata = client
125 .fetch_metadata(Some(&self.topic_name), KAFKA_FETCH_METADATA_TIMEOUT)
126 .map_err(ClientError::MetadataFetchError)?;
127
128 for topic in metadata.topics() {
129 if let Some(error) = topic.error() {
130 return Err(ClientError::TopicError(topic.name().to_string(), error));
131 }
132 }
133
134 Ok(())
135 }
136}
137
138impl fmt::Debug for Producer {
139 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
140 f.debug_struct("Producer")
141 .field("topic_name", &self.topic_name)
142 .field("producer", &"<ThreadedProducer>")
143 .finish_non_exhaustive()
144 }
145}
146
147#[derive(Debug)]
149pub struct KafkaClient {
150 producers: HashMap<KafkaTopic, Producer>,
151 #[cfg(feature = "schemas")]
152 schema_validator: schemas::Validator,
153}
154
155impl KafkaClient {
156 pub fn builder() -> KafkaClientBuilder {
158 KafkaClientBuilder::default()
159 }
160
161 pub fn send_message(
165 &self,
166 topic: KafkaTopic,
167 message: &impl Message,
168 ) -> Result<&str, ClientError> {
169 let serialized = message.serialize()?;
170 #[cfg(feature = "schemas")]
171 self.schema_validator
172 .validate_message_schema(topic, &serialized)
173 .map_err(ClientError::SchemaValidationFailed)?;
174
175 self.send(
176 topic,
177 message.key(),
178 message.headers(),
179 message.variant(),
180 &serialized,
181 )
182 }
183
184 pub fn send(
188 &self,
189 topic: KafkaTopic,
190 key: Option<[u8; 16]>,
191 headers: Option<&BTreeMap<String, String>>,
192 variant: &str,
193 payload: &[u8],
194 ) -> Result<&str, ClientError> {
195 let producer = self.producers.get(&topic).ok_or_else(|| {
196 relay_log::error!(
197 "attempted to send message to {topic:?} using an unconfigured kafka producer",
198 );
199 ClientError::InvalidTopicName
200 })?;
201
202 producer.send(key, headers, variant, payload)
203 }
204}
205
206#[derive(Default)]
208pub struct KafkaClientBuilder {
209 reused_producers: BTreeMap<Option<String>, Arc<ThreadedProducer>>,
210 producers: HashMap<KafkaTopic, Producer>,
211}
212
213impl KafkaClientBuilder {
214 pub fn new() -> Self {
216 Self::default()
217 }
218
219 pub fn add_kafka_topic_config(
226 mut self,
227 topic: KafkaTopic,
228 params: &KafkaParams,
229 validate_topic: bool,
230 ) -> Result<Self, ClientError> {
231 let mut client_config = ClientConfig::new();
232
233 let KafkaParams {
234 topic_name,
235 config_name,
236 params,
237 key_rate_limit,
238 } = params;
239
240 let rate_limiter = key_rate_limit.map(|limit| {
241 KafkaRateLimits::new(
242 limit.limit_per_window,
243 Duration::from_secs(limit.window_secs),
244 )
245 });
246
247 let config_name = config_name.map(str::to_string);
248
249 if let Some(producer) = self.reused_producers.get(&config_name) {
250 let producer = Producer::new(
251 (*topic_name).to_string(),
252 Arc::clone(producer),
253 rate_limiter,
254 );
255 if validate_topic {
256 producer.validate_topic()?;
257 }
258 self.producers.insert(topic, producer);
259 return Ok(self);
260 }
261
262 for config_p in *params {
263 client_config.set(config_p.name.as_str(), config_p.value.as_str());
264 }
265
266 let producer = Arc::new(
267 client_config
268 .create_with_context(Context)
269 .map_err(ClientError::InvalidConfig)?,
270 );
271
272 self.reused_producers
273 .insert(config_name, Arc::clone(&producer));
274
275 let producer = Producer::new((*topic_name).to_string(), producer, rate_limiter);
276 if validate_topic {
277 producer.validate_topic()?;
278 }
279 self.producers.insert(topic, producer);
280
281 Ok(self)
282 }
283
284 pub fn build(self) -> KafkaClient {
286 KafkaClient {
287 producers: self.producers,
288 #[cfg(feature = "schemas")]
289 schema_validator: schemas::Validator::default(),
290 }
291 }
292}
293
294impl fmt::Debug for KafkaClientBuilder {
295 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
296 f.debug_struct("KafkaClientBuilder")
297 .field("reused_producers", &"<CachedProducers>")
298 .field("producers", &self.producers)
299 .finish()
300 }
301}
302
303impl Producer {
304 fn send(
306 &self,
307 key: Option<[u8; 16]>,
308 headers: Option<&BTreeMap<String, String>>,
309 variant: &str,
310 payload: &[u8],
311 ) -> Result<&str, ClientError> {
312 let now = Instant::now();
313 let topic_name = self.topic_name.as_str();
314
315 metric!(
316 histogram(KafkaHistograms::KafkaMessageSize) = payload.len() as u64,
317 variant = variant,
318 topic = topic_name,
319 );
320
321 let mut headers = headers
322 .unwrap_or(&BTreeMap::new())
323 .iter()
324 .map(|(key, value)| Header {
325 key,
326 value: Some(value),
327 })
328 .collect::<KafkaHeaders>();
329
330 let key = match (key, self.rate_limiter.as_ref()) {
331 (Some(key), Some(limiter)) => {
332 let is_limited = limiter.try_increment(now, key, 1) < 1;
333
334 if is_limited {
335 metric!(
336 counter(KafkaCounters::ProducerPartitionKeyRateLimit) += 1,
337 variant = variant,
338 topic = topic_name,
339 );
340
341 headers.insert(Header {
342 key: "sentry-reshuffled",
343 value: Some("1"),
344 });
345
346 None
347 } else {
348 Some(key)
349 }
350 }
351 (key, _) => key,
352 };
353
354 let mut record = BaseRecord::to(topic_name).payload(payload);
355 if let Some(headers) = headers.into_inner() {
356 record = record.headers(headers);
357 }
358 if let Some(key) = key.as_ref() {
359 record = record.key(key);
360 }
361
362 self.metrics.debounce(now, || {
363 metric!(
364 gauge(KafkaGauges::InFlightCount) = self.producer.in_flight_count() as u64,
365 variant = variant,
366 topic = topic_name
367 );
368 });
369
370 self.producer.send(record).map_err(|(error, _message)| {
371 relay_log::error!(
372 error = &error as &dyn std::error::Error,
373 tags.variant = variant,
374 tags.topic = topic_name,
375 "error sending kafka message",
376 );
377 metric!(
378 counter(KafkaCounters::ProducerEnqueueError) += 1,
379 variant = variant,
380 topic = topic_name
381 );
382 ClientError::SendFailed(error)
383 })?;
384
385 Ok(topic_name)
386 }
387}