Processing Strategies

The processing strategies are the components to be wired together to build a consumer.

Strategy interface

We normally don’t recommend writing your own strategy, and encourage you to use built-in ones such as “reduce” or “run task” to plug in your application logic. Nevertheless, all arroyo strategies are written against the following interface:

exception arroyo.processing.strategies.abstract.MessageRejected

Bases: Exception

MessageRejected should be raised in a processing strategy’s submit method if it is unable to keep up with the rate of incoming messages. It tells the consumer to slow down and retry the message later.

class arroyo.processing.strategies.abstract.ProcessingStrategy(*args, **kwds)

Bases: ABC, Generic[TStrategyPayload]

A processing strategy defines how a stream processor processes messages during the course of a single assignment. The processor is instantiated when the assignment is received, and closed when the assignment is revoked.

This interface is intentionally not prescriptive, and affords a significant degree of flexibility for the various implementations.

abstract close() None

Close this instance. No more messages should be accepted by the instance after this method has been called.

This method should not block. Once this strategy instance has finished processing (or discarded) all messages that were submitted prior to this method being called, the strategy should commit its partition offsets and release any resources that will no longer be used (threads, processes, sockets, files, etc.)

abstract join(timeout: float | None = None) None

Block until the processing strategy has completed all previously submitted work, or the provided timeout has been reached. This method should be called after close to provide a graceful shutdown.

This method is called synchronously by the stream processor during assignment revocation, and blocks the assignment from being released until this function exits, allowing any work in progress to be completed and committed before the continuing the rebalancing process.

abstract poll() None

Poll the processor to check on the status of asynchronous tasks or perform other scheduled work.

This method is called on each consumer loop iteration, so this method should not be used to perform work that may block for a significant amount of time and block the progress of the consumer or exceed the consumer poll interval timeout.

This method may raise exceptions that were thrown by asynchronous tasks since the previous call to poll.

abstract submit(message: Message[TStrategyPayload]) None

Submit a message for processing.

Messages may be processed synchronously or asynchronously, depending on the implementation of the processing strategy. Callers of this method should not assume that this method returning successfully implies that the message was successfully processed.

If the processing strategy is unable to accept a message (due to it being at or over capacity, for example), this method will raise a MessageRejected exception.

abstract terminate() None

Close the processing strategy immediately, abandoning any work in progress. No more messages should be accepted by the instance after this method has been called.

class arroyo.processing.strategies.abstract.ProcessingStrategyFactory(*args, **kwds)

Bases: ABC, Generic[TStrategyPayload]

A ProcessingStrategyFactory is used to wrap a series of ProcessingStrategy steps, and calls create_with_partitions to instantiate the ProcessingStrategy on partition assignment or partition revocation if the strategy needs to be recreated.

abstract create_with_partitions(commit: Commit, partitions: Mapping[Partition, int]) ProcessingStrategy[TStrategyPayload]

Instantiate and return a ProcessingStrategy instance.

Parameters:
  • commit – A function that accepts a mapping of Partition instances to offset values that should be committed.

  • partitions – A mapping of a Partition to it’s most recent offset.

shutdown() None

Custom code to execute when the StreamProcessor shuts down entirely.

Note that this code will also be executed on crashes of the strategy.

Messages

class arroyo.types.BaseValue(*args, **kwds)
property committable: Mapping[Partition, int]
property payload: TMessagePayload
replace(value: TReplaced) BaseValue[TReplaced]
property timestamp: datetime | None
class arroyo.types.BrokerValue(payload: TMessagePayload, partition: Partition, offset: int, timestamp: datetime)

A payload received from the consumer or producer after it is done producing. Partition, offset, and timestamp values are present.

property committable: Mapping[Partition, int]
property next_offset: int
offset: int
partition: Partition
property payload: TMessagePayload
replace(value: TReplaced) BaseValue[TReplaced]
timestamp: datetime
class arroyo.types.Commit(*args, **kwargs)
class arroyo.types.FilteredPayload
class arroyo.types.Message(value: BaseValue[TMessagePayload])

Contains a payload and partitions to be committed after processing. Can either represent a single message from a Kafka broker (BrokerValue) or something else, such as a number of messages grouped together for a batch processing step (Payload).

property committable: Mapping[Partition, int]
property payload: TMessagePayload
property payload_unfiltered: TMessagePayload
replace(payload: TReplaced) Message[TReplaced]
property timestamp: datetime | None
value: BaseValue[TMessagePayload]
class arroyo.types.Partition(topic: 'Topic', index: 'int')
index: int
topic: Topic
class arroyo.types.Topic(name: 'str')
name: str
class arroyo.types.Value(payload: TMessagePayload, committable: Mapping[Partition, int], timestamp: datetime | None = None)

Any other payload that may not map 1:1 to a single message from a consumer. May represent a batch spanning many partitions.

property committable: Mapping[Partition, int]
property payload: TMessagePayload
replace(value: TReplaced) BaseValue[TReplaced]
property timestamp: datetime | None