Top

sentry_sdk module

The Sentry SDK is the new-style SDK for sentry.io. It implements the unified API that all modern SDKs follow for Python 2.7 and 3.5 or later.

The user documentation can be found on docs.sentry.io.

Quickstart

The only thing to get going is to call sentry_sdk.init(). When not passed any arguments the default options are used and the DSN is picked up from the SENTRY_DSN environment variable. Otherwise the DSN can be passed with the dsn keyword or first argument.

import sentry_sdk
sentry_sdk.init()

This initializes the default integrations which will automatically pick up any uncaught exceptions. Additionally you can report arbitrary other exceptions:

try:
    my_failing_function()
except Exception as e:
    sentry_sdk.capture_exception(e)
"""
The Sentry SDK is the new-style SDK for [sentry.io](https://sentry.io/).  It implements
the unified API that all modern SDKs follow for Python 2.7 and 3.5 or later.

The user documentation can be found on [docs.sentry.io](https://docs.sentry.io/).

## Quickstart

The only thing to get going is to call `sentry_sdk.init()`.  When not passed any
arguments the default options are used and the DSN is picked up from the `SENTRY_DSN`
environment variable.  Otherwise the DSN can be passed with the `dsn` keyword
or first argument.

    import sentry_sdk
    sentry_sdk.init()

This initializes the default integrations which will automatically pick up any
uncaught exceptions.  Additionally you can report arbitrary other exceptions:

    try:
        my_failing_function()
    except Exception as e:
        sentry_sdk.capture_exception(e)
"""
from sentry_sdk.api import *  # noqa
from sentry_sdk.api import __all__  # noqa
from sentry_sdk.consts import VERSION  # noqa

# modules we consider public
__all__.append("integrations")

# Initialize the debug support after everything is loaded
from sentry_sdk.debug import init_debug_support

init_debug_support()
del init_debug_support

Functions

def add_breadcrumb(

*args, **kwargs)

Alias for Hub.add_breadcrumb

Adds a breadcrumb. The breadcrumbs are a dictionary with the data as the sentry v7/v8 protocol expects. hint is an optional value that can be used by before_breadcrumb to customize the breadcrumbs that are emitted.

@hubmethod
def add_breadcrumb(*args, **kwargs):
    hub = Hub.current
    if hub is not None:
        return hub.add_breadcrumb(*args, **kwargs)

def capture_event(

event, hint=None)

Alias for Hub.capture_event

Captures an event. The return value is the ID of the event.

The event is a dictionary following the Sentry v7/v8 protocol specification. Optionally an event hint dict can be passed that is used by processors to extract additional information from it. Typically the event hint object would contain exception information.

@hubmethod
def capture_event(event, hint=None):
    hub = Hub.current
    if hub is not None:
        return hub.capture_event(event, hint)

def capture_exception(

error=None)

Alias for Hub.capture_exception

Captures an exception.

The argument passed can be None in which case the last exception will be reported, otherwise an exception object or an exc_info tuple.

@hubmethod
def capture_exception(error=None):
    hub = Hub.current
    if hub is not None:
        return hub.capture_exception(error)

def capture_message(

message, level=None)

Alias for Hub.capture_message

Captures a message. The message is just a string. If no level is provided the default level is info.

@hubmethod
def capture_message(message, level=None):
    hub = Hub.current
    if hub is not None:
        return hub.capture_message(message, level)

def configure_scope(

callback=None)

Alias for Hub.configure_scope

Reconfigures the scope.

@hubmethod
def configure_scope(callback=None):
    hub = Hub.current
    if hub is not None:
        return hub.configure_scope(callback)
    elif callback is None:

        @contextmanager
        def inner():
            yield Scope()

        return inner()

def init(

*args, **kwargs)

Initializes the SDK and optionally integrations.

This takes the same arguments as the client constructor.

def init(*args, **kwargs):
    """Initializes the SDK and optionally integrations.

    This takes the same arguments as the client constructor.
    """
    global _initial_client
    client = Client(*args, **kwargs)
    Hub.current.bind_client(client)
    rv = _InitGuard(client)
    if client is not None:
        _initial_client = weakref.ref(client)
    return rv

def last_event_id(

)

Alias for Hub.last_event_id

Returns the last event ID.

@hubmethod
def last_event_id():
    hub = Hub.current
    if hub is not None:
        return hub.last_event_id()

def push_scope(

callback=None)

Alias for Hub.push_scope

Pushes a new layer on the scope stack. Returns a context manager that should be used to pop the scope again. Alternatively a callback can be provided that is executed in the context of the scope.

@hubmethod
def push_scope(callback=None):
    hub = Hub.current
    if hub is not None:
        return hub.push_scope(callback)
    elif callback is None:

        @contextmanager
        def inner():
            yield Scope()

        return inner()

Classes

class Client

The client is internally responsible for capturing the events and forwarding them to sentry through the configured transport. It takes the client options as keyword arguments and optionally the DSN as first argument.

class Client(object):
    """The client is internally responsible for capturing the events and
    forwarding them to sentry through the configured transport.  It takes
    the client options as keyword arguments and optionally the DSN as first
    argument.
    """

    def __init__(self, *args, **kwargs):
        old_debug = _client_init_debug.get(False)
        try:
            self.options = options = get_options(*args, **kwargs)
            _client_init_debug.set(options["debug"])
            self.transport = make_transport(options)

            request_bodies = ("always", "never", "small", "medium")
            if options["request_bodies"] not in request_bodies:
                raise ValueError(
                    "Invalid value for request_bodies. Must be one of {}".format(
                        request_bodies
                    )
                )

            self.integrations = setup_integrations(
                options["integrations"], with_defaults=options["default_integrations"]
            )
        finally:
            _client_init_debug.set(old_debug)

    @property
    def dsn(self):
        """Returns the configured DSN as string."""
        return self.options["dsn"]

    def _prepare_event(self, event, hint, scope):
        if event.get("timestamp") is None:
            event["timestamp"] = datetime.utcnow()

        if scope is not None:
            event = scope.apply_to_event(event, hint)
            if event is None:
                return

        if (
            self.options["attach_stacktrace"]
            and "exception" not in event
            and "stacktrace" not in event
            and "threads" not in event
        ):
            with capture_internal_exceptions():
                event["threads"] = [
                    {
                        "stacktrace": current_stacktrace(),
                        "crashed": False,
                        "current": True,
                    }
                ]

        for key in "release", "environment", "server_name", "dist":
            if event.get(key) is None and self.options[key] is not None:
                event[key] = text_type(self.options[key]).strip()
        if event.get("sdk") is None:
            sdk_info = dict(SDK_INFO)
            sdk_info["integrations"] = sorted(self.integrations.keys())
            event["sdk"] = sdk_info

        if event.get("platform") is None:
            event["platform"] = "python"

        event = handle_in_app(
            event, self.options["in_app_exclude"], self.options["in_app_include"]
        )

        before_send = self.options["before_send"]
        if before_send is not None:
            with capture_internal_exceptions():
                new_event = before_send(event, hint)
            if new_event is None:
                logger.info("before send dropped event (%s)", event)
            event = new_event

        # Postprocess the event in the very end so that annotated types do
        # generally not surface in before_send
        if event is not None:
            strip_event_mut(event)
            event = flatten_metadata(event)
            event = convert_types(event)

        return event

    def _is_ignored_error(self, event, hint=None):
        exc_info = hint.get("exc_info")
        if exc_info is None:
            return False

        type_name = get_type_name(exc_info[0])
        full_name = "%s.%s" % (exc_info[0].__module__, type_name)

        for errcls in self.options["ignore_errors"]:
            # String types are matched against the type name in the
            # exception only
            if isinstance(errcls, string_types):
                if errcls == full_name or errcls == type_name:
                    return True
            else:
                if issubclass(exc_info[0], errcls):
                    return True

        return False

    def _should_capture(self, event, hint, scope=None):
        if (
            self.options["sample_rate"] < 1.0
            and random.random() >= self.options["sample_rate"]
        ):
            return False

        if self._is_ignored_error(event, hint):
            return False

        return True

    def capture_event(self, event, hint=None, scope=None):
        """Captures an event.

        This takes the ready made event and an optoinal hint and scope.  The
        hint is internally used to further customize the representation of the
        error.  When provided it's a dictionary of optional information such
        as exception info.

        If the transport is not set nothing happens, otherwise the return
        value of this function will be the ID of the captured event.
        """
        if self.transport is None:
            return
        if hint is None:
            hint = {}
        rv = event.get("event_id")
        if rv is None:
            event["event_id"] = rv = uuid.uuid4().hex
        if not self._should_capture(event, hint, scope):
            return
        event = self._prepare_event(event, hint, scope)
        if event is None:
            return
        self.transport.capture_event(event)
        return rv

    def close(self, timeout=None, shutdown_callback=None):
        """Closes the client which shuts down the transport in an
        orderly manner.

        The `shutdown_callback` is invoked with two arguments: the number of
        pending events and the configured shutdown timeout.  For instance the
        default atexit integration will use this to render out a message on
        stderr.
        """
        if self.transport is not None:
            if timeout is None:
                timeout = self.options["shutdown_timeout"]
            self.transport.shutdown(timeout=timeout, callback=shutdown_callback)
            self.transport = None

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, tb):
        self.close()

Ancestors (in MRO)

Static methods

def __init__(

self, *args, **kwargs)

Initialize self. See help(type(self)) for accurate signature.

def __init__(self, *args, **kwargs):
    old_debug = _client_init_debug.get(False)
    try:
        self.options = options = get_options(*args, **kwargs)
        _client_init_debug.set(options["debug"])
        self.transport = make_transport(options)
        request_bodies = ("always", "never", "small", "medium")
        if options["request_bodies"] not in request_bodies:
            raise ValueError(
                "Invalid value for request_bodies. Must be one of {}".format(
                    request_bodies
                )
            )
        self.integrations = setup_integrations(
            options["integrations"], with_defaults=options["default_integrations"]
        )
    finally:
        _client_init_debug.set(old_debug)

def capture_event(

self, event, hint=None, scope=None)

Captures an event.

This takes the ready made event and an optoinal hint and scope. The hint is internally used to further customize the representation of the error. When provided it's a dictionary of optional information such as exception info.

If the transport is not set nothing happens, otherwise the return value of this function will be the ID of the captured event.

def capture_event(self, event, hint=None, scope=None):
    """Captures an event.
    This takes the ready made event and an optoinal hint and scope.  The
    hint is internally used to further customize the representation of the
    error.  When provided it's a dictionary of optional information such
    as exception info.
    If the transport is not set nothing happens, otherwise the return
    value of this function will be the ID of the captured event.
    """
    if self.transport is None:
        return
    if hint is None:
        hint = {}
    rv = event.get("event_id")
    if rv is None:
        event["event_id"] = rv = uuid.uuid4().hex
    if not self._should_capture(event, hint, scope):
        return
    event = self._prepare_event(event, hint, scope)
    if event is None:
        return
    self.transport.capture_event(event)
    return rv

def close(

self, timeout=None, shutdown_callback=None)

Closes the client which shuts down the transport in an orderly manner.

The shutdown_callback is invoked with two arguments: the number of pending events and the configured shutdown timeout. For instance the default atexit integration will use this to render out a message on stderr.

def close(self, timeout=None, shutdown_callback=None):
    """Closes the client which shuts down the transport in an
    orderly manner.
    The `shutdown_callback` is invoked with two arguments: the number of
    pending events and the configured shutdown timeout.  For instance the
    default atexit integration will use this to render out a message on
    stderr.
    """
    if self.transport is not None:
        if timeout is None:
            timeout = self.options["shutdown_timeout"]
        self.transport.shutdown(timeout=timeout, callback=shutdown_callback)
        self.transport = None

Instance variables

var dsn

Returns the configured DSN as string.

class HttpTransport

The default HTTP transport.

class HttpTransport(Transport):
    """The default HTTP transport."""

    def __init__(self, options):
        Transport.__init__(self, options)
        self._worker = BackgroundWorker()
        self._auth = self.parsed_dsn.to_auth("sentry-python/%s" % VERSION)
        self._pool = _make_pool(
            self.parsed_dsn,
            http_proxy=options["http_proxy"],
            https_proxy=options["https_proxy"],
            ca_certs=options["ca_certs"],
        )
        self._disabled_until = None
        self._retry = urllib3.util.Retry()
        self.options = options

        from sentry_sdk import Hub

        self.hub_cls = Hub

    def _send_event(self, event):
        if self._disabled_until is not None:
            if datetime.utcnow() < self._disabled_until:
                return
            self._disabled_until = None

        body = io.BytesIO()
        with gzip.GzipFile(fileobj=body, mode="w") as f:
            f.write(json.dumps(event).encode("utf-8"))

        logger.debug(
            "Sending %s event [%s] to %s project:%s"
            % (
                event.get("level") or "error",
                event["event_id"],
                self.parsed_dsn.host,
                self.parsed_dsn.project_id,
            )
        )
        response = self._pool.request(
            "POST",
            str(self._auth.store_api_url),
            body=body.getvalue(),
            headers={
                "X-Sentry-Auth": str(self._auth.to_header()),
                "Content-Type": "application/json",
                "Content-Encoding": "gzip",
            },
        )

        try:
            if response.status == 429:
                self._disabled_until = datetime.utcnow() + timedelta(
                    seconds=self._retry.get_retry_after(response) or 60
                )
                return

            elif response.status >= 300 or response.status < 200:
                raise ValueError("Unexpected status code: %s" % response.status)
        finally:
            response.close()

        self._disabled_until = None

    def capture_event(self, event):
        hub = self.hub_cls.current

        def send_event_wrapper():
            with hub:
                with capture_internal_exceptions():
                    self._send_event(event)

        self._worker.submit(send_event_wrapper)

    def shutdown(self, timeout, callback=None):
        logger.debug("Shutting down HTTP transport orderly")
        if timeout <= 0:
            self._worker.kill()
        else:
            self._worker.shutdown(timeout, callback)

    def kill(self):
        logger.debug("Killing HTTP transport")
        self._worker.kill()

    def copy(self):
        transport = type(self)(self.options)
        transport._pool = self._pool
        return transport

Ancestors (in MRO)

Static methods

def __init__(

self, options)

Initialize self. See help(type(self)) for accurate signature.

def __init__(self, options):
    Transport.__init__(self, options)
    self._worker = BackgroundWorker()
    self._auth = self.parsed_dsn.to_auth("sentry-python/%s" % VERSION)
    self._pool = _make_pool(
        self.parsed_dsn,
        http_proxy=options["http_proxy"],
        https_proxy=options["https_proxy"],
        ca_certs=options["ca_certs"],
    )
    self._disabled_until = None
    self._retry = urllib3.util.Retry()
    self.options = options
    from sentry_sdk import Hub
    self.hub_cls = Hub

def capture_event(

self, event)

This gets invoked with the event dictionary when an event should be sent to sentry.

def capture_event(self, event):
    hub = self.hub_cls.current
    def send_event_wrapper():
        with hub:
            with capture_internal_exceptions():
                self._send_event(event)
    self._worker.submit(send_event_wrapper)

def copy(

self)

Copy the transport.

The returned transport should behave completely independent from the previous one. It still may share HTTP connection pools, but not share any state such as internal queues.

def copy(self):
    transport = type(self)(self.options)
    transport._pool = self._pool
    return transport

def kill(

self)

Forcefully kills the transport.

def kill(self):
    logger.debug("Killing HTTP transport")
    self._worker.kill()

def shutdown(

self, timeout, callback=None)

Initiates a controlled shutdown that should flush out pending events. The callback must be invoked with the number of pending events and the timeout if the shutting down would take some period of time (eg: not instant).

def shutdown(self, timeout, callback=None):
    logger.debug("Shutting down HTTP transport orderly")
    if timeout <= 0:
        self._worker.kill()
    else:
        self._worker.shutdown(timeout, callback)

Instance variables

var hub_cls

var options

Inheritance: Transport.options

class Hub

The hub wraps the concurrency management of the SDK. Each thread has its own hub but the hub might transfer with the flow of execution if context vars are available.

If the hub is used with a with statement it's temporarily activated.

class Hub(with_metaclass(HubMeta)):
    """The hub wraps the concurrency management of the SDK.  Each thread has
    its own hub but the hub might transfer with the flow of execution if
    context vars are available.

    If the hub is used with a with statement it's temporarily activated.
    """

    def __init__(self, client_or_hub=None, scope=None):
        if isinstance(client_or_hub, Hub):
            hub = client_or_hub
            client, other_scope = hub._stack[-1]
            if scope is None:
                scope = copy.copy(other_scope)
        else:
            client = client_or_hub
        if scope is None:
            scope = Scope()
        self._stack = [(client, scope)]
        self._last_event_id = None
        self._old_hubs = []

    def __enter__(self):
        self._old_hubs.append(Hub.current)
        _local.set(self)
        return self

    def __exit__(self, exc_type, exc_value, tb):
        old = self._old_hubs.pop()
        _local.set(old)

    def run(self, callback):
        """Runs a callback in the context of the hub.  Alternatively the
        with statement can be used on the hub directly.
        """
        with self:
            return callback()

    def get_integration(self, name_or_class):
        """Returns the integration for this hub by name or class.  If there
        is no client bound or the client does not have that integration
        then `None` is returned.

        If the return value is not `None` the hub is guaranteed to have a
        client attached.
        """
        if not isinstance(name_or_class, string_types):
            name_or_class = name_or_class.identifier
        client = self._stack[-1][0]
        if client is not None:
            rv = client.integrations.get(name_or_class)
            if rv is not None:
                return rv

        initial_client = _initial_client
        if initial_client is not None:
            initial_client = initial_client()

        if (
            initial_client is not None
            and initial_client is not client
            and initial_client.integrations.get(name_or_class) is not None
        ):
            warning = (
                "Integration %r attempted to run but it was only "
                "enabled on init() but not the client that "
                "was bound to the current flow.  Earlier versions of "
                "the SDK would consider these integrations enabled but "
                "this is no longer the case." % (name_or_class,)
            )
            warn(Warning(warning), stacklevel=3)
            logger.warning(warning)

    @property
    def client(self):
        """Returns the current client on the hub."""
        return self._stack[-1][0]

    def last_event_id(self):
        """Returns the last event ID."""
        return self._last_event_id

    def bind_client(self, new):
        """Binds a new client to the hub."""
        top = self._stack[-1]
        self._stack[-1] = (new, top[1])

    def capture_event(self, event, hint=None):
        """Captures an event.  The return value is the ID of the event.

        The event is a dictionary following the Sentry v7/v8 protocol
        specification.  Optionally an event hint dict can be passed that
        is used by processors to extract additional information from it.
        Typically the event hint object would contain exception information.
        """
        client, scope = self._stack[-1]
        if client is not None:
            rv = client.capture_event(event, hint, scope)
            if rv is not None:
                self._last_event_id = rv
            return rv

    def capture_message(self, message, level=None):
        """Captures a message.  The message is just a string.  If no level
        is provided the default level is `info`.
        """
        if self.client is None:
            return
        if level is None:
            level = "info"
        return self.capture_event({"message": message, "level": level})

    def capture_exception(self, error=None):
        """Captures an exception.

        The argument passed can be `None` in which case the last exception
        will be reported, otherwise an exception object or an `exc_info`
        tuple.
        """
        client = self.client
        if client is None:
            return
        if error is None:
            exc_info = sys.exc_info()
        else:
            exc_info = exc_info_from_error(error)

        event, hint = event_from_exception(exc_info, client_options=client.options)
        try:
            return self.capture_event(event, hint=hint)
        except Exception:
            self._capture_internal_exception(sys.exc_info())

    def _capture_internal_exception(self, exc_info):
        """Capture an exception that is likely caused by a bug in the SDK
        itself."""
        logger.error("Internal error in sentry_sdk", exc_info=exc_info)

    def add_breadcrumb(self, crumb=None, hint=None, **kwargs):
        """Adds a breadcrumb.  The breadcrumbs are a dictionary with the
        data as the sentry v7/v8 protocol expects.  `hint` is an optional
        value that can be used by `before_breadcrumb` to customize the
        breadcrumbs that are emitted.
        """
        client, scope = self._stack[-1]
        if client is None:
            logger.info("Dropped breadcrumb because no client bound")
            return

        crumb = dict(crumb or ())
        crumb.update(kwargs)
        if not crumb:
            return

        hint = dict(hint or ())

        if crumb.get("timestamp") is None:
            crumb["timestamp"] = datetime.utcnow()
        if crumb.get("type") is None:
            crumb["type"] = "default"

        original_crumb = crumb
        if client.options["before_breadcrumb"] is not None:
            crumb = client.options["before_breadcrumb"](crumb, hint)

        if crumb is not None:
            scope._breadcrumbs.append(crumb)
        else:
            logger.info("before breadcrumb dropped breadcrumb (%s)", original_crumb)
        while len(scope._breadcrumbs) > client.options["max_breadcrumbs"]:
            scope._breadcrumbs.popleft()

    def push_scope(self, callback=None):
        """Pushes a new layer on the scope stack. Returns a context manager
        that should be used to pop the scope again.  Alternatively a callback
        can be provided that is executed in the context of the scope.
        """
        if callback is not None:
            with self.push_scope() as scope:
                callback(scope)
            return

        client, scope = self._stack[-1]
        new_layer = (client, copy.copy(scope))
        self._stack.append(new_layer)

        return _ScopeManager(self)

    scope = push_scope

    def pop_scope_unsafe(self):
        """Pops a scope layer from the stack. Try to use the context manager
        `push_scope()` instead."""
        rv = self._stack.pop()
        assert self._stack, "stack must have at least one layer"
        return rv

    def configure_scope(self, callback=None):
        """Reconfigures the scope."""
        client, scope = self._stack[-1]
        if callback is not None:
            if client is not None:
                callback(scope)
            return

        @contextmanager
        def inner():
            if client is not None:
                yield scope
            else:
                yield Scope()

        return inner()

Ancestors (in MRO)

  • Hub
  • builtins.object

Static methods

def __init__(

self, client_or_hub=None, scope=None)

Initialize self. See help(type(self)) for accurate signature.

def __init__(self, client_or_hub=None, scope=None):
    if isinstance(client_or_hub, Hub):
        hub = client_or_hub
        client, other_scope = hub._stack[-1]
        if scope is None:
            scope = copy.copy(other_scope)
    else:
        client = client_or_hub
    if scope is None:
        scope = Scope()
    self._stack = [(client, scope)]
    self._last_event_id = None
    self._old_hubs = []

def add_breadcrumb(

self, crumb=None, hint=None, **kwargs)

Adds a breadcrumb. The breadcrumbs are a dictionary with the data as the sentry v7/v8 protocol expects. hint is an optional value that can be used by before_breadcrumb to customize the breadcrumbs that are emitted.

def add_breadcrumb(self, crumb=None, hint=None, **kwargs):
    """Adds a breadcrumb.  The breadcrumbs are a dictionary with the
    data as the sentry v7/v8 protocol expects.  `hint` is an optional
    value that can be used by `before_breadcrumb` to customize the
    breadcrumbs that are emitted.
    """
    client, scope = self._stack[-1]
    if client is None:
        logger.info("Dropped breadcrumb because no client bound")
        return
    crumb = dict(crumb or ())
    crumb.update(kwargs)
    if not crumb:
        return
    hint = dict(hint or ())
    if crumb.get("timestamp") is None:
        crumb["timestamp"] = datetime.utcnow()
    if crumb.get("type") is None:
        crumb["type"] = "default"
    original_crumb = crumb
    if client.options["before_breadcrumb"] is not None:
        crumb = client.options["before_breadcrumb"](crumb, hint)
    if crumb is not None:
        scope._breadcrumbs.append(crumb)
    else:
        logger.info("before breadcrumb dropped breadcrumb (%s)", original_crumb)
    while len(scope._breadcrumbs) > client.options["max_breadcrumbs"]:
        scope._breadcrumbs.popleft()

def bind_client(

self, new)

Binds a new client to the hub.

def bind_client(self, new):
    """Binds a new client to the hub."""
    top = self._stack[-1]
    self._stack[-1] = (new, top[1])

def capture_event(

self, event, hint=None)

Captures an event. The return value is the ID of the event.

The event is a dictionary following the Sentry v7/v8 protocol specification. Optionally an event hint dict can be passed that is used by processors to extract additional information from it. Typically the event hint object would contain exception information.

def capture_event(self, event, hint=None):
    """Captures an event.  The return value is the ID of the event.
    The event is a dictionary following the Sentry v7/v8 protocol
    specification.  Optionally an event hint dict can be passed that
    is used by processors to extract additional information from it.
    Typically the event hint object would contain exception information.
    """
    client, scope = self._stack[-1]
    if client is not None:
        rv = client.capture_event(event, hint, scope)
        if rv is not None:
            self._last_event_id = rv
        return rv

def capture_exception(

self, error=None)

Captures an exception.

The argument passed can be None in which case the last exception will be reported, otherwise an exception object or an exc_info tuple.

def capture_exception(self, error=None):
    """Captures an exception.
    The argument passed can be `None` in which case the last exception
    will be reported, otherwise an exception object or an `exc_info`
    tuple.
    """
    client = self.client
    if client is None:
        return
    if error is None:
        exc_info = sys.exc_info()
    else:
        exc_info = exc_info_from_error(error)
    event, hint = event_from_exception(exc_info, client_options=client.options)
    try:
        return self.capture_event(event, hint=hint)
    except Exception:
        self._capture_internal_exception(sys.exc_info())

def capture_message(

self, message, level=None)

Captures a message. The message is just a string. If no level is provided the default level is info.

def capture_message(self, message, level=None):
    """Captures a message.  The message is just a string.  If no level
    is provided the default level is `info`.
    """
    if self.client is None:
        return
    if level is None:
        level = "info"
    return self.capture_event({"message": message, "level": level})

def configure_scope(

self, callback=None)

Reconfigures the scope.

def configure_scope(self, callback=None):
    """Reconfigures the scope."""
    client, scope = self._stack[-1]
    if callback is not None:
        if client is not None:
            callback(scope)
        return
    @contextmanager
    def inner():
        if client is not None:
            yield scope
        else:
            yield Scope()
    return inner()

def get_integration(

self, name_or_class)

Returns the integration for this hub by name or class. If there is no client bound or the client does not have that integration then None is returned.

If the return value is not None the hub is guaranteed to have a client attached.

def get_integration(self, name_or_class):
    """Returns the integration for this hub by name or class.  If there
    is no client bound or the client does not have that integration
    then `None` is returned.
    If the return value is not `None` the hub is guaranteed to have a
    client attached.
    """
    if not isinstance(name_or_class, string_types):
        name_or_class = name_or_class.identifier
    client = self._stack[-1][0]
    if client is not None:
        rv = client.integrations.get(name_or_class)
        if rv is not None:
            return rv
    initial_client = _initial_client
    if initial_client is not None:
        initial_client = initial_client()
    if (
        initial_client is not None
        and initial_client is not client
        and initial_client.integrations.get(name_or_class) is not None
    ):
        warning = (
            "Integration %r attempted to run but it was only "
            "enabled on init() but not the client that "
            "was bound to the current flow.  Earlier versions of "
            "the SDK would consider these integrations enabled but "
            "this is no longer the case." % (name_or_class,)
        )
        warn(Warning(warning), stacklevel=3)
        logger.warning(warning)

def last_event_id(

self)

Returns the last event ID.

def last_event_id(self):
    """Returns the last event ID."""
    return self._last_event_id

def pop_scope_unsafe(

self)

Pops a scope layer from the stack. Try to use the context manager push_scope() instead.

def pop_scope_unsafe(self):
    """Pops a scope layer from the stack. Try to use the context manager
    `push_scope()` instead."""
    rv = self._stack.pop()
    assert self._stack, "stack must have at least one layer"
    return rv

def push_scope(

self, callback=None)

Pushes a new layer on the scope stack. Returns a context manager that should be used to pop the scope again. Alternatively a callback can be provided that is executed in the context of the scope.

def push_scope(self, callback=None):
    """Pushes a new layer on the scope stack. Returns a context manager
    that should be used to pop the scope again.  Alternatively a callback
    can be provided that is executed in the context of the scope.
    """
    if callback is not None:
        with self.push_scope() as scope:
            callback(scope)
        return
    client, scope = self._stack[-1]
    new_layer = (client, copy.copy(scope))
    self._stack.append(new_layer)
    return _ScopeManager(self)

def run(

self, callback)

Runs a callback in the context of the hub. Alternatively the with statement can be used on the hub directly.

def run(self, callback):
    """Runs a callback in the context of the hub.  Alternatively the
    with statement can be used on the hub directly.
    """
    with self:
        return callback()

def scope(

self, callback=None)

Pushes a new layer on the scope stack. Returns a context manager that should be used to pop the scope again. Alternatively a callback can be provided that is executed in the context of the scope.

def push_scope(self, callback=None):
    """Pushes a new layer on the scope stack. Returns a context manager
    that should be used to pop the scope again.  Alternatively a callback
    can be provided that is executed in the context of the scope.
    """
    if callback is not None:
        with self.push_scope() as scope:
            callback(scope)
        return
    client, scope = self._stack[-1]
    new_layer = (client, copy.copy(scope))
    self._stack.append(new_layer)
    return _ScopeManager(self)

Instance variables

var client

Returns the current client on the hub.

class Scope

The scope holds extra information that should be sent with all events that belong to it.

class Scope(object):
    """The scope holds extra information that should be sent with all
    events that belong to it.
    """

    __slots__ = (
        "_level",
        "_name",
        "_fingerprint",
        "_transaction",
        "_user",
        "_tags",
        "_contexts",
        "_extras",
        "_breadcrumbs",
        "_event_processors",
        "_error_processors",
    )

    def __init__(self):
        self._event_processors = []
        self._error_processors = []

        self._name = None
        self.clear()

    @_attr_setter
    def level(self, value):
        """When set this overrides the level."""
        self._level = value

    @_attr_setter
    def fingerprint(self, value):
        """When set this overrides the default fingerprint."""
        self._fingerprint = value

    @_attr_setter
    def transaction(self, value):
        """When set this forces a specific transaction name to be set."""
        self._transaction = value

    @_attr_setter
    def user(self, value):
        """When set a specific user is bound to the scope."""
        self._user = value

    def set_tag(self, key, value):
        """Sets a tag for a key to a specific value."""
        self._tags[key] = value

    def remove_tag(self, key):
        """Removes a specific tag."""
        self._tags.pop(key, None)

    def set_context(self, key, value):
        """Binds a context at a certain key to a specific value."""
        self._contexts[key] = value

    def remove_context(self, key):
        """Removes a context."""
        self._contexts.pop(key, None)

    def set_extra(self, key, value):
        """Sets an extra key to a specific value."""
        self._extras[key] = value

    def remove_extra(self, key):
        """Removes a specific extra key."""
        self._extras.pop(key, None)

    def clear(self):
        """Clears the entire scope."""
        self._level = None
        self._fingerprint = None
        self._transaction = None
        self._user = None

        self._tags = {}
        self._contexts = {}
        self._extras = {}

        self._breadcrumbs = deque()

    def add_event_processor(self, func):
        """"Register a scope local event processor on the scope.

        This function behaves like `before_send.`
        """
        self._event_processors.append(func)

    def add_error_processor(self, func, cls=None):
        """"Register a scope local error processor on the scope.

        The error processor works similar to an event processor but is
        invoked with the original exception info triple as second argument.
        """
        if cls is not None:
            real_func = func

            def func(event, exc_info):
                try:
                    is_inst = isinstance(exc_info[1], cls)
                except Exception:
                    is_inst = False
                if is_inst:
                    return real_func(event, exc_info)
                return event

        self._error_processors.append(func)

    def apply_to_event(self, event, hint=None):
        """Applies the information contained on the scope to the given event."""

        def _drop(event, cause, ty):
            logger.info("%s (%s) dropped event (%s)", ty, cause, event)

        if self._level is not None:
            event["level"] = self._level

        event.setdefault("breadcrumbs", []).extend(self._breadcrumbs)
        if event.get("user") is None and self._user is not None:
            event["user"] = self._user

        if event.get("transaction") is None and self._transaction is not None:
            event["transaction"] = self._transaction

        if event.get("fingerprint") is None and self._fingerprint is not None:
            event["fingerprint"] = self._fingerprint

        if self._extras:
            event.setdefault("extra", {}).update(self._extras)

        if self._tags:
            event.setdefault("tags", {}).update(self._tags)

        if self._contexts:
            event.setdefault("contexts", {}).update(self._contexts)

        exc_info = hint.get("exc_info") if hint is not None else None
        if exc_info is not None:
            for processor in self._error_processors:
                new_event = processor(event, exc_info)
                if new_event is None:
                    return _drop(event, processor, "error processor")
                event = new_event

        for processor in chain(global_event_processors, self._event_processors):
            new_event = event
            with capture_internal_exceptions():
                new_event = processor(event, hint)
            if new_event is None:
                return _drop(event, processor, "event processor")
            event = new_event

        return event

    def __copy__(self):
        rv = object.__new__(self.__class__)

        rv._level = self._level
        rv._name = self._name
        rv._fingerprint = self._fingerprint
        rv._transaction = self._transaction
        rv._user = self._user

        rv._tags = dict(self._tags)
        rv._contexts = dict(self._contexts)
        rv._extras = dict(self._extras)

        rv._breadcrumbs = copy(self._breadcrumbs)
        rv._event_processors = list(self._event_processors)
        rv._error_processors = list(self._error_processors)

        return rv

    def __repr__(self):
        return "<%s id=%s name=%s>" % (
            self.__class__.__name__,
            hex(id(self)),
            self._name,
        )

Ancestors (in MRO)

Static methods

def __init__(

self)

Initialize self. See help(type(self)) for accurate signature.

def __init__(self):
    self._event_processors = []
    self._error_processors = []
    self._name = None
    self.clear()

def add_error_processor(

self, func, cls=None)

"Register a scope local error processor on the scope.

The error processor works similar to an event processor but is invoked with the original exception info triple as second argument.

def add_error_processor(self, func, cls=None):
    """"Register a scope local error processor on the scope.
    The error processor works similar to an event processor but is
    invoked with the original exception info triple as second argument.
    """
    if cls is not None:
        real_func = func
        def func(event, exc_info):
            try:
                is_inst = isinstance(exc_info[1], cls)
            except Exception:
                is_inst = False
            if is_inst:
                return real_func(event, exc_info)
            return event
    self._error_processors.append(func)

def add_event_processor(

self, func)

"Register a scope local event processor on the scope.

This function behaves like before_send.

def add_event_processor(self, func):
    """"Register a scope local event processor on the scope.
    This function behaves like `before_send.`
    """
    self._event_processors.append(func)

def apply_to_event(

self, event, hint=None)

Applies the information contained on the scope to the given event.

def apply_to_event(self, event, hint=None):
    """Applies the information contained on the scope to the given event."""
    def _drop(event, cause, ty):
        logger.info("%s (%s) dropped event (%s)", ty, cause, event)
    if self._level is not None:
        event["level"] = self._level
    event.setdefault("breadcrumbs", []).extend(self._breadcrumbs)
    if event.get("user") is None and self._user is not None:
        event["user"] = self._user
    if event.get("transaction") is None and self._transaction is not None:
        event["transaction"] = self._transaction
    if event.get("fingerprint") is None and self._fingerprint is not None:
        event["fingerprint"] = self._fingerprint
    if self._extras:
        event.setdefault("extra", {}).update(self._extras)
    if self._tags:
        event.setdefault("tags", {}).update(self._tags)
    if self._contexts:
        event.setdefault("contexts", {}).update(self._contexts)
    exc_info = hint.get("exc_info") if hint is not None else None
    if exc_info is not None:
        for processor in self._error_processors:
            new_event = processor(event, exc_info)
            if new_event is None:
                return _drop(event, processor, "error processor")
            event = new_event
    for processor in chain(global_event_processors, self._event_processors):
        new_event = event
        with capture_internal_exceptions():
            new_event = processor(event, hint)
        if new_event is None:
            return _drop(event, processor, "event processor")
        event = new_event
    return event

def clear(

self)

Clears the entire scope.

def clear(self):
    """Clears the entire scope."""
    self._level = None
    self._fingerprint = None
    self._transaction = None
    self._user = None
    self._tags = {}
    self._contexts = {}
    self._extras = {}
    self._breadcrumbs = deque()

def remove_context(

self, key)

Removes a context.

def remove_context(self, key):
    """Removes a context."""
    self._contexts.pop(key, None)

def remove_extra(

self, key)

Removes a specific extra key.

def remove_extra(self, key):
    """Removes a specific extra key."""
    self._extras.pop(key, None)

def remove_tag(

self, key)

Removes a specific tag.

def remove_tag(self, key):
    """Removes a specific tag."""
    self._tags.pop(key, None)

def set_context(

self, key, value)

Binds a context at a certain key to a specific value.

def set_context(self, key, value):
    """Binds a context at a certain key to a specific value."""
    self._contexts[key] = value

def set_extra(

self, key, value)

Sets an extra key to a specific value.

def set_extra(self, key, value):
    """Sets an extra key to a specific value."""
    self._extras[key] = value

def set_tag(

self, key, value)

Sets a tag for a key to a specific value.

def set_tag(self, key, value):
    """Sets a tag for a key to a specific value."""
    self._tags[key] = value

Instance variables

var fingerprint

When set this overrides the default fingerprint.

var level

When set this overrides the level.

var transaction

When set this forces a specific transaction name to be set.

var user

When set a specific user is bound to the scope.

class Transport

Baseclass for all transports.

A transport is used to send an event to sentry.

class Transport(object):
    """Baseclass for all transports.

    A transport is used to send an event to sentry.
    """

    def __init__(self, options=None):
        self.options = options
        if options and options["dsn"]:
            self.parsed_dsn = Dsn(options["dsn"])
        else:
            self.parsed_dsn = None

    def capture_event(self, event):
        """This gets invoked with the event dictionary when an event should
        be sent to sentry.
        """
        raise NotImplementedError()

    def shutdown(self, timeout, callback=None):
        """Initiates a controlled shutdown that should flush out pending
        events.  The callback must be invoked with the number of pending
        events and the timeout if the shutting down would take some period
        of time (eg: not instant).
        """
        self.kill()

    def kill(self):
        """Forcefully kills the transport."""
        pass

    def copy(self):
        """Copy the transport.

        The returned transport should behave completely independent from the
        previous one.  It still may share HTTP connection pools, but not share
        any state such as internal queues.
        """
        return self

    def __del__(self):
        try:
            self.kill()
        except Exception:
            pass

Ancestors (in MRO)

Static methods

def __init__(

self, options=None)

Initialize self. See help(type(self)) for accurate signature.

def __init__(self, options=None):
    self.options = options
    if options and options["dsn"]:
        self.parsed_dsn = Dsn(options["dsn"])
    else:
        self.parsed_dsn = None

def capture_event(

self, event)

This gets invoked with the event dictionary when an event should be sent to sentry.

def capture_event(self, event):
    """This gets invoked with the event dictionary when an event should
    be sent to sentry.
    """
    raise NotImplementedError()

def copy(

self)

Copy the transport.

The returned transport should behave completely independent from the previous one. It still may share HTTP connection pools, but not share any state such as internal queues.

def copy(self):
    """Copy the transport.
    The returned transport should behave completely independent from the
    previous one.  It still may share HTTP connection pools, but not share
    any state such as internal queues.
    """
    return self

def kill(

self)

Forcefully kills the transport.

def kill(self):
    """Forcefully kills the transport."""
    pass

def shutdown(

self, timeout, callback=None)

Initiates a controlled shutdown that should flush out pending events. The callback must be invoked with the number of pending events and the timeout if the shutting down would take some period of time (eg: not instant).

def shutdown(self, timeout, callback=None):
    """Initiates a controlled shutdown that should flush out pending
    events.  The callback must be invoked with the number of pending
    events and the timeout if the shutting down would take some period
    of time (eg: not instant).
    """
    self.kill()

Instance variables

var options

Sub-modules