Getting started with Arroyo¶
This tutorial shows how to create a Kafka consumer with Arroyo from scratch.
Setup¶
This section explains how to setup Kafka, Zookeeper and install the library
Kafka and Zookeeper¶
In order to run an arroyo Kafka consumer you will need a working Kafka broker. If you already have one, you can skip this step. If you do not have a running Kafka broker, this command will install and start a Kafka docker container. (It requires Docker to be installed).
docker network create arroyo
docker run --rm \
-v zookeeper_volume:/var/lib/zookeeper \
--env ZOOKEEPER_CLIENT_PORT=2181 \
--name=sentry_zookeeper \
--network=arroyo \
-p 2181:2181 \
confluentinc/cp-zookeeper:6.2.0
docker run --rm \
-v kafka_volume:/var/lib/kafka \
--env KAFKA_ZOOKEEPER_CONNECT=sentry_zookeeper:2181 \
--env KAFKA_LISTENERS=INTERNAL://0.0.0.0:9093,EXTERNAL://0.0.0.0:9092 \
--env KAFKA_ADVERTISED_LISTENERS=INTERNAL://127.0.0.1:9093,EXTERNAL://127.0.0.1:9092 \
--env KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT \
--env KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL \
--env KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
--env CONFLUENT_SUPPORT_METRICS_ENABLE=false \
--env KAFKA_LOG4J_LOGGERS=kafka.cluster=WARN,kafka.controller=WARN,kafka.coordinator=WARN,kafka.log=WARN,kafka.server=WARN,kafka.zookeeper=WARN,state.change.logger=WARN \
--env KAFKA_LOG4J_ROOT_LOGLEVEL=WARN \
--env KAFKA_TOOLS_LOG4J_LOGLEVEL=WARN \
--name=sentry_kafka \
--network=arroyo \
-p 9092:9092 \
confluentinc/cp-kafka:6.2.0
Now you should see Kafka and Zookeeper running with
docker ps
Install Kafkacat¶
This tool will be useful to produce onto and consume from topics.
https://docs.confluent.io/platform/current/app-development/kafkacat-usage.html#kcat-formerly-kafkacat-utility
Development environment¶
You will need to install the library. Most likely in a python venv. So first, create a python virtual environment. Then you can install arroyo with this.
pip install sentry-arroyo
Create two Kafka topics¶
Our example will consume from one topic and produce the same messages on another topic. So we need two topics.
docker exec sentry_kafka kafka-topics \
--create \
--topic source-topic \
--bootstrap-server 127.0.0.1:9092
docker exec sentry_kafka kafka-topics \
--create \
--topic dest-topic \
--bootstrap-server 127.0.0.1:9092
Now you should be ready to develop with Arroyo.
Create a basic consumer¶
Arroyo provides two level of abstractions when writing a consumer: the basic consumer/producer library and the Streaming library. The first is just a thin wrapper around a librdkafka consumer/producer that adds some features around offset management. The second provides a more abstract streaming interface that hides details like rebalancing and the consumer lifecycle.
Creating a basic consumer¶
This initializes a basic consumer and consumes a message.
from arroyo.backends.kafka.configuration import (
build_kafka_consumer_configuration,
)
from arroyo.backends.kafka.consumer import KafkaConsumer
from arroyo.types import Topic
TOPIC = Topic("source-topic")
consumer = KafkaConsumer(
build_kafka_consumer_configuration(
default_config={},
bootstrap_servers=["127.0.0.1:9092"],
auto_offset_reset="latest",
group_id="test-group",
)
)
consumer.subscribe([TOPIC])
while True:
msg = consumer.poll(timeout=1.0)
if msg is not None:
print(f"MSG: {msg.payload}")
Start this script and use kcat to produce a message:
echo "MESSAGE" | kcat -P -b 127.0.0.1:9092 -t source-topic
In a while the message should appear on the console:
MSG: KafkaPayload(key=None, value=b'MESSAGE', headers=[])
Create a streaming consumer¶
Add a ProcessingStrategy and ProcessingStrategyFactory. Here we are using the RunTask strategy which runs a custom function over each message.
from typing import Mapping
from arroyo.backends.kafka import KafkaPayload
from arroyo.processing.strategies import (
CommitOffsets,
ProcessingStrategy,
ProcessingStrategyFactory,
RunTask,
)
from arroyo.types import Commit, Message, Partition, Topic
def handle_message(message: Message[KafkaPayload]) -> Message[KafkaPayload]:
print(f"MSG: {message.payload}")
return message.payload
class ConsumerStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
"""
The factory manages the lifecycle of the `ProcessingStrategy`.
A strategy is created every time new partitions are assigned to the
consumer, while it is destroyed when partitions are revoked or the
consumer is closed
"""
def create_with_partitions(
self,
commit: Commit,
partitions: Mapping[Partition, int],
) -> ProcessingStrategy[KafkaPayload]:
return RunTask(handle_message, CommitOffsets(commit))
The code above is orchestrated by the Arroyo runtime called StreamProcessor.
from arroyo.processing import StreamProcessor
from arroyo.commit import ONCE_PER_SECOND
processor = StreamProcessor(
consumer=consumer,
topic=TOPIC,
processor_factory=ConsumerStrategyFactory(),
commit_policy=ONCE_PER_SECOND,
)
processor.run()
The main consumer loop is managed by the StreamProcessor no need to periodically poll the consumer. The ConsumerStrategy works by inversion of control.
Add some useful logic¶
Now we will chain the Produce strategy to produce messages on a second topic after the message is logged
from arroyo.backends.kafka import KafkaProducer
from arroyo.backends.kafka.configuration import build_kafka_configuration
from arroyo.processing.strategies import Produce
class ConsumerStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
"""
The factory manages the lifecycle of the `ProcessingStrategy`.
A strategy is created every time new partitions are assigned to the
consumer, while it is destroyed when partitions are revoked or the
consumer is closed
"""
def create_with_partitions(
self,
commit: Commit,
partitions: Mapping[Partition, int],
) -> ProcessingStrategy[KafkaPayload]:
producer = KafkaProducer(
build_kafka_configuration(
default_config={},
bootstrap_servers=BOOTSTRAP_SERVERS,
)
)
return RunTask(
handle_message,
Produce(producer, Topic("dest-topic"), CommitOffsets(commit))
)
The message is first passed to the RunTask strategy which simply logs the message and submits the output to the next step. The Produce strategy produces the message asynchronously. Once the message is produced, the CommitOffsets strategy commits the offset of the message.
Further examples¶
Find some complete examples of usage.