Building a Pipeline¶
Pipelines are defined through a python DSL (more options will be provided) by chaining dataflow primitives.
Chaining primitives means sending a message from one operator to the following one.
Pipelines start with StreamingSource which represent a Kafka consumer. They can fork and broadcast messages to multiple branches. Each branch terminates with a Sink.
As of now only Python operations can be used. Soon we will have Rust as well.
Distribution is not visible at this level as it only defines the topology of the application, which is basically its business logic. The distribution is defined via the deployment descriptor so the operators can be distributed differently in different environments.
The DSL operators are in the pipeline.py module.
- class sentry_streams.pipeline.pipeline.Aggregate(name: str, window: Window[MeasurementUnit], aggregate_func: Callable[[], Accumulator[Message[InputType], OutputType]], aggregate_backend: AggregationBackend[OutputType] | None = None, group_by_key: GroupBy | None = None, step_type: StepType = StepType.REDUCE)¶
Bases:
Reduce
[MeasurementUnit
,InputType
,OutputType
]A Reduce step which performs windowed aggregations. Can be keyed or non-keyed on the input stream. Supports an Accumulator-style aggregation which can have a configurable storage backend, for flushing intermediate aggregates.
- aggregate_backend: AggregationBackend[OutputType] | None = None¶
- property aggregate_fn: Callable[[], Accumulator[Message[InputType], OutputType]]¶
- aggregate_func: Callable[[], Accumulator[Message[InputType], OutputType]]¶
- property group_by: GroupBy | None¶
- group_by_key: GroupBy | None = None¶
- window: Window[MeasurementUnit]¶
- property windowing: Window[MeasurementUnit]¶
- class sentry_streams.pipeline.pipeline.Batch(name: str, batch_size: int | None = None, batch_timedelta: timedelta | None = None, step_type: StepType = StepType.REDUCE)¶
Bases:
Reduce
[MeasurementUnit
,InputType
,MutableSequence
[Tuple
[InputType
,str
|None
]]],Generic
[MeasurementUnit
,InputType
]A step to Batch up the results of the prior step.
Batch can be configured via batch size, which can be an event time duration or a count of events.
- property aggregate_fn: Callable[[], Accumulator[Message[InputType], MutableSequence[Tuple[InputType, str | None]]]]¶
- batch_size: int | None = None¶
- batch_timedelta: timedelta | None = None¶
- property group_by: GroupBy | None¶
- override_config(loaded_config: Mapping[str, Any]) None ¶
Steps can implement custom overriding logic
- property windowing: Window[MeasurementUnit]¶
- class sentry_streams.pipeline.pipeline.BatchParser(name: 'str')¶
Bases:
ComplexStep
[Sequence
[bytes
],Sequence
[TransformFuncReturnType
]],Generic
[TransformFuncReturnType
]
- class sentry_streams.pipeline.pipeline.Branch(name: str, step_type: StepType = StepType.BRANCH)¶
Bases:
WithInput
[TIn
],Generic
[TIn
]A Branch represents one branch in a pipeline, which is routed to by a Router. Note: Branch preserves the input type as output type.
- class sentry_streams.pipeline.pipeline.Broadcast(name: str, routes: Sequence[Pipeline[TIn]], step_type: StepType = StepType.BROADCAST)¶
Bases:
WithInput
[TIn
],Generic
[TIn
]A Broadcast step will forward messages to all downstream branches in a pipeline.
- class sentry_streams.pipeline.pipeline.ComplexStep(name: str)¶
Bases:
WithInput
[TIn
],Generic
[TIn
,TOut
]A wrapper around a simple step that allows for syntactic sugar/more complex steps. The convert() function must return a simple step. ComplexStep[TIn, TOut] represents a step that transforms TIn to TOut.
- class sentry_streams.pipeline.pipeline.Filter(name: str, function: Callable[[Message[TIn]], bool] | str, step_type: StepType = StepType.FILTER)¶
Bases:
Transform
[TIn
,TIn
],Generic
[TIn
]A simple Filter, taking a single input and either returning it or None as output. Note: Filter preserves the input type as output type.
- function: Callable[[Message[TIn]], bool] | str¶
- post_rust_function_validation(func: InternalRustFunction[TIn, TOut]) None ¶
- property resolved_function: Callable[[Message[TIn]], bool]¶
Returns a callable of the filter function defined, or referenced in the this class
- class sentry_streams.pipeline.pipeline.FlatMap(name: str, function: Callable[[Message[TIn]], Iterable[TOut]] | str, step_type: StepType = StepType.FLAT_MAP)¶
Bases:
Transform
[TIn
,TOut
],Generic
[TIn
,TOut
]A generic step for mapping and flattening (and therefore alerting the shape of) inputs to get outputs. Takes a single input to 0…N outputs. The function should return an Iterable[TOut], but the step itself outputs TOut.
- function: Callable[[Message[TIn]], Iterable[TOut]] | str¶
- property resolved_function: Callable[[Message[TIn]], Iterable[TOut]]¶
Returns a callable of the flatmap function defined, or referenced in this class
- class sentry_streams.pipeline.pipeline.FunctionTransform(name: str, function: Callable[[Message[TIn]], TOut] | str, step_type: StepType)¶
Bases:
Transform
[TIn
,TOut
],Generic
[TIn
,TOut
]A transform step that applies a function to transform TIn to TOut. function: supports reference to a function using dot notation, or a Callable
- function: Callable[[Message[TIn]], TOut] | str¶
- post_rust_function_validation(func: InternalRustFunction[TIn, TOut]) None ¶
- property resolved_function: Callable[[Message[TIn]], TOut]¶
Returns a callable of the transform function defined, or referenced in the this class
- class sentry_streams.pipeline.pipeline.GCSSink(name: str, bucket: str, object_generator: Callable[[], str], step_type: StepType = StepType.SINK)¶
Bases:
Sink
[TIn
]A Sink which writes to GCS
- bucket: str¶
- object_generator: Callable[[], str]¶
- class sentry_streams.pipeline.pipeline.Map(name: str, function: Callable[[Message[TIn]], TOut] | str, step_type: StepType = StepType.MAP)¶
Bases:
FunctionTransform
[TIn
,TOut
],Generic
[TIn
,TOut
]A simple 1:1 Map, taking a single input to single output.
- class sentry_streams.pipeline.pipeline.ParquetSerializer(name: 'str', schema_fields: 'Mapping[str, DataType]', compression: 'Optional[ParquetCompression]' = 'snappy')¶
Bases:
ComplexStep
[MutableSequence
[TIn
],bytes
],Generic
[TIn
]- compression: Literal['lz4', 'uncompressed', 'snappy', 'gzip', 'lzo', 'brotli', 'zstd'] | None = 'snappy'¶
- schema_fields: Mapping[str, DataType]¶
- class sentry_streams.pipeline.pipeline.Parser(name: str)¶
Bases:
ComplexStep
[bytes
,TransformFuncReturnType
],Generic
[TransformFuncReturnType
]A step to decode bytes, deserialize the resulting message and validate it against the schema which corresponds to the message type provided. The message type should be one which is supported by sentry-kafka-schemas. See examples/ for usage, this step can be plugged in flexibly into a pipeline. Keep in mind, data up until this step will simply be bytes.
Supports both JSON and protobuf.
- class sentry_streams.pipeline.pipeline.Pipeline(source: Source[TOut] | Branch[TOut])¶
Bases:
Generic
[TOut
]A graph representing the connections between logical Steps.
- apply(step: Transform[TOut, TNewOut] | ComplexStep[TOut, TNewOut]) Pipeline[TNewOut] ¶
- broadcast(name: str, routes: Sequence[Pipeline[TOut]]) Pipeline[TOut] ¶
Broadcast a message to multiple branches. Adding a broadcast step will close the pipeline, since more steps can’t be added after it. Thus it expects that all the branches are fully defined.
- route(name: str, routing_function: Callable[[Message[TOut]], RoutingFuncReturnType], routing_table: Mapping[RoutingFuncReturnType, Pipeline[TOut]]) Pipeline[TOut] ¶
Route a message to a specific branch based on a routing function. Adding a router step will close the pipeline, since more steps can’t be added after it. Thus it expects that all the branches are fully defined.
- class sentry_streams.pipeline.pipeline.Reduce(name: str)¶
Bases:
Transform
[InputType
,OutputType
],ABC
,Generic
[MeasurementUnit
,InputType
,OutputType
]A generic Step for a Reduce (or Accumulator-based) operation
- abstract property aggregate_fn: Callable[[], Accumulator[Message[InputType], OutputType]]¶
- abstract property group_by: GroupBy | None¶
- abstract property windowing: Window[MeasurementUnit]¶
- class sentry_streams.pipeline.pipeline.Reducer(name: 'str', window: 'Window[MeasurementUnit]', aggregate_func: 'Callable[[], Accumulator[Message[InputType], OutputType]]', aggregate_backend: 'AggregationBackend[OutputType] | None' = None, group_by_key: 'GroupBy | None' = None)¶
Bases:
ComplexStep
[InputType
,OutputType
],Generic
[MeasurementUnit
,InputType
,OutputType
]- aggregate_backend: AggregationBackend[OutputType] | None = None¶
- aggregate_func: Callable[[], Accumulator[Message[InputType], OutputType]]¶
- group_by_key: GroupBy | None = None¶
- window: Window[MeasurementUnit]¶
- class sentry_streams.pipeline.pipeline.Router(name: str, routing_function: Callable[[Message[TIn]], RoutingFuncReturnType], routing_table: Mapping[RoutingFuncReturnType, Pipeline[TIn]], step_type: StepType = StepType.ROUTER)¶
Bases:
WithInput
[TIn
],Generic
[RoutingFuncReturnType
,TIn
]A step which takes a routing table of Branches and sends messages to those branches based on a routing function. Routing functions must only return a single output branch, routing to multiple branches simultaneously is not currently supported.
- routing_function: Callable[[Message[TIn]], RoutingFuncReturnType]¶
- class sentry_streams.pipeline.pipeline.Serializer(name: str, dt_format: str | None = None)¶
Bases:
ComplexStep
[TIn
,bytes
],Generic
[TIn
]A step to serialize and encode messages into bytes. These bytes can be written to sink data to a Kafka topic, for example. This step will need to precede a sink step which writes to Kafka.
- dt_format: str | None = None¶
- class sentry_streams.pipeline.pipeline.Sink(name: str)¶
Bases:
WithInput
[TIn
]A generic Sink that consumes input of type TIn.
- class sentry_streams.pipeline.pipeline.Source(name: str)¶
Bases:
Step
,Generic
[TOut
]A generic Source that produces output of type TOut.
- class sentry_streams.pipeline.pipeline.Step(name: str)¶
Bases:
object
A generic Step, whose incoming and outgoing edges are registered against a Pipeline.
- name: str¶
- override_config(loaded_config: Mapping[str, Any]) None ¶
Steps can implement custom overriding logic
- class sentry_streams.pipeline.pipeline.StepType(value)¶
Bases:
Enum
- BRANCH = 'branch'¶
- BROADCAST = 'broadcast'¶
- FILTER = 'filter'¶
- FLAT_MAP = 'flat_map'¶
- MAP = 'map'¶
- REDUCE = 'reduce'¶
- ROUTER = 'router'¶
- SINK = 'sink'¶
- SOURCE = 'source'¶
- class sentry_streams.pipeline.pipeline.StreamSink(name: str, stream_name: str, step_type: StepType = StepType.SINK)¶
Bases:
Sink
[TIn
]A Sink which specifically writes to Kafka.
- stream_name: str¶
- class sentry_streams.pipeline.pipeline.StreamSource(name: str, stream_name: str, header_filter: Tuple[str, bytes] | None = None, step_type: StepType = StepType.SOURCE)¶
Bases:
Source
[bytes
]A Source which reads from Kafka.
- header_filter: Tuple[str, bytes] | None = None¶
- stream_name: str¶
- class sentry_streams.pipeline.pipeline.Transform(name: str)¶
Bases:
WithInput
[TIn
],Generic
[TIn
,TOut
]A step that transforms input of type TIn to output of type TOut.
- sentry_streams.pipeline.pipeline.TransformStep¶
alias of
FunctionTransform
- class sentry_streams.pipeline.pipeline.WithInput(name: str)¶
Bases:
Step
,Generic
[TIn
]A generic Step representing a logical step which has inputs of type TIn.
- sentry_streams.pipeline.pipeline.branch(name: str) Pipeline[Any] ¶
Used to create a new pipeline with a branch as the root step. This pipeline can then be added as part of a router or broadcast step.
- sentry_streams.pipeline.pipeline.make_edge_sets(edge_map: Mapping[str, Sequence[Any]]) Mapping[str, Set[Any]] ¶