Processing Strategies¶
The processing strategies are the components to be wired together to build a consumer.
Strategy interface¶
We normally don’t recommend writing your own strategy, and encourage you to use built-in ones such as “reduce” or “run task” to plug in your application logic. Nevertheless, all arroyo strategies are written against the following interface:
- exception arroyo.processing.strategies.abstract.MessageRejected¶
Bases:
Exception
MessageRejected should be raised in a processing strategy’s submit method if it is unable to keep up with the rate of incoming messages. It tells the consumer to slow down and retry the message later.
- class arroyo.processing.strategies.abstract.ProcessingStrategy(*args, **kwds)¶
Bases:
ABC
,Generic
[TStrategyPayload
]A processing strategy defines how a stream processor processes messages during the course of a single assignment. The processor is instantiated when the assignment is received, and closed when the assignment is revoked.
This interface is intentionally not prescriptive, and affords a significant degree of flexibility for the various implementations.
- abstract close() None ¶
Close this instance. No more messages should be accepted by the instance after this method has been called.
This method should not block. Once this strategy instance has finished processing (or discarded) all messages that were submitted prior to this method being called, the strategy should commit its partition offsets and release any resources that will no longer be used (threads, processes, sockets, files, etc.)
- abstract join(timeout: float | None = None) None ¶
Block until the processing strategy has completed all previously submitted work, or the provided timeout has been reached. This method should be called after
close
to provide a graceful shutdown.This method is called synchronously by the stream processor during assignment revocation, and blocks the assignment from being released until this function exits, allowing any work in progress to be completed and committed before the continuing the rebalancing process.
- abstract poll() None ¶
Poll the processor to check on the status of asynchronous tasks or perform other scheduled work.
This method is called on each consumer loop iteration, so this method should not be used to perform work that may block for a significant amount of time and block the progress of the consumer or exceed the consumer poll interval timeout.
This method may raise exceptions that were thrown by asynchronous tasks since the previous call to
poll
.
- abstract submit(message: Message[TStrategyPayload]) None ¶
Submit a message for processing.
Messages may be processed synchronously or asynchronously, depending on the implementation of the processing strategy. Callers of this method should not assume that this method returning successfully implies that the message was successfully processed.
If the processing strategy is unable to accept a message (due to it being at or over capacity, for example), this method will raise a
MessageRejected
exception.
- abstract terminate() None ¶
Close the processing strategy immediately, abandoning any work in progress. No more messages should be accepted by the instance after this method has been called.
- class arroyo.processing.strategies.abstract.ProcessingStrategyFactory(*args, **kwds)¶
Bases:
ABC
,Generic
[TStrategyPayload
]A ProcessingStrategyFactory is used to wrap a series of ProcessingStrategy steps, and calls create_with_partitions to instantiate the ProcessingStrategy on partition assignment or partition revocation if the strategy needs to be recreated.
- abstract create_with_partitions(commit: Commit, partitions: Mapping[Partition, int]) ProcessingStrategy[TStrategyPayload] ¶
Instantiate and return a
ProcessingStrategy
instance.- Parameters:
commit – A function that accepts a mapping of
Partition
instances to offset values that should be committed.partitions – A mapping of a
Partition
to it’s most recent offset.
- shutdown() None ¶
Custom code to execute when the
StreamProcessor
shuts down entirely.Note that this code will also be executed on crashes of the strategy.
Messages¶
- class arroyo.types.BaseValue(*args, **kwds)¶
-
- property payload: TMessagePayload¶
- property timestamp: datetime | None¶
- class arroyo.types.BrokerValue(payload: TMessagePayload, partition: Partition, offset: int, timestamp: datetime)¶
A payload received from the consumer or producer after it is done producing. Partition, offset, and timestamp values are present.
- property next_offset: int¶
- offset: int¶
- property payload: TMessagePayload¶
- timestamp: datetime¶
- class arroyo.types.Commit(*args, **kwargs)¶
- class arroyo.types.FilteredPayload¶
- class arroyo.types.Message(value: BaseValue[TMessagePayload])¶
Contains a payload and partitions to be committed after processing. Can either represent a single message from a Kafka broker (BrokerValue) or something else, such as a number of messages grouped together for a batch processing step (Payload).
- property payload: TMessagePayload¶
- property payload_unfiltered: TMessagePayload¶
- property timestamp: datetime | None¶
- class arroyo.types.Value(payload: TMessagePayload, committable: Mapping[Partition, int], timestamp: datetime | None = None)¶
Any other payload that may not map 1:1 to a single message from a consumer. May represent a batch spanning many partitions.
- property payload: TMessagePayload¶
- property timestamp: datetime | None¶