Added context propagation support to celery instrumentation (#1135)

This commit is contained in:
Owais Lone 2020-09-29 21:06:05 +05:30 committed by GitHub
parent 70b63c3959
commit 9641c8a62c
6 changed files with 59 additions and 75 deletions

View File

@ -46,21 +46,21 @@ def test_instrumentation_info(celery_app, memory_exporter):
async_span, run_span = spans async_span, run_span = spans
assert ( assert run_span.parent == async_span.context
async_span.instrumentation_info.name assert run_span.parent.span_id == async_span.context.span_id
== opentelemetry.instrumentation.celery.__name__ assert run_span.context.trace_id == async_span.context.trace_id
assert async_span.instrumentation_info.name == "apply_async/{0}".format(
opentelemetry.instrumentation.celery.__name__
) )
assert ( assert async_span.instrumentation_info.version == "apply_async/{0}".format(
async_span.instrumentation_info.version opentelemetry.instrumentation.celery.__version__
== opentelemetry.instrumentation.celery.__version__
) )
assert ( assert run_span.instrumentation_info.name == "run/{0}".format(
run_span.instrumentation_info.name opentelemetry.instrumentation.celery.__name__
== opentelemetry.instrumentation.celery.__name__
) )
assert ( assert run_span.instrumentation_info.version == "run/{0}".format(
run_span.instrumentation_info.version opentelemetry.instrumentation.celery.__version__
== opentelemetry.instrumentation.celery.__version__
) )
@ -103,7 +103,7 @@ def test_fn_task_apply(celery_app, memory_exporter):
span = spans[0] span = spans[0]
assert span.status.is_ok is True assert span.status.is_ok is True
assert span.name == "test_celery_functional.fn_task" assert span.name == "run/test_celery_functional.fn_task"
assert span.attributes.get("messaging.message_id") == t.task_id assert span.attributes.get("messaging.message_id") == t.task_id
assert ( assert (
span.attributes.get("celery.task_name") span.attributes.get("celery.task_name")
@ -128,7 +128,7 @@ def test_fn_task_apply_bind(celery_app, memory_exporter):
span = spans[0] span = spans[0]
assert span.status.is_ok is True assert span.status.is_ok is True
assert span.name == "test_celery_functional.fn_task" assert span.name == "run/test_celery_functional.fn_task"
assert span.attributes.get("messaging.message_id") == t.task_id assert span.attributes.get("messaging.message_id") == t.task_id
assert ( assert (
span.attributes.get("celery.task_name") span.attributes.get("celery.task_name")
@ -157,7 +157,10 @@ def test_fn_task_apply_async(celery_app, memory_exporter):
assert run_span.context.trace_id != async_span.context.trace_id assert run_span.context.trace_id != async_span.context.trace_id
assert async_span.status.is_ok is True assert async_span.status.is_ok is True
assert async_span.name == "test_celery_functional.fn_task_parameters" assert (
async_span.name
== "apply_async/test_celery_functional.fn_task_parameters"
)
assert async_span.attributes.get("celery.action") == "apply_async" assert async_span.attributes.get("celery.action") == "apply_async"
assert async_span.attributes.get("messaging.message_id") == result.task_id assert async_span.attributes.get("messaging.message_id") == result.task_id
assert ( assert (
@ -209,7 +212,10 @@ def test_fn_task_delay(celery_app, memory_exporter):
assert run_span.context.trace_id != async_span.context.trace_id assert run_span.context.trace_id != async_span.context.trace_id
assert async_span.status.is_ok is True assert async_span.status.is_ok is True
assert async_span.name == "test_celery_functional.fn_task_parameters" assert (
async_span.name
== "apply_async/test_celery_functional.fn_task_parameters"
)
assert async_span.attributes.get("celery.action") == "apply_async" assert async_span.attributes.get("celery.action") == "apply_async"
assert async_span.attributes.get("messaging.message_id") == result.task_id assert async_span.attributes.get("messaging.message_id") == result.task_id
assert ( assert (
@ -218,7 +224,7 @@ def test_fn_task_delay(celery_app, memory_exporter):
) )
assert run_span.status.is_ok is True assert run_span.status.is_ok is True
assert run_span.name == "test_celery_functional.fn_task_parameters" assert run_span.name == "run/test_celery_functional.fn_task_parameters"
assert run_span.attributes.get("celery.action") == "run" assert run_span.attributes.get("celery.action") == "run"
assert run_span.attributes.get("celery.state") == "SUCCESS" assert run_span.attributes.get("celery.state") == "SUCCESS"
assert run_span.attributes.get("messaging.message_id") == result.task_id assert run_span.attributes.get("messaging.message_id") == result.task_id
@ -244,7 +250,7 @@ def test_fn_exception(celery_app, memory_exporter):
span = spans[0] span = spans[0]
assert span.status.is_ok is False assert span.status.is_ok is False
assert span.name == "test_celery_functional.fn_exception" assert span.name == "run/test_celery_functional.fn_exception"
assert span.attributes.get("celery.action") == "run" assert span.attributes.get("celery.action") == "run"
assert span.attributes.get("celery.state") == "FAILURE" assert span.attributes.get("celery.state") == "FAILURE"
assert ( assert (
@ -273,7 +279,7 @@ def test_fn_exception_expected(celery_app, memory_exporter):
assert span.status.is_ok is True assert span.status.is_ok is True
assert span.status.canonical_code == StatusCanonicalCode.OK assert span.status.canonical_code == StatusCanonicalCode.OK
assert span.name == "test_celery_functional.fn_exception" assert span.name == "run/test_celery_functional.fn_exception"
assert span.attributes.get("celery.action") == "run" assert span.attributes.get("celery.action") == "run"
assert span.attributes.get("celery.state") == "FAILURE" assert span.attributes.get("celery.state") == "FAILURE"
assert ( assert (
@ -300,7 +306,7 @@ def test_fn_retry_exception(celery_app, memory_exporter):
assert span.status.is_ok is True assert span.status.is_ok is True
assert span.status.canonical_code == StatusCanonicalCode.OK assert span.status.canonical_code == StatusCanonicalCode.OK
assert span.name == "test_celery_functional.fn_exception" assert span.name == "run/test_celery_functional.fn_exception"
assert span.attributes.get("celery.action") == "run" assert span.attributes.get("celery.action") == "run"
assert span.attributes.get("celery.state") == "RETRY" assert span.attributes.get("celery.state") == "RETRY"
assert ( assert (
@ -332,7 +338,7 @@ def test_class_task(celery_app, memory_exporter):
span = spans[0] span = spans[0]
assert span.status.is_ok is True assert span.status.is_ok is True
assert span.name == "test_celery_functional.BaseTask" assert span.name == "run/test_celery_functional.BaseTask"
assert ( assert (
span.attributes.get("celery.task_name") span.attributes.get("celery.task_name")
== "test_celery_functional.BaseTask" == "test_celery_functional.BaseTask"
@ -364,7 +370,7 @@ def test_class_task_exception(celery_app, memory_exporter):
span = spans[0] span = spans[0]
assert span.status.is_ok is False assert span.status.is_ok is False
assert span.name == "test_celery_functional.BaseTask" assert span.name == "run/test_celery_functional.BaseTask"
assert ( assert (
span.attributes.get("celery.task_name") span.attributes.get("celery.task_name")
== "test_celery_functional.BaseTask" == "test_celery_functional.BaseTask"
@ -401,7 +407,7 @@ def test_class_task_exception_excepted(celery_app, memory_exporter):
assert span.status.is_ok is True assert span.status.is_ok is True
assert span.status.canonical_code == StatusCanonicalCode.OK assert span.status.canonical_code == StatusCanonicalCode.OK
assert span.name == "test_celery_functional.BaseTask" assert span.name == "run/test_celery_functional.BaseTask"
assert span.attributes.get("celery.action") == "run" assert span.attributes.get("celery.action") == "run"
assert span.attributes.get("celery.state") == "FAILURE" assert span.attributes.get("celery.state") == "FAILURE"
assert span.attributes.get("messaging.message_id") == result.task_id assert span.attributes.get("messaging.message_id") == result.task_id
@ -423,7 +429,7 @@ def test_shared_task(celery_app, memory_exporter):
span = spans[0] span = spans[0]
assert span.status.is_ok is True assert span.status.is_ok is True
assert span.name == "test_celery_functional.add" assert span.name == "run/test_celery_functional.add"
assert ( assert (
span.attributes.get("celery.task_name") == "test_celery_functional.add" span.attributes.get("celery.task_name") == "test_celery_functional.add"
) )
@ -471,7 +477,7 @@ def test_apply_async_previous_style_tasks(
async_span, async_run_span, run_span = spans async_span, async_run_span, run_span = spans
assert run_span.status.is_ok is True assert run_span.status.is_ok is True
assert run_span.name == "test_celery_functional.CelerySubClass" assert run_span.name == "run/test_celery_functional.CelerySubClass"
assert ( assert (
run_span.attributes.get("celery.task_name") run_span.attributes.get("celery.task_name")
== "test_celery_functional.CelerySubClass" == "test_celery_functional.CelerySubClass"
@ -481,7 +487,7 @@ def test_apply_async_previous_style_tasks(
assert run_span.attributes.get("messaging.message_id") == result.task_id assert run_span.attributes.get("messaging.message_id") == result.task_id
assert async_run_span.status.is_ok is True assert async_run_span.status.is_ok is True
assert async_run_span.name == "test_celery_functional.CelerySubClass" assert async_run_span.name == "run/test_celery_functional.CelerySubClass"
assert ( assert (
async_run_span.attributes.get("celery.task_name") async_run_span.attributes.get("celery.task_name")
== "test_celery_functional.CelerySubClass" == "test_celery_functional.CelerySubClass"
@ -493,7 +499,9 @@ def test_apply_async_previous_style_tasks(
) )
assert async_span.status.is_ok is True assert async_span.status.is_ok is True
assert async_span.name == "test_celery_functional.CelerySubClass" assert (
async_span.name == "apply_async/test_celery_functional.CelerySubClass"
)
assert ( assert (
async_span.attributes.get("celery.task_name") async_span.attributes.get("celery.task_name")
== "test_celery_functional.CelerySubClass" == "test_celery_functional.CelerySubClass"

View File

@ -75,15 +75,13 @@ class TestFunctionalMysql(TestBase):
self.assertEqual(db_span.attributes["net.peer.port"], MYSQL_PORT) self.assertEqual(db_span.attributes["net.peer.port"], MYSQL_PORT)
def test_execute(self): def test_execute(self):
"""Should create a child span for execute """Should create a child span for execute"""
"""
with self._tracer.start_as_current_span("rootSpan"): with self._tracer.start_as_current_span("rootSpan"):
self._cursor.execute("CREATE TABLE IF NOT EXISTS test (id INT)") self._cursor.execute("CREATE TABLE IF NOT EXISTS test (id INT)")
self.validate_spans() self.validate_spans()
def test_execute_with_connection_context_manager(self): def test_execute_with_connection_context_manager(self):
"""Should create a child span for execute with connection context """Should create a child span for execute with connection context"""
"""
with self._tracer.start_as_current_span("rootSpan"): with self._tracer.start_as_current_span("rootSpan"):
with self._connection as conn: with self._connection as conn:
cursor = conn.cursor() cursor = conn.cursor()
@ -91,16 +89,14 @@ class TestFunctionalMysql(TestBase):
self.validate_spans() self.validate_spans()
def test_execute_with_cursor_context_manager(self): def test_execute_with_cursor_context_manager(self):
"""Should create a child span for execute with cursor context """Should create a child span for execute with cursor context"""
"""
with self._tracer.start_as_current_span("rootSpan"): with self._tracer.start_as_current_span("rootSpan"):
with self._connection.cursor() as cursor: with self._connection.cursor() as cursor:
cursor.execute("CREATE TABLE IF NOT EXISTS test (id INT)") cursor.execute("CREATE TABLE IF NOT EXISTS test (id INT)")
self.validate_spans() self.validate_spans()
def test_executemany(self): def test_executemany(self):
"""Should create a child span for executemany """Should create a child span for executemany"""
"""
with self._tracer.start_as_current_span("rootSpan"): with self._tracer.start_as_current_span("rootSpan"):
data = (("1",), ("2",), ("3",)) data = (("1",), ("2",), ("3",))
stmt = "INSERT INTO test (id) VALUES (%s)" stmt = "INSERT INTO test (id) VALUES (%s)"
@ -108,8 +104,7 @@ class TestFunctionalMysql(TestBase):
self.validate_spans() self.validate_spans()
def test_callproc(self): def test_callproc(self):
"""Should create a child span for callproc """Should create a child span for callproc"""
"""
with self._tracer.start_as_current_span("rootSpan"), self.assertRaises( with self._tracer.start_as_current_span("rootSpan"), self.assertRaises(
Exception Exception
): ):

View File

@ -85,8 +85,7 @@ class TestFunctionalAiopgConnect(TestBase):
self.assertEqual(child_span.attributes["net.peer.port"], POSTGRES_PORT) self.assertEqual(child_span.attributes["net.peer.port"], POSTGRES_PORT)
def test_execute(self): def test_execute(self):
"""Should create a child span for execute method """Should create a child span for execute method"""
"""
with self._tracer.start_as_current_span("rootSpan"): with self._tracer.start_as_current_span("rootSpan"):
async_call( async_call(
self._cursor.execute( self._cursor.execute(
@ -96,8 +95,7 @@ class TestFunctionalAiopgConnect(TestBase):
self.validate_spans() self.validate_spans()
def test_executemany(self): def test_executemany(self):
"""Should create a child span for executemany """Should create a child span for executemany"""
"""
with pytest.raises(psycopg2.ProgrammingError): with pytest.raises(psycopg2.ProgrammingError):
with self._tracer.start_as_current_span("rootSpan"): with self._tracer.start_as_current_span("rootSpan"):
data = (("1",), ("2",), ("3",)) data = (("1",), ("2",), ("3",))
@ -106,8 +104,7 @@ class TestFunctionalAiopgConnect(TestBase):
self.validate_spans() self.validate_spans()
def test_callproc(self): def test_callproc(self):
"""Should create a child span for callproc """Should create a child span for callproc"""
"""
with self._tracer.start_as_current_span("rootSpan"), self.assertRaises( with self._tracer.start_as_current_span("rootSpan"), self.assertRaises(
Exception Exception
): ):
@ -169,8 +166,7 @@ class TestFunctionalAiopgCreatePool(TestBase):
self.assertEqual(child_span.attributes["net.peer.port"], POSTGRES_PORT) self.assertEqual(child_span.attributes["net.peer.port"], POSTGRES_PORT)
def test_execute(self): def test_execute(self):
"""Should create a child span for execute method """Should create a child span for execute method"""
"""
with self._tracer.start_as_current_span("rootSpan"): with self._tracer.start_as_current_span("rootSpan"):
async_call( async_call(
self._cursor.execute( self._cursor.execute(
@ -180,8 +176,7 @@ class TestFunctionalAiopgCreatePool(TestBase):
self.validate_spans() self.validate_spans()
def test_executemany(self): def test_executemany(self):
"""Should create a child span for executemany """Should create a child span for executemany"""
"""
with pytest.raises(psycopg2.ProgrammingError): with pytest.raises(psycopg2.ProgrammingError):
with self._tracer.start_as_current_span("rootSpan"): with self._tracer.start_as_current_span("rootSpan"):
data = (("1",), ("2",), ("3",)) data = (("1",), ("2",), ("3",))
@ -190,8 +185,7 @@ class TestFunctionalAiopgCreatePool(TestBase):
self.validate_spans() self.validate_spans()
def test_callproc(self): def test_callproc(self):
"""Should create a child span for callproc """Should create a child span for callproc"""
"""
with self._tracer.start_as_current_span("rootSpan"), self.assertRaises( with self._tracer.start_as_current_span("rootSpan"), self.assertRaises(
Exception Exception
): ):

View File

@ -77,8 +77,7 @@ class TestFunctionalPsycopg(TestBase):
self.assertEqual(child_span.attributes["net.peer.port"], POSTGRES_PORT) self.assertEqual(child_span.attributes["net.peer.port"], POSTGRES_PORT)
def test_execute(self): def test_execute(self):
"""Should create a child span for execute method """Should create a child span for execute method"""
"""
with self._tracer.start_as_current_span("rootSpan"): with self._tracer.start_as_current_span("rootSpan"):
self._cursor.execute( self._cursor.execute(
"CREATE TABLE IF NOT EXISTS test (id integer)" "CREATE TABLE IF NOT EXISTS test (id integer)"
@ -86,8 +85,7 @@ class TestFunctionalPsycopg(TestBase):
self.validate_spans() self.validate_spans()
def test_execute_with_connection_context_manager(self): def test_execute_with_connection_context_manager(self):
"""Should create a child span for execute with connection context """Should create a child span for execute with connection context"""
"""
with self._tracer.start_as_current_span("rootSpan"): with self._tracer.start_as_current_span("rootSpan"):
with self._connection as conn: with self._connection as conn:
cursor = conn.cursor() cursor = conn.cursor()
@ -95,8 +93,7 @@ class TestFunctionalPsycopg(TestBase):
self.validate_spans() self.validate_spans()
def test_execute_with_cursor_context_manager(self): def test_execute_with_cursor_context_manager(self):
"""Should create a child span for execute with cursor context """Should create a child span for execute with cursor context"""
"""
with self._tracer.start_as_current_span("rootSpan"): with self._tracer.start_as_current_span("rootSpan"):
with self._connection.cursor() as cursor: with self._connection.cursor() as cursor:
cursor.execute("CREATE TABLE IF NOT EXISTS test (id INT)") cursor.execute("CREATE TABLE IF NOT EXISTS test (id INT)")
@ -104,8 +101,7 @@ class TestFunctionalPsycopg(TestBase):
self.assertTrue(cursor.closed) self.assertTrue(cursor.closed)
def test_executemany(self): def test_executemany(self):
"""Should create a child span for executemany """Should create a child span for executemany"""
"""
with self._tracer.start_as_current_span("rootSpan"): with self._tracer.start_as_current_span("rootSpan"):
data = (("1",), ("2",), ("3",)) data = (("1",), ("2",), ("3",))
stmt = "INSERT INTO test (id) VALUES (%s)" stmt = "INSERT INTO test (id) VALUES (%s)"
@ -113,8 +109,7 @@ class TestFunctionalPsycopg(TestBase):
self.validate_spans() self.validate_spans()
def test_callproc(self): def test_callproc(self):
"""Should create a child span for callproc """Should create a child span for callproc"""
"""
with self._tracer.start_as_current_span("rootSpan"), self.assertRaises( with self._tracer.start_as_current_span("rootSpan"), self.assertRaises(
Exception Exception
): ):

View File

@ -64,8 +64,7 @@ class TestFunctionalPymongo(TestBase):
) )
def test_insert(self): def test_insert(self):
"""Should create a child span for insert """Should create a child span for insert"""
"""
with self._tracer.start_as_current_span("rootSpan"): with self._tracer.start_as_current_span("rootSpan"):
self._collection.insert_one( self._collection.insert_one(
{"name": "testName", "value": "testValue"} {"name": "testName", "value": "testValue"}
@ -73,8 +72,7 @@ class TestFunctionalPymongo(TestBase):
self.validate_spans() self.validate_spans()
def test_update(self): def test_update(self):
"""Should create a child span for update """Should create a child span for update"""
"""
with self._tracer.start_as_current_span("rootSpan"): with self._tracer.start_as_current_span("rootSpan"):
self._collection.update_one( self._collection.update_one(
{"name": "testName"}, {"$set": {"value": "someOtherValue"}} {"name": "testName"}, {"$set": {"value": "someOtherValue"}}
@ -82,15 +80,13 @@ class TestFunctionalPymongo(TestBase):
self.validate_spans() self.validate_spans()
def test_find(self): def test_find(self):
"""Should create a child span for find """Should create a child span for find"""
"""
with self._tracer.start_as_current_span("rootSpan"): with self._tracer.start_as_current_span("rootSpan"):
self._collection.find_one() self._collection.find_one()
self.validate_spans() self.validate_spans()
def test_delete(self): def test_delete(self):
"""Should create a child span for delete """Should create a child span for delete"""
"""
with self._tracer.start_as_current_span("rootSpan"): with self._tracer.start_as_current_span("rootSpan"):
self._collection.delete_one({"name": "testName"}) self._collection.delete_one({"name": "testName"})
self.validate_spans() self.validate_spans()

View File

@ -72,23 +72,20 @@ class TestFunctionalPyMysql(TestBase):
self.assertEqual(db_span.attributes["net.peer.port"], MYSQL_PORT) self.assertEqual(db_span.attributes["net.peer.port"], MYSQL_PORT)
def test_execute(self): def test_execute(self):
"""Should create a child span for execute """Should create a child span for execute"""
"""
with self._tracer.start_as_current_span("rootSpan"): with self._tracer.start_as_current_span("rootSpan"):
self._cursor.execute("CREATE TABLE IF NOT EXISTS test (id INT)") self._cursor.execute("CREATE TABLE IF NOT EXISTS test (id INT)")
self.validate_spans() self.validate_spans()
def test_execute_with_cursor_context_manager(self): def test_execute_with_cursor_context_manager(self):
"""Should create a child span for execute with cursor context """Should create a child span for execute with cursor context"""
"""
with self._tracer.start_as_current_span("rootSpan"): with self._tracer.start_as_current_span("rootSpan"):
with self._connection.cursor() as cursor: with self._connection.cursor() as cursor:
cursor.execute("CREATE TABLE IF NOT EXISTS test (id INT)") cursor.execute("CREATE TABLE IF NOT EXISTS test (id INT)")
self.validate_spans() self.validate_spans()
def test_executemany(self): def test_executemany(self):
"""Should create a child span for executemany """Should create a child span for executemany"""
"""
with self._tracer.start_as_current_span("rootSpan"): with self._tracer.start_as_current_span("rootSpan"):
data = (("1",), ("2",), ("3",)) data = (("1",), ("2",), ("3",))
stmt = "INSERT INTO test (id) VALUES (%s)" stmt = "INSERT INTO test (id) VALUES (%s)"
@ -96,8 +93,7 @@ class TestFunctionalPyMysql(TestBase):
self.validate_spans() self.validate_spans()
def test_callproc(self): def test_callproc(self):
"""Should create a child span for callproc """Should create a child span for callproc"""
"""
with self._tracer.start_as_current_span("rootSpan"), self.assertRaises( with self._tracer.start_as_current_span("rootSpan"), self.assertRaises(
Exception Exception
): ):