Building a Pipeline¶
Pipelines are defined through a python DSL (more options will be provided) by chaining dataflow primitives.
Chaining primitives means sending a message from one operator to the following one.
Pipelines start with StreamingSource which represent a Kafka consumer. They can fork and broadcast messages to multiple branches. Each branch terminates with a Sink.
As of now only Python operations can be used. Soon we will have Rust as well.
Distribution is not visible at this level as it only defines the topology of the application, which is basically its business logic. The distribution is defined via the deployment descriptor so the operators can be distributed differently in different environments.
The DSL operators are in the chain.py module.
- class sentry_streams.pipeline.chain.Applier¶
Bases:
ABC
,Generic
[TIn
,TOut
]Defines a primitive that can be applied on a stream. Instances of these class represent a step in the pipeline and contains the metadata needed by the adapter to add the step to the pipeline itself.
This class is primarily syntactic sugar to avoid having tons of methods in the Chain class and still allow some customization of the primitives.
- abstractmethod build_step(name: str, ctx: Pipeline, previous: Step) Step ¶
Build a pipeline step and wires it to the Pipeline.
This method will go away once the old syntax will be retired.
- class sentry_streams.pipeline.chain.Batch(batch_size: 'MeasurementUnit')¶
Bases:
Applier
[Message
[InputType
],Message
[MutableSequence
[Tuple
[InputType
,str
|None
]]]],Generic
[MeasurementUnit
,InputType
]- batch_size: MeasurementUnit¶
- build_step(name: str, ctx: Pipeline, previous: Step) Step ¶
Build a pipeline step and wires it to the Pipeline.
This method will go away once the old syntax will be retired.
- class sentry_streams.pipeline.chain.Chain(name: str)¶
Bases:
Pipeline
A pipeline that terminates with a branch or a sink. Which means a pipeline we cannot append further steps on.
This type exists so the type checker can prevent us from reaching an invalid state.
- class sentry_streams.pipeline.chain.ExtensibleChain(name: str)¶
Bases:
Chain
,Generic
[TIn
]Defines a streaming pipeline or a segment of a pipeline by chaining operators that define steps via function calls.
A Chain is a pipeline that starts with a source, follows a number of steps. Some steps are operators that perform processing on a stream. Other steps manage the pipeline topology: sink, broadcast, route.
Example:
pipeline = streaming_source("myinput", "events") # Starts the pipeline .apply("transform1", Map(lambda msg: msg)) # Performs an operation .route( # Branches the pipeline "route_to_one", routing_function=routing_func, routes={ Routes.ROUTE1: segment(name="route1") # Creates a branch .apply("transform2", Map(lambda msg: msg)) .sink("myoutput1", StreamSink("transformed-events-2")), Routes.ROUTE2: segment(name="route2") .apply("transform3", Map(lambda msg: msg)) .sink("myoutput2", StreamSink("transformed-events3")), } )
- apply(name: str, applier: Applier[TIn, TOut]) ExtensibleChain[TOut] ¶
Apply a transformation to the stream. The transformation is defined via an Applier.
Operations can change the cardinality of the messages in the stream. Examples: - map performs a 1:1 transformation - filter performs a 1:0..1 transformation - flatMap performs a 1:n transformation - reduce performs a n:1 transformation
- broadcast(name: str, routes: Sequence[Chain]) Chain ¶
Forks the pipeline sending all messages to all routes.
- class sentry_streams.pipeline.chain.Filter(function: 'Union[Callable[[Message[TIn]], bool], str]')¶
Bases:
Applier
[Message
[TIn
],Message
[TIn
]],Generic
[TIn
]- build_step(name: str, ctx: Pipeline, previous: Step) Step ¶
Build a pipeline step and wires it to the Pipeline.
This method will go away once the old syntax will be retired.
- function: Callable[[Message[TIn]], bool] | str¶
- class sentry_streams.pipeline.chain.FlatMap(function: 'Union[Callable[[Message[MutableSequence[TIn]]], TOut], str]')¶
Bases:
Applier
[Message
[MutableSequence
[TIn
]],Message
[TOut
]],Generic
[TIn
,TOut
]- build_step(name: str, ctx: Pipeline, previous: Step) Step ¶
Build a pipeline step and wires it to the Pipeline.
This method will go away once the old syntax will be retired.
- function: Callable[[Message[MutableSequence[TIn]]], TOut] | str¶
- class sentry_streams.pipeline.chain.GCSSink(bucket: 'str', object_generator: 'Callable[[], str]')¶
Bases:
Sink
- bucket: str¶
- build_sink(name: str, ctx: Pipeline, previous: Step) Step ¶
Build a pipeline step and wires it to the Pipeline.
This method will go away once the old syntax will be retired.
- object_generator: Callable[[], str]¶
- class sentry_streams.pipeline.chain.Map(function: 'Union[Callable[[Message[TIn]], TOut], str]')¶
Bases:
Applier
[Message
[TIn
],Message
[TOut
]],Generic
[TIn
,TOut
]- build_step(name: str, ctx: Pipeline, previous: Step) Step ¶
Build a pipeline step and wires it to the Pipeline.
This method will go away once the old syntax will be retired.
- function: Callable[[Message[TIn]], TOut] | str¶
- class sentry_streams.pipeline.chain.Parser(msg_type: Type[TOut])¶
Bases:
Applier
[Message
[bytes
],Message
[TOut
]],Generic
[TOut
]A step to decode bytes, deserialize the resulting message and validate it against the schema which corresponds to the message type provided. The message type should be one which is supported by sentry-kafka-schemas. See examples/ for usage, this step can be plugged in flexibly into a pipeline. Keep in mind, data up until this step will simply be bytes.
Supports both JSON and protobuf.
- build_step(name: str, ctx: Pipeline, previous: Step) Step ¶
Build a pipeline step and wires it to the Pipeline.
This method will go away once the old syntax will be retired.
- msg_type: Type[TOut]¶
- class sentry_streams.pipeline.chain.Reducer(window: 'Window[MeasurementUnit]', aggregate_func: 'Callable[[], Accumulator[Message[InputType], OutputType]]', aggregate_backend: 'AggregationBackend[OutputType] | None' = None, group_by_key: 'GroupBy | None' = None)¶
Bases:
Applier
[Message
[InputType
],Message
[OutputType
]],Generic
[MeasurementUnit
,InputType
,OutputType
]- aggregate_backend: AggregationBackend[OutputType] | None = None¶
- aggregate_func: Callable[[], Accumulator[Message[InputType], OutputType]]¶
- build_step(name: str, ctx: Pipeline, previous: Step) Step ¶
Build a pipeline step and wires it to the Pipeline.
This method will go away once the old syntax will be retired.
- group_by_key: GroupBy | None = None¶
- window: Window[MeasurementUnit]¶
- class sentry_streams.pipeline.chain.Serializer(dt_format: str | None = None)¶
Bases:
Applier
[Message
[TIn
],bytes
],Generic
[TIn
]A step to serialize and encode messages into bytes. These bytes can be written to sink data to a Kafka topic, for example. This step will need to precede a sink step which writes to Kafka.
- build_step(name: str, ctx: Pipeline, previous: Step) Step ¶
Build a pipeline step and wires it to the Pipeline.
This method will go away once the old syntax will be retired.
- dt_format: str | None = None¶
- class sentry_streams.pipeline.chain.Sink¶
Bases:
ABC
Defines a generic Sink, which can be extended by special types of Sinks. See examples/ to see how different kinds of Sinks are plugged into a pipeline.
- abstractmethod build_sink(name: str, ctx: Pipeline, previous: Step) Step ¶
Build a pipeline step and wires it to the Pipeline.
This method will go away once the old syntax will be retired.
- class sentry_streams.pipeline.chain.StreamSink(stream_name: 'str')¶
Bases:
Sink
- build_sink(name: str, ctx: Pipeline, previous: Step) Step ¶
Build a pipeline step and wires it to the Pipeline.
This method will go away once the old syntax will be retired.
- stream_name: str¶
- sentry_streams.pipeline.chain.multi_chain(chains: Sequence[Chain]) Pipeline ¶
Creates a pipeline that contains multiple chains, where every chain is a portion of the pipeline that starts with a source and ends with multiple sinks.
- sentry_streams.pipeline.chain.segment(name: str, msg_type: Type[TIn]) ExtensibleChain[Message[TIn]] ¶
Creates a segment of a pipeline to be referenced in existing pipelines in route and broadcast steps.
- sentry_streams.pipeline.chain.streaming_source(name: str, stream_name: str, header_filter: Tuple[str, bytes] | None = None) ExtensibleChain[Message[bytes]] ¶
Create a pipeline that starts with a StreamingSource.