Source code for sentry_sdk.envelope

import io
import json
import mimetypes

from sentry_sdk.session import Session
from sentry_sdk.utils import json_dumps, capture_internal_exceptions

from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from typing import Any
    from typing import Optional
    from typing import Union
    from typing import Dict
    from typing import List
    from typing import Iterator

    from sentry_sdk._types import Event, EventDataCategory


def parse_json(data):
    # type: (Union[bytes, str]) -> Any
    # on some python 3 versions this needs to be bytes
    if isinstance(data, bytes):
        data = data.decode("utf-8", "replace")
    return json.loads(data)


[docs] class Envelope: """ Represents a Sentry Envelope. The calling code is responsible for adhering to the constraints documented in the Sentry docs: https://develop.sentry.dev/sdk/envelopes/#data-model. In particular, each envelope may have at most one Item with type "event" or "transaction" (but not both). """ def __init__( self, headers=None, # type: Optional[Dict[str, Any]] items=None, # type: Optional[List[Item]] ): # type: (...) -> None if headers is not None: headers = dict(headers) self.headers = headers or {} if items is None: items = [] else: items = list(items) self.items = items @property def description(self): # type: (...) -> str return "envelope with %s items (%s)" % ( len(self.items), ", ".join(x.data_category for x in self.items), ) def add_event( self, event # type: Event ): # type: (...) -> None self.add_item(Item(payload=PayloadRef(json=event), type="event")) def add_transaction( self, transaction # type: Event ): # type: (...) -> None self.add_item(Item(payload=PayloadRef(json=transaction), type="transaction")) def add_profile( self, profile # type: Any ): # type: (...) -> None self.add_item(Item(payload=PayloadRef(json=profile), type="profile")) def add_profile_chunk( self, profile_chunk # type: Any ): # type: (...) -> None self.add_item( Item(payload=PayloadRef(json=profile_chunk), type="profile_chunk") ) def add_checkin( self, checkin # type: Any ): # type: (...) -> None self.add_item(Item(payload=PayloadRef(json=checkin), type="check_in")) def add_session( self, session # type: Union[Session, Any] ): # type: (...) -> None if isinstance(session, Session): session = session.to_json() self.add_item(Item(payload=PayloadRef(json=session), type="session")) def add_sessions( self, sessions # type: Any ): # type: (...) -> None self.add_item(Item(payload=PayloadRef(json=sessions), type="sessions")) def add_item( self, item # type: Item ): # type: (...) -> None self.items.append(item) def get_event(self): # type: (...) -> Optional[Event] for items in self.items: event = items.get_event() if event is not None: return event return None def get_transaction_event(self): # type: (...) -> Optional[Event] for item in self.items: event = item.get_transaction_event() if event is not None: return event return None def __iter__(self): # type: (...) -> Iterator[Item] return iter(self.items) def serialize_into( self, f # type: Any ): # type: (...) -> None f.write(json_dumps(self.headers)) f.write(b"\n") for item in self.items: item.serialize_into(f) def serialize(self): # type: (...) -> bytes out = io.BytesIO() self.serialize_into(out) return out.getvalue() @classmethod def deserialize_from( cls, f # type: Any ): # type: (...) -> Envelope headers = parse_json(f.readline()) items = [] while 1: item = Item.deserialize_from(f) if item is None: break items.append(item) return cls(headers=headers, items=items) @classmethod def deserialize( cls, bytes # type: bytes ): # type: (...) -> Envelope return cls.deserialize_from(io.BytesIO(bytes)) def __repr__(self): # type: (...) -> str return "<Envelope headers=%r items=%r>" % (self.headers, self.items)
class PayloadRef: def __init__( self, bytes=None, # type: Optional[bytes] path=None, # type: Optional[Union[bytes, str]] json=None, # type: Optional[Any] ): # type: (...) -> None self.json = json self.bytes = bytes self.path = path def get_bytes(self): # type: (...) -> bytes if self.bytes is None: if self.path is not None: with capture_internal_exceptions(): with open(self.path, "rb") as f: self.bytes = f.read() elif self.json is not None: self.bytes = json_dumps(self.json) return self.bytes or b"" @property def inferred_content_type(self): # type: (...) -> str if self.json is not None: return "application/json" elif self.path is not None: path = self.path if isinstance(path, bytes): path = path.decode("utf-8", "replace") ty = mimetypes.guess_type(path)[0] if ty: return ty return "application/octet-stream" def __repr__(self): # type: (...) -> str return "<Payload %r>" % (self.inferred_content_type,)
[docs] class Item: def __init__( self, payload, # type: Union[bytes, str, PayloadRef] headers=None, # type: Optional[Dict[str, Any]] type=None, # type: Optional[str] content_type=None, # type: Optional[str] filename=None, # type: Optional[str] ): if headers is not None: headers = dict(headers) elif headers is None: headers = {} self.headers = headers if isinstance(payload, bytes): payload = PayloadRef(bytes=payload) elif isinstance(payload, str): payload = PayloadRef(bytes=payload.encode("utf-8")) else: payload = payload if filename is not None: headers["filename"] = filename if type is not None: headers["type"] = type if content_type is not None: headers["content_type"] = content_type elif "content_type" not in headers: headers["content_type"] = payload.inferred_content_type self.payload = payload def __repr__(self): # type: (...) -> str return "<Item headers=%r payload=%r data_category=%r>" % ( self.headers, self.payload, self.data_category, ) @property def type(self): # type: (...) -> Optional[str] return self.headers.get("type") @property def data_category(self): # type: (...) -> EventDataCategory ty = self.headers.get("type") if ty == "session" or ty == "sessions": return "session" elif ty == "attachment": return "attachment" elif ty == "transaction": return "transaction" elif ty == "event": return "error" elif ty == "client_report": return "internal" elif ty == "profile": return "profile" elif ty == "profile_chunk": return "profile_chunk" elif ty == "statsd": return "metric_bucket" elif ty == "check_in": return "monitor" else: return "default" def get_bytes(self): # type: (...) -> bytes return self.payload.get_bytes()
[docs] def get_event(self): # type: (...) -> Optional[Event] """ Returns an error event if there is one. """ if self.type == "event" and self.payload.json is not None: return self.payload.json return None
def get_transaction_event(self): # type: (...) -> Optional[Event] if self.type == "transaction" and self.payload.json is not None: return self.payload.json return None def serialize_into( self, f # type: Any ): # type: (...) -> None headers = dict(self.headers) bytes = self.get_bytes() headers["length"] = len(bytes) f.write(json_dumps(headers)) f.write(b"\n") f.write(bytes) f.write(b"\n") def serialize(self): # type: (...) -> bytes out = io.BytesIO() self.serialize_into(out) return out.getvalue() @classmethod def deserialize_from( cls, f # type: Any ): # type: (...) -> Optional[Item] line = f.readline().rstrip() if not line: return None headers = parse_json(line) length = headers.get("length") if length is not None: payload = f.read(length) f.readline() else: # if no length was specified we need to read up to the end of line # and remove it (if it is present, i.e. not the very last char in an eof terminated envelope) payload = f.readline().rstrip(b"\n") if headers.get("type") in ("event", "transaction", "metric_buckets"): rv = cls(headers=headers, payload=PayloadRef(json=parse_json(payload))) else: rv = cls(headers=headers, payload=payload) return rv @classmethod def deserialize( cls, bytes # type: bytes ): # type: (...) -> Optional[Item] return cls.deserialize_from(io.BytesIO(bytes))