Skip to main content

relay_kafka/producer/
mod.rs

1//! This module contains the Kafka producer related code.
2
3use std::borrow::Cow;
4use std::collections::{BTreeMap, HashMap};
5use std::fmt;
6use std::sync::Arc;
7use std::sync::atomic::{AtomicU32, Ordering};
8use std::time::{Duration, Instant};
9
10use rdkafka::ClientConfig;
11use rdkafka::message::Header;
12use rdkafka::producer::{BaseRecord, Producer as _};
13use relay_statsd::metric;
14use thiserror::Error;
15
16use crate::KafkaTopicConfig;
17use crate::config::{KafkaParams, KafkaTopic};
18use crate::debounced::Debounced;
19use crate::limits::KafkaRateLimits;
20use crate::producer::utils::KafkaHeaders;
21use crate::statsd::{KafkaCounters, KafkaDistributions, KafkaGauges};
22
23mod utils;
24use utils::{Context, ThreadedProducer};
25
26#[cfg(debug_assertions)]
27mod schemas;
28
29const REPORT_FREQUENCY_SECS: u64 = 1;
30const KAFKA_FETCH_METADATA_TIMEOUT: Duration = Duration::from_secs(30);
31
32/// Key type used for partitioning.
33pub type Key = u128;
34
35/// Kafka producer errors.
36#[derive(Error, Debug)]
37pub enum ClientError {
38    /// Failed to send a kafka message.
39    #[error("failed to send kafka message")]
40    SendFailed(#[source] rdkafka::error::KafkaError),
41
42    /// Failed to find configured producer for the requested kafka topic.
43    #[error("failed to find producer for the requested kafka topic")]
44    InvalidTopicName,
45
46    /// Failed to create a kafka producer because of the invalid configuration.
47    #[error("failed to create kafka producer: invalid kafka config")]
48    InvalidConfig(#[source] rdkafka::error::KafkaError),
49
50    /// Failed to serialize the message.
51    #[error("failed to serialize kafka message")]
52    InvalidMsgPack(#[source] rmp_serde::encode::Error),
53
54    /// Failed to serialize the json message using serde.
55    #[error("failed to serialize json message")]
56    InvalidJson(#[from] serde_json::Error),
57
58    /// Failed to run schema validation on message.
59    #[cfg(debug_assertions)]
60    #[error("schema validation failed")]
61    SchemaValidationFailed(#[source] schemas::SchemaError),
62
63    /// Configuration is wrong and it cannot be used to identify the number of a shard.
64    #[error("no kafka configuration for topic")]
65    MissingTopic,
66
67    /// Failed to fetch the metadata of Kafka.
68    #[error("failed to fetch the metadata of Kafka")]
69    MetadataFetchError(rdkafka::error::KafkaError),
70
71    /// Failed to validate the topic.
72    #[error("failed to validate the topic with name {0}: {1:?}")]
73    TopicError(String, rdkafka_sys::rd_kafka_resp_err_t),
74
75    /// Failed to encode the protobuf into the buffer
76    /// because the buffer is too small.
77    #[error("failed to encode protobuf because the buffer is too small")]
78    ProtobufEncodingFailed,
79}
80
81/// Describes the type which can be sent using kafka producer provided by this crate.
82pub trait Message {
83    /// Returns the partitioning key for this kafka message determining.
84    fn key(&self) -> Option<Key>;
85
86    /// Returns the type of the message.
87    fn variant(&self) -> &'static str;
88
89    /// Return the list of headers to be provided when payload is sent to Kafka.
90    fn headers(&self) -> Option<&BTreeMap<String, String>>;
91
92    /// Serializes the message into its binary format.
93    ///
94    /// # Errors
95    /// Returns the [`ClientError::InvalidMsgPack`], [`ClientError::InvalidJson`] or [`ClientError::ProtobufEncodingFailed`]  if the
96    /// serialization failed.
97    fn serialize(&self) -> Result<SerializationOutput<'_>, ClientError>;
98}
99
100/// The output of serializing a message for kafka.
101#[derive(Debug, Clone)]
102pub enum SerializationOutput<'a> {
103    /// Serialized as Json.
104    Json(Cow<'a, [u8]>),
105
106    /// Serialized as MsgPack.
107    MsgPack(Cow<'a, [u8]>),
108
109    /// Serialized as Protobuf.
110    Protobuf(Cow<'a, [u8]>),
111}
112
113impl SerializationOutput<'_> {
114    /// Return the serialized bytes.
115    pub fn as_bytes(&self) -> &[u8] {
116        match self {
117            SerializationOutput::Json(cow) => cow,
118            SerializationOutput::MsgPack(cow) => cow,
119            SerializationOutput::Protobuf(cow) => cow,
120        }
121    }
122}
123
124struct TopicProducers {
125    /// All configured producers.
126    producers: Vec<TopicProducer>,
127}
128
129impl TopicProducers {
130    fn new() -> Self {
131        Self {
132            producers: Vec::new(),
133        }
134    }
135
136    fn select(&self, key: u128) -> Option<&TopicProducer> {
137        debug_assert!(!self.producers.is_empty());
138
139        if self.producers.is_empty() {
140            return None;
141        } else if self.producers.len() == 1 {
142            return self.producers.first();
143        }
144
145        let select = (key % self.producers.len() as u128) as usize;
146        self.producers.get(select)
147    }
148
149    /// Validates the topic by fetching the metadata of the topic directly from Kafka.
150    fn validate_topic(&self) -> Result<(), ClientError> {
151        for tp in &self.producers {
152            let client = tp.producer.client();
153            let metadata = client
154                .fetch_metadata(Some(&tp.topic_name), KAFKA_FETCH_METADATA_TIMEOUT)
155                .map_err(ClientError::MetadataFetchError)?;
156
157            for topic in metadata.topics() {
158                if let Some(error) = topic.error() {
159                    return Err(ClientError::TopicError(topic.name().to_owned(), error));
160                }
161            }
162        }
163
164        Ok(())
165    }
166}
167
168struct TopicProducer {
169    pub topic_name: String,
170    pub producer: Arc<ThreadedProducer>,
171    pub rate_limiter: Option<KafkaRateLimits>,
172}
173
174/// Single kafka producer config with assigned topic.
175struct Producer {
176    /// Topic to producer and rate limiter mappings for sharding.
177    topic_producers: TopicProducers,
178    /// Debouncer for metrics.
179    metrics: Debounced,
180    next_key: AtomicU32,
181}
182
183impl Producer {
184    fn new(topic_producers: TopicProducers) -> Self {
185        Self {
186            topic_producers,
187            metrics: Debounced::new(REPORT_FREQUENCY_SECS),
188            next_key: AtomicU32::new(0),
189        }
190    }
191}
192
193impl Producer {
194    /// Sends the payload to the correct producer for the current topic.
195    fn send(
196        &self,
197        key: Option<Key>,
198        headers: Option<&BTreeMap<String, String>>,
199        variant: &str,
200        payload: &[u8],
201    ) -> Result<&str, ClientError> {
202        let now = Instant::now();
203
204        // Always generate a key to force a slightly more equal distribution across partitions,
205        // see also the documentation for `Self::next_key`.
206        let mut key = key.unwrap_or_else(|| self.next_key());
207
208        let Some(TopicProducer {
209            topic_name,
210            producer,
211            rate_limiter,
212        }) = self.topic_producers.select(key)
213        else {
214            return Err(ClientError::MissingTopic);
215        };
216
217        relay_log::configure_scope(|s| s.set_tag("topic", topic_name));
218
219        let producer_name = producer.context().producer_name();
220
221        metric!(
222            distribution(KafkaDistributions::KafkaMessageSize) = payload.len() as u64,
223            variant = variant,
224            topic = topic_name,
225            producer_name = producer_name
226        );
227
228        let mut headers = headers
229            .unwrap_or(&BTreeMap::new())
230            .iter()
231            .map(|(key, value)| Header {
232                key,
233                value: Some(value),
234            })
235            .collect::<KafkaHeaders>();
236
237        // Always rate limit if there is a rate limiter defined.
238        // Defining a rate limiter for a topic which does not have a consistent routing key is just
239        // a misconfiguration.
240        if let Some(limiter) = &rate_limiter {
241            let is_limited = limiter.try_increment(now, key, 1) < 1;
242
243            if is_limited {
244                metric!(
245                    counter(KafkaCounters::ProducerPartitionKeyRateLimit) += 1,
246                    variant = variant,
247                    topic = topic_name,
248                    producer_name = producer_name
249                );
250
251                headers.insert(Header {
252                    key: "sentry-reshuffled",
253                    value: Some("1"),
254                });
255
256                // Force a 'random' partition, instead the originally assigned partition.
257                key = self.next_key();
258            }
259        }
260
261        let key = u128::to_be_bytes(key);
262        let mut record = BaseRecord::to(topic_name).payload(payload).key(&key);
263        if let Some(headers) = headers.into_inner() {
264            record = record.headers(headers);
265        }
266
267        self.metrics.debounce(now, || {
268            metric!(
269                gauge(KafkaGauges::InFlightCount) = producer.in_flight_count() as u64,
270                variant = variant,
271                topic = topic_name,
272                producer_name = producer_name
273            );
274        });
275
276        producer.send(record).map_err(|(error, _message)| {
277            metric!(
278                counter(KafkaCounters::ProducerEnqueueError) += 1,
279                variant = variant,
280                topic = topic_name,
281                producer_name = producer_name
282            );
283            ClientError::SendFailed(error)
284        })?;
285
286        Ok(topic_name)
287    }
288
289    /// Returns a newly generated key.
290    ///
291    /// Keys are created in sequence, they can be used as a Kafka partition key to force an equal
292    /// distribution across partitions.
293    ///
294    /// Producing a Kafka message without a 'random' key, produces slightly uneven batches to
295    /// partitions. We've seen that this pattern does not play well with our Arroyo consumers
296    /// and leads to higher partition lags.
297    fn next_key(&self) -> u128 {
298        // Overflowing is fine, as it would wrap. The only use for the atomic is to generate
299        // a different key to the one before.
300        self.next_key.fetch_add(1, Ordering::Relaxed) as u128
301    }
302}
303
304impl fmt::Debug for Producer {
305    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
306        let topic_names: Vec<&String> = self
307            .topic_producers
308            .producers
309            .iter()
310            .map(|tp| &tp.topic_name)
311            .collect();
312        f.debug_struct("Producer")
313            .field("topic_names", &topic_names)
314            .field("producers", &"<ThreadedProducers>")
315            .finish_non_exhaustive()
316    }
317}
318
319/// Keeps all the configured Kafka producers and responsible for the routing of the messages.
320#[derive(Debug)]
321pub struct KafkaClient {
322    producers: HashMap<KafkaTopic, Producer>,
323    #[cfg(debug_assertions)]
324    schema_validator: schemas::Validator,
325}
326
327impl KafkaClient {
328    /// Returns the [`KafkaClientBuilder`]
329    pub fn builder() -> KafkaClientBuilder {
330        KafkaClientBuilder::default()
331    }
332
333    /// Sends message to the provided Kafka topic.
334    ///
335    /// Returns the name of the Kafka topic to which the message was produced.
336    pub fn send_message(
337        &self,
338        topic: KafkaTopic,
339        message: &impl Message,
340    ) -> Result<&str, ClientError> {
341        let serialized = message.serialize()?;
342
343        #[cfg(debug_assertions)]
344        if let SerializationOutput::Json(ref bytes) = serialized {
345            self.schema_validator
346                .validate_message_schema(topic, bytes)
347                .map_err(ClientError::SchemaValidationFailed)?;
348        }
349        self.send(
350            topic,
351            message.key(),
352            message.headers(),
353            message.variant(),
354            serialized.as_bytes(),
355        )
356    }
357
358    /// Sends the payload to the correct producer for the current topic.
359    ///
360    /// Returns the name of the Kafka topic to which the message was produced.
361    fn send(
362        &self,
363        topic: KafkaTopic,
364        key: Option<Key>,
365        headers: Option<&BTreeMap<String, String>>,
366        variant: &str,
367        payload: &[u8],
368    ) -> Result<&str, ClientError> {
369        let producer = self
370            .producers
371            .get(&topic)
372            .ok_or_else(|| ClientError::InvalidTopicName)?;
373
374        producer.send(key, headers, variant, payload)
375    }
376}
377
378/// Helper structure responsible for building the actual [`KafkaClient`].
379#[derive(Default)]
380pub struct KafkaClientBuilder {
381    reused_producers: BTreeMap<Option<String>, Arc<ThreadedProducer>>,
382    producers: HashMap<KafkaTopic, Producer>,
383}
384
385impl KafkaClientBuilder {
386    /// Creates an empty KafkaClientBuilder.
387    pub fn new() -> Self {
388        Self::default()
389    }
390
391    /// Adds topic configuration to the current [`KafkaClientBuilder`], which in return assigns
392    /// dedicates producer to the topic which can will be used to send the messages.
393    ///
394    /// # Errors
395    /// Returns [`ClientError::InvalidConfig`] error if the provided configuration is wrong and
396    /// the producer could not be created.
397    pub fn add_kafka_topic_config(
398        mut self,
399        topic: KafkaTopic,
400        topic_config: &KafkaTopicConfig<'_>,
401        validate_topic: bool,
402    ) -> Result<Self, ClientError> {
403        let mut topic_producers = TopicProducers::new();
404
405        // Process each shard configuration (one KafkaParams per shard)
406        // We must preserve the original order from the configuration
407        // because hash-based routing depends on shard index positions
408        for params in topic_config.topics() {
409            let KafkaParams {
410                topic_name,
411                config_name,
412                params: config_params,
413                key_rate_limit,
414            } = params;
415
416            let rate_limiter = key_rate_limit.map(|limit| {
417                KafkaRateLimits::new(
418                    limit.limit_per_window,
419                    Duration::from_secs(limit.window_secs),
420                )
421            });
422
423            let config_name = config_name.map(str::to_owned);
424
425            // Get or create producer for this broker config
426            let threaded_producer = if let Some(producer) = self.reused_producers.get(&config_name)
427            {
428                Arc::clone(producer)
429            } else {
430                let mut client_config = ClientConfig::new();
431                for config_p in *config_params {
432                    client_config.set(config_p.name.as_str(), config_p.value.as_str());
433                }
434
435                // Extract producer name from client.id, fallback to config name, then "unknown"
436                let producer_name = config_params
437                    .iter()
438                    .find(|p| p.name == "client.id")
439                    .map(|p| p.value.clone())
440                    .or_else(|| config_name.clone())
441                    .unwrap_or_else(|| "unknown".to_owned());
442
443                let producer = Arc::new(
444                    client_config
445                        .create_with_context(Context::new(producer_name))
446                        .map_err(ClientError::InvalidConfig)?,
447                );
448
449                self.reused_producers
450                    .insert(config_name, Arc::clone(&producer));
451
452                producer
453            };
454
455            topic_producers.producers.push(TopicProducer {
456                topic_name: topic_name.clone(),
457                producer: threaded_producer,
458                rate_limiter,
459            });
460        }
461
462        let producer = Producer::new(topic_producers);
463        if validate_topic {
464            producer.topic_producers.validate_topic()?;
465        }
466        self.producers.insert(topic, producer);
467
468        Ok(self)
469    }
470
471    /// Consumes self and returns the built [`KafkaClient`].
472    pub fn build(self) -> KafkaClient {
473        KafkaClient {
474            producers: self.producers,
475            #[cfg(debug_assertions)]
476            schema_validator: schemas::Validator::default(),
477        }
478    }
479}
480
481impl fmt::Debug for KafkaClientBuilder {
482    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
483        f.debug_struct("KafkaClientBuilder")
484            .field("reused_producers", &"<CachedProducers>")
485            .field("producers", &self.producers)
486            .finish()
487    }
488}