Building a Pipeline

Pipelines are defined through a python DSL (more options will be provided) by chaining dataflow primitives.

Chaining primitives means sending a message from one operator to the following one.

Pipelines start with StreamingSource which represent a Kafka consumer. They can fork and broadcast messages to multiple branches. Each branch terminates with a Sink.

As of now only Python operations can be used. Soon we will have Rust as well.

Distribution is not visible at this level as it only defines the topology of the application, which is basically its business logic. The distribution is defined via the deployment descriptor so the operators can be distributed differently in different environments.

The DSL operators are in the chain.py module.

class sentry_streams.pipeline.chain.Applier

Bases: ABC, Generic[TIn, TOut]

Defines a primitive that can be applied on a stream. Instances of these class represent a step in the pipeline and contains the metadata needed by the adapter to add the step to the pipeline itself.

This class is primarily syntactic sugar to avoid having tons of methods in the Chain class and still allow some customization of the primitives.

abstractmethod build_step(name: str, ctx: Pipeline, previous: Step) Step

Build a pipeline step and wires it to the Pipeline.

This method will go away once the old syntax will be retired.

class sentry_streams.pipeline.chain.Batch(batch_size: 'MeasurementUnit')

Bases: Applier[Message[InputType], Message[MutableSequence[Tuple[InputType, str | None]]]], Generic[MeasurementUnit, InputType]

batch_size: MeasurementUnit
build_step(name: str, ctx: Pipeline, previous: Step) Step

Build a pipeline step and wires it to the Pipeline.

This method will go away once the old syntax will be retired.

class sentry_streams.pipeline.chain.Chain(name: str)

Bases: Pipeline

A pipeline that terminates with a branch or a sink. Which means a pipeline we cannot append further steps on.

This type exists so the type checker can prevent us from reaching an invalid state.

class sentry_streams.pipeline.chain.ExtensibleChain(name: str)

Bases: Chain, Generic[TIn]

Defines a streaming pipeline or a segment of a pipeline by chaining operators that define steps via function calls.

A Chain is a pipeline that starts with a source, follows a number of steps. Some steps are operators that perform processing on a stream. Other steps manage the pipeline topology: sink, broadcast, route.

Example:

pipeline = streaming_source("myinput", "events") # Starts the pipeline
    .apply("transform1", Map(lambda msg: msg)) # Performs an operation
    .route( # Branches the pipeline
        "route_to_one",
        routing_function=routing_func,
        routes={
            Routes.ROUTE1: segment(name="route1") # Creates a branch
            .apply("transform2", Map(lambda msg: msg))
            .sink("myoutput1", StreamSink("transformed-events-2")),
            Routes.ROUTE2: segment(name="route2")
            .apply("transform3", Map(lambda msg: msg))
            .sink("myoutput2", StreamSink("transformed-events3")),
        }
    )
apply(name: str, applier: Applier[TIn, TOut]) ExtensibleChain[TOut]

Apply a transformation to the stream. The transformation is defined via an Applier.

Operations can change the cardinality of the messages in the stream. Examples: - map performs a 1:1 transformation - filter performs a 1:0..1 transformation - flatMap performs a 1:n transformation - reduce performs a n:1 transformation

broadcast(name: str, routes: Sequence[Chain]) Chain

Forks the pipeline sending all messages to all routes.

route(name: str, routing_function: Callable[[...], TRoute], routes: Mapping[TRoute, Chain]) Chain

Forks the pipeline sending each message to one of the branches. The routing_function parameter specifies the function that takes the message in and returns the route to send it to.

sink(name: str, sink: Sink) Chain

Terminates the pipeline.

TODO: support anything other than StreamSink.

class sentry_streams.pipeline.chain.Filter(function: 'Union[Callable[[Message[TIn]], bool], str]')

Bases: Applier[Message[TIn], Message[TIn]], Generic[TIn]

build_step(name: str, ctx: Pipeline, previous: Step) Step

Build a pipeline step and wires it to the Pipeline.

This method will go away once the old syntax will be retired.

function: Callable[[Message[TIn]], bool] | str
class sentry_streams.pipeline.chain.FlatMap(function: 'Union[Callable[[Message[MutableSequence[TIn]]], TOut], str]')

Bases: Applier[Message[MutableSequence[TIn]], Message[TOut]], Generic[TIn, TOut]

build_step(name: str, ctx: Pipeline, previous: Step) Step

Build a pipeline step and wires it to the Pipeline.

This method will go away once the old syntax will be retired.

function: Callable[[Message[MutableSequence[TIn]]], TOut] | str
class sentry_streams.pipeline.chain.GCSSink(bucket: 'str', object_generator: 'Callable[[], str]')

Bases: Sink

bucket: str
build_sink(name: str, ctx: Pipeline, previous: Step) Step

Build a pipeline step and wires it to the Pipeline.

This method will go away once the old syntax will be retired.

object_generator: Callable[[], str]
class sentry_streams.pipeline.chain.Map(function: 'Union[Callable[[Message[TIn]], TOut], str]')

Bases: Applier[Message[TIn], Message[TOut]], Generic[TIn, TOut]

build_step(name: str, ctx: Pipeline, previous: Step) Step

Build a pipeline step and wires it to the Pipeline.

This method will go away once the old syntax will be retired.

function: Callable[[Message[TIn]], TOut] | str
class sentry_streams.pipeline.chain.Parser(msg_type: Type[TOut])

Bases: Applier[Message[bytes], Message[TOut]], Generic[TOut]

A step to decode bytes, deserialize the resulting message and validate it against the schema which corresponds to the message type provided. The message type should be one which is supported by sentry-kafka-schemas. See examples/ for usage, this step can be plugged in flexibly into a pipeline. Keep in mind, data up until this step will simply be bytes.

Supports both JSON and protobuf.

build_step(name: str, ctx: Pipeline, previous: Step) Step

Build a pipeline step and wires it to the Pipeline.

This method will go away once the old syntax will be retired.

msg_type: Type[TOut]
class sentry_streams.pipeline.chain.Reducer(window: 'Window[MeasurementUnit]', aggregate_func: 'Callable[[], Accumulator[Message[InputType], OutputType]]', aggregate_backend: 'AggregationBackend[OutputType] | None' = None, group_by_key: 'GroupBy | None' = None)

Bases: Applier[Message[InputType], Message[OutputType]], Generic[MeasurementUnit, InputType, OutputType]

aggregate_backend: AggregationBackend[OutputType] | None = None
aggregate_func: Callable[[], Accumulator[Message[InputType], OutputType]]
build_step(name: str, ctx: Pipeline, previous: Step) Step

Build a pipeline step and wires it to the Pipeline.

This method will go away once the old syntax will be retired.

group_by_key: GroupBy | None = None
window: Window[MeasurementUnit]
class sentry_streams.pipeline.chain.Serializer(dt_format: str | None = None)

Bases: Applier[Message[TIn], bytes], Generic[TIn]

A step to serialize and encode messages into bytes. These bytes can be written to sink data to a Kafka topic, for example. This step will need to precede a sink step which writes to Kafka.

build_step(name: str, ctx: Pipeline, previous: Step) Step

Build a pipeline step and wires it to the Pipeline.

This method will go away once the old syntax will be retired.

dt_format: str | None = None
class sentry_streams.pipeline.chain.Sink

Bases: ABC

Defines a generic Sink, which can be extended by special types of Sinks. See examples/ to see how different kinds of Sinks are plugged into a pipeline.

abstractmethod build_sink(name: str, ctx: Pipeline, previous: Step) Step

Build a pipeline step and wires it to the Pipeline.

This method will go away once the old syntax will be retired.

class sentry_streams.pipeline.chain.StreamSink(stream_name: 'str')

Bases: Sink

build_sink(name: str, ctx: Pipeline, previous: Step) Step

Build a pipeline step and wires it to the Pipeline.

This method will go away once the old syntax will be retired.

stream_name: str
sentry_streams.pipeline.chain.multi_chain(chains: Sequence[Chain]) Pipeline

Creates a pipeline that contains multiple chains, where every chain is a portion of the pipeline that starts with a source and ends with multiple sinks.

sentry_streams.pipeline.chain.segment(name: str, msg_type: Type[TIn]) ExtensibleChain[Message[TIn]]

Creates a segment of a pipeline to be referenced in existing pipelines in route and broadcast steps.

sentry_streams.pipeline.chain.streaming_source(name: str, stream_name: str, header_filter: Tuple[str, bytes] | None = None) ExtensibleChain[Message[bytes]]

Create a pipeline that starts with a StreamingSource.