← Back
Editing: aiomysql.py
# -*- coding: utf-8 -*- from __future__ import annotations from typing import Any, Awaitable, Callable, TypeVar import sentry_sdk from sentry_sdk.consts import OP, SPANDATA from sentry_sdk.integrations import DidNotEnable, Integration, _check_minimum_version from sentry_sdk.traces import StreamedSpan from sentry_sdk.tracing_utils import ( add_query_source, has_span_streaming_enabled, record_sql_queries, ) from sentry_sdk.utils import ( capture_internal_exceptions, parse_version, ) try: import aiomysql # type: ignore[import-not-found] from aiomysql.connection import Connection # type: ignore[import-not-found] from aiomysql.cursors import Cursor # type: ignore[import-not-found] except ImportError: raise DidNotEnable("aiomysql not installed.") class AioMySQLIntegration(Integration): identifier = "aiomysql" origin = f"auto.db.{identifier}" _record_params = False def __init__(self, *, record_params: bool = False): AioMySQLIntegration._record_params = record_params @staticmethod def setup_once() -> None: aiomysql_version = parse_version(aiomysql.__version__) _check_minimum_version(AioMySQLIntegration, aiomysql_version) Cursor.execute = _wrap_execute(Cursor.execute) Cursor.executemany = _wrap_executemany(Cursor.executemany) # Patch Connection._connect — this catches ALL connections: # - aiomysql.connect() # - aiomysql.create_pool() (pool.py does `from .connection import connect` # which ultimately calls Connection._connect) # - Reconnects Connection._connect = _wrap_connect(Connection._connect) T = TypeVar("T") def _normalize_query(query: str | bytes | bytearray) -> str: if isinstance(query, (bytes, bytearray)): query = query.decode("utf-8", errors="replace") return " ".join(query.split()) def _wrap_execute(f: Callable[..., Awaitable[T]]) -> Callable[..., Awaitable[T]]: """Wrap Cursor.execute to capture SQL queries.""" async def _inner(*args: Any, **kwargs: Any) -> T: if sentry_sdk.get_client().get_integration(AioMySQLIntegration) is None: return await f(*args, **kwargs) cursor = args[0] # Skip if flagged by executemany (avoids double-recording). # Do NOT reset the flag here — it must stay True for the entire # duration of executemany, which may call execute multiple times # in a loop (non-INSERT fallback). Only _wrap_executemany's # finally block should clear it. if getattr(cursor, "_sentry_skip_next_execute", False): return await f(*args, **kwargs) query = args[1] if len(args) > 1 else kwargs.get("query", "") query_str = _normalize_query(query) params = args[2] if len(args) > 2 else kwargs.get("args") conn = _get_connection(cursor) integration = sentry_sdk.get_client().get_integration(AioMySQLIntegration) params_list = params if integration and integration._record_params else None param_style = "pyformat" if params_list else None with record_sql_queries( cursor=None, query=query_str, params_list=params_list, paramstyle=param_style, executemany=False, span_origin=AioMySQLIntegration.origin, ) as span: if conn: _set_db_data(span, conn) res = await f(*args, **kwargs) if isinstance(span, StreamedSpan): with capture_internal_exceptions(): add_query_source(span) if not isinstance(span, StreamedSpan): with capture_internal_exceptions(): add_query_source(span) return res return _inner def _wrap_executemany(f: Callable[..., Awaitable[T]]) -> Callable[..., Awaitable[T]]: """Wrap Cursor.executemany to capture SQL queries.""" async def _inner(*args: Any, **kwargs: Any) -> T: if sentry_sdk.get_client().get_integration(AioMySQLIntegration) is None: return await f(*args, **kwargs) cursor = args[0] query = args[1] if len(args) > 1 else kwargs.get("query", "") query_str = _normalize_query(query) seq_of_params = args[2] if len(args) > 2 else kwargs.get("args") conn = _get_connection(cursor) integration = sentry_sdk.get_client().get_integration(AioMySQLIntegration) params_list = ( seq_of_params if integration and integration._record_params else None ) param_style = "pyformat" if params_list else None # Prevent double-recording: _do_execute_many calls self.execute internally cursor._sentry_skip_next_execute = True try: with record_sql_queries( cursor=None, query=query_str, params_list=params_list, paramstyle=param_style, executemany=True, span_origin=AioMySQLIntegration.origin, ) as span: if conn: _set_db_data(span, conn) res = await f(*args, **kwargs) if isinstance(span, StreamedSpan): with capture_internal_exceptions(): add_query_source(span) if not isinstance(span, StreamedSpan): with capture_internal_exceptions(): add_query_source(span) return res finally: cursor._sentry_skip_next_execute = False return _inner def _get_connection(cursor: Any) -> Any: """Get the underlying connection from a cursor.""" return getattr(cursor, "connection", None) def _wrap_connect(f: Callable[..., Awaitable[T]]) -> Callable[..., Awaitable[T]]: """Wrap Connection._connect to capture connection spans.""" async def _inner(self: "Connection") -> T: client = sentry_sdk.get_client() if client.get_integration(AioMySQLIntegration) is None: return await f(self) if has_span_streaming_enabled(client.options): breadcrumb_data = _get_connect_data(self, use_streaming_keys=True) span_attributes: dict[str, Any] = { "sentry.op": OP.DB, "sentry.origin": AioMySQLIntegration.origin, } | breadcrumb_data with sentry_sdk.traces.start_span( name="connect", attributes=span_attributes ) as span: with capture_internal_exceptions(): sentry_sdk.add_breadcrumb( message="connect", category="query", data=breadcrumb_data ) res = await f(self) else: connect_data = _get_connect_data(self) with sentry_sdk.start_span( op=OP.DB, name="connect", origin=AioMySQLIntegration.origin, ) as span: _set_db_data(span, self) with capture_internal_exceptions(): sentry_sdk.add_breadcrumb( message="connect", category="query", data=connect_data, ) res = await f(self) return res return _inner def _get_connect_data(conn: Any, *, use_streaming_keys: bool = False) -> dict[str, Any]: if use_streaming_keys: db_system = SPANDATA.DB_SYSTEM_NAME db_name = SPANDATA.DB_NAMESPACE else: db_system = SPANDATA.DB_SYSTEM db_name = SPANDATA.DB_NAME data: dict[str, Any] = { db_system: "mysql", SPANDATA.DB_DRIVER_NAME: "aiomysql", } host = getattr(conn, "host", None) if host is not None: data[SPANDATA.SERVER_ADDRESS] = host port = getattr(conn, "port", None) if port is not None: data[SPANDATA.SERVER_PORT] = port database = getattr(conn, "db", None) if database is not None: data[db_name] = database user = getattr(conn, "user", None) if user is not None: data[SPANDATA.DB_USER] = user return data def _set_db_data(span: Any, conn: Any) -> None: """Set database-related span data from connection object.""" if isinstance(span, StreamedSpan): set_value = span.set_attribute db_system = SPANDATA.DB_SYSTEM_NAME db_name = SPANDATA.DB_NAMESPACE else: # Remove this else block once we've completely migrated to streamed spans # The use of deprecated attributes here is to ensure backwards compatibility set_value = span.set_data db_system = SPANDATA.DB_SYSTEM db_name = SPANDATA.DB_NAME set_value(db_system, "mysql") set_value(SPANDATA.DB_DRIVER_NAME, "aiomysql") host = getattr(conn, "host", None) if host is not None: set_value(SPANDATA.SERVER_ADDRESS, host) port = getattr(conn, "port", None) if port is not None: set_value(SPANDATA.SERVER_PORT, port) database = getattr(conn, "db", None) if database is not None: set_value(db_name, database) user = getattr(conn, "user", None) if user is not None: set_value(SPANDATA.DB_USER, user)
Save File
Cancel