Run Task with Multiprocessing¶
- class arroyo.processing.strategies.run_task_with_multiprocessing.RunTaskWithMultiprocessing(function: Callable[[Message[TStrategyPayload]], TResult], next_step: ProcessingStrategy[FilteredPayload | TResult], max_batch_size: int, max_batch_time: float, pool: MultiprocessingPool, input_block_size: int | None = None, output_block_size: int | None = None, max_input_block_size: int | None = None, max_output_block_size: int | None = None)¶
Run a function in parallel across messages using subprocesses.
RunTaskWithMultiprocessing
uses themultiprocessing
stdlib module to transform messages in parallel.- Parameters:
function – The function to use for transforming.
next_step – The processing strategy to forward transformed messages to.
max_batch_size – Wait at most for this many messages before “closing” a batch.
max_batch_time – Wait at most for this many seconds before closing a batch.
pool – The multiprocessing pool to use for parallel processing. The same pool instance can be re-used each time
RunTaskWithMultiprocessing
is created on rebalance.input_block_size –
For each subprocess, a shared memory buffer of
input_block_size
is allocated. This value should be at least message_size * max_batch_size large, where message_size is the expected average message size.If the value is too small, the batch is implicitly broken up. In that case, the
arroyo.strategies.run_task_with_multiprocessing.batch.input.overflow
metric is emitted.If the value is set to None, the input_block_size is automatically adjusted to adapt to traffic. Keep in mind that this is a rather experimental feature and less productionized than explicitly setting a value.
output_block_size –
Size of the shared memory buffer used to store results. Like with input data, the batch is implicitly broken up on overflow, and
arroyo.strategies.run_task_with_multiprocessing.batch.output.overflow
metric is incremented.Like with input_block_size, the value can be set to None to enable automatic resizing.
max_input_block_size – If automatic resizing is enabled, this sets an upper limit on how large those blocks can get.
max_output_block_size – Same as max_input_block_size but for output blocks.
Number of processes¶
The metric
arroyo.strategies.run_task_with_multiprocessing.batches_in_progress
shows you how many processes arroyo is able to effectively use at any given point.The metric
arroyo.strategies.run_task_with_multiprocessing.processes
shows how many processes arroyo was configured with.If those two metrics don’t line up, your consumer is not bottlenecked on number of processes. That’s a good thing, you want to have some reserve capacity. But it means that increasing
num_processes
will not make your consumer faster.Batching¶
Arroyo sends messages in batches to subprocesses.
max_batch_size
andmax_batch_time
should be tweaked for optimal performance. You can observe the effect in the following metrics:arroyo.strategies.run_task_with_multiprocessing.batch.size.msg
: The number of messages per batch.arroyo.strategies.run_task_with_multiprocessing.batch.size.bytes
: The number of bytes used per batch.
The cost of batches (locking, synchronization) generally amortizes with increased batch sizes. Too small batches, and this strategy will spend a lot of time synchronizing between processes. Too large batches, however, can cause your consumer to not use all processes effectively, as a lot of time may be spent waiting for batches to fill up.
If
batch.size.msg
is flat (as in, it’s a perfectly straight line at a constant), you are hittingmax_batch_size
. Ifbatch.size.bytes
is flat, you are hitting input buffer overflow (see next section). If neither are flat, you are hittingmax_batch_time
.Input and output buffers¶
You want to keep an eye on these metrics:
arroyo.strategies.run_task_with_multiprocessing.batch.input.overflow
arroyo.strategies.run_task_with_multiprocessing.batch.output.overflow
arroyo.strategies.run_task_with_multiprocessing.batch.backpressure
If
batch.input.overflow
is emitted at all, arroyo ran out of memory for batching and started breaking up your batches into smaller ones. You want to increaseinput_block_size
in response. Note that when you do this, you may have to re-tweakmax_batch_size
andmax_batch_time
, as you were never hitting those configured limits before. Input overflow is not really all that expensive in Arroyo, but since it affects how batching works it can still make performance tuning of your consumer more confusing. Best to avoid it anyway.If
batch.output.overflow
is emitted at all, arroyo ran out of memory when fetching the data from subprocesses, and so the response from subprocesses to the main processes is chunked. Output overflow is very expensive, and you want to avoid it. Increaseoutput_block_size
in response.If
batch.backpressure
is continuously emitted, you are not bottlenecked on multiprocessing at all, but instead the next strategy can’t keep up and is applying backpressure. You can likely reducenum_processes
and won’t notice a performance regression.How to tune your consumer¶
Note that it doesn’t make sense to fix output overflow without fixing input overflow first. If you increase output block size to get rid of output overflows, then increase input block size, your effective batch size may increase to a point where you encounter output overflow again. If you encounter a lot of issues at once, best to fix them in this order:
First, tune
input_block_size
to fix input overflow. This will increase average/effective batch size. Alternatively, set it to None (default) to let arroyo auto-tune it.Then, tune
max_batch_size
andmax_batch_time
so that you get the highest throughput. Test this by running your consumer on a backlog of messages and look at consumer offset rate (vs broker/total offset rate), or time it takes to get consumer lag back to normal. For as long as you have enough RAM, increment it in large steps (like 2x) and fine-tune afterwards.Then, tune
output_block_size
to fix output overflow. If in your previous tests there was a lot of output overflow, this will remove a lot of CPU load from your consumer and potentially also increase throughput.Now take a look at the
batch.backpressure
metric. If it is emitted, you need to optimize the next strategy (next_step
) because that’s what you’re bottlenecked on. If it is not emitted, you may need to increasenum_processes
or increase batch size, so that RunTaskWithMultiprocessing itself is not the bottleneck.