Filter¶
- class arroyo.processing.strategies.filter.FilterStep(function: Callable[[Message[TStrategyPayload]], bool], next_step: ProcessingStrategy[FilteredPayload | TStrategyPayload], commit_policy: CommitPolicy | None = None)¶
Determines if a message should be submitted to the next processing step.
FilterStep takes a callback, function, and if that callback returns False, the message is dropped.
Sometimes that behavior is not actually desirable because streams of messages is what makes the consumer commit in regular intervals. If you filter 100% of messages for a period of time, your consumer may not commit its offsets as a result.
For that scenario, you can pass your CommitPolicy to FilterStep. That will cause FilterStep to emit “sentinel messages” that contain no payload, but only carry forward partition offsets for later strategies to commit. Those messages have a payload of type FilteredPayload.
For that reason, basically every strategy needs to be able to handle Message[Union[FilteredPayload, T]] instead of Message[T], i.e. it needs to subtype ProcessingStrategy[Union[FilteredPayload, TStrategyPayload]]. If it doesn’t, and rather just handles the regular Message[T], it cannot be composed with this step, and many other default strategies of arroyo.
If no CommitPolicy is passed, no “sentinel messages” are emitted and downstream steps do not have to deal with such messages (despite the type system telling them so).