Add support for anonymous tasks (#1407)

This commit is contained in:
Naor Malca 2023-04-29 02:09:24 +03:00 committed by GitHub
parent f46a6e1c0e
commit 46e4b1da44
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 72 additions and 3 deletions

View File

@ -21,6 +21,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#1730](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1730))
- Make ASGI request span attributes available for `start_span`.
([#1762](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1762))
- `opentelemetry-instrumentation-celery` Add support for anonymous tasks.
([#1407](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1407)
### Fixed

View File

@ -183,10 +183,17 @@ class CeleryInstrumentor(BaseInstrumentor):
task = utils.retrieve_task_from_sender(kwargs)
task_id = utils.retrieve_task_id_from_message(kwargs)
if task is None or task_id is None:
if task_id is None:
return
operation_name = f"{_TASK_APPLY_ASYNC}/{task.name}"
if task is None:
# task is an anonymous task send using send_task or using canvas workflow
# Signatures() to send to a task not in the current processes dependency
# tree
task_name = kwargs.get("sender", "unknown")
else:
task_name = task.name
operation_name = f"{_TASK_APPLY_ASYNC}/{task_name}"
span = self._tracer.start_span(
operation_name, kind=trace.SpanKind.PRODUCER
)
@ -195,7 +202,7 @@ class CeleryInstrumentor(BaseInstrumentor):
if span.is_recording():
span.set_attribute(_TASK_TAG_KEY, _TASK_APPLY_ASYNC)
span.set_attribute(SpanAttributes.MESSAGING_MESSAGE_ID, task_id)
span.set_attribute(_TASK_NAME_KEY, task.name)
span.set_attribute(_TASK_NAME_KEY, task_name)
utils.set_attributes_from_context(span, kwargs)
activation = trace.use_span(span, end_on_exit=True)

View File

@ -132,6 +132,8 @@ def attach_span(task, task_id, span, is_publish=False):
NOTE: We cannot test for this well yet, because we do not run a celery worker,
and cannot run `task.apply_async()`
"""
if task is None:
return
span_dict = getattr(task, CTX_KEY, None)
if span_dict is None:
span_dict = {}

View File

@ -36,6 +36,7 @@ class TestCeleryInstrumentation(TestBase):
CeleryInstrumentor().uninstrument()
self._worker.stop()
self._thread.join()
CeleryInstrumentor().uninstrument()
def test_task(self):
CeleryInstrumentor().instrument()
@ -97,3 +98,52 @@ class TestCeleryInstrumentation(TestBase):
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 0)
class TestCelerySignatureTask(TestBase):
def setUp(self):
super().setUp()
def start_app(*args, **kwargs):
# Add an additional task that will not be registered with parent thread
@app.task
def hidden_task(num_a):
return num_a * 2
self._worker = app.Worker(app=app, pool="solo", concurrency=1)
return self._worker.start(*args, **kwargs)
self._thread = threading.Thread(target=start_app)
self._worker = app.Worker(app=app, pool="solo", concurrency=1)
self._thread.daemon = True
self._thread.start()
def tearDown(self):
super().tearDown()
self._worker.stop()
self._thread.join()
CeleryInstrumentor().uninstrument()
def test_hidden_task(self):
# no-op since already instrumented
CeleryInstrumentor().instrument()
res = app.signature("tests.test_tasks.hidden_task", (2,)).apply_async()
while not res.ready():
time.sleep(0.05)
spans = self.sorted_spans(self.memory_exporter.get_finished_spans())
self.assertEqual(len(spans), 2)
consumer, producer = spans
self.assertEqual(consumer.name, "run/tests.test_tasks.hidden_task")
self.assertEqual(consumer.kind, SpanKind.CONSUMER)
self.assertEqual(
producer.name, "apply_async/tests.test_tasks.hidden_task"
)
self.assertEqual(producer.kind, SpanKind.PRODUCER)
self.assertNotEqual(consumer.parent, producer.context)
self.assertEqual(consumer.parent.span_id, producer.context.span_id)
self.assertEqual(consumer.context.trace_id, producer.context.trace_id)

View File

@ -185,6 +185,13 @@ class TestUtils(unittest.TestCase):
utils.detach_span(fn_task, task_id)
self.assertEqual(utils.retrieve_span(fn_task, task_id), (None, None))
def test_optional_task_span_attach(self):
task_id = "7c6731af-9533-40c3-83a9-25b58f0d837f"
span = trace._Span("name", mock.Mock(spec=trace_api.SpanContext))
# assert this is is a no-aop
self.assertIsNone(utils.attach_span(None, task_id, span))
def test_span_delete_empty(self):
# ensure detach_span doesn't raise an exception if span is not present
@self.app.task