Top

sentry_sdk module

The Sentry SDK is the new-style SDK for sentry.io. It implements the unified API that all modern SDKs follow for Python 2.7 and 3.5 or later.

The user documentation can be found on docs.sentry.io.

Quickstart

The only thing to get going is to call sentry_sdk.init(). When not passed any arguments the default options are used and the DSN is picked up from the SENTRY_DSN environment variable. Otherwise the DSN can be passed with the dsn keyword or first argument.

import sentry_sdk
sentry_sdk.init()

This initializes the default integrations which will automatically pick up any uncaught exceptions. Additionally you can report arbitrary other exceptions:

try:
    my_failing_function()
except Exception as e:
    sentry_sdk.capture_exception(e)
"""
The Sentry SDK is the new-style SDK for [sentry.io](https://sentry.io/).  It implements
the unified API that all modern SDKs follow for Python 2.7 and 3.5 or later.

The user documentation can be found on [docs.sentry.io](https://docs.sentry.io/).

## Quickstart

The only thing to get going is to call `sentry_sdk.init()`.  When not passed any
arguments the default options are used and the DSN is picked up from the `SENTRY_DSN`
environment variable.  Otherwise the DSN can be passed with the `dsn` keyword
or first argument.

    import sentry_sdk
    sentry_sdk.init()

This initializes the default integrations which will automatically pick up any
uncaught exceptions.  Additionally you can report arbitrary other exceptions:

    try:
        my_failing_function()
    except Exception as e:
        sentry_sdk.capture_exception(e)
"""
from sentry_sdk.hub import Hub, init
from sentry_sdk.scope import Scope
from sentry_sdk.transport import Transport, HttpTransport
from sentry_sdk.client import Client

from sentry_sdk.api import *  # noqa
from sentry_sdk.api import __all__ as api_all

from sentry_sdk.consts import VERSION  # noqa

__all__ = api_all + [  # noqa
    "Hub",
    "Scope",
    "Client",
    "Transport",
    "HttpTransport",
    "init",
    "integrations",
]

# Initialize the debug support after everything is loaded
from sentry_sdk.debug import init_debug_support

init_debug_support()
del init_debug_support

Functions

def add_breadcrumb(

crumb=None, hint=None, **kwargs)

Alias for Hub.add_breadcrumb

Adds a breadcrumb. The breadcrumbs are a dictionary with the data as the sentry v7/v8 protocol expects. hint is an optional value that can be used by before_breadcrumb to customize the breadcrumbs that are emitted.

@hubmethod
def add_breadcrumb(crumb=None, hint=None, **kwargs):
    # type: (Dict[str, Any], Dict[str, Any], **Any) -> None
    hub = Hub.current
    if hub is not None:
        return hub.add_breadcrumb(crumb, hint, **kwargs)

def capture_event(

event, hint=None)

Alias for Hub.capture_event

Captures an event. The return value is the ID of the event.

The event is a dictionary following the Sentry v7/v8 protocol specification. Optionally an event hint dict can be passed that is used by processors to extract additional information from it. Typically the event hint object would contain exception information.

@hubmethod
def capture_event(event, hint=None):
    # type: (Dict[str, Any], Dict[str, Any]) -> Optional[str]
    hub = Hub.current
    if hub is not None:
        return hub.capture_event(event, hint)
    return None

def capture_exception(

error=None)

Alias for Hub.capture_exception

Captures an exception.

The argument passed can be None in which case the last exception will be reported, otherwise an exception object or an exc_info tuple.

@hubmethod
def capture_exception(error=None):
    # type: (Optional[BaseException]) -> Optional[str]
    hub = Hub.current
    if hub is not None:
        return hub.capture_exception(error)
    return None

def capture_message(

message, level=None)

Alias for Hub.capture_message

Captures a message. The message is just a string. If no level is provided the default level is info.

@hubmethod
def capture_message(message, level=None):
    # type: (str, Optional[Any]) -> Optional[str]
    hub = Hub.current
    if hub is not None:
        return hub.capture_message(message, level)
    return None

def configure_scope(

callback=None)

Alias for Hub.configure_scope

Reconfigures the scope.

@hubmethod  # noqa
def configure_scope(callback=None):
    hub = Hub.current
    if hub is not None:
        return hub.configure_scope(callback)
    elif callback is None:

        @contextmanager
        def inner():
            yield Scope()

        return inner()
    else:
        # returned if user provided callback
        return None

def flush(

timeout=None, callback=None)

Alias for Hub.flush

Alias for self.client.flush

@hubmethod
def flush(timeout=None, callback=None):
    hub = Hub.current
    if hub is not None:
        return hub.flush(timeout=timeout, callback=callback)

def init(

*args, **kwargs)

Initializes the SDK and optionally integrations.

This takes the same arguments as the client constructor.

def init(*args, **kwargs):
    """Initializes the SDK and optionally integrations.

    This takes the same arguments as the client constructor.
    """
    global _initial_client
    client = Client(*args, **kwargs)
    Hub.current.bind_client(client)
    rv = _InitGuard(client)
    if client is not None:
        _initial_client = weakref.ref(client)
    return rv

def last_event_id(

)

Alias for Hub.last_event_id

Returns the last event ID.

@hubmethod
def last_event_id():
    # type: () -> Optional[str]
    hub = Hub.current
    if hub is not None:
        return hub.last_event_id()
    return None

def push_scope(

callback=None)

Alias for Hub.push_scope

Pushes a new layer on the scope stack. Returns a context manager that should be used to pop the scope again. Alternatively a callback can be provided that is executed in the context of the scope.

@hubmethod  # noqa
def push_scope(callback=None):
    hub = Hub.current
    if hub is not None:
        return hub.push_scope(callback)
    elif callback is None:

        @contextmanager
        def inner():
            yield Scope()

        return inner()
    else:
        # returned if user provided callback
        return None

Classes

class Client

The client is internally responsible for capturing the events and forwarding them to sentry through the configured transport. It takes the client options as keyword arguments and optionally the DSN as first argument.

class Client(object):
    """The client is internally responsible for capturing the events and
    forwarding them to sentry through the configured transport.  It takes
    the client options as keyword arguments and optionally the DSN as first
    argument.
    """

    def __init__(self, *args, **kwargs):
        # type: (*str, **ClientOptions) -> None
        old_debug = _client_init_debug.get(False)
        try:
            self.options = options = get_options(*args, **kwargs)
            _client_init_debug.set(options["debug"])
            self.transport = make_transport(options)

            request_bodies = ("always", "never", "small", "medium")
            if options["request_bodies"] not in request_bodies:
                raise ValueError(
                    "Invalid value for request_bodies. Must be one of {}".format(
                        request_bodies
                    )
                )

            self.integrations = setup_integrations(
                options["integrations"], with_defaults=options["default_integrations"]
            )
        finally:
            _client_init_debug.set(old_debug)

    @property
    def dsn(self):
        """Returns the configured DSN as string."""
        return self.options["dsn"]

    def _prepare_event(
        self,
        event,  # type: Dict[str, Any]
        hint,  # type: Optional[Dict[str, Any]]
        scope,  # type: Optional[Scope]
    ):
        # type: (...) -> Optional[Dict[str, Any]]
        if event.get("timestamp") is None:
            event["timestamp"] = datetime.utcnow()

        if scope is not None:
            event = scope.apply_to_event(event, hint)
            if event is None:
                return

        if (
            self.options["attach_stacktrace"]
            and "exception" not in event
            and "stacktrace" not in event
            and "threads" not in event
        ):
            with capture_internal_exceptions():
                event["threads"] = {
                    "values": [
                        {
                            "stacktrace": current_stacktrace(
                                self.options["with_locals"]
                            ),
                            "crashed": False,
                            "current": True,
                        }
                    ]
                }

        for key in "release", "environment", "server_name", "dist":
            if event.get(key) is None and self.options[key] is not None:  # type: ignore
                event[key] = text_type(self.options[key]).strip()  # type: ignore
        if event.get("sdk") is None:
            sdk_info = dict(SDK_INFO)
            sdk_info["integrations"] = sorted(self.integrations.keys())
            event["sdk"] = sdk_info

        if event.get("platform") is None:
            event["platform"] = "python"

        event = handle_in_app(
            event, self.options["in_app_exclude"], self.options["in_app_include"]
        )

        # Postprocess the event here so that annotated types do
        # generally not surface in before_send
        if event is not None:
            event = Serializer().serialize_event(event)

        before_send = self.options["before_send"]
        if before_send is not None:
            new_event = None
            with capture_internal_exceptions():
                new_event = before_send(event, hint)
            if new_event is None:
                logger.info("before send dropped event (%s)", event)
            event = new_event  # type: ignore

        return event

    def _is_ignored_error(self, event, hint):
        # type: (Dict[str, Any], Dict[str, Any]) -> bool
        exc_info = hint.get("exc_info")
        if exc_info is None:
            return False

        type_name = get_type_name(exc_info[0])
        full_name = "%s.%s" % (exc_info[0].__module__, type_name)

        for errcls in self.options["ignore_errors"]:
            # String types are matched against the type name in the
            # exception only
            if isinstance(errcls, string_types):
                if errcls == full_name or errcls == type_name:
                    return True
            else:
                if issubclass(exc_info[0], errcls):
                    return True

        return False

    def _should_capture(
        self,
        event,  # type: Dict[str, Any]
        hint,  # type: Dict[str, Any]
        scope=None,  # type: Scope
    ):
        # type: (...) -> bool
        if scope is not None and not scope._should_capture:
            return False

        if (
            self.options["sample_rate"] < 1.0
            and random.random() >= self.options["sample_rate"]
        ):
            return False

        if self._is_ignored_error(event, hint):
            return False

        return True

    def capture_event(self, event, hint=None, scope=None):
        # type: (Dict[str, Any], Any, Scope) -> Optional[str]
        """Captures an event.

        This takes the ready made event and an optoinal hint and scope.  The
        hint is internally used to further customize the representation of the
        error.  When provided it's a dictionary of optional information such
        as exception info.

        If the transport is not set nothing happens, otherwise the return
        value of this function will be the ID of the captured event.
        """
        if self.transport is None:
            return None
        if hint is None:
            hint = {}
        rv = event.get("event_id")
        if rv is None:
            event["event_id"] = rv = uuid.uuid4().hex
        if not self._should_capture(event, hint, scope):
            return None
        event = self._prepare_event(event, hint, scope)  # type: ignore
        if event is None:
            return None
        self.transport.capture_event(event)
        return rv

    def close(self, timeout=None, callback=None):
        """
        Close the client and shut down the transport. Arguments have the same
        semantics as `self.flush()`.
        """
        if self.transport is not None:
            self.flush(timeout=timeout, callback=callback)
            self.transport.kill()
            self.transport = None

    def flush(self, timeout=None, callback=None):
        """
        Wait `timeout` seconds for the current events to be sent. If no
        `timeout` is provided, the `shutdown_timeout` option value is used.

        The `callback` is invoked with two arguments: the number of pending
        events and the configured timeout.  For instance the default atexit
        integration will use this to render out a message on stderr.
        """
        if self.transport is not None:
            if timeout is None:
                timeout = self.options["shutdown_timeout"]
            self.transport.flush(timeout=timeout, callback=callback)

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, tb):
        self.close()

Ancestors (in MRO)

Instance variables

var dsn

Returns the configured DSN as string.

Methods

def __init__(

self, *args, **kwargs)

def __init__(self, *args, **kwargs):
    # type: (*str, **ClientOptions) -> None
    old_debug = _client_init_debug.get(False)
    try:
        self.options = options = get_options(*args, **kwargs)
        _client_init_debug.set(options["debug"])
        self.transport = make_transport(options)
        request_bodies = ("always", "never", "small", "medium")
        if options["request_bodies"] not in request_bodies:
            raise ValueError(
                "Invalid value for request_bodies. Must be one of {}".format(
                    request_bodies
                )
            )
        self.integrations = setup_integrations(
            options["integrations"], with_defaults=options["default_integrations"]
        )
    finally:
        _client_init_debug.set(old_debug)

def capture_event(

self, event, hint=None, scope=None)

Captures an event.

This takes the ready made event and an optoinal hint and scope. The hint is internally used to further customize the representation of the error. When provided it's a dictionary of optional information such as exception info.

If the transport is not set nothing happens, otherwise the return value of this function will be the ID of the captured event.

def capture_event(self, event, hint=None, scope=None):
    # type: (Dict[str, Any], Any, Scope) -> Optional[str]
    """Captures an event.
    This takes the ready made event and an optoinal hint and scope.  The
    hint is internally used to further customize the representation of the
    error.  When provided it's a dictionary of optional information such
    as exception info.
    If the transport is not set nothing happens, otherwise the return
    value of this function will be the ID of the captured event.
    """
    if self.transport is None:
        return None
    if hint is None:
        hint = {}
    rv = event.get("event_id")
    if rv is None:
        event["event_id"] = rv = uuid.uuid4().hex
    if not self._should_capture(event, hint, scope):
        return None
    event = self._prepare_event(event, hint, scope)  # type: ignore
    if event is None:
        return None
    self.transport.capture_event(event)
    return rv

def close(

self, timeout=None, callback=None)

Close the client and shut down the transport. Arguments have the same semantics as self.flush().

def close(self, timeout=None, callback=None):
    """
    Close the client and shut down the transport. Arguments have the same
    semantics as `self.flush()`.
    """
    if self.transport is not None:
        self.flush(timeout=timeout, callback=callback)
        self.transport.kill()
        self.transport = None

def flush(

self, timeout=None, callback=None)

Wait timeout seconds for the current events to be sent. If no timeout is provided, the shutdown_timeout option value is used.

The callback is invoked with two arguments: the number of pending events and the configured timeout. For instance the default atexit integration will use this to render out a message on stderr.

def flush(self, timeout=None, callback=None):
    """
    Wait `timeout` seconds for the current events to be sent. If no
    `timeout` is provided, the `shutdown_timeout` option value is used.
    The `callback` is invoked with two arguments: the number of pending
    events and the configured timeout.  For instance the default atexit
    integration will use this to render out a message on stderr.
    """
    if self.transport is not None:
        if timeout is None:
            timeout = self.options["shutdown_timeout"]
        self.transport.flush(timeout=timeout, callback=callback)

class HttpTransport

The default HTTP transport.

class HttpTransport(Transport):
    """The default HTTP transport."""

    def __init__(self, options):
        # type: (ClientOptions) -> None
        Transport.__init__(self, options)
        self._worker = BackgroundWorker()
        self._auth = self.parsed_dsn.to_auth("sentry.python/%s" % VERSION)
        self._disabled_until = None  # type: Optional[datetime]
        self._retry = urllib3.util.Retry()
        self.options = options

        self._pool = self._make_pool(
            self.parsed_dsn,
            http_proxy=options["http_proxy"],
            https_proxy=options["https_proxy"],
            ca_certs=options["ca_certs"],
        )

        from sentry_sdk import Hub

        self.hub_cls = Hub

    def _send_event(self, event):
        # type: (Dict[str, Any]) -> None
        if self._disabled_until is not None:
            if datetime.utcnow() < self._disabled_until:
                return
            self._disabled_until = None

        body = io.BytesIO()
        with gzip.GzipFile(fileobj=body, mode="w") as f:
            f.write(json.dumps(event, allow_nan=False).encode("utf-8"))

        logger.debug(
            "Sending %s event [%s] to %s project:%s"
            % (
                event.get("level") or "error",
                event["event_id"],
                self.parsed_dsn.host,
                self.parsed_dsn.project_id,
            )
        )
        response = self._pool.request(
            "POST",
            str(self._auth.store_api_url),
            body=body.getvalue(),
            headers={
                "X-Sentry-Auth": str(self._auth.to_header()),
                "Content-Type": "application/json",
                "Content-Encoding": "gzip",
            },
        )

        try:
            if response.status == 429:
                self._disabled_until = datetime.utcnow() + timedelta(
                    seconds=self._retry.get_retry_after(response) or 60
                )
                return

            elif response.status >= 300 or response.status < 200:
                logger.error(
                    "Unexpected status code: %s (body: %s)",
                    response.status,
                    response.data,
                )
        finally:
            response.close()

        self._disabled_until = None

    def _get_pool_options(self, ca_certs):
        # type: (Optional[Any]) -> Dict[str, Any]
        return {
            "num_pools": 2,
            "cert_reqs": "CERT_REQUIRED",
            "ca_certs": ca_certs or certifi.where(),
        }

    def _make_pool(
        self,
        parsed_dsn,  # type: Dsn
        http_proxy,  # type: Optional[str]
        https_proxy,  # type: Optional[str]
        ca_certs,  # type: Optional[Any]
    ):
        # type: (...) -> Union[PoolManager, ProxyManager]
        proxy = None

        # try HTTPS first
        if parsed_dsn.scheme == "https" and (https_proxy != ""):
            proxy = https_proxy or getproxies().get("https")

        # maybe fallback to HTTP proxy
        if not proxy and (http_proxy != ""):
            proxy = http_proxy or getproxies().get("http")

        opts = self._get_pool_options(ca_certs)

        if proxy:
            return urllib3.ProxyManager(proxy, **opts)
        else:
            return urllib3.PoolManager(**opts)

    def capture_event(self, event):
        # type: (Dict[str, Any]) -> None
        hub = self.hub_cls.current

        def send_event_wrapper():
            # type: () -> None
            with hub:
                with capture_internal_exceptions():
                    self._send_event(event)

        self._worker.submit(send_event_wrapper)

    def flush(self, timeout, callback=None):
        # type: (float, Optional[Any]) -> None
        logger.debug("Flushing HTTP transport")
        if timeout > 0:
            self._worker.flush(timeout, callback)

    def kill(self):
        # type: () -> None
        logger.debug("Killing HTTP transport")
        self._worker.kill()

Ancestors (in MRO)

Class variables

var parsed_dsn

Inheritance: Transport.parsed_dsn

Instance variables

var hub_cls

var options

Inheritance: Transport.options

Methods

def __init__(

self, options)

Inheritance: Transport.__init__

def __init__(self, options):
    # type: (ClientOptions) -> None
    Transport.__init__(self, options)
    self._worker = BackgroundWorker()
    self._auth = self.parsed_dsn.to_auth("sentry.python/%s" % VERSION)
    self._disabled_until = None  # type: Optional[datetime]
    self._retry = urllib3.util.Retry()
    self.options = options
    self._pool = self._make_pool(
        self.parsed_dsn,
        http_proxy=options["http_proxy"],
        https_proxy=options["https_proxy"],
        ca_certs=options["ca_certs"],
    )
    from sentry_sdk import Hub
    self.hub_cls = Hub

def capture_event(

self, event)

Inheritance: Transport.capture_event

This gets invoked with the event dictionary when an event should be sent to sentry.

def capture_event(self, event):
    # type: (Dict[str, Any]) -> None
    hub = self.hub_cls.current
    def send_event_wrapper():
        # type: () -> None
        with hub:
            with capture_internal_exceptions():
                self._send_event(event)
    self._worker.submit(send_event_wrapper)

def flush(

self, timeout, callback=None)

Inheritance: Transport.flush

Wait timeout seconds for the current events to be sent out.

def flush(self, timeout, callback=None):
    # type: (float, Optional[Any]) -> None
    logger.debug("Flushing HTTP transport")
    if timeout > 0:
        self._worker.flush(timeout, callback)

def kill(

self)

Inheritance: Transport.kill

Forcefully kills the transport.

def kill(self):
    # type: () -> None
    logger.debug("Killing HTTP transport")
    self._worker.kill()

class Hub

The hub wraps the concurrency management of the SDK. Each thread has its own hub but the hub might transfer with the flow of execution if context vars are available.

If the hub is used with a with statement it's temporarily activated.

class Hub(with_metaclass(HubMeta)):  # type: ignore
    """The hub wraps the concurrency management of the SDK.  Each thread has
    its own hub but the hub might transfer with the flow of execution if
    context vars are available.

    If the hub is used with a with statement it's temporarily activated.
    """

    _stack = None  # type: List[Tuple[Optional[Client], Scope]]

    def __init__(self, client_or_hub=None, scope=None):
        # type: (Union[Hub, Client], Optional[Any]) -> None
        if isinstance(client_or_hub, Hub):
            hub = client_or_hub
            client, other_scope = hub._stack[-1]
            if scope is None:
                scope = copy.copy(other_scope)
        else:
            client = client_or_hub
        if scope is None:
            scope = Scope()

        self._stack = [(client, scope)]
        self._last_event_id = None  # type: Optional[str]
        self._old_hubs = []  # type: List[Hub]

    def __enter__(self):
        # type: () -> Hub
        self._old_hubs.append(Hub.current)
        _local.set(self)
        return self

    def __exit__(
        self,
        exc_type,  # type: Optional[type]
        exc_value,  # type: Optional[BaseException]
        tb,  # type: Optional[Any]
    ):
        # type: (...) -> None
        old = self._old_hubs.pop()
        _local.set(old)

    def run(self, callback):
        """Runs a callback in the context of the hub.  Alternatively the
        with statement can be used on the hub directly.
        """
        with self:
            return callback()

    def get_integration(self, name_or_class):
        # type: (Union[str, Integration]) -> Any
        """Returns the integration for this hub by name or class.  If there
        is no client bound or the client does not have that integration
        then `None` is returned.

        If the return value is not `None` the hub is guaranteed to have a
        client attached.
        """
        if isinstance(name_or_class, str):
            integration_name = name_or_class
        elif name_or_class.identifier is not None:
            integration_name = name_or_class.identifier
        else:
            raise ValueError("Integration has no name")

        client = self._stack[-1][0]
        if client is not None:
            rv = client.integrations.get(integration_name)
            if rv is not None:
                return rv

        initial_client = _initial_client
        if initial_client is not None:
            initial_client = initial_client()

        if (
            initial_client is not None
            and initial_client is not client
            and initial_client.integrations.get(name_or_class) is not None
        ):
            warning = (
                "Integration %r attempted to run but it was only "
                "enabled on init() but not the client that "
                "was bound to the current flow.  Earlier versions of "
                "the SDK would consider these integrations enabled but "
                "this is no longer the case." % (name_or_class,)
            )
            warn(Warning(warning), stacklevel=3)
            logger.warning(warning)

    @property
    def client(self):
        # type: () -> Optional[Client]
        """Returns the current client on the hub."""
        return self._stack[-1][0]

    def last_event_id(self):
        # type: () -> Optional[str]
        """Returns the last event ID."""
        return self._last_event_id

    def bind_client(self, new):
        """Binds a new client to the hub."""
        top = self._stack[-1]
        self._stack[-1] = (new, top[1])

    def capture_event(self, event, hint=None):
        # type: (Dict[str, Any], Dict[str, Any]) -> Optional[str]
        """Captures an event.  The return value is the ID of the event.

        The event is a dictionary following the Sentry v7/v8 protocol
        specification.  Optionally an event hint dict can be passed that
        is used by processors to extract additional information from it.
        Typically the event hint object would contain exception information.
        """
        client, scope = self._stack[-1]
        if client is not None:
            rv = client.capture_event(event, hint, scope)
            if rv is not None:
                self._last_event_id = rv
            return rv
        return None

    def capture_message(self, message, level=None):
        # type: (str, Optional[Any]) -> Optional[str]
        """Captures a message.  The message is just a string.  If no level
        is provided the default level is `info`.
        """
        if self.client is None:
            return None
        if level is None:
            level = "info"
        return self.capture_event({"message": message, "level": level})

    def capture_exception(self, error=None):
        # type: (Optional[BaseException]) -> Optional[str]
        """Captures an exception.

        The argument passed can be `None` in which case the last exception
        will be reported, otherwise an exception object or an `exc_info`
        tuple.
        """
        client = self.client
        if client is None:
            return None
        if error is None:
            exc_info = sys.exc_info()
        else:
            exc_info = exc_info_from_error(error)

        event, hint = event_from_exception(exc_info, client_options=client.options)
        try:
            return self.capture_event(event, hint=hint)
        except Exception:
            self._capture_internal_exception(sys.exc_info())

        return None

    def _capture_internal_exception(self, exc_info):
        """Capture an exception that is likely caused by a bug in the SDK
        itself."""
        logger.error("Internal error in sentry_sdk", exc_info=exc_info)

    def add_breadcrumb(self, crumb=None, hint=None, **kwargs):
        # type: (Dict[str, Any], Dict[str, Any], **Any) -> None
        """Adds a breadcrumb.  The breadcrumbs are a dictionary with the
        data as the sentry v7/v8 protocol expects.  `hint` is an optional
        value that can be used by `before_breadcrumb` to customize the
        breadcrumbs that are emitted.
        """
        client, scope = self._stack[-1]
        if client is None:
            logger.info("Dropped breadcrumb because no client bound")
            return

        crumb = dict(crumb or ())  # type: Dict[str, Any]
        crumb.update(kwargs)
        if not crumb:
            return

        hint = dict(hint or ())

        if crumb.get("timestamp") is None:
            crumb["timestamp"] = datetime.utcnow()
        if crumb.get("type") is None:
            crumb["type"] = "default"

        original_crumb = crumb
        if client.options["before_breadcrumb"] is not None:
            crumb = client.options["before_breadcrumb"](crumb, hint)

        if crumb is not None:
            scope._breadcrumbs.append(crumb)
        else:
            logger.info("before breadcrumb dropped breadcrumb (%s)", original_crumb)

        max_breadcrumbs = client.options["max_breadcrumbs"]  # type: int
        while len(scope._breadcrumbs) > max_breadcrumbs:
            scope._breadcrumbs.popleft()

    @overload  # noqa
    def push_scope(self):
        # type: () -> ContextManager[Scope]
        pass

    @overload  # noqa
    def push_scope(self, callback):
        # type: (Callable[[Scope], None]) -> None
        pass

    def push_scope(self, callback=None):  # noqa
        """Pushes a new layer on the scope stack. Returns a context manager
        that should be used to pop the scope again.  Alternatively a callback
        can be provided that is executed in the context of the scope.
        """

        if callback is not None:
            with self.push_scope() as scope:
                callback(scope)
            return None

        client, scope = self._stack[-1]
        new_layer = (client, copy.copy(scope))
        self._stack.append(new_layer)

        return _ScopeManager(self)

    scope = push_scope

    def pop_scope_unsafe(self):
        """Pops a scope layer from the stack. Try to use the context manager
        `push_scope()` instead."""
        rv = self._stack.pop()
        assert self._stack, "stack must have at least one layer"
        return rv

    @overload  # noqa
    def configure_scope(self):
        # type: () -> ContextManager[Scope]
        pass

    @overload  # noqa
    def configure_scope(self, callback):
        # type: (Callable[[Scope], None]) -> None
        pass

    def configure_scope(self, callback=None):  # noqa
        """Reconfigures the scope."""

        client, scope = self._stack[-1]
        if callback is not None:
            if client is not None:
                callback(scope)

            return None

        @contextmanager
        def inner():
            if client is not None:
                yield scope
            else:
                yield Scope()

        return inner()

    def flush(self, timeout=None, callback=None):
        """Alias for self.client.flush"""
        client, scope = self._stack[-1]
        if client is not None:
            return client.flush(timeout=timeout, callback=callback)

    def iter_trace_propagation_headers(self):
        client, scope = self._stack[-1]
        if scope._span is None:
            return

        propagate_traces = client and client.options["propagate_traces"]
        if not propagate_traces:
            return

        for item in scope._span.iter_headers():
            yield item

Ancestors (in MRO)

  • Hub
  • __builtin__.object

Instance variables

var client

Returns the current client on the hub.

Methods

def __init__(

self, client_or_hub=None, scope=None)

def __init__(self, client_or_hub=None, scope=None):
    # type: (Union[Hub, Client], Optional[Any]) -> None
    if isinstance(client_or_hub, Hub):
        hub = client_or_hub
        client, other_scope = hub._stack[-1]
        if scope is None:
            scope = copy.copy(other_scope)
    else:
        client = client_or_hub
    if scope is None:
        scope = Scope()
    self._stack = [(client, scope)]
    self._last_event_id = None  # type: Optional[str]
    self._old_hubs = []  # type: List[Hub]

def add_breadcrumb(

self, crumb=None, hint=None, **kwargs)

Adds a breadcrumb. The breadcrumbs are a dictionary with the data as the sentry v7/v8 protocol expects. hint is an optional value that can be used by before_breadcrumb to customize the breadcrumbs that are emitted.

def add_breadcrumb(self, crumb=None, hint=None, **kwargs):
    # type: (Dict[str, Any], Dict[str, Any], **Any) -> None
    """Adds a breadcrumb.  The breadcrumbs are a dictionary with the
    data as the sentry v7/v8 protocol expects.  `hint` is an optional
    value that can be used by `before_breadcrumb` to customize the
    breadcrumbs that are emitted.
    """
    client, scope = self._stack[-1]
    if client is None:
        logger.info("Dropped breadcrumb because no client bound")
        return
    crumb = dict(crumb or ())  # type: Dict[str, Any]
    crumb.update(kwargs)
    if not crumb:
        return
    hint = dict(hint or ())
    if crumb.get("timestamp") is None:
        crumb["timestamp"] = datetime.utcnow()
    if crumb.get("type") is None:
        crumb["type"] = "default"
    original_crumb = crumb
    if client.options["before_breadcrumb"] is not None:
        crumb = client.options["before_breadcrumb"](crumb, hint)
    if crumb is not None:
        scope._breadcrumbs.append(crumb)
    else:
        logger.info("before breadcrumb dropped breadcrumb (%s)", original_crumb)
    max_breadcrumbs = client.options["max_breadcrumbs"]  # type: int
    while len(scope._breadcrumbs) > max_breadcrumbs:
        scope._breadcrumbs.popleft()

def bind_client(

self, new)

Binds a new client to the hub.

def bind_client(self, new):
    """Binds a new client to the hub."""
    top = self._stack[-1]
    self._stack[-1] = (new, top[1])

def capture_event(

self, event, hint=None)

Captures an event. The return value is the ID of the event.

The event is a dictionary following the Sentry v7/v8 protocol specification. Optionally an event hint dict can be passed that is used by processors to extract additional information from it. Typically the event hint object would contain exception information.

def capture_event(self, event, hint=None):
    # type: (Dict[str, Any], Dict[str, Any]) -> Optional[str]
    """Captures an event.  The return value is the ID of the event.
    The event is a dictionary following the Sentry v7/v8 protocol
    specification.  Optionally an event hint dict can be passed that
    is used by processors to extract additional information from it.
    Typically the event hint object would contain exception information.
    """
    client, scope = self._stack[-1]
    if client is not None:
        rv = client.capture_event(event, hint, scope)
        if rv is not None:
            self._last_event_id = rv
        return rv
    return None

def capture_exception(

self, error=None)

Captures an exception.

The argument passed can be None in which case the last exception will be reported, otherwise an exception object or an exc_info tuple.

def capture_exception(self, error=None):
    # type: (Optional[BaseException]) -> Optional[str]
    """Captures an exception.
    The argument passed can be `None` in which case the last exception
    will be reported, otherwise an exception object or an `exc_info`
    tuple.
    """
    client = self.client
    if client is None:
        return None
    if error is None:
        exc_info = sys.exc_info()
    else:
        exc_info = exc_info_from_error(error)
    event, hint = event_from_exception(exc_info, client_options=client.options)
    try:
        return self.capture_event(event, hint=hint)
    except Exception:
        self._capture_internal_exception(sys.exc_info())
    return None

def capture_message(

self, message, level=None)

Captures a message. The message is just a string. If no level is provided the default level is info.

def capture_message(self, message, level=None):
    # type: (str, Optional[Any]) -> Optional[str]
    """Captures a message.  The message is just a string.  If no level
    is provided the default level is `info`.
    """
    if self.client is None:
        return None
    if level is None:
        level = "info"
    return self.capture_event({"message": message, "level": level})

def configure_scope(

self, callback=None)

Reconfigures the scope.

def configure_scope(self, callback=None):  # noqa
    """Reconfigures the scope."""
    client, scope = self._stack[-1]
    if callback is not None:
        if client is not None:
            callback(scope)
        return None
    @contextmanager
    def inner():
        if client is not None:
            yield scope
        else:
            yield Scope()
    return inner()

def flush(

self, timeout=None, callback=None)

Alias for self.client.flush

def flush(self, timeout=None, callback=None):
    """Alias for self.client.flush"""
    client, scope = self._stack[-1]
    if client is not None:
        return client.flush(timeout=timeout, callback=callback)

def get_integration(

self, name_or_class)

Returns the integration for this hub by name or class. If there is no client bound or the client does not have that integration then None is returned.

If the return value is not None the hub is guaranteed to have a client attached.

def get_integration(self, name_or_class):
    # type: (Union[str, Integration]) -> Any
    """Returns the integration for this hub by name or class.  If there
    is no client bound or the client does not have that integration
    then `None` is returned.
    If the return value is not `None` the hub is guaranteed to have a
    client attached.
    """
    if isinstance(name_or_class, str):
        integration_name = name_or_class
    elif name_or_class.identifier is not None:
        integration_name = name_or_class.identifier
    else:
        raise ValueError("Integration has no name")
    client = self._stack[-1][0]
    if client is not None:
        rv = client.integrations.get(integration_name)
        if rv is not None:
            return rv
    initial_client = _initial_client
    if initial_client is not None:
        initial_client = initial_client()
    if (
        initial_client is not None
        and initial_client is not client
        and initial_client.integrations.get(name_or_class) is not None
    ):
        warning = (
            "Integration %r attempted to run but it was only "
            "enabled on init() but not the client that "
            "was bound to the current flow.  Earlier versions of "
            "the SDK would consider these integrations enabled but "
            "this is no longer the case." % (name_or_class,)
        )
        warn(Warning(warning), stacklevel=3)
        logger.warning(warning)

def iter_trace_propagation_headers(

self)

def iter_trace_propagation_headers(self):
    client, scope = self._stack[-1]
    if scope._span is None:
        return
    propagate_traces = client and client.options["propagate_traces"]
    if not propagate_traces:
        return
    for item in scope._span.iter_headers():
        yield item

def last_event_id(

self)

Returns the last event ID.

def last_event_id(self):
    # type: () -> Optional[str]
    """Returns the last event ID."""
    return self._last_event_id

def pop_scope_unsafe(

self)

Pops a scope layer from the stack. Try to use the context manager push_scope() instead.

def pop_scope_unsafe(self):
    """Pops a scope layer from the stack. Try to use the context manager
    `push_scope()` instead."""
    rv = self._stack.pop()
    assert self._stack, "stack must have at least one layer"
    return rv

def push_scope(

self, callback=None)

Pushes a new layer on the scope stack. Returns a context manager that should be used to pop the scope again. Alternatively a callback can be provided that is executed in the context of the scope.

def push_scope(self, callback=None):  # noqa
    """Pushes a new layer on the scope stack. Returns a context manager
    that should be used to pop the scope again.  Alternatively a callback
    can be provided that is executed in the context of the scope.
    """
    if callback is not None:
        with self.push_scope() as scope:
            callback(scope)
        return None
    client, scope = self._stack[-1]
    new_layer = (client, copy.copy(scope))
    self._stack.append(new_layer)
    return _ScopeManager(self)

def run(

self, callback)

Runs a callback in the context of the hub. Alternatively the with statement can be used on the hub directly.

def run(self, callback):
    """Runs a callback in the context of the hub.  Alternatively the
    with statement can be used on the hub directly.
    """
    with self:
        return callback()

def scope(

self, callback=None)

Pushes a new layer on the scope stack. Returns a context manager that should be used to pop the scope again. Alternatively a callback can be provided that is executed in the context of the scope.

def push_scope(self, callback=None):  # noqa
    """Pushes a new layer on the scope stack. Returns a context manager
    that should be used to pop the scope again.  Alternatively a callback
    can be provided that is executed in the context of the scope.
    """
    if callback is not None:
        with self.push_scope() as scope:
            callback(scope)
        return None
    client, scope = self._stack[-1]
    new_layer = (client, copy.copy(scope))
    self._stack.append(new_layer)
    return _ScopeManager(self)

class Scope

The scope holds extra information that should be sent with all events that belong to it.

class Scope(object):
    """The scope holds extra information that should be sent with all
    events that belong to it.
    """

    __slots__ = (
        "_level",
        "_name",
        "_fingerprint",
        "_transaction",
        "_user",
        "_tags",
        "_contexts",
        "_extras",
        "_breadcrumbs",
        "_event_processors",
        "_error_processors",
        "_should_capture",
        "_span",
    )

    def __init__(self):
        self._event_processors = []  # type: List[Callable]
        self._error_processors = []  # type: List[Callable]

        self._name = None
        self.clear()

    @_attr_setter
    def level(self, value):
        """When set this overrides the level."""
        self._level = value

    @_attr_setter
    def fingerprint(self, value):
        """When set this overrides the default fingerprint."""
        self._fingerprint = value

    @_attr_setter
    def transaction(self, value):
        """When set this forces a specific transaction name to be set."""
        self._transaction = value

    @_attr_setter
    def user(self, value):
        """When set a specific user is bound to the scope."""
        self._user = value

    def set_span_context(self, span_context):
        """Sets the span context."""
        self._span = span_context

    def set_tag(self, key, value):
        """Sets a tag for a key to a specific value."""
        self._tags[key] = value

    def remove_tag(self, key):
        """Removes a specific tag."""
        self._tags.pop(key, None)

    def set_context(self, key, value):
        """Binds a context at a certain key to a specific value."""
        self._contexts[key] = value

    def remove_context(self, key):
        """Removes a context."""
        self._contexts.pop(key, None)

    def set_extra(self, key, value):
        """Sets an extra key to a specific value."""
        self._extras[key] = value

    def remove_extra(self, key):
        """Removes a specific extra key."""
        self._extras.pop(key, None)

    def clear(self):
        # type: () -> None
        """Clears the entire scope."""
        self._level = None
        self._fingerprint = None
        self._transaction = None
        self._user = None

        self._tags = {}  # type: Dict[str, Any]
        self._contexts = {}  # type: Dict[str, Dict]
        self._extras = {}  # type: Dict[str, Any]

        self.clear_breadcrumbs()
        self._should_capture = True

        self._span = None

    def clear_breadcrumbs(self):
        # type: () -> None
        """Clears breadcrumb buffer."""
        self._breadcrumbs = deque()  # type: Deque[Dict]

    def add_event_processor(self, func):
        # type: (Callable) -> None
        """"Register a scope local event processor on the scope.

        This function behaves like `before_send.`
        """
        self._event_processors.append(func)

    def add_error_processor(self, func, cls=None):
        # type: (Callable, Optional[type]) -> None
        """"Register a scope local error processor on the scope.

        The error processor works similar to an event processor but is
        invoked with the original exception info triple as second argument.
        """
        if cls is not None:
            real_func = func

            def func(event, exc_info):
                try:
                    is_inst = isinstance(exc_info[1], cls)
                except Exception:
                    is_inst = False
                if is_inst:
                    return real_func(event, exc_info)
                return event

        self._error_processors.append(func)

    @_disable_capture
    def apply_to_event(self, event, hint=None):
        # type: (Dict[str, Any], Dict[str, Any]) -> Optional[Dict[str, Any]]
        """Applies the information contained on the scope to the given event."""

        def _drop(event, cause, ty):
            # type: (Dict[str, Any], Callable, str) -> Optional[Any]
            logger.info("%s (%s) dropped event (%s)", ty, cause, event)
            return None

        if self._level is not None:
            event["level"] = self._level

        event.setdefault("breadcrumbs", []).extend(self._breadcrumbs)
        if event.get("user") is None and self._user is not None:
            event["user"] = self._user

        if event.get("transaction") is None and self._transaction is not None:
            event["transaction"] = self._transaction

        if event.get("fingerprint") is None and self._fingerprint is not None:
            event["fingerprint"] = self._fingerprint

        if self._extras:
            event.setdefault("extra", {}).update(self._extras)

        if self._tags:
            event.setdefault("tags", {}).update(self._tags)

        if self._contexts:
            event.setdefault("contexts", {}).update(self._contexts)

        if self._span is not None:
            event.setdefault("contexts", {})["trace"] = {
                "trace_id": self._span.trace_id,
                "span_id": self._span.span_id,
            }

        exc_info = hint.get("exc_info") if hint is not None else None
        if exc_info is not None:
            for processor in self._error_processors:
                new_event = processor(event, exc_info)
                if new_event is None:
                    return _drop(event, processor, "error processor")
                event = new_event

        for processor in chain(global_event_processors, self._event_processors):
            new_event = event
            with capture_internal_exceptions():
                new_event = processor(event, hint)
            if new_event is None:
                return _drop(event, processor, "event processor")
            event = new_event

        return event

    def __copy__(self):
        # type: () -> Scope
        rv = object.__new__(self.__class__)

        rv._level = self._level
        rv._name = self._name
        rv._fingerprint = self._fingerprint
        rv._transaction = self._transaction
        rv._user = self._user

        rv._tags = dict(self._tags)
        rv._contexts = dict(self._contexts)
        rv._extras = dict(self._extras)

        rv._breadcrumbs = copy(self._breadcrumbs)
        rv._event_processors = list(self._event_processors)
        rv._error_processors = list(self._error_processors)

        rv._should_capture = self._should_capture
        rv._span = self._span

        return rv

    def __repr__(self):
        return "<%s id=%s name=%s>" % (
            self.__class__.__name__,
            hex(id(self)),
            self._name,
        )

Ancestors (in MRO)

  • Scope
  • __builtin__.object

Instance variables

var fingerprint

When set this overrides the default fingerprint.

var level

When set this overrides the level.

var transaction

When set this forces a specific transaction name to be set.

var user

When set a specific user is bound to the scope.

Methods

def __init__(

self)

def __init__(self):
    self._event_processors = []  # type: List[Callable]
    self._error_processors = []  # type: List[Callable]
    self._name = None
    self.clear()

def add_error_processor(

self, func, cls=None)

"Register a scope local error processor on the scope.

The error processor works similar to an event processor but is invoked with the original exception info triple as second argument.

def add_error_processor(self, func, cls=None):
    # type: (Callable, Optional[type]) -> None
    """"Register a scope local error processor on the scope.
    The error processor works similar to an event processor but is
    invoked with the original exception info triple as second argument.
    """
    if cls is not None:
        real_func = func
        def func(event, exc_info):
            try:
                is_inst = isinstance(exc_info[1], cls)
            except Exception:
                is_inst = False
            if is_inst:
                return real_func(event, exc_info)
            return event
    self._error_processors.append(func)

def add_event_processor(

self, func)

"Register a scope local event processor on the scope.

This function behaves like before_send.

def add_event_processor(self, func):
    # type: (Callable) -> None
    """"Register a scope local event processor on the scope.
    This function behaves like `before_send.`
    """
    self._event_processors.append(func)

def apply_to_event(

self, *args, **kwargs)

Applies the information contained on the scope to the given event.

@wraps(fn)
def wrapper(self, *args, **kwargs):
    # type: (Any, *Dict[str, Any], **Any) -> Any
    if not self._should_capture:
        return
    try:
        self._should_capture = False
        return fn(self, *args, **kwargs)
    finally:
        self._should_capture = True

def clear(

self)

Clears the entire scope.

def clear(self):
    # type: () -> None
    """Clears the entire scope."""
    self._level = None
    self._fingerprint = None
    self._transaction = None
    self._user = None
    self._tags = {}  # type: Dict[str, Any]
    self._contexts = {}  # type: Dict[str, Dict]
    self._extras = {}  # type: Dict[str, Any]
    self.clear_breadcrumbs()
    self._should_capture = True
    self._span = None

def clear_breadcrumbs(

self)

Clears breadcrumb buffer.

def clear_breadcrumbs(self):
    # type: () -> None
    """Clears breadcrumb buffer."""
    self._breadcrumbs = deque()  # type: Deque[Dict]

def remove_context(

self, key)

Removes a context.

def remove_context(self, key):
    """Removes a context."""
    self._contexts.pop(key, None)

def remove_extra(

self, key)

Removes a specific extra key.

def remove_extra(self, key):
    """Removes a specific extra key."""
    self._extras.pop(key, None)

def remove_tag(

self, key)

Removes a specific tag.

def remove_tag(self, key):
    """Removes a specific tag."""
    self._tags.pop(key, None)

def set_context(

self, key, value)

Binds a context at a certain key to a specific value.

def set_context(self, key, value):
    """Binds a context at a certain key to a specific value."""
    self._contexts[key] = value

def set_extra(

self, key, value)

Sets an extra key to a specific value.

def set_extra(self, key, value):
    """Sets an extra key to a specific value."""
    self._extras[key] = value

def set_span_context(

self, span_context)

Sets the span context.

def set_span_context(self, span_context):
    """Sets the span context."""
    self._span = span_context

def set_tag(

self, key, value)

Sets a tag for a key to a specific value.

def set_tag(self, key, value):
    """Sets a tag for a key to a specific value."""
    self._tags[key] = value

class Transport

Baseclass for all transports.

A transport is used to send an event to sentry.

class Transport(object):
    """Baseclass for all transports.

    A transport is used to send an event to sentry.
    """

    parsed_dsn = None  # type: Dsn

    def __init__(self, options=None):
        # type: (Optional[ClientOptions]) -> None
        self.options = options
        if options and options["dsn"]:
            self.parsed_dsn = Dsn(options["dsn"])
        else:
            self.parsed_dsn = None  # type: ignore

    def capture_event(self, event):
        """This gets invoked with the event dictionary when an event should
        be sent to sentry.
        """
        raise NotImplementedError()

    def flush(self, timeout, callback=None):
        """Wait `timeout` seconds for the current events to be sent out."""
        pass

    def kill(self):
        # type: () -> None
        """Forcefully kills the transport."""
        pass

    def __del__(self):
        # type: () -> None
        try:
            self.kill()
        except Exception:
            pass

Ancestors (in MRO)

Class variables

var parsed_dsn

Instance variables

var options

Methods

def __init__(

self, options=None)

def __init__(self, options=None):
    # type: (Optional[ClientOptions]) -> None
    self.options = options
    if options and options["dsn"]:
        self.parsed_dsn = Dsn(options["dsn"])
    else:
        self.parsed_dsn = None  # type: ignore

def capture_event(

self, event)

This gets invoked with the event dictionary when an event should be sent to sentry.

def capture_event(self, event):
    """This gets invoked with the event dictionary when an event should
    be sent to sentry.
    """
    raise NotImplementedError()

def flush(

self, timeout, callback=None)

Wait timeout seconds for the current events to be sent out.

def flush(self, timeout, callback=None):
    """Wait `timeout` seconds for the current events to be sent out."""
    pass

def kill(

self)

Forcefully kills the transport.

def kill(self):
    # type: () -> None
    """Forcefully kills the transport."""
    pass

Sub-modules