Produce

class arroyo.processing.strategies.produce.Produce(producer: Producer[TStrategyPayload], topic: Topic, next_step: ProcessingStrategy[FilteredPayload | TStrategyPayload], max_buffer_size: int = 10000)

This strategy can be used to produce Kafka messages to a destination topic. A typical use case could be to consume messages from one topic, apply some transformations and then output to another topic.

For each message received in the submit method, it attempts to produce a single Kafka message in a thread. If there are too many pending futures, we MessageRejected will be raised to notify stream processor to slow down.

On poll we check for completion of the produced messages. If the message has been successfully produced then the message is submitted to the next step. If an error occured the exception will be raised.

Important: The destination topic is always the topic passed into the constructor and not the topic being referenced in the message itself (which typically refers to the original topic from where the message was consumed from).