Run Task with Multiprocessing

class arroyo.processing.strategies.run_task_with_multiprocessing.RunTaskWithMultiprocessing(function: Callable[[Message[TStrategyPayload]], TResult], next_step: ProcessingStrategy[FilteredPayload | TResult], max_batch_size: int, max_batch_time: float, pool: MultiprocessingPool, input_block_size: int | None = None, output_block_size: int | None = None, max_input_block_size: int | None = None, max_output_block_size: int | None = None)

Run a function in parallel across messages using subprocesses.

RunTaskWithMultiprocessing uses the multiprocessing stdlib module to transform messages in parallel.

Parameters:
  • function – The function to use for transforming.

  • next_step – The processing strategy to forward transformed messages to.

  • max_batch_size – Wait at most for this many messages before “closing” a batch.

  • max_batch_time – Wait at most for this many seconds before closing a batch.

  • pool – The multiprocessing pool to use for parallel processing. The same pool instance can be re-used each time RunTaskWithMultiprocessing is created on rebalance.

  • input_block_size

    For each subprocess, a shared memory buffer of input_block_size is allocated. This value should be at least message_size * max_batch_size large, where message_size is the expected average message size.

    If the value is too small, the batch is implicitly broken up. In that case, the arroyo.strategies.run_task_with_multiprocessing.batch.input.overflow metric is emitted.

    If the value is set to None, the input_block_size is automatically adjusted to adapt to traffic. Keep in mind that this is a rather experimental feature and less productionized than explicitly setting a value.

  • output_block_size

    Size of the shared memory buffer used to store results. Like with input data, the batch is implicitly broken up on overflow, and arroyo.strategies.run_task_with_multiprocessing.batch.output.overflow metric is incremented.

    Like with input_block_size, the value can be set to None to enable automatic resizing.

  • max_input_block_size – If automatic resizing is enabled, this sets an upper limit on how large those blocks can get.

  • max_output_block_size – Same as max_input_block_size but for output blocks.

Number of processes

The metric arroyo.strategies.run_task_with_multiprocessing.batches_in_progress shows you how many processes arroyo is able to effectively use at any given point.

The metric arroyo.strategies.run_task_with_multiprocessing.processes shows how many processes arroyo was configured with.

If those two metrics don’t line up, your consumer is not bottlenecked on number of processes. That’s a good thing, you want to have some reserve capacity. But it means that increasing num_processes will not make your consumer faster.

Batching

Arroyo sends messages in batches to subprocesses. max_batch_size and max_batch_time should be tweaked for optimal performance. You can observe the effect in the following metrics:

  • arroyo.strategies.run_task_with_multiprocessing.batch.size.msg: The number of messages per batch.

  • arroyo.strategies.run_task_with_multiprocessing.batch.size.bytes: The number of bytes used per batch.

The cost of batches (locking, synchronization) generally amortizes with increased batch sizes. Too small batches, and this strategy will spend a lot of time synchronizing between processes. Too large batches, however, can cause your consumer to not use all processes effectively, as a lot of time may be spent waiting for batches to fill up.

If batch.size.msg is flat (as in, it’s a perfectly straight line at a constant), you are hitting max_batch_size. If batch.size.bytes is flat, you are hitting input buffer overflow (see next section). If neither are flat, you are hitting max_batch_time.

Input and output buffers

You want to keep an eye on these metrics:

  1. arroyo.strategies.run_task_with_multiprocessing.batch.input.overflow

  2. arroyo.strategies.run_task_with_multiprocessing.batch.output.overflow

  3. arroyo.strategies.run_task_with_multiprocessing.batch.backpressure

If batch.input.overflow is emitted at all, arroyo ran out of memory for batching and started breaking up your batches into smaller ones. You want to increase input_block_size in response. Note that when you do this, you may have to re-tweak max_batch_size and max_batch_time, as you were never hitting those configured limits before. Input overflow is not really all that expensive in Arroyo, but since it affects how batching works it can still make performance tuning of your consumer more confusing. Best to avoid it anyway.

If batch.output.overflow is emitted at all, arroyo ran out of memory when fetching the data from subprocesses, and so the response from subprocesses to the main processes is chunked. Output overflow is very expensive, and you want to avoid it. Increase output_block_size in response.

If batch.backpressure is continuously emitted, you are not bottlenecked on multiprocessing at all, but instead the next strategy can’t keep up and is applying backpressure. You can likely reduce num_processes and won’t notice a performance regression.

How to tune your consumer

Note that it doesn’t make sense to fix output overflow without fixing input overflow first. If you increase output block size to get rid of output overflows, then increase input block size, your effective batch size may increase to a point where you encounter output overflow again. If you encounter a lot of issues at once, best to fix them in this order:

  1. First, tune input_block_size to fix input overflow. This will increase average/effective batch size. Alternatively, set it to None (default) to let arroyo auto-tune it.

  2. Then, tune max_batch_size and max_batch_time so that you get the highest throughput. Test this by running your consumer on a backlog of messages and look at consumer offset rate (vs broker/total offset rate), or time it takes to get consumer lag back to normal. For as long as you have enough RAM, increment it in large steps (like 2x) and fine-tune afterwards.

  3. Then, tune output_block_size to fix output overflow. If in your previous tests there was a lot of output overflow, this will remove a lot of CPU load from your consumer and potentially also increase throughput.

  4. Now take a look at the batch.backpressure metric. If it is emitted, you need to optimize the next strategy (next_step) because that’s what you’re bottlenecked on. If it is not emitted, you may need to increase num_processes or increase batch size, so that RunTaskWithMultiprocessing itself is not the bottleneck.