relay_kafka/producer/
mod.rs

1//! This module contains the kafka producer related code.
2
3use std::borrow::Cow;
4use std::collections::{BTreeMap, HashMap};
5use std::fmt;
6use std::sync::Arc;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::time::{Duration, Instant};
9
10use rdkafka::ClientConfig;
11use rdkafka::message::{Header, OwnedHeaders};
12use rdkafka::producer::{BaseRecord, Producer as _};
13use relay_statsd::metric;
14use thiserror::Error;
15
16use crate::config::{KafkaParams, KafkaTopic};
17use crate::statsd::{KafkaCounters, KafkaGauges, KafkaHistograms};
18
19mod utils;
20use utils::{Context, ThreadedProducer};
21
22#[cfg(feature = "schemas")]
23mod schemas;
24
25const REPORT_FREQUENCY_SECS: u64 = 1;
26const KAFKA_FETCH_METADATA_TIMEOUT: Duration = Duration::from_secs(30);
27
28/// Kafka producer errors.
29#[derive(Error, Debug)]
30pub enum ClientError {
31    /// Failed to send a kafka message.
32    #[error("failed to send kafka message")]
33    SendFailed(#[source] rdkafka::error::KafkaError),
34
35    /// Failed to find configured producer for the requested kafka topic.
36    #[error("failed to find producer for the requested kafka topic")]
37    InvalidTopicName,
38
39    /// Failed to create a kafka producer because of the invalid configuration.
40    #[error("failed to create kafka producer: invalid kafka config")]
41    InvalidConfig(#[source] rdkafka::error::KafkaError),
42
43    /// Failed to serialize the message.
44    #[error("failed to serialize kafka message")]
45    InvalidMsgPack(#[source] rmp_serde::encode::Error),
46
47    /// Failed to serialize the json message using serde.
48    #[error("failed to serialize json message")]
49    InvalidJson(#[source] serde_json::Error),
50
51    /// Failed to run schema validation on message.
52    #[cfg(feature = "schemas")]
53    #[error("failed to run schema validation on message")]
54    SchemaValidationFailed(#[source] schemas::SchemaError),
55
56    /// Configuration is wrong and it cannot be used to identify the number of a shard.
57    #[error("invalid kafka shard")]
58    InvalidShard,
59
60    /// Failed to fetch the metadata of Kafka.
61    #[error("failed to fetch the metadata of Kafka")]
62    MetadataFetchError(rdkafka::error::KafkaError),
63
64    /// Failed to validate the topic.
65    #[error("failed to validate the topic with name {0}: {1:?}")]
66    TopicError(String, rdkafka_sys::rd_kafka_resp_err_t),
67
68    /// Failed to encode the protobuf into the buffer
69    /// because the buffer is too small.
70    #[error("failed to encode protobuf because the buffer is too small")]
71    ProtobufEncodingFailed,
72}
73
74/// Describes the type which can be sent using kafka producer provided by this crate.
75pub trait Message {
76    /// Returns the partitioning key for this kafka message determining.
77    fn key(&self) -> [u8; 16];
78
79    /// Returns the type of the message.
80    fn variant(&self) -> &'static str;
81
82    /// Return the list of headers to be provided when payload is sent to Kafka.
83    fn headers(&self) -> Option<&BTreeMap<String, String>>;
84
85    /// Serializes the message into its binary format.
86    ///
87    /// # Errors
88    /// Returns the [`ClientError::InvalidMsgPack`] or [`ClientError::InvalidJson`] if the
89    /// serialization failed.
90    fn serialize(&self) -> Result<Cow<'_, [u8]>, ClientError>;
91}
92
93/// Single kafka producer config with assigned topic.
94struct Producer {
95    /// Kafka topic name.
96    topic_name: String,
97    /// Real kafka producer.
98    producer: Arc<ThreadedProducer>,
99    /// Debouncer for metrics.
100    metrics: Debounced,
101}
102
103impl Producer {
104    fn new(topic_name: String, producer: Arc<ThreadedProducer>) -> Self {
105        Self {
106            topic_name,
107            producer,
108            metrics: Debounced::new(REPORT_FREQUENCY_SECS),
109        }
110    }
111
112    /// Validates the topic by fetching the metadata of the topic directly from Kafka.
113    fn validate_topic(&self) -> Result<(), ClientError> {
114        let client = self.producer.client();
115        let metadata = client
116            .fetch_metadata(Some(&self.topic_name), KAFKA_FETCH_METADATA_TIMEOUT)
117            .map_err(ClientError::MetadataFetchError)?;
118
119        for topic in metadata.topics() {
120            if let Some(error) = topic.error() {
121                return Err(ClientError::TopicError(topic.name().to_string(), error));
122            }
123        }
124
125        Ok(())
126    }
127}
128
129impl fmt::Debug for Producer {
130    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
131        f.debug_struct("Producer")
132            .field("topic_name", &self.topic_name)
133            .field("producer", &"<ThreadedProducer>")
134            .finish_non_exhaustive()
135    }
136}
137
138/// Keeps all the configured kafka producers and responsible for the routing of the messages.
139#[derive(Debug)]
140pub struct KafkaClient {
141    producers: HashMap<KafkaTopic, Producer>,
142    #[cfg(feature = "schemas")]
143    schema_validator: schemas::Validator,
144}
145
146impl KafkaClient {
147    /// Returns the [`KafkaClientBuilder`]
148    pub fn builder() -> KafkaClientBuilder {
149        KafkaClientBuilder::default()
150    }
151
152    /// Sends message to the provided kafka topic.
153    ///
154    /// Returns the name of the kafka topic to which the message was produced.
155    pub fn send_message(
156        &self,
157        topic: KafkaTopic,
158        message: &impl Message,
159    ) -> Result<&str, ClientError> {
160        let serialized = message.serialize()?;
161        #[cfg(feature = "schemas")]
162        self.schema_validator
163            .validate_message_schema(topic, &serialized)
164            .map_err(ClientError::SchemaValidationFailed)?;
165        let key = message.key();
166        self.send(
167            topic,
168            &key,
169            message.headers(),
170            message.variant(),
171            &serialized,
172        )
173    }
174
175    /// Sends the payload to the correct producer for the current topic.
176    ///
177    /// Returns the name of the kafka topic to which the message was produced.
178    pub fn send(
179        &self,
180        topic: KafkaTopic,
181        key: &[u8; 16],
182        headers: Option<&BTreeMap<String, String>>,
183        variant: &str,
184        payload: &[u8],
185    ) -> Result<&str, ClientError> {
186        let producer = self.producers.get(&topic).ok_or_else(|| {
187            relay_log::error!(
188                "attempted to send message to {topic:?} using an unconfigured kafka producer",
189            );
190            ClientError::InvalidTopicName
191        })?;
192        producer.send(key, headers, variant, payload)
193    }
194}
195
196/// Helper structure responsible for building the actual [`KafkaClient`].
197#[derive(Default)]
198pub struct KafkaClientBuilder {
199    reused_producers: BTreeMap<Option<String>, Arc<ThreadedProducer>>,
200    producers: HashMap<KafkaTopic, Producer>,
201}
202
203impl KafkaClientBuilder {
204    /// Creates an empty KafkaClientBuilder.
205    pub fn new() -> Self {
206        Self::default()
207    }
208
209    /// Adds topic configuration to the current [`KafkaClientBuilder`], which in return assigns
210    /// dedicates producer to the topic which can will be used to send the messages.
211    ///
212    /// # Errors
213    /// Returns [`ClientError::InvalidConfig`] error if the provided configuration is wrong and
214    /// the producer could not be created.
215    pub fn add_kafka_topic_config(
216        mut self,
217        topic: KafkaTopic,
218        params: &KafkaParams,
219        validate_topic: bool,
220    ) -> Result<Self, ClientError> {
221        let mut client_config = ClientConfig::new();
222
223        let KafkaParams {
224            topic_name,
225            config_name,
226            params,
227        } = params;
228
229        let config_name = config_name.map(str::to_string);
230
231        if let Some(producer) = self.reused_producers.get(&config_name) {
232            let producer = Producer::new((*topic_name).to_string(), Arc::clone(producer));
233            if validate_topic {
234                producer.validate_topic()?;
235            }
236            self.producers.insert(topic, producer);
237            return Ok(self);
238        }
239
240        for config_p in *params {
241            client_config.set(config_p.name.as_str(), config_p.value.as_str());
242        }
243
244        let producer = Arc::new(
245            client_config
246                .create_with_context(Context)
247                .map_err(ClientError::InvalidConfig)?,
248        );
249
250        self.reused_producers
251            .insert(config_name, Arc::clone(&producer));
252
253        let producer = Producer::new((*topic_name).to_string(), producer);
254        if validate_topic {
255            producer.validate_topic()?;
256        }
257        self.producers.insert(topic, producer);
258
259        Ok(self)
260    }
261
262    /// Consumes self and returns the built [`KafkaClient`].
263    pub fn build(self) -> KafkaClient {
264        KafkaClient {
265            producers: self.producers,
266            #[cfg(feature = "schemas")]
267            schema_validator: schemas::Validator::default(),
268        }
269    }
270}
271
272impl fmt::Debug for KafkaClientBuilder {
273    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
274        f.debug_struct("KafkaClientBuilder")
275            .field("reused_producers", &"<CachedProducers>")
276            .field("producers", &self.producers)
277            .finish()
278    }
279}
280
281impl Producer {
282    /// Sends the payload to the correct producer for the current topic.
283    fn send(
284        &self,
285        key: &[u8; 16],
286        headers: Option<&BTreeMap<String, String>>,
287        variant: &str,
288        payload: &[u8],
289    ) -> Result<&str, ClientError> {
290        metric!(
291            histogram(KafkaHistograms::KafkaMessageSize) = payload.len() as u64,
292            variant = variant
293        );
294
295        let topic_name = self.topic_name.as_str();
296        let mut record = BaseRecord::to(topic_name).key(key).payload(payload);
297
298        // Make sure to set the headers if provided.
299        if let Some(headers) = headers {
300            let mut kafka_headers = OwnedHeaders::new();
301            for (key, value) in headers {
302                kafka_headers = kafka_headers.insert(Header {
303                    key,
304                    value: Some(value),
305                });
306            }
307            record = record.headers(kafka_headers);
308        }
309
310        self.metrics.debounce(|| {
311            metric!(
312                gauge(KafkaGauges::InFlightCount) = self.producer.in_flight_count() as u64,
313                variant = variant,
314                topic = topic_name
315            );
316        });
317
318        self.producer
319            .send(record)
320            .map(|_| topic_name)
321            .map_err(|(error, _message)| {
322                relay_log::error!(
323                    error = &error as &dyn std::error::Error,
324                    tags.variant = variant,
325                    tags.topic = topic_name,
326                    "error sending kafka message",
327                );
328                metric!(
329                    counter(KafkaCounters::ProducerEnqueueError) += 1,
330                    variant = variant,
331                    topic = topic_name
332                );
333                ClientError::SendFailed(error)
334            })
335    }
336}
337
338struct Debounced {
339    /// Time of last activation in seconds.
340    last_activation: AtomicU64,
341    /// Debounce interval in seconds.
342    interval: u64,
343    /// Relative instant used for measurements.
344    instant: Instant,
345}
346
347impl Debounced {
348    pub fn new(interval: u64) -> Self {
349        Self {
350            last_activation: AtomicU64::new(0),
351            interval,
352            instant: Instant::now(),
353        }
354    }
355
356    fn debounce(&self, f: impl FnOnce()) -> bool {
357        // Add interval to make sure it always triggers immediately.
358        let now = self.instant.elapsed().as_secs() + self.interval;
359
360        let prev = self.last_activation.load(Ordering::Relaxed);
361        if now.saturating_sub(prev) < self.interval {
362            return false;
363        }
364
365        if self
366            .last_activation
367            .compare_exchange(prev, now, Ordering::SeqCst, Ordering::Acquire)
368            .is_ok()
369        {
370            f();
371            return true;
372        }
373
374        false
375    }
376}
377
378#[cfg(test)]
379mod tests {
380    use std::thread;
381
382    use super::*;
383
384    #[test]
385    fn test_debounce() {
386        let d = Debounced::new(1);
387
388        assert!(d.debounce(|| {}));
389        for _ in 0..10 {
390            assert!(!d.debounce(|| {}));
391        }
392
393        thread::sleep(Duration::from_secs(1));
394
395        assert!(d.debounce(|| {}));
396        for _ in 0..10 {
397            assert!(!d.debounce(|| {}));
398        }
399    }
400}