relay_kafka/producer/
mod.rs

1//! This module contains the Kafka producer related code.
2
3use std::borrow::Cow;
4use std::collections::{BTreeMap, HashMap};
5use std::fmt;
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8
9use rdkafka::ClientConfig;
10use rdkafka::message::Header;
11use rdkafka::producer::{BaseRecord, Producer as _};
12use relay_statsd::metric;
13use thiserror::Error;
14
15use crate::config::{KafkaParams, KafkaTopic};
16use crate::debounced::Debounced;
17use crate::limits::KafkaRateLimits;
18use crate::producer::utils::KafkaHeaders;
19use crate::statsd::{KafkaCounters, KafkaGauges, KafkaHistograms};
20
21mod utils;
22use utils::{Context, ThreadedProducer};
23
24#[cfg(feature = "schemas")]
25mod schemas;
26
27const REPORT_FREQUENCY_SECS: u64 = 1;
28const KAFKA_FETCH_METADATA_TIMEOUT: Duration = Duration::from_secs(30);
29
30/// Kafka producer errors.
31#[derive(Error, Debug)]
32pub enum ClientError {
33    /// Failed to send a kafka message.
34    #[error("failed to send kafka message")]
35    SendFailed(#[source] rdkafka::error::KafkaError),
36
37    /// Failed to find configured producer for the requested kafka topic.
38    #[error("failed to find producer for the requested kafka topic")]
39    InvalidTopicName,
40
41    /// Failed to create a kafka producer because of the invalid configuration.
42    #[error("failed to create kafka producer: invalid kafka config")]
43    InvalidConfig(#[source] rdkafka::error::KafkaError),
44
45    /// Failed to serialize the message.
46    #[error("failed to serialize kafka message")]
47    InvalidMsgPack(#[source] rmp_serde::encode::Error),
48
49    /// Failed to serialize the json message using serde.
50    #[error("failed to serialize json message")]
51    InvalidJson(#[source] serde_json::Error),
52
53    /// Failed to run schema validation on message.
54    #[cfg(feature = "schemas")]
55    #[error("failed to run schema validation on message")]
56    SchemaValidationFailed(#[source] schemas::SchemaError),
57
58    /// Configuration is wrong and it cannot be used to identify the number of a shard.
59    #[error("invalid kafka shard")]
60    InvalidShard,
61
62    /// Failed to fetch the metadata of Kafka.
63    #[error("failed to fetch the metadata of Kafka")]
64    MetadataFetchError(rdkafka::error::KafkaError),
65
66    /// Failed to validate the topic.
67    #[error("failed to validate the topic with name {0}: {1:?}")]
68    TopicError(String, rdkafka_sys::rd_kafka_resp_err_t),
69
70    /// Failed to encode the protobuf into the buffer
71    /// because the buffer is too small.
72    #[error("failed to encode protobuf because the buffer is too small")]
73    ProtobufEncodingFailed,
74}
75
76/// Describes the type which can be sent using kafka producer provided by this crate.
77pub trait Message {
78    /// Returns the partitioning key for this kafka message determining.
79    fn key(&self) -> Option<[u8; 16]>;
80
81    /// Returns the type of the message.
82    fn variant(&self) -> &'static str;
83
84    /// Return the list of headers to be provided when payload is sent to Kafka.
85    fn headers(&self) -> Option<&BTreeMap<String, String>>;
86
87    /// Serializes the message into its binary format.
88    ///
89    /// # Errors
90    /// Returns the [`ClientError::InvalidMsgPack`] or [`ClientError::InvalidJson`] if the
91    /// serialization failed.
92    fn serialize(&self) -> Result<Cow<'_, [u8]>, ClientError>;
93}
94
95/// Single kafka producer config with assigned topic.
96struct Producer {
97    /// Kafka topic name.
98    topic_name: String,
99    /// Real kafka producer.
100    producer: Arc<ThreadedProducer>,
101    /// Debouncer for metrics.
102    metrics: Debounced,
103    /// Optional rate limits to apply.
104    rate_limiter: Option<KafkaRateLimits>,
105}
106
107impl Producer {
108    fn new(
109        topic_name: String,
110        producer: Arc<ThreadedProducer>,
111        rate_limiter: Option<KafkaRateLimits>,
112    ) -> Self {
113        Self {
114            topic_name,
115            producer,
116            metrics: Debounced::new(REPORT_FREQUENCY_SECS),
117            rate_limiter,
118        }
119    }
120
121    /// Validates the topic by fetching the metadata of the topic directly from Kafka.
122    fn validate_topic(&self) -> Result<(), ClientError> {
123        let client = self.producer.client();
124        let metadata = client
125            .fetch_metadata(Some(&self.topic_name), KAFKA_FETCH_METADATA_TIMEOUT)
126            .map_err(ClientError::MetadataFetchError)?;
127
128        for topic in metadata.topics() {
129            if let Some(error) = topic.error() {
130                return Err(ClientError::TopicError(topic.name().to_string(), error));
131            }
132        }
133
134        Ok(())
135    }
136}
137
138impl fmt::Debug for Producer {
139    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
140        f.debug_struct("Producer")
141            .field("topic_name", &self.topic_name)
142            .field("producer", &"<ThreadedProducer>")
143            .finish_non_exhaustive()
144    }
145}
146
147/// Keeps all the configured kafka producers and responsible for the routing of the messages.
148#[derive(Debug)]
149pub struct KafkaClient {
150    producers: HashMap<KafkaTopic, Producer>,
151    #[cfg(feature = "schemas")]
152    schema_validator: schemas::Validator,
153}
154
155impl KafkaClient {
156    /// Returns the [`KafkaClientBuilder`]
157    pub fn builder() -> KafkaClientBuilder {
158        KafkaClientBuilder::default()
159    }
160
161    /// Sends message to the provided kafka topic.
162    ///
163    /// Returns the name of the kafka topic to which the message was produced.
164    pub fn send_message(
165        &self,
166        topic: KafkaTopic,
167        message: &impl Message,
168    ) -> Result<&str, ClientError> {
169        let serialized = message.serialize()?;
170        #[cfg(feature = "schemas")]
171        self.schema_validator
172            .validate_message_schema(topic, &serialized)
173            .map_err(ClientError::SchemaValidationFailed)?;
174
175        self.send(
176            topic,
177            message.key(),
178            message.headers(),
179            message.variant(),
180            &serialized,
181        )
182    }
183
184    /// Sends the payload to the correct producer for the current topic.
185    ///
186    /// Returns the name of the kafka topic to which the message was produced.
187    pub fn send(
188        &self,
189        topic: KafkaTopic,
190        key: Option<[u8; 16]>,
191        headers: Option<&BTreeMap<String, String>>,
192        variant: &str,
193        payload: &[u8],
194    ) -> Result<&str, ClientError> {
195        let producer = self.producers.get(&topic).ok_or_else(|| {
196            relay_log::error!(
197                "attempted to send message to {topic:?} using an unconfigured kafka producer",
198            );
199            ClientError::InvalidTopicName
200        })?;
201
202        producer.send(key, headers, variant, payload)
203    }
204}
205
206/// Helper structure responsible for building the actual [`KafkaClient`].
207#[derive(Default)]
208pub struct KafkaClientBuilder {
209    reused_producers: BTreeMap<Option<String>, Arc<ThreadedProducer>>,
210    producers: HashMap<KafkaTopic, Producer>,
211}
212
213impl KafkaClientBuilder {
214    /// Creates an empty KafkaClientBuilder.
215    pub fn new() -> Self {
216        Self::default()
217    }
218
219    /// Adds topic configuration to the current [`KafkaClientBuilder`], which in return assigns
220    /// dedicates producer to the topic which can will be used to send the messages.
221    ///
222    /// # Errors
223    /// Returns [`ClientError::InvalidConfig`] error if the provided configuration is wrong and
224    /// the producer could not be created.
225    pub fn add_kafka_topic_config(
226        mut self,
227        topic: KafkaTopic,
228        params: &KafkaParams,
229        validate_topic: bool,
230    ) -> Result<Self, ClientError> {
231        let mut client_config = ClientConfig::new();
232
233        let KafkaParams {
234            topic_name,
235            config_name,
236            params,
237            key_rate_limit,
238        } = params;
239
240        let rate_limiter = key_rate_limit.map(|limit| {
241            KafkaRateLimits::new(
242                limit.limit_per_window,
243                Duration::from_secs(limit.window_secs),
244            )
245        });
246
247        let config_name = config_name.map(str::to_string);
248
249        if let Some(producer) = self.reused_producers.get(&config_name) {
250            let producer = Producer::new(
251                (*topic_name).to_string(),
252                Arc::clone(producer),
253                rate_limiter,
254            );
255            if validate_topic {
256                producer.validate_topic()?;
257            }
258            self.producers.insert(topic, producer);
259            return Ok(self);
260        }
261
262        for config_p in *params {
263            client_config.set(config_p.name.as_str(), config_p.value.as_str());
264        }
265
266        let producer = Arc::new(
267            client_config
268                .create_with_context(Context)
269                .map_err(ClientError::InvalidConfig)?,
270        );
271
272        self.reused_producers
273            .insert(config_name, Arc::clone(&producer));
274
275        let producer = Producer::new((*topic_name).to_string(), producer, rate_limiter);
276        if validate_topic {
277            producer.validate_topic()?;
278        }
279        self.producers.insert(topic, producer);
280
281        Ok(self)
282    }
283
284    /// Consumes self and returns the built [`KafkaClient`].
285    pub fn build(self) -> KafkaClient {
286        KafkaClient {
287            producers: self.producers,
288            #[cfg(feature = "schemas")]
289            schema_validator: schemas::Validator::default(),
290        }
291    }
292}
293
294impl fmt::Debug for KafkaClientBuilder {
295    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
296        f.debug_struct("KafkaClientBuilder")
297            .field("reused_producers", &"<CachedProducers>")
298            .field("producers", &self.producers)
299            .finish()
300    }
301}
302
303impl Producer {
304    /// Sends the payload to the correct producer for the current topic.
305    fn send(
306        &self,
307        key: Option<[u8; 16]>,
308        headers: Option<&BTreeMap<String, String>>,
309        variant: &str,
310        payload: &[u8],
311    ) -> Result<&str, ClientError> {
312        let now = Instant::now();
313        let topic_name = self.topic_name.as_str();
314
315        metric!(
316            histogram(KafkaHistograms::KafkaMessageSize) = payload.len() as u64,
317            variant = variant,
318            topic = topic_name,
319        );
320
321        let mut headers = headers
322            .unwrap_or(&BTreeMap::new())
323            .iter()
324            .map(|(key, value)| Header {
325                key,
326                value: Some(value),
327            })
328            .collect::<KafkaHeaders>();
329
330        let key = match (key, self.rate_limiter.as_ref()) {
331            (Some(key), Some(limiter)) => {
332                let is_limited = limiter.try_increment(now, key, 1) < 1;
333
334                if is_limited {
335                    metric!(
336                        counter(KafkaCounters::ProducerPartitionKeyRateLimit) += 1,
337                        variant = variant,
338                        topic = topic_name,
339                    );
340
341                    headers.insert(Header {
342                        key: "sentry-reshuffled",
343                        value: Some("1"),
344                    });
345
346                    None
347                } else {
348                    Some(key)
349                }
350            }
351            (key, _) => key,
352        };
353
354        let mut record = BaseRecord::to(topic_name).payload(payload);
355        if let Some(headers) = headers.into_inner() {
356            record = record.headers(headers);
357        }
358        if let Some(key) = key.as_ref() {
359            record = record.key(key);
360        }
361
362        self.metrics.debounce(now, || {
363            metric!(
364                gauge(KafkaGauges::InFlightCount) = self.producer.in_flight_count() as u64,
365                variant = variant,
366                topic = topic_name
367            );
368        });
369
370        self.producer.send(record).map_err(|(error, _message)| {
371            relay_log::error!(
372                error = &error as &dyn std::error::Error,
373                tags.variant = variant,
374                tags.topic = topic_name,
375                "error sending kafka message",
376            );
377            metric!(
378                counter(KafkaCounters::ProducerEnqueueError) += 1,
379                variant = variant,
380                topic = topic_name
381            );
382            ClientError::SendFailed(error)
383        })?;
384
385        Ok(topic_name)
386    }
387}