Reduce (Fold)

Accumulate messages based on a custom accumulator function

class arroyo.processing.strategies.reduce.Reduce(max_batch_size: int, max_batch_time: float, accumulator: Callable[[TResult, BaseValue[TPayload]], TResult], initial_value: Callable[[], TResult], next_step: ProcessingStrategy[TResult], compute_batch_size: Callable[[BaseValue[TPayload]], int] | None = None)

Accumulates messages until the max size or max time condition is hit. The accumulator function is run on each message in the order it is received.

Once the “batch” is full, the accumulated value is submitted to the next step.

This strategy propagates MessageRejected exceptions from the downstream steps if they are thrown.

Parameters:
  • max_batch_size – How many messages should be reduced into one at maximum.

  • max_batch_time – How much time (in seconds) should be spent reducing messages together before flushing the batch.