from __future__ import annotations
from abc import ABC, abstractmethod
import io
import os
import gzip
import socket
import ssl
import time
import asyncio
from datetime import datetime, timedelta, timezone
from collections import defaultdict
from urllib.request import getproxies
try:
import brotli # type: ignore
except ImportError:
brotli = None
try:
import httpcore
except ImportError:
httpcore = None # type: ignore
try:
import h2 # noqa: F401
HTTP2_ENABLED = httpcore is not None
except ImportError:
HTTP2_ENABLED = False
try:
import anyio # noqa: F401
ASYNC_TRANSPORT_ENABLED = httpcore is not None
except ImportError:
ASYNC_TRANSPORT_ENABLED = False
import urllib3
import certifi
from sentry_sdk.consts import EndpointType
from sentry_sdk.utils import (
Dsn,
logger,
capture_internal_exceptions,
mark_sentry_task_internal,
)
from sentry_sdk.worker import BackgroundWorker, Worker, AsyncWorker
from sentry_sdk.envelope import Envelope, Item, PayloadRef
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from typing import (
List,
Dict,
Any,
Callable,
DefaultDict,
Iterable,
Mapping,
Optional,
Tuple,
Type,
Union,
Self,
)
from urllib3.poolmanager import PoolManager
from urllib3.poolmanager import ProxyManager
from sentry_sdk._types import EventDataCategory
KEEP_ALIVE_SOCKET_OPTIONS = []
for option in [
(socket.SOL_SOCKET, lambda: getattr(socket, "SO_KEEPALIVE"), 1), # noqa: B009
(socket.SOL_TCP, lambda: getattr(socket, "TCP_KEEPIDLE"), 45), # noqa: B009
(socket.SOL_TCP, lambda: getattr(socket, "TCP_KEEPINTVL"), 10), # noqa: B009
(socket.SOL_TCP, lambda: getattr(socket, "TCP_KEEPCNT"), 6), # noqa: B009
]:
try:
KEEP_ALIVE_SOCKET_OPTIONS.append((option[0], option[1](), option[2]))
except AttributeError:
# a specific option might not be available on specific systems,
# e.g. TCP_KEEPIDLE doesn't exist on macOS
pass
[docs]
class Transport(ABC):
"""Baseclass for all transports.
A transport is used to send an event to sentry.
"""
parsed_dsn: Optional[Dsn] = None
def __init__(self: Self, options: Optional[Dict[str, Any]] = None) -> None:
self.options = options
if options and options["dsn"] is not None and options["dsn"]:
self.parsed_dsn = Dsn(options["dsn"])
else:
self.parsed_dsn = None
[docs]
@abstractmethod
def capture_envelope(self: Self, envelope: Envelope) -> None:
"""
Send an envelope to Sentry.
Envelopes are a data container format that can hold any type of data
submitted to Sentry. We use it to send all event data (including errors,
transactions, crons check-ins, etc.) to Sentry.
"""
pass
[docs]
def flush(
self: Self,
timeout: float,
callback: Optional[Any] = None,
) -> None:
"""
Wait `timeout` seconds for the current events to be sent out.
The default implementation is a no-op, since this method may only be relevant to some transports.
Subclasses should override this method if necessary.
"""
return None
[docs]
def kill(self: Self) -> None:
"""
Forcefully kills the transport.
The default implementation is a no-op, since this method may only be relevant to some transports.
Subclasses should override this method if necessary.
"""
return None
[docs]
def record_lost_event(
self: Self,
reason: str,
data_category: Optional[EventDataCategory] = None,
item: Optional[Item] = None,
*,
quantity: int = 1,
) -> None:
"""This increments a counter for event loss by reason and
data category by the given positive-int quantity (default 1).
If an item is provided, the data category and quantity are
extracted from the item, and the values passed for
data_category and quantity are ignored.
When recording a lost transaction via data_category="transaction",
the calling code should also record the lost spans via this method.
When recording lost spans, `quantity` should be set to the number
of contained spans, plus one for the transaction itself. When
passing an Item containing a transaction via the `item` parameter,
this method automatically records the lost spans.
"""
return None
def is_healthy(self: Self) -> bool:
return True
def _parse_rate_limits(
header: str, now: Optional[datetime] = None
) -> Iterable[Tuple[Optional[str], datetime]]:
if now is None:
now = datetime.now(timezone.utc)
for limit in header.split(","):
try:
parameters = limit.strip().split(":")
retry_after_val, categories = parameters[:2]
retry_after = now + timedelta(seconds=int(retry_after_val))
for category in categories and categories.split(";") or (None,):
yield category, retry_after
except (LookupError, ValueError):
continue
class HttpTransportCore(Transport):
"""Shared base class for sync and async transports."""
TIMEOUT = 30 # seconds
def __init__(self: Self, options: Dict[str, Any]) -> None:
from sentry_sdk.consts import VERSION
Transport.__init__(self, options)
assert self.parsed_dsn is not None
self.options: Dict[str, Any] = options
self._worker = self._create_worker(options)
self._auth = self.parsed_dsn.to_auth("sentry.python/%s" % VERSION)
self._disabled_until: Dict[Optional[str], datetime] = {}
# We only use this Retry() class for the `get_retry_after` method it exposes
self._retry = urllib3.util.Retry()
self._discarded_events: DefaultDict[Tuple[EventDataCategory, str], int] = (
defaultdict(int)
)
self._last_client_report_sent = time.time()
self._pool = self._make_pool()
experiments = options.get("_experiments", {})
compression_level = experiments.get(
"transport_compression_level",
experiments.get("transport_zlib_compression_level"),
)
compression_algo = experiments.get(
"transport_compression_algo",
(
"gzip"
# if only compression level is set, assume gzip for backwards compatibility
# if we don't have brotli available, fallback to gzip
if compression_level is not None or brotli is None
else "br"
),
)
if compression_algo == "br" and brotli is None:
logger.warning(
"You asked for brotli compression without the Brotli module, falling back to gzip -9"
)
compression_algo = "gzip"
compression_level = None
if compression_algo not in ("br", "gzip"):
logger.warning(
"Unknown compression algo %s, disabling compression", compression_algo
)
self._compression_level = 0
self._compression_algo = None
else:
self._compression_algo = compression_algo
if compression_level is not None:
self._compression_level = compression_level
elif self._compression_algo == "gzip":
self._compression_level = 9
elif self._compression_algo == "br":
self._compression_level = 4
def _create_worker(self, options: dict[str, Any]) -> Worker:
raise NotImplementedError()
def record_lost_event(
self: Self,
reason: str,
data_category: Optional[EventDataCategory] = None,
item: Optional[Item] = None,
*,
quantity: int = 1,
) -> None:
if not self.options["send_client_reports"]:
return
if item is not None:
data_category = item.data_category
quantity = 1 # If an item is provided, we always count it as 1 (except for attachments, handled below).
if data_category == "transaction":
# Also record the lost spans
event = item.get_transaction_event() or {}
# +1 for the transaction itself
span_count = len(event.get("spans") or []) + 1
self.record_lost_event(reason, "span", quantity=span_count)
elif data_category == "attachment":
# quantity of 0 is actually 1 as we do not want to count
# empty attachments as actually empty.
quantity = len(item.get_bytes()) or 1
elif data_category is None:
raise TypeError("data category not provided")
self._discarded_events[data_category, reason] += quantity
def _get_header_value(self: Self, response: Any, header: str) -> Optional[str]:
return response.headers.get(header)
def _update_rate_limits(
self: Self, response: Union[urllib3.BaseHTTPResponse, httpcore.Response]
) -> None:
# new sentries with more rate limit insights. We honor this header
# no matter of the status code to update our internal rate limits.
header = self._get_header_value(response, "x-sentry-rate-limits")
if header:
logger.warning("Rate-limited via x-sentry-rate-limits")
self._disabled_until.update(_parse_rate_limits(header))
# old sentries only communicate global rate limit hits via the
# retry-after header on 429. This header can also be emitted on new
# sentries if a proxy in front wants to globally slow things down.
elif response.status == 429:
logger.warning("Rate-limited via 429")
retry_after_value = self._get_header_value(response, "Retry-After")
retry_after = (
self._retry.parse_retry_after(retry_after_value)
if retry_after_value is not None
else None
) or 60
self._disabled_until[None] = datetime.now(timezone.utc) + timedelta(
seconds=retry_after
)
def _handle_request_error(
self: Self, envelope: Optional[Envelope], loss_reason: str = "network"
) -> None:
def record_loss(reason: str) -> None:
if envelope is None:
self.record_lost_event(reason, data_category="error")
else:
for item in envelope.items:
self.record_lost_event(reason, item=item)
self.on_dropped_event(loss_reason)
record_loss("network_error")
def _handle_response(
self: Self,
response: Union[urllib3.BaseHTTPResponse, httpcore.Response],
envelope: Optional[Envelope],
) -> None:
self._update_rate_limits(response)
if response.status == 429:
# if we hit a 429. Something was rate limited but we already
# acted on this in `self._update_rate_limits`. Note that we
# do not want to record event loss here as we will have recorded
# an outcome in relay already.
self.on_dropped_event("status_429")
pass
elif response.status >= 300 or response.status < 200:
logger.error(
"Unexpected status code: %s (body: %s)",
response.status,
getattr(response, "data", getattr(response, "content", None)),
)
self._handle_request_error(
envelope=envelope, loss_reason="status_{}".format(response.status)
)
def _update_headers(
self: Self,
headers: Dict[str, str],
) -> None:
headers.update(
{
"User-Agent": str(self._auth.client),
"X-Sentry-Auth": str(self._auth.to_header()),
}
)
def on_dropped_event(self: Self, _reason: str) -> None:
return None
def _fetch_pending_client_report(
self: Self, force: bool = False, interval: int = 60
) -> Optional[Item]:
if not self.options["send_client_reports"]:
return None
if not (force or self._last_client_report_sent < time.time() - interval):
return None
discarded_events = self._discarded_events
self._discarded_events = defaultdict(int)
self._last_client_report_sent = time.time()
if not discarded_events:
return None
return Item(
PayloadRef(
json={
"timestamp": time.time(),
"discarded_events": [
{"reason": reason, "category": category, "quantity": quantity}
for (
(category, reason),
quantity,
) in discarded_events.items()
],
}
),
type="client_report",
)
def _check_disabled(self: Self, category: EventDataCategory) -> bool:
def _disabled(bucket: Optional[EventDataCategory]) -> bool:
ts = self._disabled_until.get(bucket)
return ts is not None and ts > datetime.now(timezone.utc)
return _disabled(category) or _disabled(None)
def _is_rate_limited(self: Self) -> bool:
return any(
ts > datetime.now(timezone.utc) for ts in self._disabled_until.values()
)
def _is_worker_full(self: Self) -> bool:
return self._worker.full()
def is_healthy(self: Self) -> bool:
return not (self._is_worker_full() or self._is_rate_limited())
def _prepare_envelope(
self: Self, envelope: Envelope
) -> Optional[Tuple[Envelope, io.BytesIO, Dict[str, str]]]:
# remove all items from the envelope which are over quota
new_items = []
for item in envelope.items:
if self._check_disabled(item.data_category):
if item.data_category in ("transaction", "error", "default"):
self.on_dropped_event("self_rate_limits")
self.record_lost_event("ratelimit_backoff", item=item)
else:
new_items.append(item)
# Since we're modifying the envelope here make a copy so that others
# that hold references do not see their envelope modified.
envelope = Envelope(headers=envelope.headers, items=new_items)
if not envelope.items:
return None
# since we're already in the business of sending out an envelope here
# check if we have one pending for the stats session envelopes so we
# can attach it to this enveloped scheduled for sending. This will
# currently typically attach the client report to the most recent
# session update.
client_report_item = self._fetch_pending_client_report(interval=30)
if client_report_item is not None:
envelope.items.append(client_report_item)
content_encoding, body = self._serialize_envelope(envelope)
assert self.parsed_dsn is not None
logger.debug(
"Sending envelope [%s] project:%s host:%s",
envelope.description,
self.parsed_dsn.project_id,
self.parsed_dsn.host,
)
headers = {
"Content-Type": "application/x-sentry-envelope",
}
if content_encoding:
headers["Content-Encoding"] = content_encoding
return envelope, body, headers
def _serialize_envelope(
self: Self, envelope: Envelope
) -> tuple[Optional[str], io.BytesIO]:
content_encoding = None
body = io.BytesIO()
if self._compression_level == 0 or self._compression_algo is None:
envelope.serialize_into(body)
else:
content_encoding = self._compression_algo
if self._compression_algo == "br" and brotli is not None:
body.write(
brotli.compress(
envelope.serialize(), quality=self._compression_level
)
)
else: # assume gzip as we sanitize the algo value in init
with gzip.GzipFile(
fileobj=body, mode="w", compresslevel=self._compression_level
) as f:
envelope.serialize_into(f)
return content_encoding, body
def _get_pool_options(self: Self) -> Dict[str, Any]:
raise NotImplementedError()
def _in_no_proxy(self: Self, parsed_dsn: Dsn) -> bool:
no_proxy = getproxies().get("no")
if not no_proxy:
return False
for host in no_proxy.split(","):
host = host.strip()
if parsed_dsn.host.endswith(host) or parsed_dsn.netloc.endswith(host):
return True
return False
def _make_pool(
self: Self,
) -> Union[
PoolManager,
ProxyManager,
httpcore.SOCKSProxy,
httpcore.HTTPProxy,
httpcore.ConnectionPool,
httpcore.AsyncSOCKSProxy,
httpcore.AsyncHTTPProxy,
httpcore.AsyncConnectionPool,
]:
raise NotImplementedError()
def _request(
self: Self,
method: str,
endpoint_type: EndpointType,
body: Any,
headers: Mapping[str, str],
) -> Union[urllib3.BaseHTTPResponse, httpcore.Response]:
raise NotImplementedError()
def kill(self: Self) -> None:
logger.debug("Killing HTTP transport")
self._worker.kill()
class BaseHttpTransport(HttpTransportCore):
"""The base HTTP transport."""
def _send_envelope(self: Self, envelope: Envelope) -> None:
_prepared_envelope = self._prepare_envelope(envelope)
if _prepared_envelope is not None:
envelope, body, headers = _prepared_envelope
self._send_request(
body.getvalue(),
headers=headers,
endpoint_type=EndpointType.ENVELOPE,
envelope=envelope,
)
return None
def _send_request(
self: Self,
body: bytes,
headers: Dict[str, str],
endpoint_type: EndpointType,
envelope: Optional[Envelope],
) -> None:
self._update_headers(headers)
try:
response = self._request(
"POST",
endpoint_type,
body,
headers,
)
except Exception:
self._handle_request_error(envelope=envelope, loss_reason="network")
raise
try:
self._handle_response(response=response, envelope=envelope)
finally:
response.close()
def _create_worker(self: Self, options: dict[str, Any]) -> Worker:
return BackgroundWorker(queue_size=options["transport_queue_size"])
def _flush_client_reports(self: Self, force: bool = False) -> None:
client_report = self._fetch_pending_client_report(force=force, interval=60)
if client_report is not None:
self.capture_envelope(Envelope(items=[client_report]))
def capture_envelope(self: Self, envelope: Envelope) -> None:
def send_envelope_wrapper() -> None:
with capture_internal_exceptions():
self._send_envelope(envelope)
self._flush_client_reports()
if not self._worker.submit(send_envelope_wrapper):
self.on_dropped_event("full_queue")
for item in envelope.items:
self.record_lost_event("queue_overflow", item=item)
def flush(
self: Self,
timeout: float,
callback: Optional[Callable[[int, float], None]] = None,
) -> None:
logger.debug("Flushing HTTP transport")
if timeout > 0:
self._worker.submit(lambda: self._flush_client_reports(force=True))
self._worker.flush(timeout, callback)
[docs]
class HttpTransport(BaseHttpTransport):
if TYPE_CHECKING:
_pool: Union[PoolManager, ProxyManager]
def _get_pool_options(self: Self) -> Dict[str, Any]:
num_pools = self.options.get("_experiments", {}).get("transport_num_pools")
options = {
"num_pools": 2 if num_pools is None else int(num_pools),
"cert_reqs": "CERT_REQUIRED",
"timeout": urllib3.Timeout(total=self.TIMEOUT),
}
socket_options: Optional[List[Tuple[int, int, int | bytes]]] = None
if self.options["socket_options"] is not None:
socket_options = self.options["socket_options"]
if self.options["keep_alive"]:
if socket_options is None:
socket_options = []
used_options = {(o[0], o[1]) for o in socket_options}
for default_option in KEEP_ALIVE_SOCKET_OPTIONS:
if (default_option[0], default_option[1]) not in used_options:
socket_options.append(default_option)
if socket_options is not None:
options["socket_options"] = socket_options
options["ca_certs"] = (
self.options["ca_certs"] # User-provided bundle from the SDK init
or os.environ.get("SSL_CERT_FILE")
or os.environ.get("REQUESTS_CA_BUNDLE")
or certifi.where()
)
options["cert_file"] = self.options["cert_file"] or os.environ.get(
"CLIENT_CERT_FILE"
)
options["key_file"] = self.options["key_file"] or os.environ.get(
"CLIENT_KEY_FILE"
)
return options
def _make_pool(self: Self) -> Union[PoolManager, ProxyManager]:
if self.parsed_dsn is None:
raise ValueError("Cannot create HTTP-based transport without valid DSN")
proxy = None
no_proxy = self._in_no_proxy(self.parsed_dsn)
# try HTTPS first
https_proxy = self.options["https_proxy"]
if self.parsed_dsn.scheme == "https" and (https_proxy != ""):
proxy = https_proxy or (not no_proxy and getproxies().get("https"))
# maybe fallback to HTTP proxy
http_proxy = self.options["http_proxy"]
if not proxy and (http_proxy != ""):
proxy = http_proxy or (not no_proxy and getproxies().get("http"))
opts = self._get_pool_options()
if proxy:
proxy_headers = self.options["proxy_headers"]
if proxy_headers:
opts["proxy_headers"] = proxy_headers
if proxy.startswith("socks"):
use_socks_proxy = True
try:
# Check if PySocks dependency is available
from urllib3.contrib.socks import SOCKSProxyManager
except ImportError:
use_socks_proxy = False
logger.warning(
"You have configured a SOCKS proxy (%s) but support for SOCKS proxies is not installed. Disabling proxy support. Please add `PySocks` (or `urllib3` with the `[socks]` extra) to your dependencies.",
proxy,
)
if use_socks_proxy:
return SOCKSProxyManager(proxy, **opts)
else:
return urllib3.PoolManager(**opts)
else:
return urllib3.ProxyManager(proxy, **opts)
else:
return urllib3.PoolManager(**opts)
def _request(
self: Self,
method: str,
endpoint_type: EndpointType,
body: Any,
headers: Mapping[str, str],
) -> urllib3.BaseHTTPResponse:
return self._pool.request(
method,
self._auth.get_api_url(endpoint_type),
body=body,
headers=headers,
)
if not ASYNC_TRANSPORT_ENABLED:
# Sorry, no AsyncHttpTransport for you
AsyncHttpTransport = HttpTransport
else:
class AsyncHttpTransport(HttpTransportCore): # type: ignore
def __init__(self: Self, options: Dict[str, Any]) -> None:
super().__init__(options)
# Requires event loop at init time
self.loop = asyncio.get_running_loop()
def _create_worker(self: Self, options: dict[str, Any]) -> Worker:
return AsyncWorker(queue_size=options["transport_queue_size"])
def _get_header_value(self: Self, response: Any, header: str) -> Optional[str]:
return next(
(
val.decode("ascii")
for key, val in response.headers
if key.decode("ascii").lower() == header
),
None,
)
async def _send_envelope(self: Self, envelope: Envelope) -> None:
_prepared_envelope = self._prepare_envelope(envelope)
if _prepared_envelope is not None:
envelope, body, headers = _prepared_envelope
await self._send_request(
body.getvalue(),
headers=headers,
endpoint_type=EndpointType.ENVELOPE,
envelope=envelope,
)
return None
async def _send_request(
self: Self,
body: bytes,
headers: Dict[str, str],
endpoint_type: EndpointType,
envelope: Optional[Envelope],
) -> None:
self._update_headers(headers)
try:
response = await self._request(
"POST",
endpoint_type,
body,
headers,
)
except Exception:
self._handle_request_error(envelope=envelope, loss_reason="network")
raise
try:
self._handle_response(response=response, envelope=envelope)
finally:
await response.aclose()
async def _request( # type: ignore[override]
self: Self,
method: str,
endpoint_type: EndpointType,
body: Any,
headers: Mapping[str, str],
) -> httpcore.Response:
return await self._pool.request(
method,
self._auth.get_api_url(endpoint_type),
content=body,
headers=headers, # type: ignore
extensions={
"timeout": {
"pool": self.TIMEOUT,
"connect": self.TIMEOUT,
"write": self.TIMEOUT,
"read": self.TIMEOUT,
}
},
)
async def _flush_client_reports(self: Self, force: bool = False) -> None:
client_report = self._fetch_pending_client_report(force=force, interval=60)
if client_report is not None:
self.capture_envelope(Envelope(items=[client_report]))
def _capture_envelope(self: Self, envelope: Envelope) -> None:
async def send_envelope_wrapper() -> None:
with capture_internal_exceptions():
await self._send_envelope(envelope)
await self._flush_client_reports()
if not self._worker.submit(send_envelope_wrapper):
self.on_dropped_event("full_queue")
for item in envelope.items:
self.record_lost_event("queue_overflow", item=item)
def capture_envelope(self: Self, envelope: Envelope) -> None:
# Synchronous entry point
if self.loop and self.loop.is_running():
self.loop.call_soon_threadsafe(self._capture_envelope, envelope)
else:
# The event loop is no longer running
logger.warning("Async Transport is not running in an event loop.")
self.on_dropped_event("internal_sdk_error")
for item in envelope.items:
self.record_lost_event("internal_sdk_error", item=item)
def flush( # type: ignore[override]
self: Self,
timeout: float,
callback: Optional[Callable[[int, float], None]] = None,
) -> Optional[asyncio.Task[None]]:
logger.debug("Flushing HTTP transport")
if timeout > 0:
self._worker.submit(lambda: self._flush_client_reports(force=True))
return self._worker.flush(timeout, callback) # type: ignore[func-returns-value]
return None
def _get_pool_options(self: Self) -> Dict[str, Any]:
options: Dict[str, Any] = {
"http2": False, # no HTTP2 for now
"retries": 3,
}
socket_options = (
self.options["socket_options"]
if self.options["socket_options"] is not None
else []
)
used_options = {(o[0], o[1]) for o in socket_options}
for default_option in KEEP_ALIVE_SOCKET_OPTIONS:
if (default_option[0], default_option[1]) not in used_options:
socket_options.append(default_option)
options["socket_options"] = socket_options
ssl_context = ssl.create_default_context()
ssl_context.load_verify_locations(
self.options["ca_certs"] # User-provided bundle from the SDK init
or os.environ.get("SSL_CERT_FILE")
or os.environ.get("REQUESTS_CA_BUNDLE")
or certifi.where()
)
cert_file = self.options["cert_file"] or os.environ.get("CLIENT_CERT_FILE")
key_file = self.options["key_file"] or os.environ.get("CLIENT_KEY_FILE")
if cert_file is not None:
ssl_context.load_cert_chain(cert_file, key_file)
options["ssl_context"] = ssl_context
return options
def _make_pool(
self: Self,
) -> Union[
httpcore.AsyncSOCKSProxy,
httpcore.AsyncHTTPProxy,
httpcore.AsyncConnectionPool,
]:
if self.parsed_dsn is None:
raise ValueError("Cannot create HTTP-based transport without valid DSN")
proxy = None
no_proxy = self._in_no_proxy(self.parsed_dsn)
# try HTTPS first
https_proxy = self.options["https_proxy"]
if self.parsed_dsn.scheme == "https" and (https_proxy != ""):
proxy = https_proxy or (not no_proxy and getproxies().get("https"))
# maybe fallback to HTTP proxy
http_proxy = self.options["http_proxy"]
if not proxy and (http_proxy != ""):
proxy = http_proxy or (not no_proxy and getproxies().get("http"))
opts = self._get_pool_options()
if proxy:
proxy_headers = self.options["proxy_headers"]
if proxy_headers:
opts["proxy_headers"] = proxy_headers
if proxy.startswith("socks"):
try:
if "socket_options" in opts:
socket_options = opts.pop("socket_options")
if socket_options:
logger.warning(
"You have defined socket_options but using a SOCKS proxy which doesn't support these. We'll ignore socket_options."
)
return httpcore.AsyncSOCKSProxy(proxy_url=proxy, **opts)
except RuntimeError:
logger.warning(
"You have configured a SOCKS proxy (%s) but support for SOCKS proxies is not installed. Disabling proxy support.",
proxy,
)
else:
return httpcore.AsyncHTTPProxy(proxy_url=proxy, **opts)
return httpcore.AsyncConnectionPool(**opts)
def kill(self: Self) -> Optional[asyncio.Task[None]]: # type: ignore
logger.debug("Killing HTTP transport")
self._worker.kill()
try:
# Return the pool cleanup task so caller can await it if needed
with mark_sentry_task_internal():
return self.loop.create_task(self._pool.aclose()) # type: ignore
except RuntimeError:
logger.warning("Event loop not running, aborting kill.")
return None
if not HTTP2_ENABLED:
# Sorry, no Http2Transport for you
class Http2Transport(HttpTransport):
def __init__(self: Self, options: Dict[str, Any]) -> None:
super().__init__(options)
logger.warning(
"You tried to use HTTP2Transport but don't have httpcore[http2] installed. Falling back to HTTPTransport."
)
else:
class Http2Transport(BaseHttpTransport): # type: ignore
"""The HTTP2 transport based on httpcore."""
TIMEOUT = 15
if TYPE_CHECKING:
_pool: Union[
httpcore.SOCKSProxy, httpcore.HTTPProxy, httpcore.ConnectionPool
]
def _get_header_value(self: Self, response: Any, header: str) -> Optional[str]:
return next(
(
val.decode("ascii")
for key, val in response.headers
if key.decode("ascii").lower() == header
),
None,
)
def _request(
self: Self,
method: str,
endpoint_type: EndpointType,
body: Any,
headers: Mapping[str, str],
) -> httpcore.Response:
response = self._pool.request(
method,
self._auth.get_api_url(endpoint_type),
content=body,
headers=headers, # type: ignore
extensions={
"timeout": {
"pool": self.TIMEOUT,
"connect": self.TIMEOUT,
"write": self.TIMEOUT,
"read": self.TIMEOUT,
}
},
)
return response
def _get_pool_options(self: Self) -> Dict[str, Any]:
options: Dict[str, Any] = {
"http2": self.parsed_dsn is not None
and self.parsed_dsn.scheme == "https",
"retries": 3,
}
socket_options = (
self.options["socket_options"]
if self.options["socket_options"] is not None
else []
)
used_options = {(o[0], o[1]) for o in socket_options}
for default_option in KEEP_ALIVE_SOCKET_OPTIONS:
if (default_option[0], default_option[1]) not in used_options:
socket_options.append(default_option)
options["socket_options"] = socket_options
ssl_context = ssl.create_default_context()
ssl_context.load_verify_locations(
self.options["ca_certs"] # User-provided bundle from the SDK init
or os.environ.get("SSL_CERT_FILE")
or os.environ.get("REQUESTS_CA_BUNDLE")
or certifi.where()
)
cert_file = self.options["cert_file"] or os.environ.get("CLIENT_CERT_FILE")
key_file = self.options["key_file"] or os.environ.get("CLIENT_KEY_FILE")
if cert_file is not None:
ssl_context.load_cert_chain(cert_file, key_file)
options["ssl_context"] = ssl_context
return options
def _make_pool(
self: Self,
) -> Union[httpcore.SOCKSProxy, httpcore.HTTPProxy, httpcore.ConnectionPool]:
if self.parsed_dsn is None:
raise ValueError("Cannot create HTTP-based transport without valid DSN")
proxy = None
no_proxy = self._in_no_proxy(self.parsed_dsn)
# try HTTPS first
https_proxy = self.options["https_proxy"]
if self.parsed_dsn.scheme == "https" and (https_proxy != ""):
proxy = https_proxy or (not no_proxy and getproxies().get("https"))
# maybe fallback to HTTP proxy
http_proxy = self.options["http_proxy"]
if not proxy and (http_proxy != ""):
proxy = http_proxy or (not no_proxy and getproxies().get("http"))
opts = self._get_pool_options()
if proxy:
proxy_headers = self.options["proxy_headers"]
if proxy_headers:
opts["proxy_headers"] = proxy_headers
if proxy.startswith("socks"):
try:
if "socket_options" in opts:
socket_options = opts.pop("socket_options")
if socket_options:
logger.warning(
"You have defined socket_options but using a SOCKS proxy which doesn't support these. We'll ignore socket_options."
)
return httpcore.SOCKSProxy(proxy_url=proxy, **opts)
except RuntimeError:
logger.warning(
"You have configured a SOCKS proxy (%s) but support for SOCKS proxies is not installed. Disabling proxy support.",
proxy,
)
else:
return httpcore.HTTPProxy(proxy_url=proxy, **opts)
return httpcore.ConnectionPool(**opts)
def make_transport(options: Dict[str, Any]) -> Optional[Transport]:
ref_transport = options["transport"]
# We default to using HTTP2 transport if the user also has the required h2
# library installed (through the subclass check). The reason is h2 not being
# available on py3.7 which we still support.
use_http2_transport = options.get("http2") is not False and not issubclass(
Http2Transport, HttpTransport
)
use_async_transport = options.get("_experiments", {}).get("transport_async", False)
async_integration = any(
integration.__class__.__name__ == "AsyncioIntegration"
for integration in options.get("integrations") or []
)
# By default, we use the http transport class
transport_cls: Type[Transport] = (
Http2Transport if use_http2_transport else HttpTransport
)
if use_async_transport and ASYNC_TRANSPORT_ENABLED:
try:
asyncio.get_running_loop()
if async_integration:
transport_cls = AsyncHttpTransport
else:
logger.warning(
"You tried to use AsyncHttpTransport but the AsyncioIntegration is not enabled. Falling back to sync transport."
)
except RuntimeError:
# No event loop running, fall back to sync transport
logger.warning("No event loop running, falling back to sync transport.")
elif use_async_transport:
logger.warning(
"You tried to use AsyncHttpTransport but don't have httpcore[asyncio] installed. Falling back to sync transport."
)
if isinstance(ref_transport, Transport):
return ref_transport
elif isinstance(ref_transport, type) and issubclass(ref_transport, Transport):
transport_cls = ref_transport
# if a transport class is given only instantiate it if the dsn is not
# empty or None
if options["dsn"]:
return transport_cls(options)
return None