use std::borrow::Cow;
use std::collections::{BTreeMap, HashMap};
use std::fmt;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use rdkafka::message::{Header, OwnedHeaders};
use rdkafka::producer::{BaseRecord, Producer as _};
use rdkafka::ClientConfig;
use relay_statsd::metric;
use thiserror::Error;
use crate::config::{KafkaParams, KafkaTopic};
use crate::statsd::{KafkaCounters, KafkaGauges, KafkaHistograms};
mod utils;
use utils::{Context, ThreadedProducer};
#[cfg(feature = "schemas")]
mod schemas;
const REPORT_FREQUENCY_SECS: u64 = 1;
const KAFKA_FETCH_METADATA_TIMEOUT: Duration = Duration::from_secs(30);
#[derive(Error, Debug)]
pub enum ClientError {
#[error("failed to send kafka message")]
SendFailed(#[source] rdkafka::error::KafkaError),
#[error("failed to find producer for the requested kafka topic")]
InvalidTopicName,
#[error("failed to create kafka producer: invalid kafka config")]
InvalidConfig(#[source] rdkafka::error::KafkaError),
#[error("failed to serialize kafka message")]
InvalidMsgPack(#[source] rmp_serde::encode::Error),
#[error("failed to serialize json message")]
InvalidJson(#[source] serde_json::Error),
#[cfg(feature = "schemas")]
#[error("failed to run schema validation on message")]
SchemaValidationFailed(#[source] schemas::SchemaError),
#[error("invalid kafka shard")]
InvalidShard,
#[error("failed to fetch the metadata of Kafka")]
MetadataFetchError(rdkafka::error::KafkaError),
#[error("failed to validate the topic with name {0}: {1:?}")]
TopicError(String, rdkafka_sys::rd_kafka_resp_err_t),
}
pub trait Message {
fn key(&self) -> [u8; 16];
fn variant(&self) -> &'static str;
fn headers(&self) -> Option<&BTreeMap<String, String>>;
fn serialize(&self) -> Result<Cow<'_, [u8]>, ClientError>;
}
struct Producer {
topic_name: String,
producer: Arc<ThreadedProducer>,
metrics: Debounced,
}
impl Producer {
fn new(topic_name: String, producer: Arc<ThreadedProducer>) -> Self {
Self {
topic_name,
producer,
metrics: Debounced::new(REPORT_FREQUENCY_SECS),
}
}
fn validate_topic(&self) -> Result<(), ClientError> {
let client = self.producer.client();
let metadata = client
.fetch_metadata(Some(&self.topic_name), KAFKA_FETCH_METADATA_TIMEOUT)
.map_err(ClientError::MetadataFetchError)?;
for topic in metadata.topics() {
if let Some(error) = topic.error() {
return Err(ClientError::TopicError(topic.name().to_string(), error));
}
}
Ok(())
}
}
impl fmt::Debug for Producer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Producer")
.field("topic_name", &self.topic_name)
.field("producer", &"<ThreadedProducer>")
.finish_non_exhaustive()
}
}
#[derive(Debug)]
pub struct KafkaClient {
producers: HashMap<KafkaTopic, Producer>,
#[cfg(feature = "schemas")]
schema_validator: schemas::Validator,
}
impl KafkaClient {
pub fn builder() -> KafkaClientBuilder {
KafkaClientBuilder::default()
}
pub fn send_message(
&self,
topic: KafkaTopic,
message: &impl Message,
) -> Result<&str, ClientError> {
let serialized = message.serialize()?;
#[cfg(feature = "schemas")]
self.schema_validator
.validate_message_schema(topic, &serialized)
.map_err(ClientError::SchemaValidationFailed)?;
let key = message.key();
self.send(
topic,
&key,
message.headers(),
message.variant(),
&serialized,
)
}
pub fn send(
&self,
topic: KafkaTopic,
key: &[u8; 16],
headers: Option<&BTreeMap<String, String>>,
variant: &str,
payload: &[u8],
) -> Result<&str, ClientError> {
let producer = self.producers.get(&topic).ok_or_else(|| {
relay_log::error!(
"attempted to send message to {topic:?} using an unconfigured kafka producer",
);
ClientError::InvalidTopicName
})?;
producer.send(key, headers, variant, payload)
}
}
#[derive(Default)]
pub struct KafkaClientBuilder {
reused_producers: BTreeMap<Option<String>, Arc<ThreadedProducer>>,
producers: HashMap<KafkaTopic, Producer>,
}
impl KafkaClientBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn add_kafka_topic_config(
mut self,
topic: KafkaTopic,
params: &KafkaParams,
validate_topic: bool,
) -> Result<Self, ClientError> {
let mut client_config = ClientConfig::new();
let KafkaParams {
topic_name,
config_name,
params,
} = params;
let config_name = config_name.map(str::to_string);
if let Some(producer) = self.reused_producers.get(&config_name) {
let producer = Producer::new((*topic_name).to_string(), Arc::clone(producer));
if validate_topic {
producer.validate_topic()?;
}
self.producers.insert(topic, producer);
return Ok(self);
}
for config_p in *params {
client_config.set(config_p.name.as_str(), config_p.value.as_str());
}
let producer = Arc::new(
client_config
.create_with_context(Context)
.map_err(ClientError::InvalidConfig)?,
);
self.reused_producers
.insert(config_name, Arc::clone(&producer));
let producer = Producer::new((*topic_name).to_string(), producer);
if validate_topic {
producer.validate_topic()?;
}
self.producers.insert(topic, producer);
Ok(self)
}
pub fn build(self) -> KafkaClient {
KafkaClient {
producers: self.producers,
#[cfg(feature = "schemas")]
schema_validator: schemas::Validator::default(),
}
}
}
impl fmt::Debug for KafkaClientBuilder {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("KafkaClientBuilder")
.field("reused_producers", &"<CachedProducers>")
.field("producers", &self.producers)
.finish()
}
}
impl Producer {
fn send(
&self,
key: &[u8; 16],
headers: Option<&BTreeMap<String, String>>,
variant: &str,
payload: &[u8],
) -> Result<&str, ClientError> {
metric!(
histogram(KafkaHistograms::KafkaMessageSize) = payload.len() as u64,
variant = variant
);
let topic_name = self.topic_name.as_str();
let mut record = BaseRecord::to(topic_name).key(key).payload(payload);
if let Some(headers) = headers {
let mut kafka_headers = OwnedHeaders::new();
for (key, value) in headers {
kafka_headers = kafka_headers.insert(Header {
key,
value: Some(value),
});
}
record = record.headers(kafka_headers);
}
self.metrics.debounce(|| {
metric!(
gauge(KafkaGauges::InFlightCount) = self.producer.in_flight_count() as u64,
variant = variant,
topic = topic_name
);
});
self.producer
.send(record)
.map(|_| topic_name)
.map_err(|(error, _message)| {
relay_log::error!(
error = &error as &dyn std::error::Error,
tags.variant = variant,
tags.topic = topic_name,
"error sending kafka message",
);
metric!(
counter(KafkaCounters::ProducerEnqueueError) += 1,
variant = variant,
topic = topic_name
);
ClientError::SendFailed(error)
})
}
}
struct Debounced {
last_activation: AtomicU64,
interval: u64,
instant: Instant,
}
impl Debounced {
pub fn new(interval: u64) -> Self {
Self {
last_activation: AtomicU64::new(0),
interval,
instant: Instant::now(),
}
}
fn debounce(&self, f: impl FnOnce()) -> bool {
let now = self.instant.elapsed().as_secs() + self.interval;
let prev = self.last_activation.load(Ordering::Relaxed);
if now.saturating_sub(prev) < self.interval {
return false;
}
if self
.last_activation
.compare_exchange(prev, now, Ordering::SeqCst, Ordering::Acquire)
.is_ok()
{
f();
return true;
}
false
}
}
#[cfg(test)]
mod tests {
use std::thread;
use super::*;
#[test]
fn test_debounce() {
let d = Debounced::new(1);
assert!(d.debounce(|| {}));
for _ in 0..10 {
assert!(!d.debounce(|| {}));
}
thread::sleep(Duration::from_secs(1));
assert!(d.debounce(|| {}));
for _ in 0..10 {
assert!(!d.debounce(|| {}));
}
}
}