Metrics

Arroyo consumers and strategies attempt to auto instrument some metrics that most people find useful to understand the behavior and performance of their consumers. These metrics are typically sampled or buffered as appropriate and flushed periodically (often once per second).

In order to use these metrics, you must configure a metrics backend that conforms to the metrics protocol before creating your consumer.

This can be done like so:

from arroyo.utils.metrics import Metrics, MetricName

 class MyMetrics(Metrics):
     def increment(
         self, name: MetricName, value: Union[int, float] = 1, tags: Optional[Tags] = None
     ) -> None:
         # Increment a counter by the given value.
         record_incr(name, value, tags)

     def gauge(
         self, name: MetricName, value: Union[int, float], tags: Optional[Tags] = None
     ) -> None:
         # Sets a gauge metric to the given value.
         record_gauge(name, value, tags)

     def timing(
         self, name: MetricName, value: Union[int, float], tags: Optional[Tags] = None
     ) -> None:
         # Emit a timing metric with the given value.
         record_timing(name, value, tags)

 metrics_backend = MyMetrics()

 configure_metrics(metrics_backend)

Available Metrics

from typing import Literal

MetricName = Literal[
    # Time: Number of messages in a multiprocessing batch
    "arroyo.strategies.run_task_with_multiprocessing.batch.size.msg",
    # Time: Number of bytes in a multiprocessing batch
    "arroyo.strategies.run_task_with_multiprocessing.batch.size.bytes",
    # Time: Number of messages in a multiprocessing batch after the message transformation
    "arroyo.strategies.run_task_with_multiprocessing.output_batch.size.msg",
    # Time: Number of bytes in a multiprocessing batch after the message transformation
    "arroyo.strategies.run_task_with_multiprocessing.output_batch.size.bytes",
    # Counter: Number of times the consumer is spinning
    "arroyo.consumer.run.count",
    # Counter: Number of times the consumer encountered an invalid message.
    "arroyo.consumer.invalid_message.count",
    # Time: How long it took the Reduce step to fill up a batch
    "arroyo.strategies.reduce.batch_time",
    # Counter: Incremented when a strategy after multiprocessing applies
    # backpressure to multiprocessing. May be a reason why CPU cannot be
    # saturated.
    "arroyo.strategies.run_task_with_multiprocessing.batch.backpressure",
    # Counter: Incremented when multiprocessing cannot fill the input batch
    # because not enough memory was allocated. This results in batches smaller
    # than configured. Increase `input_block_size` to fix.
    "arroyo.strategies.run_task_with_multiprocessing.batch.input.overflow",
    # Counter: Incremented when multiprocessing cannot pull results in batches
    # equal to the input batch size, because not enough memory was allocated.
    # This can be devastating for throughput. Increase `output_block_size` to
    # fix.
    "arroyo.strategies.run_task_with_multiprocessing.batch.output.overflow",
    # Counter: Arroyo has decided to re-allocate a block in order to combat input
    # buffer overflow. This behavior can be disabled by explicitly setting
    # `input_block_size` to a not-None value in `RunTaskWithMultiprocessing`.
    "arroyo.strategies.run_task_with_multiprocessing.batch.input.resize",
    # Counter: Arroyo has decided to re-allocate a block in order to combat output
    # buffer overflow. This behavior can be disabled by explicitly setting
    # `output_block_size` to a not-None value in `RunTaskWithMultiprocessing`.
    "arroyo.strategies.run_task_with_multiprocessing.batch.output.resize",
    # Gauge: How many batches are being processed in parallel by multiprocessing.
    "arroyo.strategies.run_task_with_multiprocessing.batches_in_progress",
    # Counter: A subprocess by multiprocessing unexpectedly died.
    "sigchld.detected",
    # Gauge: Shows how many processes the multiprocessing strategy is
    # configured with.
    "arroyo.strategies.run_task_with_multiprocessing.processes",
    # Counter: Incremented when the multiprocessing pool is created (or re-created).
    "arroyo.strategies.run_task_with_multiprocessing.pool.create",
    # Time: (unitless) spent polling librdkafka for new messages.
    "arroyo.consumer.poll.time",
    # Time: (unitless) spent in strategies (blocking in strategy.submit or
    # strategy.poll)
    "arroyo.consumer.processing.time",
    # Time: (unitless) spent pausing the consumer due to backpressure (MessageRejected)
    "arroyo.consumer.backpressure.time",
    # Time: (unitless) spent in handling `InvalidMessage` exceptions and sending
    # messages to the the DLQ.
    "arroyo.consumer.dlq.time",
    # Time: (unitless) spent in waiting for the strategy to exit, such as during
    # shutdown or rebalancing.
    "arroyo.consumer.join.time",
    # Time: (unitless) spent in librdkafka callbacks. This metric's timings
    # overlap other timings, and might spike at the same time.
    "arroyo.consumer.callback.time",
    # Time: (unitless) spent in shutting down the consumer. This metric's
    # timings overlap other timings, and might spike at the same time.
    "arroyo.consumer.shutdown.time",
    # Time: A regular duration metric where each datapoint is measuring the time it
    # took to execute a single callback. This metric is distinct from the
    # arroyo.consumer.*.time metrics as it does not attempt to accumulate time
    # spent per second in an attempt to keep monitoring overhead low.
    #
    # The metric is tagged by the name of the internal callback function being
    # executed, as 'callback_name'. Possible values are on_partitions_assigned
    # and on_partitions_revoked.
    "arroyo.consumer.run.callback",
    # Time: Duration metric measuring the time it took to flush in-flight messages
    # and shut down the strategies.
    "arroyo.consumer.run.close_strategy",
    # Time: Duration metric measuring the time it took to create the processing strategy.
    "arroyo.consumer.run.create_strategy",
    # Counter: How many partitions have been revoked just now.
    "arroyo.consumer.partitions_revoked.count",
    # Counter: How many partitions have been assigned just now.
    "arroyo.consumer.partitions_assigned.count",
    # Time: Consumer latency in seconds. Recorded by the commit offsets strategy.
    "arroyo.consumer.latency",
    # Counter: Metric for when the underlying rdkafka consumer is being paused.
    #
    # This flushes internal prefetch buffers.
    "arroyo.consumer.pause",
    # Counter: Metric for when the underlying rdkafka consumer is being resumed.
    #
    # This might cause increased network usage as messages are being re-fetched.
    "arroyo.consumer.resume",
    # Gauge: Queue size of background queue that librdkafka uses to prefetch messages.
    "arroyo.consumer.librdkafka.total_queue_size",
    # Counter: Counter metric to measure how often the healthcheck file has been touched.
    "arroyo.processing.strategies.healthcheck.touch",
    # Counter: Number of messages dropped in the FilterStep strategy
    "arroyo.strategies.filter.dropped_messages",
]

For convenience Arroyo includes a machine readable version which can be loaded like:

import importlib.resources
import json

with importlib.resources.files("arroyo.utils").joinpath("metricDefs.json").open() as f:
    metric_defs = json.load(f)

API

class arroyo.utils.metrics.Metrics(*args, **kwargs)

An abstract class that defines the interface for metrics backends.

abstract gauge(name: MetricName, value: int | float, tags: Mapping[str, str] | None = None) None

Sets a gauge metric to the given value.

abstract increment(name: MetricName, value: int | float = 1, tags: Mapping[str, str] | None = None) None

Increments a counter metric by a given value.

abstract timing(name: MetricName, value: int | float, tags: Mapping[str, str] | None = None) None

Records a timing metric.

arroyo.utils.metrics.configure_metrics(metrics: Metrics, force: bool = False) None

Metrics can generally only be configured once, unless force is passed on subsequent initializations.