snuba_sdk

image

See here for the full documentation.

Status

image

image

image

Examples

Snuba SDK is a tool that allows requests to Snuba to be built programatically. A Request consists of a Query, the dataset the Query is targeting, the AppID of the Request, and any flags for the Request. A Query object is a code representation of a SnQL query, and has a number of attributes corresponding to different parts of the query.

Requests and Queries can be created directly:

request = Request(
    dataset = "discover",
    app_id = "myappid",
    tenant_ids = {"referrer": "my_referrer", "organization_id": 1234}
    query = Query(
        match=Entity("events"),
        select=[
            Column("title"),
            Function("uniq", [Column("event_id")], "uniq_events"),
        ],
        groupby=[Column("title")],
        where=[
            Condition(Column("timestamp"), Op.GT, datetime.datetime(2021, 1, 1)),
            Condition(Column("project_id"), Op.IN, Function("tuple", [1, 2, 3])),
        ],
        limit=Limit(10),
        offset=Offset(0),
        granularity=Granularity(3600),
    ),
    flags = Flags(debug=True)
)

Queries can also be built incrementally:

query = (
    Query("discover", Entity("events"))
    .set_select(
        [Column("title"), Function("uniq", [Column("event_id")], "uniq_events")]
    )
    .set_groupby([Column("title")])
    .set_where(
        [
            Condition(Column("timestamp"), Op.GT, datetime.datetime.(2021, 1, 1)),
            Condition(Column("project_id"), Op.IN, Function("tuple", [1, 2, 3])),
        ]
    )
    .set_limit(10)
    .set_offset(0)
    .set_granularity(3600)
)

MQL Examples

MQL queries can be built in a similar way to SnQL queries. However they use a MetricsQuery object instead of a Query object. The query argument of a MetricsQuery is either a Timeseries or Formula, which is a mathemtical formula of Timeseries.

The other arguments to the MetricsQuery are meta data about how to run the query, e.g. start/end timestamps, the granularity, limits etc.

    MetricsQuery(
        query=Formula(
            ArithmeticOperator.DIVIDE.value,
            [
                Timeseries(
                    metric=Metric(
                        public_name="transaction.duration",
                    ),
                    aggregate="sum",
                ),
                1000,
            ],
        ),
        start=NOW,
        end=NOW + timedelta(days=14),
        rollup=Rollup(interval=3600, totals=None, granularity=3600),
        scope=MetricsScope(
            org_ids=[1], project_ids=[11], use_case_id="transactions"
        ),
        limit=Limit(100),
        offset=Offset(5),
    )

Once the request is built, it can be translated into a Snuba request that can be sent to Snuba.

# Outputs a formatted Snuba request
request.serialize()

It can also be printed in a more human readable format.

# Outputs a formatted Snuba request
print(request.print())

This outputs:

{
    "dataset": "discover",
    "app_id": "myappid",
    "query": "MATCH (events) SELECT title, uniq(event_id) AS uniq_events BY title WHERE timestamp > toDateTime('2021-01-01T00:00:00.000000') AND project_id IN tuple(1, 2, 3) LIMIT 10 OFFSET 0 GRANULARITY 3600",
    "debug": true
}

If an expression in the query is invalid (e.g. Column(1)) then an InvalidExpressionError exception will be thrown. If there is a problem with a query, it will throw an InvalidQueryError exception when .validate() or .translate() is called. If there is a problem with the Request or the Flags, an InvalidRequestError or InvalidFlagError will be thrown respectively.

Contributing to the SDK

Please refer to CONTRIBUTING.rst.

License

Licensed under FSL-1.0-Apache-2.0, see LICENSE.

 1"""
 2.. include:: ../README.md
 3"""
 4
 5from snuba_sdk.aliased_expression import AliasedExpression
 6from snuba_sdk.column import Column
 7from snuba_sdk.conditions import And, BooleanCondition, BooleanOp, Condition, Op, Or
 8from snuba_sdk.entity import Entity
 9from snuba_sdk.storage import Storage
10from snuba_sdk.expressions import Extrapolate, Granularity, Limit, Offset, Totals
11from snuba_sdk.formula import ArithmeticOperator, Formula
12from snuba_sdk.function import CurriedFunction, Function, Identifier, Lambda
13from snuba_sdk.metrics_query import MetricsQuery
14from snuba_sdk.mql_context import MQLContext
15from snuba_sdk.orderby import Direction, LimitBy, OrderBy
16from snuba_sdk.query import Query
17from snuba_sdk.relationships import Join, Relationship
18from snuba_sdk.request import Flags, Request
19from snuba_sdk.timeseries import Metric, MetricsScope, Rollup, Timeseries
20from snuba_sdk.delete_query import DeleteQuery
21
22__all__ = [
23    "AliasedExpression",
24    "And",
25    "ArithmeticOperator",
26    "BooleanCondition",
27    "BooleanOp",
28    "Column",
29    "Condition",
30    "CurriedFunction",
31    "Direction",
32    "Entity",
33    "Extrapolate",
34    "Flags",
35    "Formula",
36    "Function",
37    "Granularity",
38    "Identifier",
39    "Join",
40    "Lambda",
41    "Limit",
42    "LimitBy",
43    "Metric",
44    "MetricsQuery",
45    "MetricsScope",
46    "MQLContext",
47    "Offset",
48    "Op",
49    "Or",
50    "OrderBy",
51    "Query",
52    "Relationship",
53    "Request",
54    "Rollup",
55    "Storage",
56    "Timeseries",
57    "Totals",
58    "DeleteQuery",
59]
@dataclass(frozen=True)
class AliasedExpression(snuba_sdk.expressions.Expression):
 9@dataclass(frozen=True)
10class AliasedExpression(Expression):
11    """
12    Used to alias the name of an expression in the results of a query. It is not used
13    anywhere in Snuba except to change the names in the results set. Right now this is
14    limited to Columns only because Functions have a separate alias. Eventually the
15    two will be combined.
16
17    :param Expression: The expression to alias.
18    :type Expression: Column
19    :raises InvalidExpressionError: If the expression or alias is invalid.
20    """
21
22    # TODO: We should eventually allow Functions here as well, once we think through
23    # how this should work with functions that already have aliases.
24    exp: Column
25    alias: Optional[str] = None
26
27    def validate(self) -> None:
28        if not isinstance(self.exp, Column):
29            raise InvalidExpressionError(
30                "aliased expressions can only contain a Column"
31            )
32
33        if self.alias is not None:
34            if not isinstance(self.alias, str) or self.alias == "":
35                raise InvalidExpressionError(
36                    f"alias '{self.alias}' of expression must be None or a non-empty string"
37                )
38            if not ALIAS_RE.match(self.alias):
39                raise InvalidExpressionError(
40                    f"alias '{self.alias}' of expression contains invalid characters"
41                )

Used to alias the name of an expression in the results of a query. It is not used anywhere in Snuba except to change the names in the results set. Right now this is limited to Columns only because Functions have a separate alias. Eventually the two will be combined.

Parameters
  • Expression: The expression to alias.
Raises
  • InvalidExpressionError: If the expression or alias is invalid.
AliasedExpression(exp: Column, alias: Optional[str] = None)
exp: Column
alias: Optional[str] = None
def validate(self) -> None:
27    def validate(self) -> None:
28        if not isinstance(self.exp, Column):
29            raise InvalidExpressionError(
30                "aliased expressions can only contain a Column"
31            )
32
33        if self.alias is not None:
34            if not isinstance(self.alias, str) or self.alias == "":
35                raise InvalidExpressionError(
36                    f"alias '{self.alias}' of expression must be None or a non-empty string"
37                )
38            if not ALIAS_RE.match(self.alias):
39                raise InvalidExpressionError(
40                    f"alias '{self.alias}' of expression contains invalid characters"
41                )
@dataclass(frozen=True)
class And(snuba_sdk.BooleanCondition):
135@dataclass(frozen=True)
136class And(BooleanCondition):
137    op: BooleanOp = field(init=False, default=BooleanOp.AND)
138    conditions: ConditionGroup = field(default_factory=list)
And( conditions: Sequence[Union[BooleanCondition, Condition]] = <factory>)
op: BooleanOp = <BooleanOp.AND: 'AND'>
conditions: Sequence[Union[BooleanCondition, Condition]]
Inherited Members
BooleanCondition
validate
class ArithmeticOperator(enum.Enum):
23class ArithmeticOperator(Enum):
24    PLUS = "plus"
25    MINUS = "minus"
26    MULTIPLY = "multiply"
27    DIVIDE = "divide"

Create a collection of name/value pairs.

Example enumeration:

>>> class Color(Enum):
...     RED = 1
...     BLUE = 2
...     GREEN = 3

Access them by:

  • attribute access::
>>> Color.RED
<Color.RED: 1>
  • value lookup:
>>> Color(1)
<Color.RED: 1>
  • name lookup:
>>> Color['RED']
<Color.RED: 1>

Enumerations can be iterated over, and know how many members they have:

>>> len(Color)
3
>>> list(Color)
[<Color.RED: 1>, <Color.BLUE: 2>, <Color.GREEN: 3>]

Methods can be added to enumerations, and members can have their own attributes -- see the documentation for details.

PLUS = <ArithmeticOperator.PLUS: 'plus'>
MINUS = <ArithmeticOperator.MINUS: 'minus'>
MULTIPLY = <ArithmeticOperator.MULTIPLY: 'multiply'>
DIVIDE = <ArithmeticOperator.DIVIDE: 'divide'>
Inherited Members
enum.Enum
name
value
@dataclass(frozen=True)
class BooleanCondition(snuba_sdk.expressions.Expression):
108@dataclass(frozen=True)
109class BooleanCondition(Expression):
110    op: BooleanOp
111    conditions: ConditionGroup
112
113    def validate(self) -> None:
114        if not isinstance(self.op, BooleanOp):
115            raise InvalidConditionError(
116                "invalid boolean: operator of a boolean must be a BooleanOp"
117            )
118
119        if not isinstance(self.conditions, (list, tuple)):
120            raise InvalidConditionError(
121                "invalid boolean: conditions must be a list of other conditions"
122            )
123        elif len(self.conditions) < 2:
124            raise InvalidConditionError(
125                "invalid boolean: must supply at least two conditions"
126            )
127
128        for con in self.conditions:
129            if not isinstance(con, (Condition, BooleanCondition)):
130                raise InvalidConditionError(
131                    f"invalid boolean: {con} is not a valid condition"
132                )
BooleanCondition( op: BooleanOp, conditions: Sequence[Union[BooleanCondition, Condition]])
op: BooleanOp
conditions: Sequence[Union[BooleanCondition, Condition]]
def validate(self) -> None:
113    def validate(self) -> None:
114        if not isinstance(self.op, BooleanOp):
115            raise InvalidConditionError(
116                "invalid boolean: operator of a boolean must be a BooleanOp"
117            )
118
119        if not isinstance(self.conditions, (list, tuple)):
120            raise InvalidConditionError(
121                "invalid boolean: conditions must be a list of other conditions"
122            )
123        elif len(self.conditions) < 2:
124            raise InvalidConditionError(
125                "invalid boolean: must supply at least two conditions"
126            )
127
128        for con in self.conditions:
129            if not isinstance(con, (Condition, BooleanCondition)):
130                raise InvalidConditionError(
131                    f"invalid boolean: {con} is not a valid condition"
132                )
class BooleanOp(enum.Enum):
103class BooleanOp(Enum):
104    AND = "AND"
105    OR = "OR"

Create a collection of name/value pairs.

Example enumeration:

>>> class Color(Enum):
...     RED = 1
...     BLUE = 2
...     GREEN = 3

Access them by:

  • attribute access::
>>> Color.RED
<Color.RED: 1>
  • value lookup:
>>> Color(1)
<Color.RED: 1>
  • name lookup:
>>> Color['RED']
<Color.RED: 1>

Enumerations can be iterated over, and know how many members they have:

>>> len(Color)
3
>>> list(Color)
[<Color.RED: 1>, <Color.BLUE: 2>, <Color.GREEN: 3>]

Methods can be added to enumerations, and members can have their own attributes -- see the documentation for details.

AND = <BooleanOp.AND: 'AND'>
OR = <BooleanOp.OR: 'OR'>
Inherited Members
enum.Enum
name
value
@dataclass(frozen=True)
class Column(snuba_sdk.expressions.Expression):
18@dataclass(frozen=True)
19class Column(Expression):
20    """
21    A representation of a single column in the database. Columns are
22    expected to be alpha-numeric, with '.', '_`, and `:` allowed as well.
23    If the column is subscriptable then you can specify the column in the
24    form `subscriptable[key]`. The `subscriptable` attribute will contain the outer
25    column and `key` will contain the inner key.
26
27    :param name: The column name.
28    :type name: str
29    :param entity: The entity for that column
30    :type name: Optional[Entity]
31
32    :raises InvalidColumnError: If the column name is not a string or has an
33        invalid format.
34
35    """
36
37    name: str
38    entity: Optional[Entity] = None
39    subscriptable: Optional[str] = field(init=False, default=None)
40    key: Optional[str] = field(init=False, default=None)
41
42    def validate(self) -> None:
43        if not isinstance(self.name, str):
44            raise InvalidColumnError(f"column '{self.name}' must be a string")
45        if not column_name_re.match(self.name):
46            raise InvalidColumnError(
47                f"column '{self.name}' is empty or contains invalid characters"
48            )
49
50        if self.entity is not None:
51            if not isinstance(self.entity, Entity):
52                raise InvalidColumnError(f"column '{self.name}' expects an Entity")
53            if not self.entity.alias:
54                raise InvalidColumnError(
55                    f"column '{self.name}' expects an Entity with an alias"
56                )
57
58            self.validate_data_model(self.entity)
59
60        # If this is a subscriptable set these values to help with debugging etc.
61        # Because this is frozen we can't set the value directly.
62        if "[" in self.name:
63            subscriptable, key = self.name.split("[", 1)
64            key = key.strip("]")
65            super().__setattr__("subscriptable", subscriptable)
66            super().__setattr__("key", key)
67
68    def validate_data_model(self, match: Union[Entity, Storage]) -> None:
69        if match.data_model is None:
70            return
71
72        to_check = self.subscriptable if self.subscriptable else self.name
73        if not match.data_model.contains(to_check):
74            raise InvalidColumnError(
75                f"'{match.name}' does not support the column '{self.name}'"
76            )

A representation of a single column in the database. Columns are expected to be alpha-numeric, with '.', '_, and:allowed as well. If the column is subscriptable then you can specify the column in the formsubscriptable[key]. Thesubscriptableattribute will contain the outer column andkey` will contain the inner key.

Parameters
  • name: The column name.
  • entity: The entity for that column
Raises
  • InvalidColumnError: If the column name is not a string or has an invalid format.
Column(name: str, entity: Optional[Entity] = None)
name: str
entity: Optional[Entity] = None
subscriptable: Optional[str] = None
key: Optional[str] = None
def validate(self) -> None:
42    def validate(self) -> None:
43        if not isinstance(self.name, str):
44            raise InvalidColumnError(f"column '{self.name}' must be a string")
45        if not column_name_re.match(self.name):
46            raise InvalidColumnError(
47                f"column '{self.name}' is empty or contains invalid characters"
48            )
49
50        if self.entity is not None:
51            if not isinstance(self.entity, Entity):
52                raise InvalidColumnError(f"column '{self.name}' expects an Entity")
53            if not self.entity.alias:
54                raise InvalidColumnError(
55                    f"column '{self.name}' expects an Entity with an alias"
56                )
57
58            self.validate_data_model(self.entity)
59
60        # If this is a subscriptable set these values to help with debugging etc.
61        # Because this is frozen we can't set the value directly.
62        if "[" in self.name:
63            subscriptable, key = self.name.split("[", 1)
64            key = key.strip("]")
65            super().__setattr__("subscriptable", subscriptable)
66            super().__setattr__("key", key)
def validate_data_model( self, match: Union[Entity, Storage]) -> None:
68    def validate_data_model(self, match: Union[Entity, Storage]) -> None:
69        if match.data_model is None:
70            return
71
72        to_check = self.subscriptable if self.subscriptable else self.name
73        if not match.data_model.contains(to_check):
74            raise InvalidColumnError(
75                f"'{match.name}' does not support the column '{self.name}'"
76            )
@dataclass(frozen=True)
class Condition(snuba_sdk.expressions.Expression):
 73@dataclass(frozen=True)
 74class Condition(Expression):
 75    lhs: Union[Column, CurriedFunction, Function]
 76    op: Op
 77    rhs: Optional[Union[Column, CurriedFunction, Function, ScalarType]] = None
 78
 79    def validate(self) -> None:
 80        if not isinstance(self.lhs, (Column, CurriedFunction, Function)):
 81            raise InvalidConditionError(
 82                f"invalid condition: LHS of a condition must be a Column, CurriedFunction or Function, not {type(self.lhs)}"
 83            )
 84        if not isinstance(self.op, Op):
 85            raise InvalidConditionError(
 86                "invalid condition: operator of a condition must be an Op"
 87            )
 88
 89        if is_unary(self.op):
 90            if self.rhs is not None:
 91                raise InvalidConditionError(
 92                    "invalid condition: unary operators don't have rhs conditions"
 93                )
 94
 95        if not isinstance(
 96            self.rhs, (Column, CurriedFunction, Function)
 97        ) and not is_scalar(self.rhs):
 98            raise InvalidConditionError(
 99                f"invalid condition: RHS of a condition must be a Column, CurriedFunction, Function or Scalar not {type(self.rhs)}"
100            )
Condition( lhs: Union[Column, CurriedFunction, Function], op: Op, rhs: Union[Column, CurriedFunction, Function, NoneType, bool, str, bytes, float, int, datetime.date, datetime.datetime, Sequence[Union[snuba_sdk.expressions.Expression, NoneType, bool, str, bytes, float, int, datetime.date, datetime.datetime]]] = None)
op: Op
rhs: Union[Column, CurriedFunction, Function, NoneType, bool, str, bytes, float, int, datetime.date, datetime.datetime, Sequence[Union[snuba_sdk.expressions.Expression, NoneType, bool, str, bytes, float, int, datetime.date, datetime.datetime]]] = None
def validate(self) -> None:
 79    def validate(self) -> None:
 80        if not isinstance(self.lhs, (Column, CurriedFunction, Function)):
 81            raise InvalidConditionError(
 82                f"invalid condition: LHS of a condition must be a Column, CurriedFunction or Function, not {type(self.lhs)}"
 83            )
 84        if not isinstance(self.op, Op):
 85            raise InvalidConditionError(
 86                "invalid condition: operator of a condition must be an Op"
 87            )
 88
 89        if is_unary(self.op):
 90            if self.rhs is not None:
 91                raise InvalidConditionError(
 92                    "invalid condition: unary operators don't have rhs conditions"
 93                )
 94
 95        if not isinstance(
 96            self.rhs, (Column, CurriedFunction, Function)
 97        ) and not is_scalar(self.rhs):
 98            raise InvalidConditionError(
 99                f"invalid condition: RHS of a condition must be a Column, CurriedFunction, Function or Scalar not {type(self.rhs)}"
100            )
@dataclass(frozen=True)
class CurriedFunction(snuba_sdk.expressions.Expression):
 32@dataclass(frozen=True)
 33class CurriedFunction(Expression):
 34    function: str
 35    initializers: Optional[Sequence[Union[ScalarLiteralType, Column]]] = None
 36    parameters: Optional[
 37        Sequence[
 38            Union[
 39                ScalarType,
 40                Column,
 41                CurriedFunction,
 42                Function,
 43                Identifier,
 44                Lambda,
 45            ]
 46        ]
 47    ] = None
 48    alias: Optional[str] = None
 49
 50    def validate(self) -> None:
 51        if not isinstance(self.function, str):
 52            raise InvalidFunctionError(f"function '{self.function}' must be a string")
 53        if self.function == "":
 54            # TODO: Have a whitelist of valid functions to check, maybe even with more
 55            # specific parameter type checking
 56            raise InvalidFunctionError("function cannot be empty")
 57        if not function_name_re.match(self.function):
 58            raise InvalidFunctionError(
 59                f"function '{self.function}' contains invalid characters"
 60            )
 61
 62        if self.initializers is not None:
 63            if not isinstance(self.initializers, Sequence):
 64                raise InvalidFunctionError(
 65                    f"initializers of function {self.function} must be a Sequence"
 66                )
 67            elif not all(
 68                isinstance(param, Column) or is_literal(param)
 69                for param in self.initializers
 70            ):
 71                raise InvalidFunctionError(
 72                    f"initializers to function {self.function} must be a scalar or column"
 73                )
 74
 75        if self.alias is not None:
 76            if not isinstance(self.alias, str) or self.alias == "":
 77                raise InvalidFunctionError(
 78                    f"alias '{self.alias}' of function {self.function} must be None or a non-empty string"
 79                )
 80            if not ALIAS_RE.match(self.alias):
 81                raise InvalidFunctionError(
 82                    f"alias '{self.alias}' of function {self.function} contains invalid characters"
 83                )
 84
 85        if self.parameters is not None:
 86            if not isinstance(self.parameters, Sequence):
 87                raise InvalidFunctionError(
 88                    f"parameters of function {self.function} must be a Sequence"
 89                )
 90            for param in self.parameters:
 91                if not isinstance(
 92                    param,
 93                    (Column, CurriedFunction, Function, Identifier, Lambda),
 94                ) and not is_scalar(param):
 95                    assert not isinstance(param, bytes)  # mypy
 96                    raise InvalidFunctionError(
 97                        f"parameter '{param}' of function {self.function} is an invalid type"
 98                    )
 99
100    def __eq__(self, other: object) -> bool:
101        # Don't use the alias to compare equality
102        if not isinstance(other, CurriedFunction):
103            return False
104
105        return (
106            self.function == other.function
107            and self.initializers == other.initializers
108            and self.parameters == other.parameters
109        )
CurriedFunction( function: str, initializers: Optional[Sequence[Union[NoneType, bool, str, bytes, float, int, datetime.date, datetime.datetime, Column]]] = None, parameters: Optional[Sequence[Union[NoneType, bool, str, bytes, float, int, datetime.date, datetime.datetime, Sequence[Union[snuba_sdk.expressions.Expression, NoneType, bool, str, bytes, float, int, datetime.date, datetime.datetime]], Column, CurriedFunction, Function, Identifier, Lambda]]] = None, alias: Optional[str] = None)
function: str
initializers: Optional[Sequence[Union[NoneType, bool, str, bytes, float, int, datetime.date, datetime.datetime, Column]]] = None
parameters: Optional[Sequence[Union[NoneType, bool, str, bytes, float, int, datetime.date, datetime.datetime, Sequence[Union[snuba_sdk.expressions.Expression, NoneType, bool, str, bytes, float, int, datetime.date, datetime.datetime]], Column, CurriedFunction, Function, Identifier, Lambda]]] = None
alias: Optional[str] = None
def validate(self) -> None:
50    def validate(self) -> None:
51        if not isinstance(self.function, str):
52            raise InvalidFunctionError(f"function '{self.function}' must be a string")
53        if self.function == "":
54            # TODO: Have a whitelist of valid functions to check, maybe even with more
55            # specific parameter type checking
56            raise InvalidFunctionError("function cannot be empty")
57        if not function_name_re.match(self.function):
58            raise InvalidFunctionError(
59                f"function '{self.function}' contains invalid characters"
60            )
61
62        if self.initializers is not None:
63            if not isinstance(self.initializers, Sequence):
64                raise InvalidFunctionError(
65                    f"initializers of function {self.function} must be a Sequence"
66                )
67            elif not all(
68                isinstance(param, Column) or is_literal(param)
69                for param in self.initializers
70            ):
71                raise InvalidFunctionError(
72                    f"initializers to function {self.function} must be a scalar or column"
73                )
74
75        if self.alias is not None:
76            if not isinstance(self.alias, str) or self.alias == "":
77                raise InvalidFunctionError(
78                    f"alias '{self.alias}' of function {self.function} must be None or a non-empty string"
79                )
80            if not ALIAS_RE.match(self.alias):
81                raise InvalidFunctionError(
82                    f"alias '{self.alias}' of function {self.function} contains invalid characters"
83                )
84
85        if self.parameters is not None:
86            if not isinstance(self.parameters, Sequence):
87                raise InvalidFunctionError(
88                    f"parameters of function {self.function} must be a Sequence"
89                )
90            for param in self.parameters:
91                if not isinstance(
92                    param,
93                    (Column, CurriedFunction, Function, Identifier, Lambda),
94                ) and not is_scalar(param):
95                    assert not isinstance(param, bytes)  # mypy
96                    raise InvalidFunctionError(
97                        f"parameter '{param}' of function {self.function} is an invalid type"
98                    )
class Direction(enum.Enum):
11class Direction(Enum):
12    ASC = "ASC"
13    DESC = "DESC"

Create a collection of name/value pairs.

Example enumeration:

>>> class Color(Enum):
...     RED = 1
...     BLUE = 2
...     GREEN = 3

Access them by:

  • attribute access::
>>> Color.RED
<Color.RED: 1>
  • value lookup:
>>> Color(1)
<Color.RED: 1>
  • name lookup:
>>> Color['RED']
<Color.RED: 1>

Enumerations can be iterated over, and know how many members they have:

>>> len(Color)
3
>>> list(Color)
[<Color.RED: 1>, <Color.BLUE: 2>, <Color.GREEN: 3>]

Methods can be added to enumerations, and members can have their own attributes -- see the documentation for details.

ASC = <Direction.ASC: 'ASC'>
DESC = <Direction.DESC: 'DESC'>
Inherited Members
enum.Enum
name
value
@dataclass(frozen=True, repr=False)
class Entity(snuba_sdk.expressions.Expression):
16@dataclass(frozen=True, repr=False)
17class Entity(Expression):
18    name: str
19    alias: Optional[str] = None
20    sample: Optional[float] = None
21    data_model: Optional[DataModel] = field(hash=False, default=None)
22
23    def validate(self) -> None:
24        # TODO: There should be a whitelist of entity names at some point
25        if not isinstance(self.name, str) or not entity_name_re.match(self.name):
26            raise InvalidEntityError(f"'{self.name}' is not a valid entity name")
27
28        if self.sample is not None:
29            if not isinstance(self.sample, float):
30                raise InvalidEntityError("sample must be a float")
31            elif self.sample <= 0.0:
32                raise InvalidEntityError("samples must be greater than 0.0")
33
34        if self.alias is not None:
35            if not isinstance(self.alias, str) or not self.alias:
36                raise InvalidEntityError(f"'{self.alias}' is not a valid alias")
37
38        if self.data_model is not None:
39            if not isinstance(self.data_model, DataModel):
40                raise InvalidEntityError("data_model must be an instance of DataModel")
41
42    def __repr__(self) -> str:
43        alias = f", alias='{self.alias}'" if self.alias is not None else ""
44        sample = f", sample={self.sample}" if self.sample is not None else ""
45        return f"Entity('{self.name}'{alias}{sample})"
Entity( name: str, alias: Optional[str] = None, sample: Optional[float] = None, data_model: Optional[snuba_sdk.schema.DataModel] = None)
name: str
alias: Optional[str] = None
sample: Optional[float] = None
data_model: Optional[snuba_sdk.schema.DataModel] = None
def validate(self) -> None:
23    def validate(self) -> None:
24        # TODO: There should be a whitelist of entity names at some point
25        if not isinstance(self.name, str) or not entity_name_re.match(self.name):
26            raise InvalidEntityError(f"'{self.name}' is not a valid entity name")
27
28        if self.sample is not None:
29            if not isinstance(self.sample, float):
30                raise InvalidEntityError("sample must be a float")
31            elif self.sample <= 0.0:
32                raise InvalidEntityError("samples must be greater than 0.0")
33
34        if self.alias is not None:
35            if not isinstance(self.alias, str) or not self.alias:
36                raise InvalidEntityError(f"'{self.alias}' is not a valid alias")
37
38        if self.data_model is not None:
39            if not isinstance(self.data_model, DataModel):
40                raise InvalidEntityError("data_model must be an instance of DataModel")
@dataclass(frozen=True)
class Extrapolate(snuba_sdk.expressions.Expression):
124@dataclass(frozen=True)
125class Extrapolate(Expression):
126    extrapolate: bool
127
128    def validate(self) -> None:
129        if not isinstance(self.extrapolate, bool):
130            raise InvalidExpressionError("extrapolate must be a boolean")
Extrapolate(extrapolate: bool)
extrapolate: bool
def validate(self) -> None:
128    def validate(self) -> None:
129        if not isinstance(self.extrapolate, bool):
130            raise InvalidExpressionError("extrapolate must be a boolean")
@dataclass
class Flags:
22@dataclass
23class Flags:
24    totals: bool | None = None
25    consistent: bool | None = None
26    turbo: bool | None = None
27    debug: bool | None = None
28    dry_run: bool | None = None
29    legacy: bool | None = None
30
31    def validate(self) -> None:
32        flags_fields = fields(self)
33        for ff in flags_fields:
34            flag = getattr(self, ff.name)
35            if flag is not None and not isinstance(flag, bool):
36                raise InvalidFlagError(f"{ff.name} must be a boolean")
37
38    def to_dict(self) -> dict[str, bool]:
39        self.validate()
40        values = asdict(self)
41        return {f: v for f, v in values.items() if v is not None}
Flags( totals: bool | None = None, consistent: bool | None = None, turbo: bool | None = None, debug: bool | None = None, dry_run: bool | None = None, legacy: bool | None = None)
totals: bool | None = None
consistent: bool | None = None
turbo: bool | None = None
debug: bool | None = None
dry_run: bool | None = None
legacy: bool | None = None
def validate(self) -> None:
31    def validate(self) -> None:
32        flags_fields = fields(self)
33        for ff in flags_fields:
34            flag = getattr(self, ff.name)
35            if flag is not None and not isinstance(flag, bool):
36                raise InvalidFlagError(f"{ff.name} must be a boolean")
def to_dict(self) -> dict[str, bool]:
38    def to_dict(self) -> dict[str, bool]:
39        self.validate()
40        values = asdict(self)
41        return {f: v for f, v in values.items() if v is not None}
@dataclass(frozen=True)
class Formula:
 40@dataclass(frozen=True)
 41class Formula:
 42    function_name: str
 43    parameters: Optional[Sequence[FormulaParameterGroup]] = None
 44    aggregate_params: list[Any] | None = None
 45    filters: Optional[ConditionGroup] = None
 46    groupby: Optional[list[Column | AliasedExpression]] = None
 47
 48    def __validate_consistency(self) -> None:
 49        """
 50        Ensure that the groupby columns are consistent across all Timeseries
 51        and Formulas within this Formula."""
 52        if self.parameters is None:
 53            raise InvalidFormulaError("Formula must have parameters")
 54
 55        groupbys = set()
 56        has_timeseries = False
 57
 58        stack: list[FormulaParameterGroup] = [self]
 59        while stack:
 60            param = stack.pop()
 61            if isinstance(param, Formula):
 62                if param.groupby is not None:
 63                    groupbys.add(tuple(param.groupby))
 64
 65                if param.parameters:
 66                    stack.extend(param.parameters)
 67            elif isinstance(param, Timeseries):
 68                has_timeseries = True
 69                if param.groupby is not None:
 70                    groupbys.add(tuple(param.groupby))
 71
 72        if not has_timeseries:
 73            raise InvalidFormulaError(
 74                "Formulas must operate on at least one Timeseries"
 75            )
 76        if len(set(groupbys)) > 1:
 77            raise InvalidFormulaError(
 78                "Formula parameters must group by the same columns"
 79            )
 80
 81    def validate(self) -> None:
 82        if not isinstance(self.function_name, str):
 83            raise InvalidFormulaError(f"formula '{self.function_name}' must be a str")
 84        if self.parameters is None:
 85            raise InvalidFormulaError("Formula must have parameters")
 86        elif not isinstance(self.parameters, Sequence):
 87            raise InvalidFormulaError(
 88                f"parameters of formula {self.function_name} must be a Sequence"
 89            )
 90
 91        for param in self.parameters:
 92            if not isinstance(param, tuple(FormulaParameter)):
 93                raise InvalidFormulaError(
 94                    f"parameter '{param}' of formula {self.function_name} is an invalid type"
 95                )
 96        self.__validate_consistency()
 97
 98    def _replace(self, field: str, value: Any) -> Formula:
 99        new = replace(self, **{field: value})
100        return new
101
102    def set_parameters(self, parameters: Sequence[FormulaParameterGroup]) -> Formula:
103        if parameters is not None and not list_type(
104            parameters, (Formula, Timeseries, float, int)
105        ):
106            raise InvalidFormulaError(
107                "parameters must be a list of either Timeseries, floats, or ints"
108            )
109        return self._replace("parameters", parameters)
110
111    def set_filters(self, filters: ConditionGroup | None) -> Formula:
112        if filters is not None and not list_type(
113            filters, (BooleanCondition, Condition)
114        ):
115            raise InvalidFormulaError("filters must be a list of Conditions")
116        return self._replace("filters", filters)
117
118    def set_groupby(self, groupby: list[Column | AliasedExpression] | None) -> Formula:
119        if groupby is not None and not list_type(groupby, (Column, AliasedExpression)):
120            raise InvalidFormulaError("groupby must be a list of Columns")
121        return self._replace("groupby", groupby)
Formula( function_name: str, parameters: Optional[Sequence[Union[Formula, Timeseries, float, int, str]]] = None, aggregate_params: list[typing.Any] | None = None, filters: Optional[Sequence[Union[BooleanCondition, Condition]]] = None, groupby: Optional[list[Column | AliasedExpression]] = None)
function_name: str
parameters: Optional[Sequence[Union[Formula, Timeseries, float, int, str]]] = None
aggregate_params: list[typing.Any] | None = None
filters: Optional[Sequence[Union[BooleanCondition, Condition]]] = None
groupby: Optional[list[Column | AliasedExpression]] = None
def validate(self) -> None:
81    def validate(self) -> None:
82        if not isinstance(self.function_name, str):
83            raise InvalidFormulaError(f"formula '{self.function_name}' must be a str")
84        if self.parameters is None:
85            raise InvalidFormulaError("Formula must have parameters")
86        elif not isinstance(self.parameters, Sequence):
87            raise InvalidFormulaError(
88                f"parameters of formula {self.function_name} must be a Sequence"
89            )
90
91        for param in self.parameters:
92            if not isinstance(param, tuple(FormulaParameter)):
93                raise InvalidFormulaError(
94                    f"parameter '{param}' of formula {self.function_name} is an invalid type"
95                )
96        self.__validate_consistency()
def set_parameters( self, parameters: Sequence[Union[Formula, Timeseries, float, int, str]]) -> Formula:
102    def set_parameters(self, parameters: Sequence[FormulaParameterGroup]) -> Formula:
103        if parameters is not None and not list_type(
104            parameters, (Formula, Timeseries, float, int)
105        ):
106            raise InvalidFormulaError(
107                "parameters must be a list of either Timeseries, floats, or ints"
108            )
109        return self._replace("parameters", parameters)
def set_filters( self, filters: Optional[Sequence[Union[BooleanCondition, Condition]]]) -> Formula:
111    def set_filters(self, filters: ConditionGroup | None) -> Formula:
112        if filters is not None and not list_type(
113            filters, (BooleanCondition, Condition)
114        ):
115            raise InvalidFormulaError("filters must be a list of Conditions")
116        return self._replace("filters", filters)
def set_groupby( self, groupby: list[Column | AliasedExpression] | None) -> Formula:
118    def set_groupby(self, groupby: list[Column | AliasedExpression] | None) -> Formula:
119        if groupby is not None and not list_type(groupby, (Column, AliasedExpression)):
120            raise InvalidFormulaError("groupby must be a list of Columns")
121        return self._replace("groupby", groupby)
@dataclass(frozen=True)
class Function(snuba_sdk.CurriedFunction):
112@dataclass(frozen=True)
113class Function(CurriedFunction):
114    initializers: Optional[Sequence[Union[ScalarLiteralType, Column]]] = field(
115        init=False, default=None
116    )
Function( function: str, parameters: Optional[Sequence[Union[NoneType, bool, str, bytes, float, int, datetime.date, datetime.datetime, Sequence[Union[snuba_sdk.expressions.Expression, NoneType, bool, str, bytes, float, int, datetime.date, datetime.datetime]], Column, CurriedFunction, Function, Identifier, Lambda]]] = None, alias: Optional[str] = None)
initializers: Optional[Sequence[Union[NoneType, bool, str, bytes, float, int, datetime.date, datetime.datetime, Column]]] = None
@dataclass(frozen=True)
class Granularity(snuba_sdk.expressions.Expression):
107@dataclass(frozen=True)
108class Granularity(Expression):
109    granularity: int
110
111    def validate(self) -> None:
112        _validate_int_literal("granularity", self.granularity, 1, None)
Granularity(granularity: int)
granularity: int
def validate(self) -> None:
111    def validate(self) -> None:
112        _validate_int_literal("granularity", self.granularity, 1, None)
@dataclass(frozen=True)
class Identifier(snuba_sdk.expressions.Expression):
126@dataclass(frozen=True)
127class Identifier(Expression):
128    name: str
129
130    def validate(self) -> None:
131        if not isinstance(self.name, str) or not identifier_re.match(self.name):
132            raise InvalidLambdaError(f"`{self.name}` is not a valid identifier")
Identifier(name: str)
name: str
def validate(self) -> None:
130    def validate(self) -> None:
131        if not isinstance(self.name, str) or not identifier_re.match(self.name):
132            raise InvalidLambdaError(f"`{self.name}` is not a valid identifier")
@dataclass(frozen=True)
class Join(snuba_sdk.expressions.Expression):
51@dataclass(frozen=True)
52class Join(Expression):
53    """
54    A collection of relationships that is used in the MATCH section of
55    the SnQL query. Must contain at least one Relationship, and will make
56    sure that Entity aliases are not used by different Entities.
57
58    :param relationships: The relationships in the join.
59    :type name: Sequence[Relationship]
60
61    :raises InvalidExpressionError: If two different Entities are using
62        the same alias, this will be raised.
63
64    """
65
66    relationships: Sequence[Relationship]
67
68    def get_alias_mappings(self) -> set[tuple[str, str]]:
69        aliases = set()
70        for rel in self.relationships:
71            if rel.lhs.alias is not None:
72                aliases.add((rel.lhs.alias, rel.lhs.name))
73            if rel.rhs.alias is not None:
74                aliases.add((rel.rhs.alias, rel.rhs.name))
75
76        return aliases
77
78    def get_entities(self) -> set[Entity]:
79        entities = set()
80        for rel in self.relationships:
81            entities.add(rel.lhs)
82            entities.add(rel.rhs)
83        return entities
84
85    def validate(self) -> None:
86        if not isinstance(self.relationships, (list, tuple)) or not self.relationships:
87            raise InvalidExpressionError("Join must have at least one Relationship")
88        elif not all(isinstance(x, Relationship) for x in self.relationships):
89            raise InvalidExpressionError("Join expects a list of Relationship objects")
90
91        seen: MutableMapping[str, str] = {}
92        for alias, entity in self.get_alias_mappings():
93            if alias in seen and seen[alias] != entity:
94                entities = sorted([entity, seen[alias]])
95                raise InvalidExpressionError(
96                    f"alias '{alias}' is duplicated for entities {', '.join(entities)}"
97                )
98            seen[alias] = entity

A collection of relationships that is used in the MATCH section of the SnQL query. Must contain at least one Relationship, and will make sure that Entity aliases are not used by different Entities.

Parameters
  • relationships: The relationships in the join.
Raises
  • InvalidExpressionError: If two different Entities are using the same alias, this will be raised.
Join(relationships: Sequence[Relationship])
relationships: Sequence[Relationship]
def get_alias_mappings(self) -> set[tuple[str, str]]:
68    def get_alias_mappings(self) -> set[tuple[str, str]]:
69        aliases = set()
70        for rel in self.relationships:
71            if rel.lhs.alias is not None:
72                aliases.add((rel.lhs.alias, rel.lhs.name))
73            if rel.rhs.alias is not None:
74                aliases.add((rel.rhs.alias, rel.rhs.name))
75
76        return aliases
def get_entities(self) -> set[Entity]:
78    def get_entities(self) -> set[Entity]:
79        entities = set()
80        for rel in self.relationships:
81            entities.add(rel.lhs)
82            entities.add(rel.rhs)
83        return entities
def validate(self) -> None:
85    def validate(self) -> None:
86        if not isinstance(self.relationships, (list, tuple)) or not self.relationships:
87            raise InvalidExpressionError("Join must have at least one Relationship")
88        elif not all(isinstance(x, Relationship) for x in self.relationships):
89            raise InvalidExpressionError("Join expects a list of Relationship objects")
90
91        seen: MutableMapping[str, str] = {}
92        for alias, entity in self.get_alias_mappings():
93            if alias in seen and seen[alias] != entity:
94                entities = sorted([entity, seen[alias]])
95                raise InvalidExpressionError(
96                    f"alias '{alias}' is duplicated for entities {', '.join(entities)}"
97                )
98            seen[alias] = entity
@dataclass(frozen=True)
class Lambda(snuba_sdk.expressions.Expression):
135@dataclass(frozen=True)
136class Lambda(Expression):
137    identifiers: Sequence[str]
138    transformation: CurriedFunction
139
140    def validate(self) -> None:
141        if not isinstance(self.identifiers, (tuple, list)):
142            raise InvalidLambdaError("identifiers must be a sequence")
143        for i in self.identifiers:
144            if not isinstance(i, str) or not identifier_re.match(i):
145                raise InvalidLambdaError(f"{i} is not a valid identifier")
146        if not isinstance(self.transformation, CurriedFunction):
147            raise InvalidLambdaError("transformation must be a function")
Lambda( identifiers: Sequence[str], transformation: CurriedFunction)
identifiers: Sequence[str]
transformation: CurriedFunction
def validate(self) -> None:
140    def validate(self) -> None:
141        if not isinstance(self.identifiers, (tuple, list)):
142            raise InvalidLambdaError("identifiers must be a sequence")
143        for i in self.identifiers:
144            if not isinstance(i, str) or not identifier_re.match(i):
145                raise InvalidLambdaError(f"{i} is not a valid identifier")
146        if not isinstance(self.transformation, CurriedFunction):
147            raise InvalidLambdaError("transformation must be a function")
@dataclass(frozen=True)
class Limit(snuba_sdk.expressions.Expression):
91@dataclass(frozen=True)
92class Limit(Expression):
93    limit: int
94
95    def validate(self) -> None:
96        _validate_int_literal("limit", self.limit, 1, 10000)
Limit(limit: int)
limit: int
def validate(self) -> None:
95    def validate(self) -> None:
96        _validate_int_literal("limit", self.limit, 1, 10000)
@dataclass(frozen=True)
class LimitBy(snuba_sdk.expressions.Expression):
46@dataclass(frozen=True)
47class LimitBy(Expression):
48    columns: Sequence[Column]
49    count: int
50
51    def validate(self) -> None:
52        validate_sequence_of_type("LimitBy columns", self.columns, Column, 1)
53        if not isinstance(self.count, int) or self.count <= 0 or self.count > 10000:
54            raise InvalidExpressionError(
55                "LimitBy count must be a positive integer (max 10,000)"
56            )
LimitBy(columns: Sequence[Column], count: int)
columns: Sequence[Column]
count: int
def validate(self) -> None:
51    def validate(self) -> None:
52        validate_sequence_of_type("LimitBy columns", self.columns, Column, 1)
53        if not isinstance(self.count, int) or self.count <= 0 or self.count > 10000:
54            raise InvalidExpressionError(
55                "LimitBy count must be a positive integer (max 10,000)"
56            )
@dataclass(frozen=True)
class Metric:
23@dataclass(frozen=True)
24class Metric:
25    """
26    Metric represents a raw metric that is being populated. It can be created with
27    one of public name, mri or raw ID.
28    """
29
30    public_name: str | None = None
31    mri: str | None = None
32    id: int | None = None
33
34    def __post_init__(self) -> None:
35        self.validate()
36
37    def get_fields(self) -> Sequence[str]:
38        self_fields = fields(self)  # Verified the order in the Python source
39        return tuple(f.name for f in self_fields)
40
41    def validate(self) -> None:
42        if self.public_name is not None and not isinstance(self.public_name, str):
43            raise InvalidTimeseriesError("public_name must be a string")
44        if self.mri is not None and not isinstance(self.mri, str):
45            raise InvalidTimeseriesError("mri must be a string")
46        if self.id is not None and not isinstance(self.id, int):
47            raise InvalidTimeseriesError("id must be an integer")
48
49        if all(v is None for v in (self.public_name, self.mri)):
50            raise InvalidTimeseriesError(
51                "Metric must have at least one of public_name or mri"
52            )
53
54    def set_mri(self, mri: str) -> Metric:
55        if not isinstance(mri, str):
56            raise InvalidExpressionError("mri must be an str")
57        return replace(self, mri=mri)
58
59    def set_public_name(self, public_name: str) -> Metric:
60        if not isinstance(public_name, str):
61            raise InvalidExpressionError("public_name must be an str")
62        return replace(self, public_name=public_name)
63
64    def set_id(self, id: int) -> Metric:
65        if not isinstance(id, int):
66            raise InvalidExpressionError("id must be an int")
67        return replace(self, id=id)

Metric represents a raw metric that is being populated. It can be created with one of public name, mri or raw ID.

Metric( public_name: str | None = None, mri: str | None = None, id: int | None = None)
public_name: str | None = None
mri: str | None = None
id: int | None = None
def get_fields(self) -> Sequence[str]:
37    def get_fields(self) -> Sequence[str]:
38        self_fields = fields(self)  # Verified the order in the Python source
39        return tuple(f.name for f in self_fields)
def validate(self) -> None:
41    def validate(self) -> None:
42        if self.public_name is not None and not isinstance(self.public_name, str):
43            raise InvalidTimeseriesError("public_name must be a string")
44        if self.mri is not None and not isinstance(self.mri, str):
45            raise InvalidTimeseriesError("mri must be a string")
46        if self.id is not None and not isinstance(self.id, int):
47            raise InvalidTimeseriesError("id must be an integer")
48
49        if all(v is None for v in (self.public_name, self.mri)):
50            raise InvalidTimeseriesError(
51                "Metric must have at least one of public_name or mri"
52            )
def set_mri(self, mri: str) -> Metric:
54    def set_mri(self, mri: str) -> Metric:
55        if not isinstance(mri, str):
56            raise InvalidExpressionError("mri must be an str")
57        return replace(self, mri=mri)
def set_public_name(self, public_name: str) -> Metric:
59    def set_public_name(self, public_name: str) -> Metric:
60        if not isinstance(public_name, str):
61            raise InvalidExpressionError("public_name must be an str")
62        return replace(self, public_name=public_name)
def set_id(self, id: int) -> Metric:
64    def set_id(self, id: int) -> Metric:
65        if not isinstance(id, int):
66            raise InvalidExpressionError("id must be an int")
67        return replace(self, id=id)
@dataclass
class MetricsQuery(snuba_sdk.query.BaseQuery):
 19@dataclass
 20class MetricsQuery(BaseQuery):
 21    """
 22    A query on a set of timeseries. This class gets translated into a Snuba request string
 23    that returns a list of timeseries data. In order to allow this class to be built incrementally,
 24    it is not validated until it is serialized. Any specified filters or groupby fields are pushed
 25    down to each of the Timeseries in the query field. It is immutable, so any set functions return
 26    a new copy of the query, which also allows chaining calls.
 27
 28    This class is distinct from the Query class to allow for more specific validation and to provide
 29    a simpler syntax for writing timeseries queries, which have fewer available features.
 30    """
 31
 32    query: Timeseries | Formula | str | None = None
 33    start: datetime | None = None
 34    end: datetime | None = None
 35    rollup: Rollup | None = None
 36    scope: MetricsScope | None = None
 37    limit: Limit | None = None
 38    offset: Offset | None = None
 39    extrapolate: Extrapolate | None = None
 40    indexer_mappings: dict[str, str | int] | None = None
 41
 42    def _replace(self, field: str, value: Any) -> MetricsQuery:
 43        new = replace(self, **{field: value})
 44        return new
 45
 46    def set_query(self, query: Formula | Timeseries | str) -> MetricsQuery:
 47        if not isinstance(query, (Formula, Timeseries, str)):
 48            raise InvalidQueryError(
 49                "query must be a Formula or Timeseries or MQL string"
 50            )
 51        return self._replace("query", query)
 52
 53    def set_start(self, start: datetime) -> MetricsQuery:
 54        if not isinstance(start, datetime):
 55            raise InvalidQueryError("start must be a datetime")
 56        return self._replace("start", start)
 57
 58    def set_end(self, end: datetime) -> MetricsQuery:
 59        if not isinstance(end, datetime):
 60            raise InvalidQueryError("end must be a datetime")
 61        return self._replace("end", end)
 62
 63    def set_rollup(self, rollup: Rollup) -> MetricsQuery:
 64        if not isinstance(rollup, Rollup):
 65            raise InvalidQueryError("rollup must be a Rollup")
 66        return self._replace("rollup", rollup)
 67
 68    def set_scope(self, scope: MetricsScope) -> MetricsQuery:
 69        if not isinstance(scope, MetricsScope):
 70            raise InvalidQueryError("scope must be a MetricsScope")
 71        return self._replace("scope", scope)
 72
 73    def set_limit(self, limit: int) -> MetricsQuery:
 74        return self._replace("limit", Limit(limit))
 75
 76    def set_offset(self, offset: int) -> MetricsQuery:
 77        return self._replace("offset", Offset(offset))
 78
 79    def set_extrapolate(self, extrapolate: bool) -> MetricsQuery:
 80        return self._replace("extrapolate", Extrapolate(extrapolate))
 81
 82    def set_indexer_mappings(
 83        self, indexer_mappings: dict[str, str | int]
 84    ) -> MetricsQuery:
 85        return self._replace("indexer_mappings", indexer_mappings)
 86
 87    def validate(self) -> None:
 88        Validator().visit(self)
 89
 90    def __str__(self) -> str:
 91        result = MQL_PRINTER.visit(self)
 92        return json.dumps(result, indent=4)
 93
 94    def print(self) -> str:
 95        self.validate()
 96        result = MQL_PRINTER.visit(self)
 97        return json.dumps(result, indent=4)
 98
 99    def serialize(self) -> str | dict[str, Any]:
100        self.validate()
101        self._optimize()
102        result = MQL_PRINTER.visit(self)
103        return result
104
105    def _optimize(self) -> None:
106        if (
107            isinstance(self.query, (Formula, Timeseries))
108            and self.query.filters is not None
109        ):
110            new_filters = OrOptimizer().optimize(self.query.filters)
111            if new_filters is not None:
112                self.query = replace(self.query, filters=new_filters)

A query on a set of timeseries. This class gets translated into a Snuba request string that returns a list of timeseries data. In order to allow this class to be built incrementally, it is not validated until it is serialized. Any specified filters or groupby fields are pushed down to each of the Timeseries in the query field. It is immutable, so any set functions return a new copy of the query, which also allows chaining calls.

This class is distinct from the Query class to allow for more specific validation and to provide a simpler syntax for writing timeseries queries, which have fewer available features.

MetricsQuery( query: Timeseries | Formula | str | None = None, start: datetime.datetime | None = None, end: datetime.datetime | None = None, rollup: Rollup | None = None, scope: MetricsScope | None = None, limit: Limit | None = None, offset: Offset | None = None, extrapolate: Extrapolate | None = None, indexer_mappings: dict[str, str | int] | None = None)
query: Timeseries | Formula | str | None = None
start: datetime.datetime | None = None
end: datetime.datetime | None = None
rollup: Rollup | None = None
scope: MetricsScope | None = None
limit: Limit | None = None
offset: Offset | None = None
extrapolate: Extrapolate | None = None
indexer_mappings: dict[str, str | int] | None = None
def set_query( self, query: Formula | Timeseries | str) -> MetricsQuery:
46    def set_query(self, query: Formula | Timeseries | str) -> MetricsQuery:
47        if not isinstance(query, (Formula, Timeseries, str)):
48            raise InvalidQueryError(
49                "query must be a Formula or Timeseries or MQL string"
50            )
51        return self._replace("query", query)
def set_start(self, start: datetime.datetime) -> MetricsQuery:
53    def set_start(self, start: datetime) -> MetricsQuery:
54        if not isinstance(start, datetime):
55            raise InvalidQueryError("start must be a datetime")
56        return self._replace("start", start)
def set_end(self, end: datetime.datetime) -> MetricsQuery:
58    def set_end(self, end: datetime) -> MetricsQuery:
59        if not isinstance(end, datetime):
60            raise InvalidQueryError("end must be a datetime")
61        return self._replace("end", end)
def set_rollup( self, rollup: Rollup) -> MetricsQuery:
63    def set_rollup(self, rollup: Rollup) -> MetricsQuery:
64        if not isinstance(rollup, Rollup):
65            raise InvalidQueryError("rollup must be a Rollup")
66        return self._replace("rollup", rollup)
def set_scope( self, scope: MetricsScope) -> MetricsQuery:
68    def set_scope(self, scope: MetricsScope) -> MetricsQuery:
69        if not isinstance(scope, MetricsScope):
70            raise InvalidQueryError("scope must be a MetricsScope")
71        return self._replace("scope", scope)
def set_limit(self, limit: int) -> MetricsQuery:
73    def set_limit(self, limit: int) -> MetricsQuery:
74        return self._replace("limit", Limit(limit))
def set_offset(self, offset: int) -> MetricsQuery:
76    def set_offset(self, offset: int) -> MetricsQuery:
77        return self._replace("offset", Offset(offset))
def set_extrapolate(self, extrapolate: bool) -> MetricsQuery:
79    def set_extrapolate(self, extrapolate: bool) -> MetricsQuery:
80        return self._replace("extrapolate", Extrapolate(extrapolate))
def set_indexer_mappings( self, indexer_mappings: dict[str, str | int]) -> MetricsQuery:
82    def set_indexer_mappings(
83        self, indexer_mappings: dict[str, str | int]
84    ) -> MetricsQuery:
85        return self._replace("indexer_mappings", indexer_mappings)
def validate(self) -> None:
87    def validate(self) -> None:
88        Validator().visit(self)
def print(self) -> str:
94    def print(self) -> str:
95        self.validate()
96        result = MQL_PRINTER.visit(self)
97        return json.dumps(result, indent=4)
def serialize(self) -> str | dict[str, typing.Any]:
 99    def serialize(self) -> str | dict[str, Any]:
100        self.validate()
101        self._optimize()
102        result = MQL_PRINTER.visit(self)
103        return result
Inherited Members
snuba_sdk.query.BaseQuery
get_fields
@dataclass
class MetricsScope:
218@dataclass
219class MetricsScope:
220    """
221    This contains all the meta information necessary to resolve a metric and to safely query
222    the metrics dataset. All these values get automatically added to the query conditions.
223    The idea of this class is to contain all the filter values that are not represented by
224    tags in the API.
225
226    use_case_id is treated separately since it can be derived separate from the MRIs of the
227    metrics in the outer query.
228    """
229
230    org_ids: list[int]
231    project_ids: list[int]
232    use_case_id: str | None = None
233
234    def __post_init__(self) -> None:
235        self.validate()
236
237    def validate(self) -> None:
238        if not list_type(self.org_ids, (int,)):
239            raise InvalidExpressionError("org_ids must be a list of integers")
240
241        if not list_type(self.project_ids, (int,)):
242            raise InvalidExpressionError("project_ids must be a list of integers")
243
244        if self.use_case_id is not None and not isinstance(self.use_case_id, str):
245            raise InvalidExpressionError("use_case_id must be an str")
246
247    def set_use_case_id(self, use_case_id: str) -> MetricsScope:
248        if not isinstance(use_case_id, str):
249            raise InvalidExpressionError("use_case_id must be an str")
250        return replace(self, use_case_id=use_case_id)

This contains all the meta information necessary to resolve a metric and to safely query the metrics dataset. All these values get automatically added to the query conditions. The idea of this class is to contain all the filter values that are not represented by tags in the API.

use_case_id is treated separately since it can be derived separate from the MRIs of the metrics in the outer query.

MetricsScope( org_ids: list[int], project_ids: list[int], use_case_id: str | None = None)
org_ids: list[int]
project_ids: list[int]
use_case_id: str | None = None
def validate(self) -> None:
237    def validate(self) -> None:
238        if not list_type(self.org_ids, (int,)):
239            raise InvalidExpressionError("org_ids must be a list of integers")
240
241        if not list_type(self.project_ids, (int,)):
242            raise InvalidExpressionError("project_ids must be a list of integers")
243
244        if self.use_case_id is not None and not isinstance(self.use_case_id, str):
245            raise InvalidExpressionError("use_case_id must be an str")
def set_use_case_id(self, use_case_id: str) -> MetricsScope:
247    def set_use_case_id(self, use_case_id: str) -> MetricsScope:
248        if not isinstance(use_case_id, str):
249            raise InvalidExpressionError("use_case_id must be an str")
250        return replace(self, use_case_id=use_case_id)
@dataclass
class MQLContext:
11@dataclass
12class MQLContext:
13    """
14    The MQL string alone is not enough to fully describe a query.
15    This class contains all of the additional information needed to
16    execute a metrics query in snuba.
17
18    It should be noted that this class is used as an intermediary encoding
19    class for data in the the MetricsQuery class that can't be encoded into
20    MQL. As such it shouldn't be used directly by users of the SDK.
21
22    This also means that the validation here is quite loose, since this object
23    should be created exclusively from a valid MetricsQuery object.
24    """
25
26    start: str
27    end: str
28    rollup: dict[str, str | int | None]
29    scope: dict[str, str | list[int]]
30    indexer_mappings: dict[str, str | int]
31    limit: int | None = None
32    offset: int | None = None
33    extrapolate: bool | None = None
34
35    def __post_init__(self) -> None:
36        self.validate()
37
38    def validate(self) -> None:
39        # Simple assert that all the expected fields are present
40        fields = ["start", "end", "rollup", "scope", "indexer_mappings"]
41        for field in fields:
42            if getattr(self, field) is None:
43                raise InvalidMQLContextError(f"MQLContext.{field} is required")

The MQL string alone is not enough to fully describe a query. This class contains all of the additional information needed to execute a metrics query in snuba.

It should be noted that this class is used as an intermediary encoding class for data in the the MetricsQuery class that can't be encoded into MQL. As such it shouldn't be used directly by users of the SDK.

This also means that the validation here is quite loose, since this object should be created exclusively from a valid MetricsQuery object.

MQLContext( start: str, end: str, rollup: dict[str, str | int | None], scope: dict[str, str | list[int]], indexer_mappings: dict[str, str | int], limit: int | None = None, offset: int | None = None, extrapolate: bool | None = None)
start: str
end: str
rollup: dict[str, str | int | None]
scope: dict[str, str | list[int]]
indexer_mappings: dict[str, str | int]
limit: int | None = None
offset: int | None = None
extrapolate: bool | None = None
def validate(self) -> None:
38    def validate(self) -> None:
39        # Simple assert that all the expected fields are present
40        fields = ["start", "end", "rollup", "scope", "indexer_mappings"]
41        for field in fields:
42            if getattr(self, field) is None:
43                raise InvalidMQLContextError(f"MQLContext.{field} is required")
@dataclass(frozen=True)
class Offset(snuba_sdk.expressions.Expression):
 99@dataclass(frozen=True)
100class Offset(Expression):
101    offset: int
102
103    def validate(self) -> None:
104        _validate_int_literal("offset", self.offset, 0, None)
Offset(offset: int)
offset: int
def validate(self) -> None:
103    def validate(self) -> None:
104        _validate_int_literal("offset", self.offset, 0, None)
class Op(enum.Enum):
22class Op(Enum):
23    GT = ">"
24    LT = "<"
25    GTE = ">="
26    LTE = "<="
27    EQ = "="
28    NEQ = "!="
29    IN = "IN"
30    NOT_IN = "NOT IN"
31    LIKE = "LIKE"
32    NOT_LIKE = "NOT LIKE"
33    IS_NULL = "IS NULL"
34    IS_NOT_NULL = "IS NOT NULL"
35    NOT = "!"

Create a collection of name/value pairs.

Example enumeration:

>>> class Color(Enum):
...     RED = 1
...     BLUE = 2
...     GREEN = 3

Access them by:

  • attribute access::
>>> Color.RED
<Color.RED: 1>
  • value lookup:
>>> Color(1)
<Color.RED: 1>
  • name lookup:
>>> Color['RED']
<Color.RED: 1>

Enumerations can be iterated over, and know how many members they have:

>>> len(Color)
3
>>> list(Color)
[<Color.RED: 1>, <Color.BLUE: 2>, <Color.GREEN: 3>]

Methods can be added to enumerations, and members can have their own attributes -- see the documentation for details.

GT = <Op.GT: '>'>
LT = <Op.LT: '<'>
GTE = <Op.GTE: '>='>
LTE = <Op.LTE: '<='>
EQ = <Op.EQ: '='>
NEQ = <Op.NEQ: '!='>
IN = <Op.IN: 'IN'>
NOT_IN = <Op.NOT_IN: 'NOT IN'>
LIKE = <Op.LIKE: 'LIKE'>
NOT_LIKE = <Op.NOT_LIKE: 'NOT LIKE'>
IS_NULL = <Op.IS_NULL: 'IS NULL'>
IS_NOT_NULL = <Op.IS_NOT_NULL: 'IS NOT NULL'>
NOT = <Op.NOT: '!'>
Inherited Members
enum.Enum
name
value
@dataclass(frozen=True)
class Or(snuba_sdk.BooleanCondition):
141@dataclass(frozen=True)
142class Or(BooleanCondition):
143    op: BooleanOp = field(init=False, default=BooleanOp.OR)
144    conditions: ConditionGroup = field(default_factory=list)
Or( conditions: Sequence[Union[BooleanCondition, Condition]] = <factory>)
op: BooleanOp = <BooleanOp.OR: 'OR'>
conditions: Sequence[Union[BooleanCondition, Condition]]
Inherited Members
BooleanCondition
validate
@dataclass(frozen=True)
class OrderBy(snuba_sdk.expressions.Expression):
16@dataclass(frozen=True)
17class OrderBy(Expression):
18    exp: Union[Column, CurriedFunction, Function]
19    direction: Direction
20
21    def validate(self) -> None:
22        if not isinstance(self.exp, (Column, CurriedFunction, Function)):
23            raise InvalidExpressionError(
24                "OrderBy expression must be a Column, CurriedFunction or Function"
25            )
26        if not isinstance(self.direction, Direction):
27            raise InvalidExpressionError("OrderBy direction must be a Direction")
OrderBy( exp: Union[Column, CurriedFunction, Function], direction: Direction)
direction: Direction
def validate(self) -> None:
21    def validate(self) -> None:
22        if not isinstance(self.exp, (Column, CurriedFunction, Function)):
23            raise InvalidExpressionError(
24                "OrderBy expression must be a Column, CurriedFunction or Function"
25            )
26        if not isinstance(self.direction, Direction):
27            raise InvalidExpressionError("OrderBy direction must be a Direction")
@dataclass(frozen=True)
class Query(snuba_sdk.query.BaseQuery):
 56@dataclass(frozen=True)
 57class Query(BaseQuery):
 58    """
 59    A code representation of a SnQL query. It is immutable, so any set functions
 60    return a new copy of the query. Unlike Expressions it is possible to
 61    instantiate a Query that is invalid. Any of the translation functions will
 62    validate the query before translating them, so the query must be valid before
 63    they are called.
 64    """
 65
 66    # These must be listed in the order that they must appear in the SnQL query.
 67    match: Union[Entity, Storage, Join, Query]
 68    select: Optional[Sequence[SelectableExpression]] = None
 69    groupby: Optional[Sequence[SelectableExpression]] = None
 70    array_join: Optional[Sequence[Column]] = None
 71    where: Optional[ConditionGroup] = None
 72    having: Optional[ConditionGroup] = None
 73    orderby: Optional[Sequence[OrderBy]] = None
 74    limitby: Optional[LimitBy] = None
 75    limit: Optional[Limit] = None
 76    offset: Optional[Offset] = None
 77    granularity: Optional[Granularity] = None
 78    totals: Optional[Totals] = None
 79
 80    def __post_init__(self) -> None:
 81        """
 82        This has a different validation flow from normal expressions, since a query
 83        is not necessarily always correct. For example, you can create a Query with
 84        no select columns, which will fail in the validate. However it shouldn't fail
 85        right away since the select columns can be added later.
 86
 87        """
 88        if not isinstance(self.match, (Query, Join, Entity, Storage)):
 89            raise InvalidQueryError(
 90                "queries must have a valid Entity, Storage, Join or Query"
 91            )
 92
 93        if isinstance(self.match, Query):
 94            try:
 95                self.match.validate()
 96            except Exception as e:
 97                raise InvalidQueryError(f"inner query is invalid: {e}") from e
 98
 99    def _replace(self, field: str, value: Any) -> Query:
100        new = replace(self, **{field: value})
101        return new
102
103    def set_match(self, match: Union[Entity, Storage, Join, Query]) -> Query:
104        if not isinstance(match, (Entity, Join, Query)):
105            raise InvalidQueryError(
106                f"{match} must be a valid Entity, Storage, Join or Query"
107            )
108        elif isinstance(match, Query):
109            try:
110                match.validate()
111            except Exception as e:
112                raise InvalidQueryError(f"inner query is invalid: {e}") from e
113
114        return self._replace("match", match)
115
116    def set_select(self, select: Sequence[SelectableExpression]) -> Query:
117        if not list_type(select, SelectableExpressionType) or not select:
118            raise InvalidQueryError(
119                "select clause must be a non-empty list of SelectableExpression"
120            )
121        return self._replace("select", select)
122
123    def set_groupby(self, groupby: Sequence[SelectableExpression]) -> Query:
124        if not list_type(groupby, SelectableExpressionType):
125            raise InvalidQueryError(
126                "groupby clause must be a list of SelectableExpression"
127            )
128        return self._replace("groupby", groupby)
129
130    def set_array_join(self, array_join: Sequence[Column]) -> Query:
131        if not list_type(array_join, [Column]) or len(array_join) < 1:
132            raise InvalidQueryError("array join must be a non-empty list of Column")
133
134        return self._replace("array_join", array_join)
135
136    def set_where(self, conditions: ConditionGroup) -> Query:
137        if not list_type(conditions, (BooleanCondition, Condition)):
138            raise InvalidQueryError("where clause must be a list of conditions")
139        return self._replace("where", conditions)
140
141    def set_having(self, conditions: ConditionGroup) -> Query:
142        if not list_type(conditions, (BooleanCondition, Condition)):
143            raise InvalidQueryError("having clause must be a list of conditions")
144        return self._replace("having", conditions)
145
146    def set_orderby(self, orderby: Sequence[OrderBy]) -> Query:
147        if not list_type(orderby, (OrderBy,)):
148            raise InvalidQueryError("orderby clause must be a list of OrderBy")
149        return self._replace("orderby", orderby)
150
151    def set_limitby(self, limitby: LimitBy) -> Query:
152        if not isinstance(limitby, LimitBy):
153            raise InvalidQueryError("limitby clause must be a LimitBy")
154        return self._replace("limitby", limitby)
155
156    def set_limit(self, limit: int) -> Query:
157        return self._replace("limit", Limit(limit))
158
159    def set_offset(self, offset: int) -> Query:
160        return self._replace("offset", Offset(offset))
161
162    def set_granularity(self, granularity: int) -> Query:
163        return self._replace("granularity", Granularity(granularity))
164
165    def set_totals(self, totals: bool) -> Query:
166        return self._replace("totals", Totals(totals))
167
168    def validate(self) -> None:
169        VALIDATOR.visit(self)
170
171    def __str__(self) -> str:
172        return self.serialize()
173
174    def serialize(self) -> str:
175        self.validate()
176        optimized = self._optimize()
177        return Printer().visit(optimized)
178
179    def print(self) -> str:
180        self.validate()
181        return Printer(pretty=True).visit(self)
182
183    def _optimize(self) -> Query:
184        if self.where is not None:
185            new_where = OrOptimizer().optimize(self.where)
186            if new_where is not None:
187                return replace(self, where=new_where)
188        return self

A code representation of a SnQL query. It is immutable, so any set functions return a new copy of the query. Unlike Expressions it is possible to instantiate a Query that is invalid. Any of the translation functions will validate the query before translating them, so the query must be valid before they are called.

Query( match: Union[Entity, Storage, Join, Query], select: Optional[Sequence[Union[AliasedExpression, Column, CurriedFunction, Function]]] = None, groupby: Optional[Sequence[Union[AliasedExpression, Column, CurriedFunction, Function]]] = None, array_join: Optional[Sequence[Column]] = None, where: Optional[Sequence[Union[BooleanCondition, Condition]]] = None, having: Optional[Sequence[Union[BooleanCondition, Condition]]] = None, orderby: Optional[Sequence[OrderBy]] = None, limitby: Optional[LimitBy] = None, limit: Optional[Limit] = None, offset: Optional[Offset] = None, granularity: Optional[Granularity] = None, totals: Optional[Totals] = None)
match: Union[Entity, Storage, Join, Query]
select: Optional[Sequence[Union[AliasedExpression, Column, CurriedFunction, Function]]] = None
groupby: Optional[Sequence[Union[AliasedExpression, Column, CurriedFunction, Function]]] = None
array_join: Optional[Sequence[Column]] = None
where: Optional[Sequence[Union[BooleanCondition, Condition]]] = None
having: Optional[Sequence[Union[BooleanCondition, Condition]]] = None
orderby: Optional[Sequence[OrderBy]] = None
limitby: Optional[LimitBy] = None
limit: Optional[Limit] = None
offset: Optional[Offset] = None
granularity: Optional[Granularity] = None
totals: Optional[Totals] = None
def set_match( self, match: Union[Entity, Storage, Join, Query]) -> Query:
103    def set_match(self, match: Union[Entity, Storage, Join, Query]) -> Query:
104        if not isinstance(match, (Entity, Join, Query)):
105            raise InvalidQueryError(
106                f"{match} must be a valid Entity, Storage, Join or Query"
107            )
108        elif isinstance(match, Query):
109            try:
110                match.validate()
111            except Exception as e:
112                raise InvalidQueryError(f"inner query is invalid: {e}") from e
113
114        return self._replace("match", match)
def set_select( self, select: Sequence[Union[AliasedExpression, Column, CurriedFunction, Function]]) -> Query:
116    def set_select(self, select: Sequence[SelectableExpression]) -> Query:
117        if not list_type(select, SelectableExpressionType) or not select:
118            raise InvalidQueryError(
119                "select clause must be a non-empty list of SelectableExpression"
120            )
121        return self._replace("select", select)
def set_groupby( self, groupby: Sequence[Union[AliasedExpression, Column, CurriedFunction, Function]]) -> Query:
123    def set_groupby(self, groupby: Sequence[SelectableExpression]) -> Query:
124        if not list_type(groupby, SelectableExpressionType):
125            raise InvalidQueryError(
126                "groupby clause must be a list of SelectableExpression"
127            )
128        return self._replace("groupby", groupby)
def set_array_join( self, array_join: Sequence[Column]) -> Query:
130    def set_array_join(self, array_join: Sequence[Column]) -> Query:
131        if not list_type(array_join, [Column]) or len(array_join) < 1:
132            raise InvalidQueryError("array join must be a non-empty list of Column")
133
134        return self._replace("array_join", array_join)
def set_where( self, conditions: Sequence[Union[BooleanCondition, Condition]]) -> Query:
136    def set_where(self, conditions: ConditionGroup) -> Query:
137        if not list_type(conditions, (BooleanCondition, Condition)):
138            raise InvalidQueryError("where clause must be a list of conditions")
139        return self._replace("where", conditions)
def set_having( self, conditions: Sequence[Union[BooleanCondition, Condition]]) -> Query:
141    def set_having(self, conditions: ConditionGroup) -> Query:
142        if not list_type(conditions, (BooleanCondition, Condition)):
143            raise InvalidQueryError("having clause must be a list of conditions")
144        return self._replace("having", conditions)
def set_orderby( self, orderby: Sequence[OrderBy]) -> Query:
146    def set_orderby(self, orderby: Sequence[OrderBy]) -> Query:
147        if not list_type(orderby, (OrderBy,)):
148            raise InvalidQueryError("orderby clause must be a list of OrderBy")
149        return self._replace("orderby", orderby)
def set_limitby(self, limitby: LimitBy) -> Query:
151    def set_limitby(self, limitby: LimitBy) -> Query:
152        if not isinstance(limitby, LimitBy):
153            raise InvalidQueryError("limitby clause must be a LimitBy")
154        return self._replace("limitby", limitby)
def set_limit(self, limit: int) -> Query:
156    def set_limit(self, limit: int) -> Query:
157        return self._replace("limit", Limit(limit))
def set_offset(self, offset: int) -> Query:
159    def set_offset(self, offset: int) -> Query:
160        return self._replace("offset", Offset(offset))
def set_granularity(self, granularity: int) -> Query:
162    def set_granularity(self, granularity: int) -> Query:
163        return self._replace("granularity", Granularity(granularity))
def set_totals(self, totals: bool) -> Query:
165    def set_totals(self, totals: bool) -> Query:
166        return self._replace("totals", Totals(totals))
def validate(self) -> None:
168    def validate(self) -> None:
169        VALIDATOR.visit(self)
def serialize(self) -> str:
174    def serialize(self) -> str:
175        self.validate()
176        optimized = self._optimize()
177        return Printer().visit(optimized)
def print(self) -> str:
179    def print(self) -> str:
180        self.validate()
181        return Printer(pretty=True).visit(self)
Inherited Members
snuba_sdk.query.BaseQuery
get_fields
@dataclass(frozen=True)
class Relationship(snuba_sdk.expressions.Expression):
11@dataclass(frozen=True)
12class Relationship(Expression):
13    """
14    A representation of a relationship between two Entities. The relationship
15    name should be defined in the data model of the LHS and entity in Snuba.
16    Both Entities must have a valid alias, which will be used to qualify the
17    columns in the SnQL query.
18
19    :param lhs: The Entity that owns the relationship.
20    :type name: Entity
21    :param name: The name of the relationship on the LHS Entity.
22    :type name: str
23    :param rhs: The Entity connected to the LHS using the relationship.
24    :type name: Entity
25
26    :raises InvalidExpressionError: If the incorrect types are used or if either
27        of the Entities does not have an alias.
28
29    """
30
31    lhs: Entity
32    name: str
33    rhs: Entity
34
35    def validate(self) -> None:
36        def valid_entity(e: Any) -> None:
37            if not isinstance(e, Entity):
38                raise InvalidExpressionError(f"'{e}' must be an Entity")
39            elif e.alias is None:
40                raise InvalidExpressionError(f"{e} must have a valid alias")
41
42        valid_entity(self.lhs)
43        valid_entity(self.rhs)
44
45        if not isinstance(self.name, str) or not self.name:
46            raise InvalidExpressionError(
47                f"'{self.name}' is not a valid relationship name"
48            )

A representation of a relationship between two Entities. The relationship name should be defined in the data model of the LHS and entity in Snuba. Both Entities must have a valid alias, which will be used to qualify the columns in the SnQL query.

Parameters
  • lhs: The Entity that owns the relationship.
  • name: The name of the relationship on the LHS Entity.
  • rhs: The Entity connected to the LHS using the relationship.
Raises
  • InvalidExpressionError: If the incorrect types are used or if either of the Entities does not have an alias.
Relationship( lhs: Entity, name: str, rhs: Entity)
lhs: Entity
name: str
rhs: Entity
def validate(self) -> None:
35    def validate(self) -> None:
36        def valid_entity(e: Any) -> None:
37            if not isinstance(e, Entity):
38                raise InvalidExpressionError(f"'{e}' must be an Entity")
39            elif e.alias is None:
40                raise InvalidExpressionError(f"{e} must have a valid alias")
41
42        valid_entity(self.lhs)
43        valid_entity(self.rhs)
44
45        if not isinstance(self.name, str) or not self.name:
46            raise InvalidExpressionError(
47                f"'{self.name}' is not a valid relationship name"
48            )
@dataclass
class Request:
 47@dataclass
 48class Request:
 49    dataset: str
 50    app_id: str
 51    query: BaseQuery
 52    flags: Flags = field(default_factory=Flags)
 53    parent_api: str = "<unknown>"
 54    tenant_ids: Mapping[str, str | int] = field(default_factory=dict)
 55
 56    def validate(self) -> None:
 57        if not self.dataset or not isinstance(self.dataset, str):
 58            raise InvalidRequestError("Request must have a valid dataset")
 59        elif not FLAG_RE.match(self.dataset):
 60            raise InvalidRequestError(f"'{self.dataset}' is not a valid dataset")
 61
 62        if not self.app_id or not isinstance(self.app_id, str):
 63            raise InvalidRequestError("Request must have a valid app_id")
 64        if not FLAG_RE.match(self.app_id):
 65            raise InvalidRequestError(f"'{self.app_id}' is not a valid app_id")
 66
 67        if not self.parent_api or not isinstance(self.parent_api, str):
 68            raise InvalidRequestError(f"`{self.parent_api}` is not a valid parent_api")
 69
 70        if not isinstance(self.tenant_ids, dict):
 71            raise InvalidRequestError("Request must have a `tenant_ids` dictionary")
 72
 73        self.query.validate()
 74        if self.flags is not None:
 75            self.flags.validate()
 76
 77    def to_dict(self) -> dict[str, object]:
 78        self.validate()
 79        flags = self.flags.to_dict() if self.flags is not None else {}
 80
 81        mql_context = None
 82        if isinstance(self.query, MetricsQuery):
 83            serialized_mql = self.query.serialize()
 84            assert isinstance(serialized_mql, dict)  # mypy
 85            mql_context = serialized_mql["mql_context"]
 86            query = str(serialized_mql["mql"])
 87        elif isinstance(self.query, DeleteQuery):
 88            """
 89            for a DeleteQuery, the query is not a snql/mql string,
 90            it is a dict
 91            """
 92            return {
 93                **flags,
 94                "query": self.query.serialize(),
 95                "app_id": self.app_id,
 96                "tenant_ids": self.tenant_ids,
 97                "parent_api": self.parent_api,
 98            }
 99        else:
100            query = str(self.query.serialize())
101
102        ret: dict[str, object] = {
103            **flags,
104            "query": query,
105            "dataset": self.dataset,
106            "app_id": self.app_id,
107            "tenant_ids": self.tenant_ids,
108            "parent_api": self.parent_api,
109        }
110        if mql_context is not None:
111            ret["mql_context"] = mql_context
112        return ret
113
114    def serialize(self) -> str:
115        return json.dumps(self.to_dict())
116
117    def serialize_mql(self) -> str:
118        # NOTE: This function is temporary, just to help with a cutover in the Sentry codebase.
119        # It will be removed in a future version.
120        return json.dumps(self.to_dict())
121
122    def __str__(self) -> str:
123        return self.serialize()
124
125    def print(self) -> str:
126        self.validate()
127        output = self.to_dict()
128        return json.dumps(output, sort_keys=True, indent=4 * " ")
Request( dataset: str, app_id: str, query: snuba_sdk.query.BaseQuery, flags: Flags = <factory>, parent_api: str = '<unknown>', tenant_ids: Mapping[str, str | int] = <factory>)
dataset: str
app_id: str
query: snuba_sdk.query.BaseQuery
flags: Flags
parent_api: str = '<unknown>'
tenant_ids: Mapping[str, str | int]
def validate(self) -> None:
56    def validate(self) -> None:
57        if not self.dataset or not isinstance(self.dataset, str):
58            raise InvalidRequestError("Request must have a valid dataset")
59        elif not FLAG_RE.match(self.dataset):
60            raise InvalidRequestError(f"'{self.dataset}' is not a valid dataset")
61
62        if not self.app_id or not isinstance(self.app_id, str):
63            raise InvalidRequestError("Request must have a valid app_id")
64        if not FLAG_RE.match(self.app_id):
65            raise InvalidRequestError(f"'{self.app_id}' is not a valid app_id")
66
67        if not self.parent_api or not isinstance(self.parent_api, str):
68            raise InvalidRequestError(f"`{self.parent_api}` is not a valid parent_api")
69
70        if not isinstance(self.tenant_ids, dict):
71            raise InvalidRequestError("Request must have a `tenant_ids` dictionary")
72
73        self.query.validate()
74        if self.flags is not None:
75            self.flags.validate()
def to_dict(self) -> dict[str, object]:
 77    def to_dict(self) -> dict[str, object]:
 78        self.validate()
 79        flags = self.flags.to_dict() if self.flags is not None else {}
 80
 81        mql_context = None
 82        if isinstance(self.query, MetricsQuery):
 83            serialized_mql = self.query.serialize()
 84            assert isinstance(serialized_mql, dict)  # mypy
 85            mql_context = serialized_mql["mql_context"]
 86            query = str(serialized_mql["mql"])
 87        elif isinstance(self.query, DeleteQuery):
 88            """
 89            for a DeleteQuery, the query is not a snql/mql string,
 90            it is a dict
 91            """
 92            return {
 93                **flags,
 94                "query": self.query.serialize(),
 95                "app_id": self.app_id,
 96                "tenant_ids": self.tenant_ids,
 97                "parent_api": self.parent_api,
 98            }
 99        else:
100            query = str(self.query.serialize())
101
102        ret: dict[str, object] = {
103            **flags,
104            "query": query,
105            "dataset": self.dataset,
106            "app_id": self.app_id,
107            "tenant_ids": self.tenant_ids,
108            "parent_api": self.parent_api,
109        }
110        if mql_context is not None:
111            ret["mql_context"] = mql_context
112        return ret
def serialize(self) -> str:
114    def serialize(self) -> str:
115        return json.dumps(self.to_dict())
def serialize_mql(self) -> str:
117    def serialize_mql(self) -> str:
118        # NOTE: This function is temporary, just to help with a cutover in the Sentry codebase.
119        # It will be removed in a future version.
120        return json.dumps(self.to_dict())
def print(self) -> str:
125    def print(self) -> str:
126        self.validate()
127        output = self.to_dict()
128        return json.dumps(output, sort_keys=True, indent=4 * " ")
@dataclass(frozen=True)
class Rollup:
164@dataclass(frozen=True)
165class Rollup:
166    """
167    Rollup instructs how the timeseries queries should be grouped on time. If the query is for a set of timeseries, then
168    the interval field should be specified. It is the number of seconds to group the timeseries by.
169    For a query that returns only the totals, specify Totals(True). A totals query can be ordered using the orderby field.
170    If totals is set to True and the interval is specified, then an extra row will be returned in the result with the totals
171    for the timeseries.
172    """
173
174    interval: int | None = None
175    totals: bool | None = None
176    orderby: Direction | None = None  # TODO: This doesn't make sense: ordered by what?
177    granularity: int | None = None
178
179    def __post_init__(self) -> None:
180        self.validate()
181
182    def validate(self) -> None:
183        # The interval is used to determine how the timestamp is rolled up in the group by of the query.
184        # The granularity is separate since it ultimately determines which data we retrieve.
185        if self.granularity and self.granularity not in ALLOWED_GRANULARITIES:
186            raise InvalidExpressionError(
187                f"granularity must be an integer and one of {ALLOWED_GRANULARITIES}"
188            )
189
190        if self.interval is not None:
191            _validate_int_literal(
192                "interval", self.interval, 10, None
193            )  # Minimum 10 seconds
194            if self.granularity is not None and self.interval < self.granularity:
195                raise InvalidExpressionError(
196                    "interval must be greater than or equal to granularity"
197                )
198
199        if self.totals is not None:
200            if not isinstance(self.totals, bool):
201                raise InvalidExpressionError("totals must be a boolean")
202
203        if self.interval is None and self.totals is None:
204            raise InvalidExpressionError(
205                "Rollup must have at least one of interval or totals"
206            )
207
208        if self.orderby is not None:
209            if not isinstance(self.orderby, Direction):
210                raise InvalidExpressionError("orderby must be a Direction object")
211
212        if self.interval is not None and self.orderby is not None:
213            raise InvalidExpressionError(
214                "Timeseries queries can't be ordered when using interval"
215            )

Rollup instructs how the timeseries queries should be grouped on time. If the query is for a set of timeseries, then the interval field should be specified. It is the number of seconds to group the timeseries by. For a query that returns only the totals, specify Totals(True). A totals query can be ordered using the orderby field. If totals is set to True and the interval is specified, then an extra row will be returned in the result with the totals for the timeseries.

Rollup( interval: int | None = None, totals: bool | None = None, orderby: Direction | None = None, granularity: int | None = None)
interval: int | None = None
totals: bool | None = None
orderby: Direction | None = None
granularity: int | None = None
def validate(self) -> None:
182    def validate(self) -> None:
183        # The interval is used to determine how the timestamp is rolled up in the group by of the query.
184        # The granularity is separate since it ultimately determines which data we retrieve.
185        if self.granularity and self.granularity not in ALLOWED_GRANULARITIES:
186            raise InvalidExpressionError(
187                f"granularity must be an integer and one of {ALLOWED_GRANULARITIES}"
188            )
189
190        if self.interval is not None:
191            _validate_int_literal(
192                "interval", self.interval, 10, None
193            )  # Minimum 10 seconds
194            if self.granularity is not None and self.interval < self.granularity:
195                raise InvalidExpressionError(
196                    "interval must be greater than or equal to granularity"
197                )
198
199        if self.totals is not None:
200            if not isinstance(self.totals, bool):
201                raise InvalidExpressionError("totals must be a boolean")
202
203        if self.interval is None and self.totals is None:
204            raise InvalidExpressionError(
205                "Rollup must have at least one of interval or totals"
206            )
207
208        if self.orderby is not None:
209            if not isinstance(self.orderby, Direction):
210                raise InvalidExpressionError("orderby must be a Direction object")
211
212        if self.interval is not None and self.orderby is not None:
213            raise InvalidExpressionError(
214                "Timeseries queries can't be ordered when using interval"
215            )
@dataclass(frozen=True, repr=False)
class Storage(snuba_sdk.expressions.Expression):
16@dataclass(frozen=True, repr=False)
17class Storage(Expression):
18    name: str
19    sample: Optional[float] = None
20    data_model: Optional[DataModel] = field(hash=False, default=None)
21
22    def validate(self) -> None:
23        if not isinstance(self.name, str) or not storage_name_re.match(self.name):
24            raise InvalidStorageError(f"'{self.name}' is not a valid storage name")
25
26        if self.sample is not None:
27            if not isinstance(self.sample, float):
28                raise InvalidStorageError("sample must be a float")
29            elif self.sample <= 0.0:
30                raise InvalidStorageError("samples must be greater than 0.0")
31
32        if self.data_model is not None:
33            if not isinstance(self.data_model, DataModel):
34                raise InvalidStorageError("data_model must be an instance of DataModel")
35
36    def __repr__(self) -> str:
37        sample = f", sample={self.sample}" if self.sample is not None else ""
38        return f"STORAGE('{self.name}'{sample})"
Storage( name: str, sample: Optional[float] = None, data_model: Optional[snuba_sdk.schema.DataModel] = None)
name: str
sample: Optional[float] = None
data_model: Optional[snuba_sdk.schema.DataModel] = None
def validate(self) -> None:
22    def validate(self) -> None:
23        if not isinstance(self.name, str) or not storage_name_re.match(self.name):
24            raise InvalidStorageError(f"'{self.name}' is not a valid storage name")
25
26        if self.sample is not None:
27            if not isinstance(self.sample, float):
28                raise InvalidStorageError("sample must be a float")
29            elif self.sample <= 0.0:
30                raise InvalidStorageError("samples must be greater than 0.0")
31
32        if self.data_model is not None:
33            if not isinstance(self.data_model, DataModel):
34                raise InvalidStorageError("data_model must be an instance of DataModel")
@dataclass
class Timeseries:
 70@dataclass
 71class Timeseries:
 72    """
 73    A code representation of a single timeseries. This is the basic unit of a metrics query.
 74    A raw metric, aggregated by an aggregate function. It can be filtered by tag conditions.
 75    It can also grouped by a set of tag values, which will return one timeseries for each unique
 76    grouping of tag values.
 77    """
 78
 79    metric: Metric
 80    aggregate: str
 81    aggregate_params: list[Any] | None = None
 82    filters: ConditionGroup | None = None
 83    groupby: list[Column | AliasedExpression] | None = None
 84
 85    def __post_init__(self) -> None:
 86        self.validate()
 87
 88    def get_fields(self) -> Sequence[str]:
 89        self_fields = fields(self)  # Verified the order in the Python source
 90        return tuple(f.name for f in self_fields)
 91
 92    def validate(self) -> None:
 93        if not isinstance(self.metric, Metric):
 94            raise InvalidTimeseriesError("metric must be an instance of a Metric")
 95        self.metric.validate()
 96
 97        # TODO: Restrict which specific aggregates are allowed
 98        # TODO: Validate aggregate_params based on the aggregate supplied e.g. quantile needs a float
 99        if not isinstance(self.aggregate, str):
100            raise InvalidTimeseriesError("aggregate must be a string")
101        if self.aggregate_params is not None:
102            if not isinstance(self.aggregate_params, list):
103                raise InvalidTimeseriesError("aggregate_params must be a list")
104            for p in self.aggregate_params:
105                if not is_literal(p):
106                    raise InvalidTimeseriesError(
107                        "aggregate_params can only be literal types"
108                    )
109
110        # TODO: Validate these are tag conditions only
111        # TODO: Validate these are simple conditions e.g. tag[x] op literal
112        if self.filters is not None:
113            if not isinstance(self.filters, list):
114                raise InvalidTimeseriesError("filters must be a list")
115            for f in self.filters:
116                if not isinstance(f, (Condition, BooleanCondition)):
117                    raise InvalidTimeseriesError("filters must be a list of Conditions")
118
119        # TODO: Can you group by meta information like project_id?
120        # TODO: Validate these are appropriate columns for grouping
121        if self.groupby is not None:
122            if not isinstance(self.groupby, list):
123                raise InvalidTimeseriesError("groupby must be a list")
124            for g in self.groupby:
125                if not isinstance(g, (Column, AliasedExpression)):
126                    raise InvalidTimeseriesError(
127                        "groupby must be a list of Columns or AliasedExpression"
128                    )
129
130    def set_metric(self, metric: Metric) -> Timeseries:
131        if not isinstance(metric, Metric):
132            raise InvalidTimeseriesError("metric must be a Metric")
133        return replace(self, metric=metric)
134
135    def set_aggregate(
136        self, aggregate: str, aggregate_params: list[Any] | None = None
137    ) -> Timeseries:
138        if not isinstance(aggregate, str):
139            raise InvalidTimeseriesError("aggregate must be a str")
140        if aggregate_params is not None and not isinstance(aggregate_params, list):
141            raise InvalidTimeseriesError("aggregate_params must be a list")
142        return replace(self, aggregate=aggregate, aggregate_params=aggregate_params)
143
144    def set_filters(self, filters: ConditionGroup | None) -> Timeseries:
145        if filters is not None and not list_type(
146            filters, (BooleanCondition, Condition)
147        ):
148            raise InvalidTimeseriesError("filters must be a list of Conditions")
149        return replace(self, filters=filters)
150
151    def set_groupby(
152        self, groupby: list[Column | AliasedExpression] | None
153    ) -> Timeseries:
154        if groupby is not None and not list_type(groupby, (Column, AliasedExpression)):
155            raise InvalidTimeseriesError(
156                "groupby must be a list of Columns or AliasedExpression"
157            )
158        return replace(self, groupby=groupby)

A code representation of a single timeseries. This is the basic unit of a metrics query. A raw metric, aggregated by an aggregate function. It can be filtered by tag conditions. It can also grouped by a set of tag values, which will return one timeseries for each unique grouping of tag values.

Timeseries( metric: Metric, aggregate: str, aggregate_params: list[typing.Any] | None = None, filters: Optional[Sequence[Union[BooleanCondition, Condition]]] = None, groupby: list[Column | AliasedExpression] | None = None)
metric: Metric
aggregate: str
aggregate_params: list[typing.Any] | None = None
filters: Optional[Sequence[Union[BooleanCondition, Condition]]] = None
groupby: list[Column | AliasedExpression] | None = None
def get_fields(self) -> Sequence[str]:
88    def get_fields(self) -> Sequence[str]:
89        self_fields = fields(self)  # Verified the order in the Python source
90        return tuple(f.name for f in self_fields)
def validate(self) -> None:
 92    def validate(self) -> None:
 93        if not isinstance(self.metric, Metric):
 94            raise InvalidTimeseriesError("metric must be an instance of a Metric")
 95        self.metric.validate()
 96
 97        # TODO: Restrict which specific aggregates are allowed
 98        # TODO: Validate aggregate_params based on the aggregate supplied e.g. quantile needs a float
 99        if not isinstance(self.aggregate, str):
100            raise InvalidTimeseriesError("aggregate must be a string")
101        if self.aggregate_params is not None:
102            if not isinstance(self.aggregate_params, list):
103                raise InvalidTimeseriesError("aggregate_params must be a list")
104            for p in self.aggregate_params:
105                if not is_literal(p):
106                    raise InvalidTimeseriesError(
107                        "aggregate_params can only be literal types"
108                    )
109
110        # TODO: Validate these are tag conditions only
111        # TODO: Validate these are simple conditions e.g. tag[x] op literal
112        if self.filters is not None:
113            if not isinstance(self.filters, list):
114                raise InvalidTimeseriesError("filters must be a list")
115            for f in self.filters:
116                if not isinstance(f, (Condition, BooleanCondition)):
117                    raise InvalidTimeseriesError("filters must be a list of Conditions")
118
119        # TODO: Can you group by meta information like project_id?
120        # TODO: Validate these are appropriate columns for grouping
121        if self.groupby is not None:
122            if not isinstance(self.groupby, list):
123                raise InvalidTimeseriesError("groupby must be a list")
124            for g in self.groupby:
125                if not isinstance(g, (Column, AliasedExpression)):
126                    raise InvalidTimeseriesError(
127                        "groupby must be a list of Columns or AliasedExpression"
128                    )
def set_metric( self, metric: Metric) -> Timeseries:
130    def set_metric(self, metric: Metric) -> Timeseries:
131        if not isinstance(metric, Metric):
132            raise InvalidTimeseriesError("metric must be a Metric")
133        return replace(self, metric=metric)
def set_aggregate( self, aggregate: str, aggregate_params: list[typing.Any] | None = None) -> Timeseries:
135    def set_aggregate(
136        self, aggregate: str, aggregate_params: list[Any] | None = None
137    ) -> Timeseries:
138        if not isinstance(aggregate, str):
139            raise InvalidTimeseriesError("aggregate must be a str")
140        if aggregate_params is not None and not isinstance(aggregate_params, list):
141            raise InvalidTimeseriesError("aggregate_params must be a list")
142        return replace(self, aggregate=aggregate, aggregate_params=aggregate_params)
def set_filters( self, filters: Optional[Sequence[Union[BooleanCondition, Condition]]]) -> Timeseries:
144    def set_filters(self, filters: ConditionGroup | None) -> Timeseries:
145        if filters is not None and not list_type(
146            filters, (BooleanCondition, Condition)
147        ):
148            raise InvalidTimeseriesError("filters must be a list of Conditions")
149        return replace(self, filters=filters)
def set_groupby( self, groupby: list[Column | AliasedExpression] | None) -> Timeseries:
151    def set_groupby(
152        self, groupby: list[Column | AliasedExpression] | None
153    ) -> Timeseries:
154        if groupby is not None and not list_type(groupby, (Column, AliasedExpression)):
155            raise InvalidTimeseriesError(
156                "groupby must be a list of Columns or AliasedExpression"
157            )
158        return replace(self, groupby=groupby)
@dataclass(frozen=True)
class Totals(snuba_sdk.expressions.Expression):
115@dataclass(frozen=True)
116class Totals(Expression):
117    totals: bool
118
119    def validate(self) -> None:
120        if not isinstance(self.totals, bool):
121            raise InvalidExpressionError("totals must be a boolean")
Totals(totals: bool)
totals: bool
def validate(self) -> None:
119    def validate(self) -> None:
120        if not isinstance(self.totals, bool):
121            raise InvalidExpressionError("totals must be a boolean")
@dataclass(frozen=True)
class DeleteQuery(snuba_sdk.query.BaseQuery):
12@dataclass(frozen=True)
13class DeleteQuery(BaseQuery):
14    """
15    This represents a snuba delete query.
16    Inputs:
17        storage - the storage to delete from
18        columnConditions - a mapping from column-name to a list of column values
19            that defines the delete conditions. ex:
20            {
21                "id": [1, 2, 3]
22                "status": ["failed"]
23            }
24            represents
25            DELETE FROM ... WHERE id in (1,2,3) AND status='failed'
26    Deletes all rows in the given storage, that satisfy the conditions
27    defined in 'columnConditions'.
28    """
29
30    storage_name: str
31    column_conditions: Dict[str, List[Union[str, int]]]
32
33    def validate(self) -> None:
34        if self.column_conditions == {}:
35            raise InvalidDeleteQueryError("column conditions cannot be empty")
36
37        for col, values in self.column_conditions.items():
38            if len(values) == 0:
39                raise InvalidDeleteQueryError(
40                    f"column condition '{col}' cannot be empty"
41                )
42
43    def serialize(self) -> Union[str, Dict[str, Any]]:
44        # the body of the request
45        self.validate()
46        return {"columns": self.column_conditions}
47
48    def print(self) -> str:
49        return repr(self)
50
51    def __repr__(self) -> str:
52        return f"DeleteQuery(storage_name={repr(self.storage_name)}, columnsConditions={repr(self.column_conditions)})"

This represents a snuba delete query. Inputs: storage - the storage to delete from columnConditions - a mapping from column-name to a list of column values that defines the delete conditions. ex: { "id": [1, 2, 3] "status": ["failed"] } represents DELETE FROM ... WHERE id in (1,2,3) AND status='failed' Deletes all rows in the given storage, that satisfy the conditions defined in 'columnConditions'.

DeleteQuery( storage_name: str, column_conditions: Dict[str, List[Union[str, int]]])
storage_name: str
column_conditions: Dict[str, List[Union[str, int]]]
def validate(self) -> None:
33    def validate(self) -> None:
34        if self.column_conditions == {}:
35            raise InvalidDeleteQueryError("column conditions cannot be empty")
36
37        for col, values in self.column_conditions.items():
38            if len(values) == 0:
39                raise InvalidDeleteQueryError(
40                    f"column condition '{col}' cannot be empty"
41                )
def serialize(self) -> Union[str, Dict[str, Any]]:
43    def serialize(self) -> Union[str, Dict[str, Any]]:
44        # the body of the request
45        self.validate()
46        return {"columns": self.column_conditions}
def print(self) -> str:
48    def print(self) -> str:
49        return repr(self)
Inherited Members
snuba_sdk.query.BaseQuery
get_fields