← Back
Editing: stdlib.py
import os import platform import subprocess import sys from http.client import HTTPConnection, HTTPResponse from typing import TYPE_CHECKING import sentry_sdk from sentry_sdk.consts import OP, SPANDATA from sentry_sdk.integrations import Integration from sentry_sdk.scope import add_global_event_processor from sentry_sdk.traces import StreamedSpan from sentry_sdk.tracing import Span from sentry_sdk.tracing_utils import ( EnvironHeaders, add_http_request_source, has_span_streaming_enabled, should_propagate_trace, ) from sentry_sdk.utils import ( SENSITIVE_DATA_SUBSTITUTE, capture_internal_exceptions, ensure_integration_enabled, is_sentry_url, logger, parse_url, safe_repr, ) if TYPE_CHECKING: from typing import Any, Callable, Dict, List, Optional, Union from sentry_sdk._types import Event, Hint _RUNTIME_CONTEXT: "dict[str, object]" = { "name": platform.python_implementation(), "version": "%s.%s.%s" % (sys.version_info[:3]), "build": sys.version, } class StdlibIntegration(Integration): identifier = "stdlib" @staticmethod def setup_once() -> None: _install_httplib() _install_subprocess() @add_global_event_processor def add_python_runtime_context( event: "Event", hint: "Hint" ) -> "Optional[Event]": if sentry_sdk.get_client().get_integration(StdlibIntegration) is not None: contexts = event.setdefault("contexts", {}) if isinstance(contexts, dict) and "runtime" not in contexts: contexts["runtime"] = _RUNTIME_CONTEXT return event def _complete_span(span: "Union[Span, StreamedSpan]") -> None: if isinstance(span, StreamedSpan): with capture_internal_exceptions(): add_http_request_source(span) span.end() else: span.finish() with capture_internal_exceptions(): add_http_request_source(span) def _install_httplib() -> None: real_putrequest = HTTPConnection.putrequest real_getresponse = HTTPConnection.getresponse real_read = HTTPResponse.read real_close = HTTPResponse.close def putrequest( self: "HTTPConnection", method: str, url: str, *args: "Any", **kwargs: "Any" ) -> "Any": default_port = self.default_port # proxies go through set_tunnel tunnel_host = getattr(self, "_tunnel_host", None) if tunnel_host: host = tunnel_host port = getattr(self, "_tunnel_port", default_port) else: host = self.host port = self.port client = sentry_sdk.get_client() if client.get_integration(StdlibIntegration) is None or is_sentry_url( client, host ): return real_putrequest(self, method, url, *args, **kwargs) real_url = url if real_url is None or not real_url.startswith(("http://", "https://")): real_url = "%s://%s%s%s" % ( default_port == 443 and "https" or "http", host, port != default_port and ":%s" % port or "", url, ) parsed_url = None with capture_internal_exceptions(): parsed_url = parse_url(real_url, sanitize=False) span_streaming = has_span_streaming_enabled(client.options) span: "Union[Span, StreamedSpan]" if span_streaming: span = sentry_sdk.traces.start_span( name="%s %s" % (method, parsed_url.url if parsed_url else SENSITIVE_DATA_SUBSTITUTE), attributes={ "sentry.origin": "auto.http.stdlib.httplib", "sentry.op": OP.HTTP_CLIENT, SPANDATA.HTTP_REQUEST_METHOD: method, }, ) if parsed_url is not None: span.set_attribute(SPANDATA.URL_FULL, parsed_url.url) span.set_attribute(SPANDATA.URL_QUERY, parsed_url.query) span.set_attribute(SPANDATA.URL_FRAGMENT, parsed_url.fragment) set_on_span = span.set_attribute else: span = sentry_sdk.start_span( op=OP.HTTP_CLIENT, name="%s %s" % (method, parsed_url.url if parsed_url else SENSITIVE_DATA_SUBSTITUTE), origin="auto.http.stdlib.httplib", ) span.set_data(SPANDATA.HTTP_METHOD, method) if parsed_url is not None: span.set_data("url", parsed_url.url) span.set_data(SPANDATA.HTTP_QUERY, parsed_url.query) span.set_data(SPANDATA.HTTP_FRAGMENT, parsed_url.fragment) set_on_span = span.set_data # for proxies, these point to the proxy host/port if tunnel_host: set_on_span(SPANDATA.NETWORK_PEER_ADDRESS, self.host) set_on_span(SPANDATA.NETWORK_PEER_PORT, self.port) rv = real_putrequest(self, method, url, *args, **kwargs) if should_propagate_trace(client, real_url): for ( key, value, ) in sentry_sdk.get_current_scope().iter_trace_propagation_headers( span=span ): logger.debug( "[Tracing] Adding `{key}` header {value} to outgoing request to {real_url}.".format( key=key, value=value, real_url=real_url ) ) self.putheader(key, value) self._sentrysdk_span = span # type: ignore[attr-defined] return rv def getresponse(self: "HTTPConnection", *args: "Any", **kwargs: "Any") -> "Any": span = getattr(self, "_sentrysdk_span", None) if span is None: return real_getresponse(self, *args, **kwargs) try: rv = real_getresponse(self, *args, **kwargs) except BaseException: _complete_span(span) raise if isinstance(span, StreamedSpan): status_code = int(rv.status) span.status = "error" if status_code >= 400 else "ok" span.set_attribute("http.response.status_code", status_code) else: span.set_http_status(int(rv.status)) span.set_data("reason", rv.reason) # getresponse doesn't include actually reading the response body. This # is done in read(). So if the metadata/headers suggest there's a body to # read, don't finish the span just yet, but save it for ending it later. has_body = rv.chunked or (rv.length is not None and rv.length > 0) if has_body: rv._sentrysdk_span = span # type: ignore[attr-defined] else: _complete_span(span) return rv def read(self: "HTTPResponse", *args: "Any", **kwargs: "Any") -> "Any": try: return real_read(self, *args, **kwargs) finally: span = getattr(self, "_sentrysdk_span", None) # read() might be called multiple times to consume a single body, # so we can't just end the span when read() is done. Instead, # try to figure out whether the response body has been fully read. if span and (self.fp is None or self.closed): self._sentrysdk_span = None # type: ignore[attr-defined] _complete_span(span) def close(self: "HTTPResponse") -> None: # We patch close() as a best effort fallback in case the span is not # ended yet in getresponse() or read(). try: real_close(self) finally: span = getattr(self, "_sentrysdk_span", None) if span is not None: self._sentrysdk_span = None # type: ignore[attr-defined] _complete_span(span) HTTPConnection.putrequest = putrequest # type: ignore[method-assign] HTTPConnection.getresponse = getresponse # type: ignore[method-assign] HTTPResponse.read = read # type: ignore[method-assign] HTTPResponse.close = close # type: ignore[assignment,method-assign] def _init_argument( args: "List[Any]", kwargs: "Dict[Any, Any]", name: str, position: int, setdefault_callback: "Optional[Callable[[Any], Any]]" = None, ) -> "Any": """ given (*args, **kwargs) of a function call, retrieve (and optionally set a default for) an argument by either name or position. This is useful for wrapping functions with complex type signatures and extracting a few arguments without needing to redefine that function's entire type signature. """ if name in kwargs: rv = kwargs[name] if setdefault_callback is not None: rv = setdefault_callback(rv) if rv is not None: kwargs[name] = rv elif position < len(args): rv = args[position] if setdefault_callback is not None: rv = setdefault_callback(rv) if rv is not None: args[position] = rv else: rv = setdefault_callback and setdefault_callback(None) if rv is not None: kwargs[name] = rv return rv def _install_subprocess() -> None: old_popen_init = subprocess.Popen.__init__ @ensure_integration_enabled(StdlibIntegration, old_popen_init) def sentry_patched_popen_init( self: "subprocess.Popen[Any]", *a: "Any", **kw: "Any" ) -> None: # Convert from tuple to list to be able to set values. a = list(a) args = _init_argument(a, kw, "args", 0) or [] cwd = _init_argument(a, kw, "cwd", 9) # if args is not a list or tuple (and e.g. some iterator instead), # let's not use it at all. There are too many things that can go wrong # when trying to collect an iterator into a list and setting that list # into `a` again. # # Also invocations where `args` is not a sequence are not actually # legal. They just happen to work under CPython. description = None if isinstance(args, (list, tuple)) and len(args) < 100: with capture_internal_exceptions(): description = " ".join(map(str, args)) if description is None: description = safe_repr(args) env = None span_streaming = has_span_streaming_enabled(sentry_sdk.get_client().options) span: "Union[Span, StreamedSpan]" if span_streaming: span = sentry_sdk.traces.start_span( name=description, attributes={ "sentry.op": OP.SUBPROCESS, "sentry.origin": "auto.subprocess.stdlib.subprocess", }, ) else: span = sentry_sdk.start_span( op=OP.SUBPROCESS, name=description, origin="auto.subprocess.stdlib.subprocess", ) with span: for k, v in sentry_sdk.get_current_scope().iter_trace_propagation_headers( span=span ): if env is None: env = _init_argument( a, kw, "env", 10, lambda x: dict(x if x is not None else os.environ), ) env["SUBPROCESS_" + k.upper().replace("-", "_")] = v if cwd and isinstance(span, Span): span.set_data("subprocess.cwd", cwd) rv = old_popen_init(self, *a, **kw) if isinstance(span, StreamedSpan): span.set_attribute(SPANDATA.PROCESS_PID, self.pid) else: span.set_tag("subprocess.pid", self.pid) return rv subprocess.Popen.__init__ = sentry_patched_popen_init # type: ignore old_popen_wait = subprocess.Popen.wait @ensure_integration_enabled(StdlibIntegration, old_popen_wait) def sentry_patched_popen_wait( self: "subprocess.Popen[Any]", *a: "Any", **kw: "Any" ) -> "Any": span_streaming = has_span_streaming_enabled(sentry_sdk.get_client().options) if span_streaming: with sentry_sdk.traces.start_span( name=OP.SUBPROCESS_WAIT, attributes={ "sentry.op": OP.SUBPROCESS_WAIT, "sentry.origin": "auto.subprocess.stdlib.subprocess", }, ) as span: span.set_attribute(SPANDATA.PROCESS_PID, self.pid) return old_popen_wait(self, *a, **kw) else: with sentry_sdk.start_span( op=OP.SUBPROCESS_WAIT, origin="auto.subprocess.stdlib.subprocess", ) as span: span.set_tag("subprocess.pid", self.pid) return old_popen_wait(self, *a, **kw) subprocess.Popen.wait = sentry_patched_popen_wait # type: ignore old_popen_communicate = subprocess.Popen.communicate @ensure_integration_enabled(StdlibIntegration, old_popen_communicate) def sentry_patched_popen_communicate( self: "subprocess.Popen[Any]", *a: "Any", **kw: "Any" ) -> "Any": span_streaming = has_span_streaming_enabled(sentry_sdk.get_client().options) if span_streaming: with sentry_sdk.traces.start_span( name=OP.SUBPROCESS_COMMUNICATE, attributes={ "sentry.op": OP.SUBPROCESS_COMMUNICATE, "sentry.origin": "auto.subprocess.stdlib.subprocess", }, ) as span: span.set_attribute(SPANDATA.PROCESS_PID, self.pid) return old_popen_communicate(self, *a, **kw) else: with sentry_sdk.start_span( op=OP.SUBPROCESS_COMMUNICATE, origin="auto.subprocess.stdlib.subprocess", ) as span: span.set_tag("subprocess.pid", self.pid) return old_popen_communicate(self, *a, **kw) subprocess.Popen.communicate = sentry_patched_popen_communicate # type: ignore def get_subprocess_traceparent_headers() -> "EnvironHeaders": return EnvironHeaders(os.environ, prefix="SUBPROCESS_")
Save File
Cancel