relay_kafka/
statsd.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
use relay_statsd::{CounterMetric, GaugeMetric, HistogramMetric};

pub enum KafkaCounters {
    /// Number of producer errors occurred after an envelope was already enqueued for sending to
    /// Kafka.
    ///
    /// These errors include, for example, _"MessageTooLarge"_ errors when the broker does not
    /// accept the requests over a certain size, which is usually due to invalid or inconsistent
    /// broker/producer configurations.
    ///
    /// This metric is tagged with:
    ///  - `topic`: The Kafka topic being produced to.
    ProcessingProduceError,

    /// Number of messages that failed to be enqueued in the Kafka producer's memory buffer.
    ///
    /// These errors include, for example, _"UnknownTopic"_ errors when attempting to send a
    /// message a topic that does not exist.
    ///
    /// This metric is tagged with:
    /// - `topic`: The Kafka topic being produced to.
    /// - `variant`: The Kafka message variant.
    ProducerEnqueueError,
}

impl CounterMetric for KafkaCounters {
    fn name(&self) -> &'static str {
        match self {
            Self::ProcessingProduceError => "processing.produce.error",
            Self::ProducerEnqueueError => "producer.enqueue.error",
        }
    }
}

pub enum KafkaHistograms {
    /// Size of emitted kafka message in bytes, tagged by message type.
    KafkaMessageSize,
}

impl HistogramMetric for KafkaHistograms {
    fn name(&self) -> &'static str {
        match self {
            Self::KafkaMessageSize => "kafka.message_size",
        }
    }
}
/// Gauge metrics for the Kafka producer.
///
/// Most of these metrics are taken from the [`rdkafka::statistics`] module.
pub enum KafkaGauges {
    /// The number of messages waiting to be sent to, or acknowledged by, the broker.
    ///
    /// See <https://docs.confluent.io/platform/7.5/clients/librdkafka/html/rdkafka_8h.html#ad4b3b7659cf9a79d3353810d6b625bb7>.
    ///
    /// This metric is tagged with:
    /// - `topic`: The Kafka topic being produced to.
    /// - `variant`: The Kafka message variant.
    InFlightCount,

    /// The current number of messages in producer queues.
    MessageCount,

    /// The maximum number of messages allowed in the producer queues.
    MessageCountMax,

    /// The current total size of messages in producer queues.
    MessageSize,

    /// The maximum total size of messages allowed in the producer queues.
    MessageSizeMax,

    /// The number of requests awaiting transmission to the broker.
    ///
    /// This metric is tagged with:
    /// - `broker_name`: The broker hostname, port, and ID, in the form HOSTNAME:PORT/ID.
    OutboundBufferRequests,

    /// The number of messages awaiting transmission to the broker.
    ///
    /// This metric is tagged with:
    /// - `broker_name`: The broker hostname, port, and ID, in the form HOSTNAME:PORT/ID.
    OutboundBufferMessages,

    /// The number of connection attempts, including successful and failed attempts, and name resolution failures.
    ///
    /// This metric is tagged with:
    /// - `broker_name`: The broker hostname, port, and ID, in the form HOSTNAME:PORT/ID.
    Connects,

    /// The number of disconnections, whether triggered by the broker, the network, the load balancer, or something else.
    ///
    /// This metric is tagged with:
    /// - `broker_name`: The broker hostname, port, and ID, in the form HOSTNAME:PORT/ID.
    Disconnects,

    /// Maximum internal producer queue latency, in microseconds.
    ///
    /// This metric is tagged with:
    /// - `broker_name`: The broker hostname, port, and ID, in the form HOSTNAME:PORT/ID.
    ProducerQueueLatency,

    /// Maximum internal request queue latency, in microseconds.
    ///
    /// This metric is tagged with:
    /// - `broker_name`: The broker hostname, port, and ID, in the form HOSTNAME:PORT/ID.
    RequestQueueLatency,
}

impl GaugeMetric for KafkaGauges {
    fn name(&self) -> &'static str {
        match self {
            KafkaGauges::InFlightCount => "kafka.in_flight_count",
            KafkaGauges::MessageCount => "kafka.stats.message_count",
            KafkaGauges::MessageCountMax => "kafka.stats.message_count_max",
            KafkaGauges::MessageSize => "kafka.stats.message_size",
            KafkaGauges::MessageSizeMax => "kafka.stats.message_size_max",
            KafkaGauges::OutboundBufferRequests => "kafka.stats.broker.outbuf.requests",
            KafkaGauges::OutboundBufferMessages => "kafka.stats.broker.outbuf.messages",
            KafkaGauges::Connects => "kafka.stats.broker.connects",
            KafkaGauges::Disconnects => "kafka.stats.broker.disconnects",
            KafkaGauges::ProducerQueueLatency => "kafka.stats.broker.int_latency",
            KafkaGauges::RequestQueueLatency => "kafka.stats.broker.outbuf_latency",
        }
    }
}