relay_kafka/producer/
mod.rs

1//! This module contains the Kafka producer related code.
2
3use std::borrow::Cow;
4use std::collections::{BTreeMap, HashMap};
5use std::fmt;
6use std::sync::Arc;
7use std::sync::atomic::{AtomicU32, Ordering};
8use std::time::{Duration, Instant};
9
10use rdkafka::ClientConfig;
11use rdkafka::message::Header;
12use rdkafka::producer::{BaseRecord, Producer as _};
13use relay_statsd::metric;
14use thiserror::Error;
15
16use crate::KafkaTopicConfig;
17use crate::config::{KafkaParams, KafkaTopic};
18use crate::debounced::Debounced;
19use crate::limits::KafkaRateLimits;
20use crate::producer::utils::KafkaHeaders;
21use crate::statsd::{KafkaCounters, KafkaGauges, KafkaHistograms};
22
23mod utils;
24use utils::{Context, ThreadedProducer};
25
26#[cfg(debug_assertions)]
27mod schemas;
28
29const REPORT_FREQUENCY_SECS: u64 = 1;
30const KAFKA_FETCH_METADATA_TIMEOUT: Duration = Duration::from_secs(30);
31
32/// Key type used for partitioning.
33pub type Key = u128;
34
35/// Kafka producer errors.
36#[derive(Error, Debug)]
37pub enum ClientError {
38    /// Failed to send a kafka message.
39    #[error("failed to send kafka message")]
40    SendFailed(#[source] rdkafka::error::KafkaError),
41
42    /// Failed to find configured producer for the requested kafka topic.
43    #[error("failed to find producer for the requested kafka topic")]
44    InvalidTopicName,
45
46    /// Failed to create a kafka producer because of the invalid configuration.
47    #[error("failed to create kafka producer: invalid kafka config")]
48    InvalidConfig(#[source] rdkafka::error::KafkaError),
49
50    /// Failed to serialize the message.
51    #[error("failed to serialize kafka message")]
52    InvalidMsgPack(#[source] rmp_serde::encode::Error),
53
54    /// Failed to serialize the json message using serde.
55    #[error("failed to serialize json message")]
56    InvalidJson(#[source] serde_json::Error),
57
58    /// Failed to run schema validation on message.
59    #[cfg(debug_assertions)]
60    #[error("failed to run schema validation on message")]
61    SchemaValidationFailed(#[source] schemas::SchemaError),
62
63    /// Configuration is wrong and it cannot be used to identify the number of a shard.
64    #[error("no kafka configuration for topic")]
65    MissingTopic,
66
67    /// Failed to fetch the metadata of Kafka.
68    #[error("failed to fetch the metadata of Kafka")]
69    MetadataFetchError(rdkafka::error::KafkaError),
70
71    /// Failed to validate the topic.
72    #[error("failed to validate the topic with name {0}: {1:?}")]
73    TopicError(String, rdkafka_sys::rd_kafka_resp_err_t),
74
75    /// Failed to encode the protobuf into the buffer
76    /// because the buffer is too small.
77    #[error("failed to encode protobuf because the buffer is too small")]
78    ProtobufEncodingFailed,
79}
80
81/// Describes the type which can be sent using kafka producer provided by this crate.
82pub trait Message {
83    /// Returns the partitioning key for this kafka message determining.
84    fn key(&self) -> Option<Key>;
85
86    /// Returns the type of the message.
87    fn variant(&self) -> &'static str;
88
89    /// Return the list of headers to be provided when payload is sent to Kafka.
90    fn headers(&self) -> Option<&BTreeMap<String, String>>;
91
92    /// Serializes the message into its binary format.
93    ///
94    /// # Errors
95    /// Returns the [`ClientError::InvalidMsgPack`], [`ClientError::InvalidJson`] or [`ClientError::ProtobufEncodingFailed`]  if the
96    /// serialization failed.
97    fn serialize(&self) -> Result<SerializationOutput<'_>, ClientError>;
98}
99
100/// The output of serializing a message for kafka.
101#[derive(Debug, Clone)]
102pub enum SerializationOutput<'a> {
103    /// Serialized as Json.
104    Json(Cow<'a, [u8]>),
105
106    /// Serialized as MsgPack.
107    MsgPack(Cow<'a, [u8]>),
108
109    /// Serialized as Protobuf.
110    Protobuf(Cow<'a, [u8]>),
111}
112
113impl SerializationOutput<'_> {
114    /// Return the serialized bytes.
115    pub fn as_bytes(&self) -> &[u8] {
116        match self {
117            SerializationOutput::Json(cow) => cow,
118            SerializationOutput::MsgPack(cow) => cow,
119            SerializationOutput::Protobuf(cow) => cow,
120        }
121    }
122}
123
124struct TopicProducers {
125    /// All configured producers.
126    producers: Vec<TopicProducer>,
127}
128
129impl TopicProducers {
130    fn new() -> Self {
131        Self {
132            producers: Vec::new(),
133        }
134    }
135
136    fn select(&self, key: u128) -> Option<&TopicProducer> {
137        debug_assert!(!self.producers.is_empty());
138
139        if self.producers.is_empty() {
140            return None;
141        } else if self.producers.len() == 1 {
142            return self.producers.first();
143        }
144
145        let select = (key % self.producers.len() as u128) as usize;
146        self.producers.get(select)
147    }
148
149    /// Validates the topic by fetching the metadata of the topic directly from Kafka.
150    fn validate_topic(&self) -> Result<(), ClientError> {
151        for tp in &self.producers {
152            let client = tp.producer.client();
153            let metadata = client
154                .fetch_metadata(Some(&tp.topic_name), KAFKA_FETCH_METADATA_TIMEOUT)
155                .map_err(ClientError::MetadataFetchError)?;
156
157            for topic in metadata.topics() {
158                if let Some(error) = topic.error() {
159                    return Err(ClientError::TopicError(topic.name().to_owned(), error));
160                }
161            }
162        }
163
164        Ok(())
165    }
166}
167
168struct TopicProducer {
169    pub topic_name: String,
170    pub producer: Arc<ThreadedProducer>,
171    pub rate_limiter: Option<KafkaRateLimits>,
172}
173
174/// Single kafka producer config with assigned topic.
175struct Producer {
176    /// Topic to producer and rate limiter mappings for sharding.
177    topic_producers: TopicProducers,
178    /// Debouncer for metrics.
179    metrics: Debounced,
180    next_key: AtomicU32,
181}
182
183impl Producer {
184    fn new(topic_producers: TopicProducers) -> Self {
185        Self {
186            topic_producers,
187            metrics: Debounced::new(REPORT_FREQUENCY_SECS),
188            next_key: AtomicU32::new(0),
189        }
190    }
191}
192
193impl Producer {
194    /// Sends the payload to the correct producer for the current topic.
195    fn send(
196        &self,
197        key: Option<Key>,
198        headers: Option<&BTreeMap<String, String>>,
199        variant: &str,
200        payload: &[u8],
201    ) -> Result<&str, ClientError> {
202        let now = Instant::now();
203
204        // Always generate a key to force a slightly more equal distribution across partitions,
205        // see also the documentation for `Self::next_key`.
206        let mut key = key.unwrap_or_else(|| self.next_key());
207
208        let Some(TopicProducer {
209            topic_name,
210            producer,
211            rate_limiter,
212        }) = self.topic_producers.select(key)
213        else {
214            return Err(ClientError::MissingTopic);
215        };
216
217        let producer_name = producer.context().producer_name();
218
219        metric!(
220            histogram(KafkaHistograms::KafkaMessageSize) = payload.len() as u64,
221            variant = variant,
222            topic = topic_name,
223            producer_name = producer_name
224        );
225
226        let mut headers = headers
227            .unwrap_or(&BTreeMap::new())
228            .iter()
229            .map(|(key, value)| Header {
230                key,
231                value: Some(value),
232            })
233            .collect::<KafkaHeaders>();
234
235        // Always rate limit if there is a rate limiter defined.
236        // Defining a rate limiter for a topic which does not have a consistent routing key is just
237        // a misconfiguration.
238        if let Some(limiter) = &rate_limiter {
239            let is_limited = limiter.try_increment(now, key, 1) < 1;
240
241            if is_limited {
242                metric!(
243                    counter(KafkaCounters::ProducerPartitionKeyRateLimit) += 1,
244                    variant = variant,
245                    topic = topic_name,
246                    producer_name = producer_name
247                );
248
249                headers.insert(Header {
250                    key: "sentry-reshuffled",
251                    value: Some("1"),
252                });
253
254                // Force a 'random' partition, instead the originally assigned partition.
255                key = self.next_key();
256            }
257        }
258
259        let key = u128::to_be_bytes(key);
260        let mut record = BaseRecord::to(topic_name).payload(payload).key(&key);
261        if let Some(headers) = headers.into_inner() {
262            record = record.headers(headers);
263        }
264
265        self.metrics.debounce(now, || {
266            metric!(
267                gauge(KafkaGauges::InFlightCount) = producer.in_flight_count() as u64,
268                variant = variant,
269                topic = topic_name,
270                producer_name = producer_name
271            );
272        });
273
274        producer.send(record).map_err(|(error, _message)| {
275            relay_log::error!(
276                error = &error as &dyn std::error::Error,
277                tags.variant = variant,
278                tags.topic = topic_name,
279                "error sending kafka message",
280            );
281            metric!(
282                counter(KafkaCounters::ProducerEnqueueError) += 1,
283                variant = variant,
284                topic = topic_name,
285                producer_name = producer_name
286            );
287            ClientError::SendFailed(error)
288        })?;
289
290        Ok(topic_name)
291    }
292
293    /// Returns a newly generated key.
294    ///
295    /// Keys are created in sequence, they can be used as a Kafka partition key to force an equal
296    /// distribution across partitions.
297    ///
298    /// Producing a Kafka message without a 'random' key, produces slightly uneven batches to
299    /// partitions. We've seen that this pattern does not play well with our Arroyo consumers
300    /// and leads to higher partition lags.
301    fn next_key(&self) -> u128 {
302        // Overflowing is fine, as it would wrap. The only use for the atomic is to generate
303        // a different key to the one before.
304        self.next_key.fetch_add(1, Ordering::Relaxed) as u128
305    }
306}
307
308impl fmt::Debug for Producer {
309    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
310        let topic_names: Vec<&String> = self
311            .topic_producers
312            .producers
313            .iter()
314            .map(|tp| &tp.topic_name)
315            .collect();
316        f.debug_struct("Producer")
317            .field("topic_names", &topic_names)
318            .field("producers", &"<ThreadedProducers>")
319            .finish_non_exhaustive()
320    }
321}
322
323/// Keeps all the configured Kafka producers and responsible for the routing of the messages.
324#[derive(Debug)]
325pub struct KafkaClient {
326    producers: HashMap<KafkaTopic, Producer>,
327    #[cfg(debug_assertions)]
328    schema_validator: schemas::Validator,
329}
330
331impl KafkaClient {
332    /// Returns the [`KafkaClientBuilder`]
333    pub fn builder() -> KafkaClientBuilder {
334        KafkaClientBuilder::default()
335    }
336
337    /// Sends message to the provided Kafka topic.
338    ///
339    /// Returns the name of the Kafka topic to which the message was produced.
340    pub fn send_message(
341        &self,
342        topic: KafkaTopic,
343        message: &impl Message,
344    ) -> Result<&str, ClientError> {
345        let serialized = message.serialize()?;
346
347        #[cfg(debug_assertions)]
348        if let SerializationOutput::Json(ref bytes) = serialized {
349            self.schema_validator
350                .validate_message_schema(topic, bytes)
351                .map_err(ClientError::SchemaValidationFailed)?;
352        }
353
354        self.send(
355            topic,
356            message.key(),
357            message.headers(),
358            message.variant(),
359            serialized.as_bytes(),
360        )
361    }
362
363    /// Sends the payload to the correct producer for the current topic.
364    ///
365    /// Returns the name of the Kafka topic to which the message was produced.
366    pub fn send(
367        &self,
368        topic: KafkaTopic,
369        key: Option<Key>,
370        headers: Option<&BTreeMap<String, String>>,
371        variant: &str,
372        payload: &[u8],
373    ) -> Result<&str, ClientError> {
374        let producer = self.producers.get(&topic).ok_or_else(|| {
375            relay_log::error!(
376                "attempted to send message to {topic:?} using an unconfigured kafka producer",
377            );
378            ClientError::InvalidTopicName
379        })?;
380
381        producer.send(key, headers, variant, payload)
382    }
383}
384
385/// Helper structure responsible for building the actual [`KafkaClient`].
386#[derive(Default)]
387pub struct KafkaClientBuilder {
388    reused_producers: BTreeMap<Option<String>, Arc<ThreadedProducer>>,
389    producers: HashMap<KafkaTopic, Producer>,
390}
391
392impl KafkaClientBuilder {
393    /// Creates an empty KafkaClientBuilder.
394    pub fn new() -> Self {
395        Self::default()
396    }
397
398    /// Adds topic configuration to the current [`KafkaClientBuilder`], which in return assigns
399    /// dedicates producer to the topic which can will be used to send the messages.
400    ///
401    /// # Errors
402    /// Returns [`ClientError::InvalidConfig`] error if the provided configuration is wrong and
403    /// the producer could not be created.
404    pub fn add_kafka_topic_config(
405        mut self,
406        topic: KafkaTopic,
407        topic_config: &KafkaTopicConfig<'_>,
408        validate_topic: bool,
409    ) -> Result<Self, ClientError> {
410        let mut topic_producers = TopicProducers::new();
411
412        // Process each shard configuration (one KafkaParams per shard)
413        // We must preserve the original order from the configuration
414        // because hash-based routing depends on shard index positions
415        for params in topic_config.topics() {
416            let KafkaParams {
417                topic_name,
418                config_name,
419                params: config_params,
420                key_rate_limit,
421            } = params;
422
423            let rate_limiter = key_rate_limit.map(|limit| {
424                KafkaRateLimits::new(
425                    limit.limit_per_window,
426                    Duration::from_secs(limit.window_secs),
427                )
428            });
429
430            let config_name = config_name.map(str::to_string);
431
432            // Get or create producer for this broker config
433            let threaded_producer = if let Some(producer) = self.reused_producers.get(&config_name)
434            {
435                Arc::clone(producer)
436            } else {
437                let mut client_config = ClientConfig::new();
438                for config_p in *config_params {
439                    client_config.set(config_p.name.as_str(), config_p.value.as_str());
440                }
441
442                // Extract producer name from client.id, fallback to config name, then "unknown"
443                let producer_name = config_params
444                    .iter()
445                    .find(|p| p.name == "client.id")
446                    .map(|p| p.value.clone())
447                    .or_else(|| config_name.clone())
448                    .unwrap_or_else(|| "unknown".to_owned());
449
450                let producer = Arc::new(
451                    client_config
452                        .create_with_context(Context::new(producer_name))
453                        .map_err(ClientError::InvalidConfig)?,
454                );
455
456                self.reused_producers
457                    .insert(config_name, Arc::clone(&producer));
458
459                producer
460            };
461
462            topic_producers.producers.push(TopicProducer {
463                topic_name: topic_name.clone(),
464                producer: threaded_producer,
465                rate_limiter,
466            });
467        }
468
469        let producer = Producer::new(topic_producers);
470        if validate_topic {
471            producer.topic_producers.validate_topic()?;
472        }
473        self.producers.insert(topic, producer);
474
475        Ok(self)
476    }
477
478    /// Consumes self and returns the built [`KafkaClient`].
479    pub fn build(self) -> KafkaClient {
480        KafkaClient {
481            producers: self.producers,
482            #[cfg(debug_assertions)]
483            schema_validator: schemas::Validator::default(),
484        }
485    }
486}
487
488impl fmt::Debug for KafkaClientBuilder {
489    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
490        f.debug_struct("KafkaClientBuilder")
491            .field("reused_producers", &"<CachedProducers>")
492            .field("producers", &self.producers)
493            .finish()
494    }
495}