relay_kafka/producer/
mod.rs

1//! This module contains the kafka producer related code.
2
3use std::borrow::Cow;
4use std::collections::{BTreeMap, HashMap};
5use std::fmt;
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use rdkafka::message::{Header, OwnedHeaders};
11use rdkafka::producer::{BaseRecord, Producer as _};
12use rdkafka::ClientConfig;
13use relay_statsd::metric;
14use thiserror::Error;
15
16use crate::config::{KafkaParams, KafkaTopic};
17use crate::statsd::{KafkaCounters, KafkaGauges, KafkaHistograms};
18
19mod utils;
20use utils::{Context, ThreadedProducer};
21
22#[cfg(feature = "schemas")]
23mod schemas;
24
25const REPORT_FREQUENCY_SECS: u64 = 1;
26const KAFKA_FETCH_METADATA_TIMEOUT: Duration = Duration::from_secs(30);
27
28/// Kafka producer errors.
29#[derive(Error, Debug)]
30pub enum ClientError {
31    /// Failed to send a kafka message.
32    #[error("failed to send kafka message")]
33    SendFailed(#[source] rdkafka::error::KafkaError),
34
35    /// Failed to find configured producer for the requested kafka topic.
36    #[error("failed to find producer for the requested kafka topic")]
37    InvalidTopicName,
38
39    /// Failed to create a kafka producer because of the invalid configuration.
40    #[error("failed to create kafka producer: invalid kafka config")]
41    InvalidConfig(#[source] rdkafka::error::KafkaError),
42
43    /// Failed to serialize the message.
44    #[error("failed to serialize kafka message")]
45    InvalidMsgPack(#[source] rmp_serde::encode::Error),
46
47    /// Failed to serialize the json message using serde.
48    #[error("failed to serialize json message")]
49    InvalidJson(#[source] serde_json::Error),
50
51    /// Failed to run schema validation on message.
52    #[cfg(feature = "schemas")]
53    #[error("failed to run schema validation on message")]
54    SchemaValidationFailed(#[source] schemas::SchemaError),
55
56    /// Configuration is wrong and it cannot be used to identify the number of a shard.
57    #[error("invalid kafka shard")]
58    InvalidShard,
59
60    /// Failed to fetch the metadata of Kafka.
61    #[error("failed to fetch the metadata of Kafka")]
62    MetadataFetchError(rdkafka::error::KafkaError),
63
64    /// Failed to validate the topic.
65    #[error("failed to validate the topic with name {0}: {1:?}")]
66    TopicError(String, rdkafka_sys::rd_kafka_resp_err_t),
67}
68
69/// Describes the type which can be sent using kafka producer provided by this crate.
70pub trait Message {
71    /// Returns the partitioning key for this kafka message determining.
72    fn key(&self) -> [u8; 16];
73
74    /// Returns the type of the message.
75    fn variant(&self) -> &'static str;
76
77    /// Return the list of headers to be provided when payload is sent to Kafka.
78    fn headers(&self) -> Option<&BTreeMap<String, String>>;
79
80    /// Serializes the message into its binary format.
81    ///
82    /// # Errors
83    /// Returns the [`ClientError::InvalidMsgPack`] or [`ClientError::InvalidJson`] if the
84    /// serialization failed.
85    fn serialize(&self) -> Result<Cow<'_, [u8]>, ClientError>;
86}
87
88/// Single kafka producer config with assigned topic.
89struct Producer {
90    /// Kafka topic name.
91    topic_name: String,
92    /// Real kafka producer.
93    producer: Arc<ThreadedProducer>,
94    /// Debouncer for metrics.
95    metrics: Debounced,
96}
97
98impl Producer {
99    fn new(topic_name: String, producer: Arc<ThreadedProducer>) -> Self {
100        Self {
101            topic_name,
102            producer,
103            metrics: Debounced::new(REPORT_FREQUENCY_SECS),
104        }
105    }
106
107    /// Validates the topic by fetching the metadata of the topic directly from Kafka.
108    fn validate_topic(&self) -> Result<(), ClientError> {
109        let client = self.producer.client();
110        let metadata = client
111            .fetch_metadata(Some(&self.topic_name), KAFKA_FETCH_METADATA_TIMEOUT)
112            .map_err(ClientError::MetadataFetchError)?;
113
114        for topic in metadata.topics() {
115            if let Some(error) = topic.error() {
116                return Err(ClientError::TopicError(topic.name().to_string(), error));
117            }
118        }
119
120        Ok(())
121    }
122}
123
124impl fmt::Debug for Producer {
125    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
126        f.debug_struct("Producer")
127            .field("topic_name", &self.topic_name)
128            .field("producer", &"<ThreadedProducer>")
129            .finish_non_exhaustive()
130    }
131}
132
133/// Keeps all the configured kafka producers and responsible for the routing of the messages.
134#[derive(Debug)]
135pub struct KafkaClient {
136    producers: HashMap<KafkaTopic, Producer>,
137    #[cfg(feature = "schemas")]
138    schema_validator: schemas::Validator,
139}
140
141impl KafkaClient {
142    /// Returns the [`KafkaClientBuilder`]
143    pub fn builder() -> KafkaClientBuilder {
144        KafkaClientBuilder::default()
145    }
146
147    /// Sends message to the provided kafka topic.
148    ///
149    /// Returns the name of the kafka topic to which the message was produced.
150    pub fn send_message(
151        &self,
152        topic: KafkaTopic,
153        message: &impl Message,
154    ) -> Result<&str, ClientError> {
155        let serialized = message.serialize()?;
156        #[cfg(feature = "schemas")]
157        self.schema_validator
158            .validate_message_schema(topic, &serialized)
159            .map_err(ClientError::SchemaValidationFailed)?;
160        let key = message.key();
161        self.send(
162            topic,
163            &key,
164            message.headers(),
165            message.variant(),
166            &serialized,
167        )
168    }
169
170    /// Sends the payload to the correct producer for the current topic.
171    ///
172    /// Returns the name of the kafka topic to which the message was produced.
173    pub fn send(
174        &self,
175        topic: KafkaTopic,
176        key: &[u8; 16],
177        headers: Option<&BTreeMap<String, String>>,
178        variant: &str,
179        payload: &[u8],
180    ) -> Result<&str, ClientError> {
181        let producer = self.producers.get(&topic).ok_or_else(|| {
182            relay_log::error!(
183                "attempted to send message to {topic:?} using an unconfigured kafka producer",
184            );
185            ClientError::InvalidTopicName
186        })?;
187        producer.send(key, headers, variant, payload)
188    }
189}
190
191/// Helper structure responsible for building the actual [`KafkaClient`].
192#[derive(Default)]
193pub struct KafkaClientBuilder {
194    reused_producers: BTreeMap<Option<String>, Arc<ThreadedProducer>>,
195    producers: HashMap<KafkaTopic, Producer>,
196}
197
198impl KafkaClientBuilder {
199    /// Creates an empty KafkaClientBuilder.
200    pub fn new() -> Self {
201        Self::default()
202    }
203
204    /// Adds topic configuration to the current [`KafkaClientBuilder`], which in return assigns
205    /// dedicates producer to the topic which can will be used to send the messages.
206    ///
207    /// # Errors
208    /// Returns [`ClientError::InvalidConfig`] error if the provided configuration is wrong and
209    /// the producer could not be created.
210    pub fn add_kafka_topic_config(
211        mut self,
212        topic: KafkaTopic,
213        params: &KafkaParams,
214        validate_topic: bool,
215    ) -> Result<Self, ClientError> {
216        let mut client_config = ClientConfig::new();
217
218        let KafkaParams {
219            topic_name,
220            config_name,
221            params,
222        } = params;
223
224        let config_name = config_name.map(str::to_string);
225
226        if let Some(producer) = self.reused_producers.get(&config_name) {
227            let producer = Producer::new((*topic_name).to_string(), Arc::clone(producer));
228            if validate_topic {
229                producer.validate_topic()?;
230            }
231            self.producers.insert(topic, producer);
232            return Ok(self);
233        }
234
235        for config_p in *params {
236            client_config.set(config_p.name.as_str(), config_p.value.as_str());
237        }
238
239        let producer = Arc::new(
240            client_config
241                .create_with_context(Context)
242                .map_err(ClientError::InvalidConfig)?,
243        );
244
245        self.reused_producers
246            .insert(config_name, Arc::clone(&producer));
247
248        let producer = Producer::new((*topic_name).to_string(), producer);
249        if validate_topic {
250            producer.validate_topic()?;
251        }
252        self.producers.insert(topic, producer);
253
254        Ok(self)
255    }
256
257    /// Consumes self and returns the built [`KafkaClient`].
258    pub fn build(self) -> KafkaClient {
259        KafkaClient {
260            producers: self.producers,
261            #[cfg(feature = "schemas")]
262            schema_validator: schemas::Validator::default(),
263        }
264    }
265}
266
267impl fmt::Debug for KafkaClientBuilder {
268    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
269        f.debug_struct("KafkaClientBuilder")
270            .field("reused_producers", &"<CachedProducers>")
271            .field("producers", &self.producers)
272            .finish()
273    }
274}
275
276impl Producer {
277    /// Sends the payload to the correct producer for the current topic.
278    fn send(
279        &self,
280        key: &[u8; 16],
281        headers: Option<&BTreeMap<String, String>>,
282        variant: &str,
283        payload: &[u8],
284    ) -> Result<&str, ClientError> {
285        metric!(
286            histogram(KafkaHistograms::KafkaMessageSize) = payload.len() as u64,
287            variant = variant
288        );
289
290        let topic_name = self.topic_name.as_str();
291        let mut record = BaseRecord::to(topic_name).key(key).payload(payload);
292
293        // Make sure to set the headers if provided.
294        if let Some(headers) = headers {
295            let mut kafka_headers = OwnedHeaders::new();
296            for (key, value) in headers {
297                kafka_headers = kafka_headers.insert(Header {
298                    key,
299                    value: Some(value),
300                });
301            }
302            record = record.headers(kafka_headers);
303        }
304
305        self.metrics.debounce(|| {
306            metric!(
307                gauge(KafkaGauges::InFlightCount) = self.producer.in_flight_count() as u64,
308                variant = variant,
309                topic = topic_name
310            );
311        });
312
313        self.producer
314            .send(record)
315            .map(|_| topic_name)
316            .map_err(|(error, _message)| {
317                relay_log::error!(
318                    error = &error as &dyn std::error::Error,
319                    tags.variant = variant,
320                    tags.topic = topic_name,
321                    "error sending kafka message",
322                );
323                metric!(
324                    counter(KafkaCounters::ProducerEnqueueError) += 1,
325                    variant = variant,
326                    topic = topic_name
327                );
328                ClientError::SendFailed(error)
329            })
330    }
331}
332
333struct Debounced {
334    /// Time of last activation in seconds.
335    last_activation: AtomicU64,
336    /// Debounce interval in seconds.
337    interval: u64,
338    /// Relative instant used for measurements.
339    instant: Instant,
340}
341
342impl Debounced {
343    pub fn new(interval: u64) -> Self {
344        Self {
345            last_activation: AtomicU64::new(0),
346            interval,
347            instant: Instant::now(),
348        }
349    }
350
351    fn debounce(&self, f: impl FnOnce()) -> bool {
352        // Add interval to make sure it always triggers immediately.
353        let now = self.instant.elapsed().as_secs() + self.interval;
354
355        let prev = self.last_activation.load(Ordering::Relaxed);
356        if now.saturating_sub(prev) < self.interval {
357            return false;
358        }
359
360        if self
361            .last_activation
362            .compare_exchange(prev, now, Ordering::SeqCst, Ordering::Acquire)
363            .is_ok()
364        {
365            f();
366            return true;
367        }
368
369        false
370    }
371}
372
373#[cfg(test)]
374mod tests {
375    use std::thread;
376
377    use super::*;
378
379    #[test]
380    fn test_debounce() {
381        let d = Debounced::new(1);
382
383        assert!(d.debounce(|| {}));
384        for _ in 0..10 {
385            assert!(!d.debounce(|| {}));
386        }
387
388        thread::sleep(Duration::from_secs(1));
389
390        assert!(d.debounce(|| {}));
391        for _ in 0..10 {
392            assert!(!d.debounce(|| {}));
393        }
394    }
395}