Backpressure¶
Arroyo’s own processing strategies internally apply backpressure by raising
MessageRejected
. Most
consumers do not require additional work to deal with backpressure correctly.
If you want to slow down the consumer based on some external signal or
condition, you can achieve that most effectively by raising the same exception
from within a callback passed to RunTask
while the
consumer is supposed to be paused
class ConsumerStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
def __init__(self):
self.is_paused = False
def create_with_partitions(
self,
commit: Commit,
partitions: Mapping[Partition, int],
) -> ProcessingStrategy[KafkaPayload]:
def handle_message(message: Message[KafkaPayload]) -> Message[KafkaPayload]:
if self.is_paused:
raise MessageRejected()
print(f"MSG: {message.payload}")
return message
return RunTask(handle_message, CommitOffsets(commit))
It is not recommended to apply backpressure by just sleep()
-ing in
submit
(or, in this example,
handle_message
) for more than a few milliseconds. While this definitely
pauses the consumer, it will block the main thread for too long and and prevent
things like consumer rebalancing from occuring.
A 0.01 second sleep is applied each time MessageRejected
is
raised to prevent the main thread spinning at 100% CPU. However background thread
performance may be impacted during this time.