diff --git a/sentry_sdk/integrations/huey.py b/sentry_sdk/integrations/huey.py index 7a1ac19232..426430e35f 100644 --- a/sentry_sdk/integrations/huey.py +++ b/sentry_sdk/integrations/huey.py @@ -7,11 +7,13 @@ from sentry_sdk.consts import OP, SPANSTATUS from sentry_sdk.integrations import DidNotEnable, Integration from sentry_sdk.scope import should_send_default_pii +from sentry_sdk.traces import SegmentSource, SpanStatus, StreamedSpan from sentry_sdk.tracing import ( BAGGAGE_HEADER_NAME, SENTRY_TRACE_HEADER_NAME, TransactionSource, ) +from sentry_sdk.tracing_utils import has_span_streaming_enabled from sentry_sdk.utils import ( SENSITIVE_DATA_SUBSTITUTE, capture_internal_exceptions, @@ -60,7 +62,7 @@ def patch_enqueue() -> None: @ensure_integration_enabled(HueyIntegration, old_enqueue) def _sentry_enqueue( - self: "Huey", item: "Union[Task, HueyGroup, HueyChord]" + self: "Huey", item: "Any" ) -> "Optional[Union[Result, ResultGroup]]": if HueyChord is not None and isinstance(item, HueyChord): span_name = "Huey Chord" @@ -69,16 +71,31 @@ def _sentry_enqueue( else: span_name = item.name - with sentry_sdk.start_span( - op=OP.QUEUE_SUBMIT_HUEY, - name=span_name, - origin=HueyIntegration.origin, - ): - if ( - not isinstance(item, PeriodicTask) - and not (HueyGroup is not None and isinstance(item, HueyGroup)) - and not (HueyChord is not None and isinstance(item, HueyChord)) - ): + is_span_streaming_enabled = has_span_streaming_enabled( + sentry_sdk.get_client().options + ) + + span_ctx = None + if is_span_streaming_enabled: + span_ctx = sentry_sdk.traces.start_span( + name=span_name, + attributes={ + "sentry.op": OP.QUEUE_SUBMIT_HUEY, + "sentry.origin": HueyIntegration.origin, + }, + ) + else: + span_ctx = sentry_sdk.start_span( + op=OP.QUEUE_SUBMIT_HUEY, + name=span_name, + origin=HueyIntegration.origin, + ) + + no_headers_types = (PeriodicTask,) + tuple( + t for t in [HueyGroup, HueyChord] if t is not None + ) + with span_ctx: + if not isinstance(item, no_headers_types): # Attach trace propagation data to task kwargs. We do # not do this for periodic tasks, as these don't # really have an originating transaction. @@ -124,12 +141,22 @@ def event_processor(event: "Event", hint: "Hint") -> "Optional[Event]": def _capture_exception(exc_info: "ExcInfo") -> None: scope = sentry_sdk.get_current_scope() + is_span_streaming_enabled = has_span_streaming_enabled( + sentry_sdk.get_client().options + ) if exc_info[0] in HUEY_CONTROL_FLOW_EXCEPTIONS: - scope.transaction.set_status(SPANSTATUS.ABORTED) + if not is_span_streaming_enabled: + scope.transaction.set_status(SPANSTATUS.ABORTED) + elif type(scope._span) is StreamedSpan: + scope._span._segment.status = SpanStatus.OK return - scope.transaction.set_status(SPANSTATUS.INTERNAL_ERROR) + if not is_span_streaming_enabled: + scope.transaction.set_status(SPANSTATUS.INTERNAL_ERROR) + elif type(scope._span) is StreamedSpan: + scope._span._segment.status = SpanStatus.ERROR + event, hint = event_from_exception( exc_info, client_options=sentry_sdk.get_client().options, @@ -167,21 +194,42 @@ def _sentry_execute( scope.add_event_processor(_make_event_processor(task)) sentry_headers = task.kwargs.pop("sentry_headers", None) - - transaction = continue_trace( - sentry_headers or {}, - name=task.name, - op=OP.QUEUE_TASK_HUEY, - source=TransactionSource.TASK, - origin=HueyIntegration.origin, + is_span_streaming_enabled = has_span_streaming_enabled( + sentry_sdk.get_client().options ) - transaction.set_status(SPANSTATUS.OK) + + if is_span_streaming_enabled: + headers = sentry_headers or {} + sentry_sdk.traces.continue_trace(headers) + span_ctx = sentry_sdk.traces.start_span( + name=task.name, + attributes={ + "sentry.op": OP.QUEUE_TASK_HUEY, + "sentry.origin": HueyIntegration.origin, + "sentry.span.source": SegmentSource.TASK, + "messaging.message.id": task.id, + "messaging.message.system": "huey", + "messaging.message.retry.count": (task.default_retries or 0) + - task.retries, + }, + parent_span=None, + ) + else: + transaction = continue_trace( + sentry_headers or {}, + name=task.name, + op=OP.QUEUE_TASK_HUEY, + source=TransactionSource.TASK, + origin=HueyIntegration.origin, + ) + transaction.set_status(SPANSTATUS.OK) + span_ctx = sentry_sdk.start_transaction(transaction) if not getattr(task, "_sentry_is_patched", False): task.execute = _wrap_task_execute(task.execute) task._sentry_is_patched = True - with sentry_sdk.start_transaction(transaction): + with span_ctx: return old_execute(self, task, timestamp) Huey._execute = _sentry_execute diff --git a/sentry_sdk/utils.py b/sentry_sdk/utils.py index aa13a98e94..e93e27a2f1 100644 --- a/sentry_sdk/utils.py +++ b/sentry_sdk/utils.py @@ -32,6 +32,13 @@ except ImportError: AIOHttpHttpException = None +try: + from huey.exceptions import CancelExecution, RetryTask, TaskLockedException + + HueyControlFlowExceptions = (CancelExecution, RetryTask, TaskLockedException) +except ImportError: + HueyControlFlowExceptions = None + from typing import TYPE_CHECKING import sentry_sdk @@ -1994,6 +2001,12 @@ def should_be_treated_as_error(ty: "Any", value: "Any") -> bool: if AIOHttpHttpException and isinstance(value, AIOHttpHttpException): return False + # Huey also has exceptions that are raised for control flow reasons, not + # because there's an actual error. This check, similar to the aiohttp one above, + # is to prevent accidentally overwriting a status of "ok" with "error" + if HueyControlFlowExceptions and isinstance(value, HueyControlFlowExceptions): + return False + return True diff --git a/tests/integrations/huey/test_huey.py b/tests/integrations/huey/test_huey.py index e2cc81e755..c7b9e0469c 100644 --- a/tests/integrations/huey/test_huey.py +++ b/tests/integrations/huey/test_huey.py @@ -3,10 +3,13 @@ import pytest from huey import __version__ as HUEY_VERSION from huey.api import MemoryHuey, Result -from huey.exceptions import RetryTask +from huey.exceptions import CancelExecution, RetryTask +import sentry_sdk from sentry_sdk import start_transaction +from sentry_sdk.consts import OP from sentry_sdk.integrations.huey import HueyIntegration +from sentry_sdk.traces import SegmentSource, SpanStatus from sentry_sdk.utils import parse_version HUEY_VERSION = parse_version(HUEY_VERSION) @@ -20,11 +23,12 @@ @pytest.fixture def init_huey(sentry_init): - def inner(): + def inner(has_span_streaming=None): sentry_init( integrations=[HueyIntegration()], traces_sample_rate=1.0, send_default_pii=True, + _experiments={"trace_lifecycle": "stream"} if has_span_streaming else {}, ) return MemoryHuey(name="sentry_sdk") @@ -69,39 +73,66 @@ def increase(num): @pytest.mark.parametrize("task_fails", [True, False], ids=["error", "success"]) -def test_task_transaction(capture_events, init_huey, task_fails): - huey = init_huey() +@pytest.mark.parametrize( + "has_span_streaming", [True, False], ids=["streaming", "no_streaming"] +) +def test_task_transaction_or_segment( + capture_events, capture_items, init_huey, task_fails, has_span_streaming +): + huey = init_huey(has_span_streaming=has_span_streaming) @huey.task() def division(a, b): return a / b - events = capture_events() - execute_huey_task( - huey, division, 1, int(not task_fails), exceptions=(DivisionByZero,) - ) + if has_span_streaming: + items = capture_items("span") + execute_huey_task( + huey, division, 1, int(not task_fails), exceptions=(DivisionByZero,) + ) + sentry_sdk.get_client().flush() + + payloads = [i.payload for i in items] + assert len(payloads) == 2 + enqueue_span, execute_span = payloads + + assert enqueue_span["attributes"]["sentry.op"] == OP.QUEUE_SUBMIT_HUEY + assert execute_span["is_segment"] + assert execute_span["attributes"]["sentry.op"] == OP.QUEUE_TASK_HUEY + assert execute_span["name"] == "division" + assert execute_span["status"] == ( + SpanStatus.ERROR if task_fails else SpanStatus.OK + ) + else: + events = capture_events() + execute_huey_task( + huey, division, 1, int(not task_fails), exceptions=(DivisionByZero,) + ) - if task_fails: - error_event = events.pop(0) - assert error_event["exception"]["values"][0]["type"] == "ZeroDivisionError" - assert error_event["exception"]["values"][0]["mechanism"]["type"] == "huey" + if task_fails: + error_event = events.pop(0) + assert error_event["exception"]["values"][0]["type"] == "ZeroDivisionError" + assert error_event["exception"]["values"][0]["mechanism"]["type"] == "huey" - (event,) = events - assert event["type"] == "transaction" - assert event["transaction"] == "division" - assert event["transaction_info"] == {"source": "task"} + (event,) = events + assert event["type"] == "transaction" + assert event["transaction"] == "division" + assert event["transaction_info"] == {"source": "task"} - if task_fails: - assert event["contexts"]["trace"]["status"] == "internal_error" - else: - assert event["contexts"]["trace"]["status"] == "ok" + if task_fails: + assert event["contexts"]["trace"]["status"] == "internal_error" + else: + assert event["contexts"]["trace"]["status"] == "ok" - assert "huey_task_id" in event["tags"] - assert "huey_task_retry" in event["tags"] + assert "huey_task_id" in event["tags"] + assert "huey_task_retry" in event["tags"] -def test_task_retry(capture_events, init_huey): - huey = init_huey() +@pytest.mark.parametrize( + "has_span_streaming", [True, False], ids=["streaming", "no_streaming"] +) +def test_task_retry(capture_events, capture_items, init_huey, has_span_streaming): + huey = init_huey(has_span_streaming=has_span_streaming) context = {"retry": True} @huey.task() @@ -110,27 +141,105 @@ def retry_task(context): context["retry"] = False raise RetryTask() - events = capture_events() - result = execute_huey_task(huey, retry_task, context) - (event,) = events + if has_span_streaming: + items = capture_items("span") + execute_huey_task(huey, retry_task, context) + sentry_sdk.get_client().flush() - assert event["transaction"] == "retry_task" - assert event["tags"]["huey_task_id"] == result.task.id - assert len(huey) == 1 + payloads = [i.payload for i in items] + assert len(payloads) == 3 - task = huey.dequeue() - huey.execute(task) - (event, _) = events + enqueue_span, re_enqueue_span, execute_span = payloads - assert event["transaction"] == "retry_task" - assert event["tags"]["huey_task_id"] == result.task.id - assert len(huey) == 0 + assert enqueue_span["attributes"]["sentry.op"] == OP.QUEUE_SUBMIT_HUEY + assert enqueue_span["is_segment"] + + assert re_enqueue_span["attributes"]["sentry.op"] == OP.QUEUE_SUBMIT_HUEY + assert not re_enqueue_span["is_segment"] + + assert execute_span["attributes"]["sentry.op"] == OP.QUEUE_TASK_HUEY + assert execute_span["is_segment"] + assert execute_span["name"] == "retry_task" + assert execute_span["status"] == SpanStatus.OK + + assert len(huey) == 1 + + task = huey.dequeue() + huey.execute(task) + + sentry_sdk.get_client().flush() + + all_payloads = [i.payload for i in items] + + assert len(all_payloads) == 4 + retry_span = all_payloads[3] + + assert retry_span["is_segment"] + assert retry_span["name"] == "retry_task" + assert retry_span["status"] == SpanStatus.OK + assert len(huey) == 0 + else: + events = capture_events() + result = execute_huey_task(huey, retry_task, context) + (event,) = events + + assert event["transaction"] == "retry_task" + assert event["tags"]["huey_task_id"] == result.task.id + assert len(huey) == 1 + + task = huey.dequeue() + huey.execute(task) + (event, _) = events + + assert event["transaction"] == "retry_task" + assert event["tags"]["huey_task_id"] == result.task.id + assert len(huey) == 0 + + +@pytest.mark.parametrize( + "has_span_streaming", [True, False], ids=["streaming", "no_streaming"] +) +def test_task_cancel_does_not_override_status( + capture_events, capture_items, init_huey, has_span_streaming +): + huey = init_huey(has_span_streaming=has_span_streaming) + + @huey.task() + def cancel_task(): + raise CancelExecution() + + if has_span_streaming: + items = capture_items("span") + execute_huey_task(huey, cancel_task) + sentry_sdk.get_client().flush() + + payloads = [i.payload for i in items] + assert len(payloads) == 2 + enqueue_span, execute_span = payloads + + assert enqueue_span["attributes"]["sentry.op"] == OP.QUEUE_SUBMIT_HUEY + assert execute_span["attributes"]["sentry.op"] == OP.QUEUE_TASK_HUEY + assert execute_span["is_segment"] + assert execute_span["name"] == "cancel_task" + assert execute_span["status"] == SpanStatus.OK + else: + events = capture_events() + execute_huey_task(huey, cancel_task) + + (event,) = events + assert event["transaction"] == "cancel_task" + assert event["contexts"]["trace"]["status"] == "aborted" @pytest.mark.parametrize("lock_name", ["lock.a", "lock.b"], ids=["locked", "unlocked"]) +@pytest.mark.parametrize( + "has_span_streaming", [True, False], ids=["streaming", "no_streaming"] +) @pytest.mark.skipif(HUEY_VERSION < (2, 5), reason="is_locked was added in 2.5") -def test_task_lock(capture_events, init_huey, lock_name): - huey = init_huey() +def test_task_lock( + capture_events, capture_items, init_huey, lock_name, has_span_streaming +): + huey = init_huey(has_span_streaming=has_span_streaming) task_lock_name = "lock.a" should_be_locked = task_lock_name == lock_name @@ -140,19 +249,39 @@ def test_task_lock(capture_events, init_huey, lock_name): def maybe_locked_task(): pass - events = capture_events() + if has_span_streaming: + items = capture_items("span") + with huey.lock_task(lock_name): + assert huey.is_locked(task_lock_name) == should_be_locked + execute_huey_task(huey, maybe_locked_task) + sentry_sdk.get_client().flush() - with huey.lock_task(lock_name): - assert huey.is_locked(task_lock_name) == should_be_locked - result = execute_huey_task(huey, maybe_locked_task) + payloads = [i.payload for i in items] + assert len(payloads) == 2 + enqueue_span, execute_span = payloads - (event,) = events + assert enqueue_span["attributes"]["sentry.op"] == OP.QUEUE_SUBMIT_HUEY + assert execute_span["attributes"]["sentry.op"] == OP.QUEUE_TASK_HUEY - assert event["transaction"] == "maybe_locked_task" - assert event["tags"]["huey_task_id"] == result.task.id - assert ( - event["contexts"]["trace"]["status"] == "aborted" if should_be_locked else "ok" - ) + assert execute_span["is_segment"] + assert execute_span["name"] == "maybe_locked_task" + assert execute_span["status"] == SpanStatus.OK + else: + events = capture_events() + + with huey.lock_task(lock_name): + assert huey.is_locked(task_lock_name) == should_be_locked + result = execute_huey_task(huey, maybe_locked_task) + + (event,) = events + + assert event["transaction"] == "maybe_locked_task" + assert event["tags"]["huey_task_id"] == result.task.id + assert ( + event["contexts"]["trace"]["status"] == "aborted" + if should_be_locked + else "ok" + ) assert len(huey) == 0 @@ -230,11 +359,12 @@ def propagated_trace_task(): assert event["contexts"]["trace"]["origin"] == "auto.queue.huey" +@pytest.mark.parametrize("has_span_streaming", [True, False]) @pytest.mark.skipif(HUEY_VERSION < (3, 0), reason="group was added in 3.0") -def test_huey_enqueue_group(init_huey, capture_events): - huey = init_huey() - - events = capture_events() +def test_huey_enqueue_group( + init_huey, capture_events, capture_items, has_span_streaming +): + huey = init_huey(has_span_streaming=has_span_streaming) @huey.task() def task1(): @@ -244,51 +374,125 @@ def task1(): def task2(): pass - with start_transaction() as transaction: + if has_span_streaming: + items = capture_items("span") + huey.enqueue(group([task1.s(), task2.s()])) - for _ in range(2): - task = huey.dequeue() - huey.execute(task) + for _ in range(2): + task = huey.dequeue() + huey.execute(task) + + sentry_sdk.get_client().flush() + assert len(items) == 5 + + ( + task1_enqueue_span, + task2_enqueue_span, + group_span, + task1_execute_span, + task2_execute_span, + ) = [i.payload for i in items] + + assert group_span["is_segment"] + assert not task1_enqueue_span["is_segment"] + assert not task2_enqueue_span["is_segment"] + assert task1_execute_span["is_segment"] + assert task2_execute_span["is_segment"] + + assert group_span["name"] == "Huey Task Group" + assert group_span["status"] == "ok" + assert group_span["attributes"]["sentry.op"] == "queue.submit.huey" + assert group_span["attributes"]["sentry.origin"] == "auto.queue.huey" + + assert task1_enqueue_span["name"] == "task1" + assert task1_enqueue_span["status"] == "ok" + assert task1_enqueue_span["parent_span_id"] == group_span["span_id"] + assert ( + task1_enqueue_span["attributes"]["sentry.segment.name"] == "Huey Task Group" + ) + assert task1_enqueue_span["attributes"]["sentry.op"] == "queue.submit.huey" + assert task1_enqueue_span["attributes"]["sentry.origin"] == "auto.queue.huey" + + assert task2_enqueue_span["name"] == "task2" + assert task2_enqueue_span["status"] == "ok" + assert task2_enqueue_span["parent_span_id"] == group_span["span_id"] + assert ( + task2_enqueue_span["attributes"]["sentry.segment.name"] == "Huey Task Group" + ) + assert task2_enqueue_span["attributes"]["sentry.op"] == "queue.submit.huey" + assert task2_enqueue_span["attributes"]["sentry.origin"] == "auto.queue.huey" + + assert task1_execute_span["name"] == "task1" + assert task1_execute_span["status"] == "ok" + assert task1_execute_span["attributes"]["messaging.message.system"] == "huey" + assert task1_execute_span["parent_span_id"] == task1_enqueue_span["span_id"] + assert task1_execute_span["attributes"]["sentry.op"] == "queue.task.huey" + assert task1_execute_span["attributes"]["sentry.origin"] == "auto.queue.huey" + assert ( + task1_execute_span["attributes"]["sentry.span.source"] == SegmentSource.TASK + ) + assert task1_execute_span["attributes"]["messaging.message.id"] is not None + assert task1_execute_span["attributes"]["messaging.message.retry.count"] == 0 + + assert task2_execute_span["name"] == "task2" + assert task2_execute_span["status"] == "ok" + assert task2_execute_span["parent_span_id"] == task2_enqueue_span["span_id"] + assert task2_execute_span["attributes"]["messaging.message.system"] == "huey" + assert task2_execute_span["attributes"]["sentry.op"] == "queue.task.huey" + assert task2_execute_span["attributes"]["sentry.origin"] == "auto.queue.huey" + assert ( + task2_execute_span["attributes"]["sentry.span.source"] == SegmentSource.TASK + ) - assert len(events) == 3 - - # Assert enqueue spans were successfully recorded - producer_event = events[0] - assert producer_event["type"] == "transaction" - assert producer_event["contexts"]["trace"]["trace_id"] == transaction.trace_id - assert producer_event["contexts"]["trace"]["origin"] == "manual" - - spans = producer_event["spans"] - assert len(spans) == 3 - assert spans[0]["op"] == "queue.submit.huey" - assert spans[0]["description"] == "Huey Task Group" - assert spans[1]["op"] == "queue.submit.huey" - assert spans[1]["description"] == "task1" - assert spans[2]["op"] == "queue.submit.huey" - assert spans[2]["description"] == "task2" - - # Consumer transaction assertions (one per task) - consumer_events = events[1:] - for _, (consumer_event, expected_name) in enumerate( - zip(consumer_events, ["task1", "task2"]) - ): - assert consumer_event["type"] == "transaction" - assert consumer_event["transaction"] == expected_name - assert consumer_event["transaction_info"] == {"source": "task"} - assert consumer_event["contexts"]["trace"]["op"] == "queue.task.huey" - assert consumer_event["contexts"]["trace"]["origin"] == "auto.queue.huey" - assert consumer_event["contexts"]["trace"]["status"] == "ok" - assert consumer_event["contexts"]["trace"]["trace_id"] == transaction.trace_id - assert "huey_task_id" in consumer_event["tags"] - assert consumer_event["tags"]["huey_task_retry"] is False + else: + events = capture_events() + with start_transaction() as transaction: + huey.enqueue(group([task1.s(), task2.s()])) + for _ in range(2): + task = huey.dequeue() + huey.execute(task) + assert len(events) == 3 + + # Assert enqueue spans were successfully recorded + producer_event = events[0] + assert producer_event["type"] == "transaction" + assert producer_event["contexts"]["trace"]["trace_id"] == transaction.trace_id + assert producer_event["contexts"]["trace"]["origin"] == "manual" + + spans = producer_event["spans"] + assert len(spans) == 3 + assert spans[0]["op"] == "queue.submit.huey" + assert spans[0]["description"] == "Huey Task Group" + assert spans[1]["op"] == "queue.submit.huey" + assert spans[1]["description"] == "task1" + assert spans[2]["op"] == "queue.submit.huey" + assert spans[2]["description"] == "task2" + + # Consumer transaction assertions (one per task) + consumer_events = events[1:] + for consumer_event, expected_name in zip(consumer_events, ["task1", "task2"]): + assert consumer_event["type"] == "transaction" + assert consumer_event["transaction"] == expected_name + assert consumer_event["transaction_info"] == {"source": "task"} + assert consumer_event["contexts"]["trace"]["op"] == "queue.task.huey" + assert consumer_event["contexts"]["trace"]["origin"] == "auto.queue.huey" + assert consumer_event["contexts"]["trace"]["status"] == "ok" + assert ( + consumer_event["contexts"]["trace"]["trace_id"] == transaction.trace_id + ) + assert "huey_task_id" in consumer_event["tags"] + assert consumer_event["tags"]["huey_task_retry"] is False + + +@pytest.mark.parametrize("has_span_streaming", [True, False]) @pytest.mark.skipif(HUEY_VERSION < (3, 0), reason="chord was added in 3.0") -def test_huey_enqueue_chord(init_huey, capture_events): - huey = init_huey() - - events = capture_events() +def test_huey_enqueue_chord( + init_huey, capture_events, capture_items, has_span_streaming +): + huey = init_huey(has_span_streaming=has_span_streaming) @huey.task() def task1(): @@ -298,44 +502,114 @@ def task1(): def task2(results): pass - with start_transaction() as transaction: + if has_span_streaming: + items = capture_items("span") huey.enqueue(chord([task1.s()], task2.s())) - for _ in range(2): - task = huey.dequeue() - huey.execute(task) + for _ in range(2): + task = huey.dequeue() + huey.execute(task) + + sentry_sdk.get_client().flush() + assert len(items) == 5 + + ( + task1_enqueue_span, + chord_span, + task2_enqueue_span, + task1_execute_span, + task2_execute_span, + ) = [i.payload for i in items] + + assert chord_span["is_segment"] + assert not task1_enqueue_span["is_segment"] + assert not task2_enqueue_span["is_segment"] + assert task1_execute_span["is_segment"] + assert task2_execute_span["is_segment"] + + assert chord_span["name"] == "Huey Chord" + assert chord_span["status"] == "ok" + assert chord_span["attributes"]["sentry.op"] == "queue.submit.huey" + assert chord_span["attributes"]["sentry.origin"] == "auto.queue.huey" + + assert task1_enqueue_span["name"] == "task1" + assert task1_enqueue_span["status"] == "ok" + assert task1_enqueue_span["parent_span_id"] == chord_span["span_id"] + assert task1_enqueue_span["attributes"]["sentry.segment.name"] == "Huey Chord" + assert task1_enqueue_span["attributes"]["sentry.op"] == "queue.submit.huey" + assert task1_enqueue_span["attributes"]["sentry.origin"] == "auto.queue.huey" + + assert task1_execute_span["name"] == "task1" + assert task1_execute_span["status"] == "ok" + assert task1_execute_span["attributes"]["messaging.message.system"] == "huey" + assert task1_execute_span["parent_span_id"] == task1_enqueue_span["span_id"] + assert task1_execute_span["attributes"]["sentry.op"] == "queue.task.huey" + assert task1_execute_span["attributes"]["sentry.origin"] == "auto.queue.huey" + assert ( + task1_execute_span["attributes"]["sentry.span.source"] == SegmentSource.TASK + ) + # chord callback (task2) is enqueued during task1's execution + assert task2_enqueue_span["name"] == "task2" + assert task2_enqueue_span["status"] == "ok" + assert task2_enqueue_span["parent_span_id"] == task1_execute_span["span_id"] + assert task2_enqueue_span["attributes"]["sentry.segment.name"] == "task1" + assert task2_enqueue_span["attributes"]["sentry.op"] == "queue.submit.huey" + assert task2_enqueue_span["attributes"]["sentry.origin"] == "auto.queue.huey" + + assert task2_execute_span["name"] == "task2" + assert task2_execute_span["status"] == "ok" + assert task2_execute_span["parent_span_id"] == task2_enqueue_span["span_id"] + assert task2_execute_span["attributes"]["messaging.message.system"] == "huey" + assert task2_execute_span["attributes"]["sentry.op"] == "queue.task.huey" + assert task2_execute_span["attributes"]["sentry.origin"] == "auto.queue.huey" + assert ( + task2_execute_span["attributes"]["sentry.span.source"] == SegmentSource.TASK + ) + else: + events = capture_events() + with start_transaction() as transaction: + huey.enqueue(chord([task1.s()], task2.s())) + + for _ in range(2): + task = huey.dequeue() + huey.execute(task) - assert len(events) == 3 - - # Enqueue spans - producer_event = events[0] - assert producer_event["contexts"]["trace"]["trace_id"] == transaction.trace_id - assert producer_event["contexts"]["trace"]["origin"] == "manual" - - spans = producer_event["spans"] - assert len(spans) == 2 - assert spans[0]["op"] == "queue.submit.huey" - assert spans[0]["description"] == "Huey Chord" - assert spans[1]["op"] == "queue.submit.huey" - assert spans[1]["description"] == "task1" - - task1_event = events[1] - # Confirm the first task enqueued the chord callback - task1_spans = task1_event["spans"] - assert len(task1_spans) == 1 - assert task1_spans[0]["op"] == "queue.submit.huey" - assert task1_spans[0]["description"] == "task2" - - consumer_events = events[1:] - for _, (consumer_event, expected_name) in enumerate( - zip(consumer_events, ["task1", "task2"]) - ): - assert consumer_event["type"] == "transaction" - assert consumer_event["transaction"] == expected_name - assert consumer_event["transaction_info"] == {"source": "task"} - assert consumer_event["contexts"]["trace"]["op"] == "queue.task.huey" - assert consumer_event["contexts"]["trace"]["origin"] == "auto.queue.huey" - assert consumer_event["contexts"]["trace"]["status"] == "ok" - assert consumer_event["contexts"]["trace"]["trace_id"] == transaction.trace_id - assert "huey_task_id" in consumer_event["tags"] - assert consumer_event["tags"]["huey_task_retry"] is False + assert len(events) == 3 + + # Enqueue spans + producer_event = events[0] + assert producer_event["contexts"]["trace"]["trace_id"] == transaction.trace_id + assert producer_event["contexts"]["trace"]["origin"] == "manual" + + spans = producer_event["spans"] + assert len(spans) == 2 + assert spans[0]["op"] == "queue.submit.huey" + assert spans[0]["description"] == "Huey Chord" + assert spans[1]["op"] == "queue.submit.huey" + assert spans[1]["description"] == "task1" + + task1_event = events[1] + # Confirm the first task enqueued the chord callback + assert len(task1_event["spans"]) == 1 + assert task1_event["spans"][0]["op"] == "queue.submit.huey" + assert task1_event["spans"][0]["description"] == "task2" + assert task1_event["type"] == "transaction" + assert task1_event["transaction"] == "task1" + assert task1_event["transaction_info"] == {"source": "task"} + assert task1_event["contexts"]["trace"]["op"] == "queue.task.huey" + assert task1_event["contexts"]["trace"]["origin"] == "auto.queue.huey" + assert task1_event["contexts"]["trace"]["status"] == "ok" + assert task1_event["contexts"]["trace"]["trace_id"] == transaction.trace_id + assert "huey_task_id" in task1_event["tags"] + assert task1_event["tags"]["huey_task_retry"] is False + + task2_event = events[2] + assert task2_event["type"] == "transaction" + assert task2_event["transaction"] == "task2" + assert task2_event["transaction_info"] == {"source": "task"} + assert task2_event["contexts"]["trace"]["op"] == "queue.task.huey" + assert task2_event["contexts"]["trace"]["origin"] == "auto.queue.huey" + assert task2_event["contexts"]["trace"]["status"] == "ok" + assert task2_event["contexts"]["trace"]["trace_id"] == transaction.trace_id + assert "huey_task_id" in task2_event["tags"] + assert task2_event["tags"]["huey_task_retry"] is False