← Back
Editing: tasks.py
from functools import wraps import sentry_sdk from sentry_sdk.consts import OP from sentry_sdk.tracing_utils import has_span_streaming_enabled from sentry_sdk.utils import qualname_from_function try: # django.tasks were added in Django 6.0 from django.tasks.base import Task except ImportError: Task = None from typing import TYPE_CHECKING if TYPE_CHECKING: from typing import Any def patch_tasks() -> None: if Task is None: return old_task_enqueue = Task.enqueue @wraps(old_task_enqueue) def _sentry_enqueue(self: "Any", *args: "Any", **kwargs: "Any") -> "Any": from sentry_sdk.integrations.django import DjangoIntegration integration = sentry_sdk.get_client().get_integration(DjangoIntegration) if integration is None: return old_task_enqueue(self, *args, **kwargs) name = qualname_from_function(self.func) or "<unknown Django task>" span_streaming = has_span_streaming_enabled(sentry_sdk.get_client().options) if span_streaming: with sentry_sdk.traces.start_span( name=name, attributes={ "sentry.op": OP.QUEUE_SUBMIT_DJANGO, "sentry.origin": DjangoIntegration.origin, }, ): return old_task_enqueue(self, *args, **kwargs) else: with sentry_sdk.start_span( op=OP.QUEUE_SUBMIT_DJANGO, name=name, origin=DjangoIntegration.origin ): return old_task_enqueue(self, *args, **kwargs) Task.enqueue = _sentry_enqueue
Save File
Cancel