Dead letter queues

Warning

Dead letter queues should be used with caution as they break some of the ordering guarantees otherwise offered by Arroyo and Kafka consumer code. In particular, it must be safe for the consumer to drop a message. If replaying or later re-processing of the DLQ’ed messages is done, it is critical that ordering is not a requirement in the relevant downstream code.

Arroyo provides support for routing invalid messages to dead letter queues in consumers. Dead letter queues are critical in some applications because messages are ordered in Kafka and a single invalid message can cause a consumer to crash and every subsequent message to not be processed.

The dead letter queue configuration is passed to the StreamProcessor and, if provided, any InvalidMessage raise by a strategy will be produced to the dead letter queue.

class arroyo.dlq.DlqLimit(max_invalid_ratio: float | None = None, max_consecutive_count: int | None = None)

Defines any limits that should be placed on the number of messages that are forwarded to the DLQ. This exists to prevent 100% of messages from going into the DLQ if something is misconfigured or bad code is deployed. In this scenario, it may be preferable to stop processing messages altogether and deploy a fix rather than rerouting every message to the DLQ.

The ratio and max_consecutive_count are counted on a per-partition basis.

class arroyo.dlq.DlqPolicy(producer: DlqProducer[TStrategyPayload], limit: DlqLimit | None = None, max_buffered_messages_per_partition: int | None = None)

DLQ policy defines the DLQ configuration, and is passed to the stream processor upon creation of the consumer. It consists of the DLQ producer implementation and any limits that should be applied.

exception arroyo.dlq.InvalidMessage(partition: Partition, offset: int, needs_commit: bool = True)

InvalidMessage should be raised if a message is not valid for processing and should not be retried. It will be placed a DLQ if one is configured.

It can be raised from the submit, poll or join methods of any processing strategy.

Once a filtered message is forwarded to the next step, needs_commit should be set to False, in order to prevent multiple filtered messages from being forwarded for a single invalid message.

class arroyo.dlq.KafkaDlqProducer(producer: Producer[KafkaPayload], topic: Topic)

KafkaDLQProducer forwards invalid messages to a Kafka topic

Two additional fields are added to the headers of the Kafka message “original_partition”: The partition of the original message “original_offset”: The offset of the original message

class arroyo.dlq.NoopDlqProducer(*args, **kwds)

Drops all invalid messages