Batch and Unbatch

Accumulate messages into a batch and pass to the next step. The batch and unbatch strategies are based on reduce and unfold. Use reduce/unfold instead if you want to provide custom accumulator/generator functions.

class arroyo.processing.strategies.batching.BatchStep(max_batch_size: int, max_batch_time: float, next_step: ProcessingStrategy[MutableSequence[BaseValue[TStrategyPayload]]])

Accumulates messages into a batch. When the batch is full, this strategy submits it to the next step.

A batch is represented as a ValuesBatch object which is a sequence of BaseValue. This includes both the messages and the high offset watermark.

A messages batch is closed and submitted when the maximum number of messages is received or when the max_batch_time has passed since the first message was received.

This step does not require in order processing. If messages are sent out of order, though, the highest observed offset per partition is still the committable one, whether or not all messages with lower offsets have been observed by this step.

This strategy propagates MessageRejected exceptions from the downstream steps if they are thrown.

Parameters:
  • max_batch_size – How many messages should be reduced into one at maximum.

  • max_batch_time – How much time (in seconds) should be spent reducing messages together before flushing the batch.

join(timeout: float | None = None) None

Terminates the strategy by joining the following step. This method tries to flush the current batch no matter whether the batch is ready or not.

submit(message: Message[FilteredPayload | TStrategyPayload]) None

Accumulates messages in the current batch. A new batch is created at the first message received.

This method tries to flush before adding the message to the current batch. This is so that, if we receive MessageRejected exception from the following step, we can propagate the exception without processing the new message. This allows the previous step to try again without introducing duplications.

class arroyo.processing.strategies.batching.UnbatchStep(next_step: ProcessingStrategy[FilteredPayload | TStrategyPayload])

This processing step receives batches and explodes them thus sending the content to the next step message by message.

A batch is represented as a ValuesBatch object.

If this step receives a MessageRejected exception from the next step it would keep the remaining messages and attempt to submit them at the following call to poll