Merge branch 'master' into release/0.17b0-auto
This commit is contained in:
commit
ab0b47c260
|
|
@ -78,7 +78,7 @@ disable=missing-docstring,
|
||||||
protected-access, # temp-pylint-upgrade
|
protected-access, # temp-pylint-upgrade
|
||||||
super-init-not-called, # temp-pylint-upgrade
|
super-init-not-called, # temp-pylint-upgrade
|
||||||
invalid-overridden-method, # temp-pylint-upgrade
|
invalid-overridden-method, # temp-pylint-upgrade
|
||||||
missing-module-docstring, # temp-pylint-upgrad, # temp-pylint-upgradee
|
missing-module-docstring, # temp-pylint-upgrade
|
||||||
|
|
||||||
# Enable the message, report, category or checker with the given id(s). You can
|
# Enable the message, report, category or checker with the given id(s). You can
|
||||||
# either give multiple identifier separated by comma (,) or put this option
|
# either give multiple identifier separated by comma (,) or put this option
|
||||||
|
|
|
||||||
12
CHANGELOG.md
12
CHANGELOG.md
|
|
@ -44,7 +44,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||||
|
|
||||||
### Changed
|
### Changed
|
||||||
- `opentelemetry-instrumentation-asgi`, `opentelemetry-instrumentation-wsgi` Return `None` for `CarrierGetter` if key not found
|
- `opentelemetry-instrumentation-asgi`, `opentelemetry-instrumentation-wsgi` Return `None` for `CarrierGetter` if key not found
|
||||||
([#1374](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/233))
|
([#233](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/233))
|
||||||
- `opentelemetry-instrumentation-grpc` Comply with updated spec, rework tests
|
- `opentelemetry-instrumentation-grpc` Comply with updated spec, rework tests
|
||||||
([#236](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/236))
|
([#236](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/236))
|
||||||
- `opentelemetry-instrumentation-asgi`, `opentelemetry-instrumentation-falcon`, `opentelemetry-instrumentation-flask`, `opentelemetry-instrumentation-pyramid`, `opentelemetry-instrumentation-wsgi` Renamed `host.port` attribute to `net.host.port`
|
- `opentelemetry-instrumentation-asgi`, `opentelemetry-instrumentation-falcon`, `opentelemetry-instrumentation-flask`, `opentelemetry-instrumentation-pyramid`, `opentelemetry-instrumentation-wsgi` Renamed `host.port` attribute to `net.host.port`
|
||||||
|
|
@ -58,7 +58,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||||
- `opentelemetry-instrumentation-aiopg` Fix AttributeError `__aexit__` when `aiopg.connect` and `aio[g].create_pool` used with async context manager
|
- `opentelemetry-instrumentation-aiopg` Fix AttributeError `__aexit__` when `aiopg.connect` and `aio[g].create_pool` used with async context manager
|
||||||
([#235](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/235))
|
([#235](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/235))
|
||||||
- `opentelemetry-exporter-datadog` `opentelemetry-sdk-extension-aws` Fix reference to ids_generator in sdk
|
- `opentelemetry-exporter-datadog` `opentelemetry-sdk-extension-aws` Fix reference to ids_generator in sdk
|
||||||
([#235](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/235))
|
([#283](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/283))
|
||||||
|
- `opentelemetry-instrumentation-sqlalchemy` Use SQL operation and DB name as span name.
|
||||||
|
([#254](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/254))
|
||||||
|
- `opentelemetry-instrumentation-dbapi`, `TracedCursor` replaced by `CursorTracer`
|
||||||
|
([#246](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/246))
|
||||||
|
- `opentelemetry-instrumentation-psycopg2`, Added support for psycopg2 registered types.
|
||||||
|
([#246](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/246))
|
||||||
|
- `opentelemetry-instrumentation-dbapi`, `opentelemetry-instrumentation-psycopg2`, `opentelemetry-instrumentation-mysql`, `opentelemetry-instrumentation-pymysql`, `opentelemetry-instrumentation-aiopg` Use SQL command name as the span operation name instead of the entire query.
|
||||||
|
([#246](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/246))
|
||||||
|
|
||||||
## [0.16b1](https://github.com/open-telemetry/opentelemetry-python-contrib/releases/tag/v0.16b1) - 2020-11-26
|
## [0.16b1](https://github.com/open-telemetry/opentelemetry-python-contrib/releases/tag/v0.16b1) - 2020-11-26
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -4,8 +4,8 @@ import wrapt
|
||||||
from aiopg.utils import _ContextManager, _PoolAcquireContextManager
|
from aiopg.utils import _ContextManager, _PoolAcquireContextManager
|
||||||
|
|
||||||
from opentelemetry.instrumentation.dbapi import (
|
from opentelemetry.instrumentation.dbapi import (
|
||||||
|
CursorTracer,
|
||||||
DatabaseApiIntegration,
|
DatabaseApiIntegration,
|
||||||
TracedCursor,
|
|
||||||
)
|
)
|
||||||
from opentelemetry.trace import SpanKind
|
from opentelemetry.trace import SpanKind
|
||||||
from opentelemetry.trace.status import Status, StatusCode
|
from opentelemetry.trace.status import Status, StatusCode
|
||||||
|
|
@ -94,25 +94,29 @@ def get_traced_pool_proxy(pool, db_api_integration, *args, **kwargs):
|
||||||
return TracedPoolProxy(pool, *args, **kwargs)
|
return TracedPoolProxy(pool, *args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
class AsyncTracedCursor(TracedCursor):
|
class AsyncCursorTracer(CursorTracer):
|
||||||
async def traced_execution(
|
async def traced_execution(
|
||||||
self,
|
self,
|
||||||
|
cursor,
|
||||||
query_method: typing.Callable[..., typing.Any],
|
query_method: typing.Callable[..., typing.Any],
|
||||||
*args: typing.Tuple[typing.Any, typing.Any],
|
*args: typing.Tuple[typing.Any, typing.Any],
|
||||||
**kwargs: typing.Dict[typing.Any, typing.Any]
|
**kwargs: typing.Dict[typing.Any, typing.Any]
|
||||||
):
|
):
|
||||||
name = ""
|
name = ""
|
||||||
if len(args) > 0 and args[0]:
|
if args:
|
||||||
name = args[0]
|
name = self.get_operation_name(cursor, args)
|
||||||
elif self._db_api_integration.database:
|
|
||||||
name = self._db_api_integration.database
|
if not name:
|
||||||
else:
|
name = (
|
||||||
name = self._db_api_integration.name
|
self._db_api_integration.database
|
||||||
|
if self._db_api_integration.database
|
||||||
|
else self._db_api_integration.name
|
||||||
|
)
|
||||||
|
|
||||||
with self._db_api_integration.get_tracer().start_as_current_span(
|
with self._db_api_integration.get_tracer().start_as_current_span(
|
||||||
name, kind=SpanKind.CLIENT
|
name, kind=SpanKind.CLIENT
|
||||||
) as span:
|
) as span:
|
||||||
self._populate_span(span, *args)
|
self._populate_span(span, cursor, *args)
|
||||||
try:
|
try:
|
||||||
result = await query_method(*args, **kwargs)
|
result = await query_method(*args, **kwargs)
|
||||||
return result
|
return result
|
||||||
|
|
@ -123,10 +127,10 @@ class AsyncTracedCursor(TracedCursor):
|
||||||
|
|
||||||
|
|
||||||
def get_traced_cursor_proxy(cursor, db_api_integration, *args, **kwargs):
|
def get_traced_cursor_proxy(cursor, db_api_integration, *args, **kwargs):
|
||||||
_traced_cursor = AsyncTracedCursor(db_api_integration)
|
_traced_cursor = AsyncCursorTracer(db_api_integration)
|
||||||
|
|
||||||
# pylint: disable=abstract-method
|
# pylint: disable=abstract-method
|
||||||
class AsyncTracedCursorProxy(AsyncProxyObject):
|
class AsyncCursorTracerProxy(AsyncProxyObject):
|
||||||
|
|
||||||
# pylint: disable=unused-argument
|
# pylint: disable=unused-argument
|
||||||
def __init__(self, cursor, *args, **kwargs):
|
def __init__(self, cursor, *args, **kwargs):
|
||||||
|
|
@ -134,20 +138,20 @@ def get_traced_cursor_proxy(cursor, db_api_integration, *args, **kwargs):
|
||||||
|
|
||||||
async def execute(self, *args, **kwargs):
|
async def execute(self, *args, **kwargs):
|
||||||
result = await _traced_cursor.traced_execution(
|
result = await _traced_cursor.traced_execution(
|
||||||
self.__wrapped__.execute, *args, **kwargs
|
self, self.__wrapped__.execute, *args, **kwargs
|
||||||
)
|
)
|
||||||
return result
|
return result
|
||||||
|
|
||||||
async def executemany(self, *args, **kwargs):
|
async def executemany(self, *args, **kwargs):
|
||||||
result = await _traced_cursor.traced_execution(
|
result = await _traced_cursor.traced_execution(
|
||||||
self.__wrapped__.executemany, *args, **kwargs
|
self, self.__wrapped__.executemany, *args, **kwargs
|
||||||
)
|
)
|
||||||
return result
|
return result
|
||||||
|
|
||||||
async def callproc(self, *args, **kwargs):
|
async def callproc(self, *args, **kwargs):
|
||||||
result = await _traced_cursor.traced_execution(
|
result = await _traced_cursor.traced_execution(
|
||||||
self.__wrapped__.callproc, *args, **kwargs
|
self, self.__wrapped__.callproc, *args, **kwargs
|
||||||
)
|
)
|
||||||
return result
|
return result
|
||||||
|
|
||||||
return AsyncTracedCursorProxy(cursor, *args, **kwargs)
|
return AsyncCursorTracerProxy(cursor, *args, **kwargs)
|
||||||
|
|
|
||||||
|
|
@ -256,7 +256,7 @@ class TestAiopgIntegration(TestBase):
|
||||||
spans_list = self.memory_exporter.get_finished_spans()
|
spans_list = self.memory_exporter.get_finished_spans()
|
||||||
self.assertEqual(len(spans_list), 1)
|
self.assertEqual(len(spans_list), 1)
|
||||||
span = spans_list[0]
|
span = spans_list[0]
|
||||||
self.assertEqual(span.name, "Test query")
|
self.assertEqual(span.name, "Test")
|
||||||
self.assertIs(span.kind, trace_api.SpanKind.CLIENT)
|
self.assertIs(span.kind, trace_api.SpanKind.CLIENT)
|
||||||
|
|
||||||
self.assertEqual(span.attributes["component"], "testcomponent")
|
self.assertEqual(span.attributes["component"], "testcomponent")
|
||||||
|
|
|
||||||
|
|
@ -60,6 +60,7 @@ def trace_integration(
|
||||||
connection_attributes: typing.Dict = None,
|
connection_attributes: typing.Dict = None,
|
||||||
tracer_provider: typing.Optional[TracerProvider] = None,
|
tracer_provider: typing.Optional[TracerProvider] = None,
|
||||||
capture_parameters: bool = False,
|
capture_parameters: bool = False,
|
||||||
|
db_api_integration_factory=None,
|
||||||
):
|
):
|
||||||
"""Integrate with DB API library.
|
"""Integrate with DB API library.
|
||||||
https://www.python.org/dev/peps/pep-0249/
|
https://www.python.org/dev/peps/pep-0249/
|
||||||
|
|
@ -86,6 +87,7 @@ def trace_integration(
|
||||||
version=__version__,
|
version=__version__,
|
||||||
tracer_provider=tracer_provider,
|
tracer_provider=tracer_provider,
|
||||||
capture_parameters=capture_parameters,
|
capture_parameters=capture_parameters,
|
||||||
|
db_api_integration_factory=db_api_integration_factory,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -99,6 +101,7 @@ def wrap_connect(
|
||||||
version: str = "",
|
version: str = "",
|
||||||
tracer_provider: typing.Optional[TracerProvider] = None,
|
tracer_provider: typing.Optional[TracerProvider] = None,
|
||||||
capture_parameters: bool = False,
|
capture_parameters: bool = False,
|
||||||
|
db_api_integration_factory=None,
|
||||||
):
|
):
|
||||||
"""Integrate with DB API library.
|
"""Integrate with DB API library.
|
||||||
https://www.python.org/dev/peps/pep-0249/
|
https://www.python.org/dev/peps/pep-0249/
|
||||||
|
|
@ -115,6 +118,9 @@ def wrap_connect(
|
||||||
capture_parameters: Configure if db.statement.parameters should be captured.
|
capture_parameters: Configure if db.statement.parameters should be captured.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
db_api_integration_factory = (
|
||||||
|
db_api_integration_factory or DatabaseApiIntegration
|
||||||
|
)
|
||||||
|
|
||||||
# pylint: disable=unused-argument
|
# pylint: disable=unused-argument
|
||||||
def wrap_connect_(
|
def wrap_connect_(
|
||||||
|
|
@ -123,7 +129,7 @@ def wrap_connect(
|
||||||
args: typing.Tuple[typing.Any, typing.Any],
|
args: typing.Tuple[typing.Any, typing.Any],
|
||||||
kwargs: typing.Dict[typing.Any, typing.Any],
|
kwargs: typing.Dict[typing.Any, typing.Any],
|
||||||
):
|
):
|
||||||
db_integration = DatabaseApiIntegration(
|
db_integration = db_api_integration_factory(
|
||||||
name,
|
name,
|
||||||
database_component,
|
database_component,
|
||||||
database_type=database_type,
|
database_type=database_type,
|
||||||
|
|
@ -314,16 +320,19 @@ def get_traced_connection_proxy(
|
||||||
return TracedConnectionProxy(connection, *args, **kwargs)
|
return TracedConnectionProxy(connection, *args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
class TracedCursor:
|
class CursorTracer:
|
||||||
def __init__(self, db_api_integration: DatabaseApiIntegration):
|
def __init__(self, db_api_integration: DatabaseApiIntegration):
|
||||||
self._db_api_integration = db_api_integration
|
self._db_api_integration = db_api_integration
|
||||||
|
|
||||||
def _populate_span(
|
def _populate_span(
|
||||||
self, span: trace_api.Span, *args: typing.Tuple[typing.Any, typing.Any]
|
self,
|
||||||
|
span: trace_api.Span,
|
||||||
|
cursor,
|
||||||
|
*args: typing.Tuple[typing.Any, typing.Any]
|
||||||
):
|
):
|
||||||
if not span.is_recording():
|
if not span.is_recording():
|
||||||
return
|
return
|
||||||
statement = args[0] if args else ""
|
statement = self.get_statement(cursor, args)
|
||||||
span.set_attribute(
|
span.set_attribute(
|
||||||
"component", self._db_api_integration.database_component
|
"component", self._db_api_integration.database_component
|
||||||
)
|
)
|
||||||
|
|
@ -342,24 +351,38 @@ class TracedCursor:
|
||||||
if self._db_api_integration.capture_parameters and len(args) > 1:
|
if self._db_api_integration.capture_parameters and len(args) > 1:
|
||||||
span.set_attribute("db.statement.parameters", str(args[1]))
|
span.set_attribute("db.statement.parameters", str(args[1]))
|
||||||
|
|
||||||
|
def get_operation_name(self, cursor, args): # pylint: disable=no-self-use
|
||||||
|
if args and isinstance(args[0], str):
|
||||||
|
return args[0].split()[0]
|
||||||
|
return ""
|
||||||
|
|
||||||
|
def get_statement(self, cursor, args): # pylint: disable=no-self-use
|
||||||
|
if not args:
|
||||||
|
return ""
|
||||||
|
statement = args[0]
|
||||||
|
if isinstance(statement, bytes):
|
||||||
|
return statement.decode("utf8", "replace")
|
||||||
|
return statement
|
||||||
|
|
||||||
def traced_execution(
|
def traced_execution(
|
||||||
self,
|
self,
|
||||||
|
cursor,
|
||||||
query_method: typing.Callable[..., typing.Any],
|
query_method: typing.Callable[..., typing.Any],
|
||||||
*args: typing.Tuple[typing.Any, typing.Any],
|
*args: typing.Tuple[typing.Any, typing.Any],
|
||||||
**kwargs: typing.Dict[typing.Any, typing.Any]
|
**kwargs: typing.Dict[typing.Any, typing.Any]
|
||||||
):
|
):
|
||||||
name = ""
|
name = self.get_operation_name(cursor, args)
|
||||||
if args:
|
if not name:
|
||||||
name = args[0]
|
name = (
|
||||||
elif self._db_api_integration.database:
|
self._db_api_integration.database
|
||||||
name = self._db_api_integration.database
|
if self._db_api_integration.database
|
||||||
else:
|
else self._db_api_integration.name
|
||||||
name = self._db_api_integration.name
|
)
|
||||||
|
|
||||||
with self._db_api_integration.get_tracer().start_as_current_span(
|
with self._db_api_integration.get_tracer().start_as_current_span(
|
||||||
name, kind=SpanKind.CLIENT
|
name, kind=SpanKind.CLIENT
|
||||||
) as span:
|
) as span:
|
||||||
self._populate_span(span, *args)
|
self._populate_span(span, cursor, *args)
|
||||||
try:
|
try:
|
||||||
result = query_method(*args, **kwargs)
|
result = query_method(*args, **kwargs)
|
||||||
return result
|
return result
|
||||||
|
|
@ -370,7 +393,7 @@ class TracedCursor:
|
||||||
|
|
||||||
|
|
||||||
def get_traced_cursor_proxy(cursor, db_api_integration, *args, **kwargs):
|
def get_traced_cursor_proxy(cursor, db_api_integration, *args, **kwargs):
|
||||||
_traced_cursor = TracedCursor(db_api_integration)
|
_cursor_tracer = CursorTracer(db_api_integration)
|
||||||
|
|
||||||
# pylint: disable=abstract-method
|
# pylint: disable=abstract-method
|
||||||
class TracedCursorProxy(wrapt.ObjectProxy):
|
class TracedCursorProxy(wrapt.ObjectProxy):
|
||||||
|
|
@ -380,18 +403,18 @@ def get_traced_cursor_proxy(cursor, db_api_integration, *args, **kwargs):
|
||||||
wrapt.ObjectProxy.__init__(self, cursor)
|
wrapt.ObjectProxy.__init__(self, cursor)
|
||||||
|
|
||||||
def execute(self, *args, **kwargs):
|
def execute(self, *args, **kwargs):
|
||||||
return _traced_cursor.traced_execution(
|
return _cursor_tracer.traced_execution(
|
||||||
self.__wrapped__.execute, *args, **kwargs
|
self.__wrapped__, self.__wrapped__.execute, *args, **kwargs
|
||||||
)
|
)
|
||||||
|
|
||||||
def executemany(self, *args, **kwargs):
|
def executemany(self, *args, **kwargs):
|
||||||
return _traced_cursor.traced_execution(
|
return _cursor_tracer.traced_execution(
|
||||||
self.__wrapped__.executemany, *args, **kwargs
|
self.__wrapped__, self.__wrapped__.executemany, *args, **kwargs
|
||||||
)
|
)
|
||||||
|
|
||||||
def callproc(self, *args, **kwargs):
|
def callproc(self, *args, **kwargs):
|
||||||
return _traced_cursor.traced_execution(
|
return _cursor_tracer.traced_execution(
|
||||||
self.__wrapped__.callproc, *args, **kwargs
|
self.__wrapped__, self.__wrapped__.callproc, *args, **kwargs
|
||||||
)
|
)
|
||||||
|
|
||||||
def __enter__(self):
|
def __enter__(self):
|
||||||
|
|
|
||||||
|
|
@ -50,7 +50,7 @@ class TestDBApiIntegration(TestBase):
|
||||||
spans_list = self.memory_exporter.get_finished_spans()
|
spans_list = self.memory_exporter.get_finished_spans()
|
||||||
self.assertEqual(len(spans_list), 1)
|
self.assertEqual(len(spans_list), 1)
|
||||||
span = spans_list[0]
|
span = spans_list[0]
|
||||||
self.assertEqual(span.name, "Test query")
|
self.assertEqual(span.name, "Test")
|
||||||
self.assertIs(span.kind, trace_api.SpanKind.CLIENT)
|
self.assertIs(span.kind, trace_api.SpanKind.CLIENT)
|
||||||
|
|
||||||
self.assertEqual(span.attributes["component"], "testcomponent")
|
self.assertEqual(span.attributes["component"], "testcomponent")
|
||||||
|
|
@ -65,6 +65,27 @@ class TestDBApiIntegration(TestBase):
|
||||||
span.status.status_code, trace_api.status.StatusCode.UNSET
|
span.status.status_code, trace_api.status.StatusCode.UNSET
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def test_span_name(self):
|
||||||
|
db_integration = dbapi.DatabaseApiIntegration(
|
||||||
|
self.tracer, "testcomponent", "testtype", {}
|
||||||
|
)
|
||||||
|
mock_connection = db_integration.wrapped_connection(
|
||||||
|
mock_connect, {}, {}
|
||||||
|
)
|
||||||
|
cursor = mock_connection.cursor()
|
||||||
|
cursor.execute("Test query", ("param1Value", False))
|
||||||
|
cursor.execute(
|
||||||
|
"""multi
|
||||||
|
line
|
||||||
|
query"""
|
||||||
|
)
|
||||||
|
cursor.execute("tab\tseparated query")
|
||||||
|
spans_list = self.memory_exporter.get_finished_spans()
|
||||||
|
self.assertEqual(len(spans_list), 3)
|
||||||
|
self.assertEqual(spans_list[0].name, "Test")
|
||||||
|
self.assertEqual(spans_list[1].name, "multi")
|
||||||
|
self.assertEqual(spans_list[2].name, "tab")
|
||||||
|
|
||||||
def test_span_succeeded_with_capture_of_statement_parameters(self):
|
def test_span_succeeded_with_capture_of_statement_parameters(self):
|
||||||
connection_props = {
|
connection_props = {
|
||||||
"database": "testdatabase",
|
"database": "testdatabase",
|
||||||
|
|
@ -93,7 +114,7 @@ class TestDBApiIntegration(TestBase):
|
||||||
spans_list = self.memory_exporter.get_finished_spans()
|
spans_list = self.memory_exporter.get_finished_spans()
|
||||||
self.assertEqual(len(spans_list), 1)
|
self.assertEqual(len(spans_list), 1)
|
||||||
span = spans_list[0]
|
span = spans_list[0]
|
||||||
self.assertEqual(span.name, "Test query")
|
self.assertEqual(span.name, "Test")
|
||||||
self.assertIs(span.kind, trace_api.SpanKind.CLIENT)
|
self.assertIs(span.kind, trace_api.SpanKind.CLIENT)
|
||||||
|
|
||||||
self.assertEqual(span.attributes["component"], "testcomponent")
|
self.assertEqual(span.attributes["component"], "testcomponent")
|
||||||
|
|
|
||||||
|
|
@ -39,12 +39,19 @@ API
|
||||||
---
|
---
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
import typing
|
||||||
|
|
||||||
import psycopg2
|
import psycopg2
|
||||||
|
from psycopg2.extensions import (
|
||||||
|
cursor as pg_cursor, # pylint: disable=no-name-in-module
|
||||||
|
)
|
||||||
|
from psycopg2.sql import Composed # pylint: disable=no-name-in-module
|
||||||
|
|
||||||
from opentelemetry.instrumentation import dbapi
|
from opentelemetry.instrumentation import dbapi
|
||||||
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
|
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
|
||||||
from opentelemetry.instrumentation.psycopg2.version import __version__
|
from opentelemetry.instrumentation.psycopg2.version import __version__
|
||||||
from opentelemetry.trace import get_tracer
|
|
||||||
|
_OTEL_CURSOR_FACTORY_KEY = "_otel_orig_cursor_factory"
|
||||||
|
|
||||||
|
|
||||||
class Psycopg2Instrumentor(BaseInstrumentor):
|
class Psycopg2Instrumentor(BaseInstrumentor):
|
||||||
|
|
@ -62,7 +69,6 @@ class Psycopg2Instrumentor(BaseInstrumentor):
|
||||||
"""Integrate with PostgreSQL Psycopg library.
|
"""Integrate with PostgreSQL Psycopg library.
|
||||||
Psycopg: http://initd.org/psycopg/
|
Psycopg: http://initd.org/psycopg/
|
||||||
"""
|
"""
|
||||||
|
|
||||||
tracer_provider = kwargs.get("tracer_provider")
|
tracer_provider = kwargs.get("tracer_provider")
|
||||||
|
|
||||||
dbapi.wrap_connect(
|
dbapi.wrap_connect(
|
||||||
|
|
@ -74,39 +80,101 @@ class Psycopg2Instrumentor(BaseInstrumentor):
|
||||||
self._CONNECTION_ATTRIBUTES,
|
self._CONNECTION_ATTRIBUTES,
|
||||||
version=__version__,
|
version=__version__,
|
||||||
tracer_provider=tracer_provider,
|
tracer_provider=tracer_provider,
|
||||||
|
db_api_integration_factory=DatabaseApiIntegration,
|
||||||
)
|
)
|
||||||
|
|
||||||
def _uninstrument(self, **kwargs):
|
def _uninstrument(self, **kwargs):
|
||||||
""""Disable Psycopg2 instrumentation"""
|
""""Disable Psycopg2 instrumentation"""
|
||||||
dbapi.unwrap_connect(psycopg2, "connect")
|
dbapi.unwrap_connect(psycopg2, "connect")
|
||||||
|
|
||||||
# pylint:disable=no-self-use
|
# TODO(owais): check if core dbapi can do this for all dbapi implementations e.g, pymysql and mysql
|
||||||
def instrument_connection(self, connection):
|
def instrument_connection(self, connection): # pylint: disable=no-self-use
|
||||||
"""Enable instrumentation in a Psycopg2 connection.
|
setattr(
|
||||||
|
connection, _OTEL_CURSOR_FACTORY_KEY, connection.cursor_factory
|
||||||
|
)
|
||||||
|
connection.cursor_factory = _new_cursor_factory()
|
||||||
|
return connection
|
||||||
|
|
||||||
Args:
|
# TODO(owais): check if core dbapi can do this for all dbapi implementations e.g, pymysql and mysql
|
||||||
connection: The connection to instrument.
|
def uninstrument_connection(
|
||||||
|
self, connection
|
||||||
|
): # pylint: disable=no-self-use
|
||||||
|
connection.cursor_factory = getattr(
|
||||||
|
connection, _OTEL_CURSOR_FACTORY_KEY, None
|
||||||
|
)
|
||||||
|
return connection
|
||||||
|
|
||||||
Returns:
|
|
||||||
An instrumented connection.
|
|
||||||
"""
|
|
||||||
tracer = get_tracer(__name__, __version__)
|
|
||||||
|
|
||||||
return dbapi.instrument_connection(
|
# TODO(owais): check if core dbapi can do this for all dbapi implementations e.g, pymysql and mysql
|
||||||
tracer,
|
class DatabaseApiIntegration(dbapi.DatabaseApiIntegration):
|
||||||
connection,
|
def wrapped_connection(
|
||||||
self._DATABASE_COMPONENT,
|
self,
|
||||||
self._DATABASE_TYPE,
|
connect_method: typing.Callable[..., typing.Any],
|
||||||
self._CONNECTION_ATTRIBUTES,
|
args: typing.Tuple[typing.Any, typing.Any],
|
||||||
|
kwargs: typing.Dict[typing.Any, typing.Any],
|
||||||
|
):
|
||||||
|
"""Add object proxy to connection object."""
|
||||||
|
base_cursor_factory = kwargs.pop("cursor_factory", None)
|
||||||
|
new_factory_kwargs = {"db_api": self}
|
||||||
|
if base_cursor_factory:
|
||||||
|
new_factory_kwargs["base_factory"] = base_cursor_factory
|
||||||
|
kwargs["cursor_factory"] = _new_cursor_factory(**new_factory_kwargs)
|
||||||
|
connection = connect_method(*args, **kwargs)
|
||||||
|
self.get_connection_attributes(connection)
|
||||||
|
return connection
|
||||||
|
|
||||||
|
|
||||||
|
class CursorTracer(dbapi.CursorTracer):
|
||||||
|
def get_operation_name(self, cursor, args):
|
||||||
|
if not args:
|
||||||
|
return ""
|
||||||
|
|
||||||
|
statement = args[0]
|
||||||
|
if isinstance(statement, Composed):
|
||||||
|
statement = statement.as_string(cursor)
|
||||||
|
|
||||||
|
if isinstance(statement, str):
|
||||||
|
return statement.split()[0]
|
||||||
|
|
||||||
|
return ""
|
||||||
|
|
||||||
|
def get_statement(self, cursor, args):
|
||||||
|
if not args:
|
||||||
|
return ""
|
||||||
|
|
||||||
|
statement = args[0]
|
||||||
|
if isinstance(statement, Composed):
|
||||||
|
statement = statement.as_string(cursor)
|
||||||
|
return statement
|
||||||
|
|
||||||
|
|
||||||
|
def _new_cursor_factory(db_api=None, base_factory=None):
|
||||||
|
if not db_api:
|
||||||
|
db_api = DatabaseApiIntegration(
|
||||||
|
__name__,
|
||||||
|
Psycopg2Instrumentor._DATABASE_COMPONENT,
|
||||||
|
database_type=Psycopg2Instrumentor._DATABASE_TYPE,
|
||||||
|
connection_attributes=Psycopg2Instrumentor._CONNECTION_ATTRIBUTES,
|
||||||
|
version=__version__,
|
||||||
)
|
)
|
||||||
|
|
||||||
def uninstrument_connection(self, connection):
|
base_factory = base_factory or pg_cursor
|
||||||
"""Disable instrumentation in a Psycopg2 connection.
|
_cursor_tracer = CursorTracer(db_api)
|
||||||
|
|
||||||
Args:
|
class TracedCursorFactory(base_factory):
|
||||||
connection: The connection to uninstrument.
|
def execute(self, *args, **kwargs):
|
||||||
|
return _cursor_tracer.traced_execution(
|
||||||
|
self, super().execute, *args, **kwargs
|
||||||
|
)
|
||||||
|
|
||||||
Returns:
|
def executemany(self, *args, **kwargs):
|
||||||
An uninstrumented connection.
|
return _cursor_tracer.traced_execution(
|
||||||
"""
|
self, super().executemany, *args, **kwargs
|
||||||
return dbapi.uninstrument_connection(connection)
|
)
|
||||||
|
|
||||||
|
def callproc(self, *args, **kwargs):
|
||||||
|
return _cursor_tracer.traced_execution(
|
||||||
|
self, super().callproc, *args, **kwargs
|
||||||
|
)
|
||||||
|
|
||||||
|
return TracedCursorFactory
|
||||||
|
|
|
||||||
|
|
@ -12,6 +12,7 @@
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
|
import types
|
||||||
from unittest import mock
|
from unittest import mock
|
||||||
|
|
||||||
import psycopg2
|
import psycopg2
|
||||||
|
|
@ -22,15 +23,69 @@ from opentelemetry.sdk import resources
|
||||||
from opentelemetry.test.test_base import TestBase
|
from opentelemetry.test.test_base import TestBase
|
||||||
|
|
||||||
|
|
||||||
|
class MockCursor:
|
||||||
|
|
||||||
|
execute = mock.MagicMock(spec=types.MethodType)
|
||||||
|
execute.__name__ = "execute"
|
||||||
|
|
||||||
|
executemany = mock.MagicMock(spec=types.MethodType)
|
||||||
|
executemany.__name__ = "executemany"
|
||||||
|
|
||||||
|
callproc = mock.MagicMock(spec=types.MethodType)
|
||||||
|
callproc.__name__ = "callproc"
|
||||||
|
|
||||||
|
rowcount = "SomeRowCount"
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, *args):
|
||||||
|
return self
|
||||||
|
|
||||||
|
|
||||||
|
class MockConnection:
|
||||||
|
|
||||||
|
commit = mock.MagicMock(spec=types.MethodType)
|
||||||
|
commit.__name__ = "commit"
|
||||||
|
|
||||||
|
rollback = mock.MagicMock(spec=types.MethodType)
|
||||||
|
rollback.__name__ = "rollback"
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
self.cursor_factory = kwargs.pop("cursor_factory", None)
|
||||||
|
|
||||||
|
def cursor(self):
|
||||||
|
if self.cursor_factory:
|
||||||
|
return self.cursor_factory(self)
|
||||||
|
return MockCursor()
|
||||||
|
|
||||||
|
def get_dsn_parameters(self): # pylint: disable=no-self-use
|
||||||
|
return dict(dbname="test")
|
||||||
|
|
||||||
|
|
||||||
class TestPostgresqlIntegration(TestBase):
|
class TestPostgresqlIntegration(TestBase):
|
||||||
|
def setUp(self):
|
||||||
|
self.cursor_mock = mock.patch(
|
||||||
|
"opentelemetry.instrumentation.psycopg2.pg_cursor", MockCursor
|
||||||
|
)
|
||||||
|
self.connection_mock = mock.patch("psycopg2.connect", MockConnection)
|
||||||
|
|
||||||
|
self.cursor_mock.start()
|
||||||
|
self.connection_mock.start()
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
super().tearDown()
|
super().tearDown()
|
||||||
|
self.memory_exporter.clear()
|
||||||
|
self.cursor_mock.stop()
|
||||||
|
self.connection_mock.stop()
|
||||||
with self.disable_logging():
|
with self.disable_logging():
|
||||||
Psycopg2Instrumentor().uninstrument()
|
Psycopg2Instrumentor().uninstrument()
|
||||||
|
|
||||||
@mock.patch("psycopg2.connect")
|
|
||||||
# pylint: disable=unused-argument
|
# pylint: disable=unused-argument
|
||||||
def test_instrumentor(self, mock_connect):
|
def test_instrumentor(self):
|
||||||
Psycopg2Instrumentor().instrument()
|
Psycopg2Instrumentor().instrument()
|
||||||
|
|
||||||
cnx = psycopg2.connect(database="test")
|
cnx = psycopg2.connect(database="test")
|
||||||
|
|
@ -60,9 +115,8 @@ class TestPostgresqlIntegration(TestBase):
|
||||||
spans_list = self.memory_exporter.get_finished_spans()
|
spans_list = self.memory_exporter.get_finished_spans()
|
||||||
self.assertEqual(len(spans_list), 1)
|
self.assertEqual(len(spans_list), 1)
|
||||||
|
|
||||||
@mock.patch("psycopg2.connect")
|
|
||||||
# pylint: disable=unused-argument
|
# pylint: disable=unused-argument
|
||||||
def test_not_recording(self, mock_connect):
|
def test_not_recording(self):
|
||||||
mock_tracer = mock.Mock()
|
mock_tracer = mock.Mock()
|
||||||
mock_span = mock.Mock()
|
mock_span = mock.Mock()
|
||||||
mock_span.is_recording.return_value = False
|
mock_span.is_recording.return_value = False
|
||||||
|
|
@ -83,9 +137,8 @@ class TestPostgresqlIntegration(TestBase):
|
||||||
|
|
||||||
Psycopg2Instrumentor().uninstrument()
|
Psycopg2Instrumentor().uninstrument()
|
||||||
|
|
||||||
@mock.patch("psycopg2.connect")
|
|
||||||
# pylint: disable=unused-argument
|
# pylint: disable=unused-argument
|
||||||
def test_custom_tracer_provider(self, mock_connect):
|
def test_custom_tracer_provider(self):
|
||||||
resource = resources.Resource.create({})
|
resource = resources.Resource.create({})
|
||||||
result = self.create_tracer_provider(resource=resource)
|
result = self.create_tracer_provider(resource=resource)
|
||||||
tracer_provider, exporter = result
|
tracer_provider, exporter = result
|
||||||
|
|
@ -103,9 +156,8 @@ class TestPostgresqlIntegration(TestBase):
|
||||||
|
|
||||||
self.assertIs(span.resource, resource)
|
self.assertIs(span.resource, resource)
|
||||||
|
|
||||||
@mock.patch("psycopg2.connect")
|
|
||||||
# pylint: disable=unused-argument
|
# pylint: disable=unused-argument
|
||||||
def test_instrument_connection(self, mock_connect):
|
def test_instrument_connection(self):
|
||||||
cnx = psycopg2.connect(database="test")
|
cnx = psycopg2.connect(database="test")
|
||||||
query = "SELECT * FROM test"
|
query = "SELECT * FROM test"
|
||||||
cursor = cnx.cursor()
|
cursor = cnx.cursor()
|
||||||
|
|
@ -121,9 +173,8 @@ class TestPostgresqlIntegration(TestBase):
|
||||||
spans_list = self.memory_exporter.get_finished_spans()
|
spans_list = self.memory_exporter.get_finished_spans()
|
||||||
self.assertEqual(len(spans_list), 1)
|
self.assertEqual(len(spans_list), 1)
|
||||||
|
|
||||||
@mock.patch("psycopg2.connect")
|
|
||||||
# pylint: disable=unused-argument
|
# pylint: disable=unused-argument
|
||||||
def test_uninstrument_connection(self, mock_connect):
|
def test_uninstrument_connection(self):
|
||||||
Psycopg2Instrumentor().instrument()
|
Psycopg2Instrumentor().instrument()
|
||||||
cnx = psycopg2.connect(database="test")
|
cnx = psycopg2.connect(database="test")
|
||||||
query = "SELECT * FROM test"
|
query = "SELECT * FROM test"
|
||||||
|
|
|
||||||
|
|
@ -12,7 +12,7 @@
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
from sqlalchemy.event import listen
|
from sqlalchemy.event import listen # pylint: disable=no-name-in-module
|
||||||
|
|
||||||
from opentelemetry import trace
|
from opentelemetry import trace
|
||||||
from opentelemetry.instrumentation.sqlalchemy.version import __version__
|
from opentelemetry.instrumentation.sqlalchemy.version import __version__
|
||||||
|
|
@ -72,22 +72,38 @@ class EngineTracer:
|
||||||
listen(engine, "after_cursor_execute", self._after_cur_exec)
|
listen(engine, "after_cursor_execute", self._after_cur_exec)
|
||||||
listen(engine, "handle_error", self._handle_error)
|
listen(engine, "handle_error", self._handle_error)
|
||||||
|
|
||||||
|
def _operation_name(self, db_name, statement):
|
||||||
|
parts = []
|
||||||
|
if isinstance(statement, str):
|
||||||
|
# otel spec recommends against parsing SQL queries. We are not trying to parse SQL
|
||||||
|
# but simply truncating the statement to the first word. This covers probably >95%
|
||||||
|
# use cases and uses the SQL statement in span name correctly as per the spec.
|
||||||
|
# For some very special cases it might not record the correct statement if the SQL
|
||||||
|
# dialect is too weird but in any case it shouldn't break anything.
|
||||||
|
parts.append(statement.split()[0])
|
||||||
|
if db_name:
|
||||||
|
parts.append(db_name)
|
||||||
|
if not parts:
|
||||||
|
return self.vendor
|
||||||
|
return " ".join(parts)
|
||||||
|
|
||||||
# pylint: disable=unused-argument
|
# pylint: disable=unused-argument
|
||||||
def _before_cur_exec(self, conn, cursor, statement, *args):
|
def _before_cur_exec(self, conn, cursor, statement, *args):
|
||||||
|
attrs, found = _get_attributes_from_url(conn.engine.url)
|
||||||
|
if not found:
|
||||||
|
attrs = _get_attributes_from_cursor(self.vendor, cursor, attrs)
|
||||||
|
|
||||||
|
db_name = attrs.get(_DB, "")
|
||||||
self.current_span = self.tracer.start_span(
|
self.current_span = self.tracer.start_span(
|
||||||
statement, kind=trace.SpanKind.CLIENT
|
self._operation_name(db_name, statement),
|
||||||
|
kind=trace.SpanKind.CLIENT,
|
||||||
)
|
)
|
||||||
with self.tracer.use_span(self.current_span, end_on_exit=False):
|
with self.tracer.use_span(self.current_span, end_on_exit=False):
|
||||||
if self.current_span.is_recording():
|
if self.current_span.is_recording():
|
||||||
self.current_span.set_attribute(_STMT, statement)
|
self.current_span.set_attribute(_STMT, statement)
|
||||||
self.current_span.set_attribute("db.system", self.vendor)
|
self.current_span.set_attribute("db.system", self.vendor)
|
||||||
|
for key, value in attrs.items():
|
||||||
if not _set_attributes_from_url(
|
self.current_span.set_attribute(key, value)
|
||||||
self.current_span, conn.engine.url
|
|
||||||
):
|
|
||||||
_set_attributes_from_cursor(
|
|
||||||
self.current_span, self.vendor, cursor
|
|
||||||
)
|
|
||||||
|
|
||||||
# pylint: disable=unused-argument
|
# pylint: disable=unused-argument
|
||||||
def _after_cur_exec(self, conn, cursor, statement, *args):
|
def _after_cur_exec(self, conn, cursor, statement, *args):
|
||||||
|
|
@ -108,25 +124,22 @@ class EngineTracer:
|
||||||
self.current_span.end()
|
self.current_span.end()
|
||||||
|
|
||||||
|
|
||||||
def _set_attributes_from_url(span: trace.Span, url):
|
def _get_attributes_from_url(url):
|
||||||
"""Set connection tags from the url. return true if successful."""
|
"""Set connection tags from the url. return true if successful."""
|
||||||
if span.is_recording():
|
attrs = {}
|
||||||
if url.host:
|
if url.host:
|
||||||
span.set_attribute(_HOST, url.host)
|
attrs[_HOST] = url.host
|
||||||
if url.port:
|
if url.port:
|
||||||
span.set_attribute(_PORT, url.port)
|
attrs[_PORT] = url.port
|
||||||
if url.database:
|
if url.database:
|
||||||
span.set_attribute(_DB, url.database)
|
attrs[_DB] = url.database
|
||||||
if url.username:
|
if url.username:
|
||||||
span.set_attribute(_USER, url.username)
|
attrs[_USER] = url.username
|
||||||
|
return attrs, bool(url.host)
|
||||||
return bool(url.host)
|
|
||||||
|
|
||||||
|
|
||||||
def _set_attributes_from_cursor(span: trace.Span, vendor, cursor):
|
def _get_attributes_from_cursor(vendor, cursor, attrs):
|
||||||
"""Attempt to set db connection attributes by introspecting the cursor."""
|
"""Attempt to set db connection attributes by introspecting the cursor."""
|
||||||
if not span.is_recording():
|
|
||||||
return
|
|
||||||
if vendor == "postgresql":
|
if vendor == "postgresql":
|
||||||
# pylint: disable=import-outside-toplevel
|
# pylint: disable=import-outside-toplevel
|
||||||
from psycopg2.extensions import parse_dsn
|
from psycopg2.extensions import parse_dsn
|
||||||
|
|
@ -135,6 +148,7 @@ def _set_attributes_from_cursor(span: trace.Span, vendor, cursor):
|
||||||
dsn = getattr(cursor.connection, "dsn", None)
|
dsn = getattr(cursor.connection, "dsn", None)
|
||||||
if dsn:
|
if dsn:
|
||||||
data = parse_dsn(dsn)
|
data = parse_dsn(dsn)
|
||||||
span.set_attribute(_DB, data.get("dbname"))
|
attrs[_DB] = data.get("dbname")
|
||||||
span.set_attribute(_HOST, data.get("host"))
|
attrs[_HOST] = data.get("host")
|
||||||
span.set_attribute(_PORT, int(data.get("port")))
|
attrs[_PORT] = int(data.get("port"))
|
||||||
|
return attrs
|
||||||
|
|
|
||||||
|
|
@ -35,7 +35,7 @@ class TestSqlalchemyInstrumentation(TestBase):
|
||||||
spans = self.memory_exporter.get_finished_spans()
|
spans = self.memory_exporter.get_finished_spans()
|
||||||
|
|
||||||
self.assertEqual(len(spans), 1)
|
self.assertEqual(len(spans), 1)
|
||||||
self.assertEqual(spans[0].name, "SELECT 1 + 1;")
|
self.assertEqual(spans[0].name, "SELECT :memory:")
|
||||||
self.assertEqual(spans[0].kind, trace.SpanKind.CLIENT)
|
self.assertEqual(spans[0].kind, trace.SpanKind.CLIENT)
|
||||||
|
|
||||||
def test_not_recording(self):
|
def test_not_recording(self):
|
||||||
|
|
@ -68,5 +68,5 @@ class TestSqlalchemyInstrumentation(TestBase):
|
||||||
spans = self.memory_exporter.get_finished_spans()
|
spans = self.memory_exporter.get_finished_spans()
|
||||||
|
|
||||||
self.assertEqual(len(spans), 1)
|
self.assertEqual(len(spans), 1)
|
||||||
self.assertEqual(spans[0].name, "SELECT 1 + 1;")
|
self.assertEqual(spans[0].name, "SELECT :memory:")
|
||||||
self.assertEqual(spans[0].kind, trace.SpanKind.CLIENT)
|
self.assertEqual(spans[0].kind, trace.SpanKind.CLIENT)
|
||||||
|
|
|
||||||
|
|
@ -60,7 +60,7 @@ class TestSQLite3(TestBase):
|
||||||
stmt = "CREATE TABLE IF NOT EXISTS test (id integer)"
|
stmt = "CREATE TABLE IF NOT EXISTS test (id integer)"
|
||||||
with self._tracer.start_as_current_span("rootSpan"):
|
with self._tracer.start_as_current_span("rootSpan"):
|
||||||
self._cursor.execute(stmt)
|
self._cursor.execute(stmt)
|
||||||
self.validate_spans(stmt)
|
self.validate_spans("CREATE")
|
||||||
|
|
||||||
def test_executemany(self):
|
def test_executemany(self):
|
||||||
"""Should create a child span for executemany"""
|
"""Should create a child span for executemany"""
|
||||||
|
|
@ -68,7 +68,7 @@ class TestSQLite3(TestBase):
|
||||||
with self._tracer.start_as_current_span("rootSpan"):
|
with self._tracer.start_as_current_span("rootSpan"):
|
||||||
data = [("1",), ("2",), ("3",)]
|
data = [("1",), ("2",), ("3",)]
|
||||||
self._cursor.executemany(stmt, data)
|
self._cursor.executemany(stmt, data)
|
||||||
self.validate_spans(stmt)
|
self.validate_spans("INSERT")
|
||||||
|
|
||||||
def test_callproc(self):
|
def test_callproc(self):
|
||||||
"""Should create a child span for callproc"""
|
"""Should create a child span for callproc"""
|
||||||
|
|
|
||||||
|
|
@ -81,7 +81,7 @@ class TestFunctionalMysql(TestBase):
|
||||||
stmt = "CREATE TABLE IF NOT EXISTS test (id INT)"
|
stmt = "CREATE TABLE IF NOT EXISTS test (id INT)"
|
||||||
with self._tracer.start_as_current_span("rootSpan"):
|
with self._tracer.start_as_current_span("rootSpan"):
|
||||||
self._cursor.execute(stmt)
|
self._cursor.execute(stmt)
|
||||||
self.validate_spans(stmt)
|
self.validate_spans("CREATE")
|
||||||
|
|
||||||
def test_execute_with_connection_context_manager(self):
|
def test_execute_with_connection_context_manager(self):
|
||||||
"""Should create a child span for execute with connection context"""
|
"""Should create a child span for execute with connection context"""
|
||||||
|
|
@ -90,7 +90,7 @@ class TestFunctionalMysql(TestBase):
|
||||||
with self._connection as conn:
|
with self._connection as conn:
|
||||||
cursor = conn.cursor()
|
cursor = conn.cursor()
|
||||||
cursor.execute(stmt)
|
cursor.execute(stmt)
|
||||||
self.validate_spans(stmt)
|
self.validate_spans("CREATE")
|
||||||
|
|
||||||
def test_execute_with_cursor_context_manager(self):
|
def test_execute_with_cursor_context_manager(self):
|
||||||
"""Should create a child span for execute with cursor context"""
|
"""Should create a child span for execute with cursor context"""
|
||||||
|
|
@ -98,7 +98,7 @@ class TestFunctionalMysql(TestBase):
|
||||||
with self._tracer.start_as_current_span("rootSpan"):
|
with self._tracer.start_as_current_span("rootSpan"):
|
||||||
with self._connection.cursor() as cursor:
|
with self._connection.cursor() as cursor:
|
||||||
cursor.execute(stmt)
|
cursor.execute(stmt)
|
||||||
self.validate_spans(stmt)
|
self.validate_spans("CREATE")
|
||||||
|
|
||||||
def test_executemany(self):
|
def test_executemany(self):
|
||||||
"""Should create a child span for executemany"""
|
"""Should create a child span for executemany"""
|
||||||
|
|
@ -106,7 +106,7 @@ class TestFunctionalMysql(TestBase):
|
||||||
with self._tracer.start_as_current_span("rootSpan"):
|
with self._tracer.start_as_current_span("rootSpan"):
|
||||||
data = (("1",), ("2",), ("3",))
|
data = (("1",), ("2",), ("3",))
|
||||||
self._cursor.executemany(stmt, data)
|
self._cursor.executemany(stmt, data)
|
||||||
self.validate_spans(stmt)
|
self.validate_spans("INSERT")
|
||||||
|
|
||||||
def test_callproc(self):
|
def test_callproc(self):
|
||||||
"""Should create a child span for callproc"""
|
"""Should create a child span for callproc"""
|
||||||
|
|
|
||||||
|
|
@ -89,7 +89,7 @@ class TestFunctionalAiopgConnect(TestBase):
|
||||||
stmt = "CREATE TABLE IF NOT EXISTS test (id integer)"
|
stmt = "CREATE TABLE IF NOT EXISTS test (id integer)"
|
||||||
with self._tracer.start_as_current_span("rootSpan"):
|
with self._tracer.start_as_current_span("rootSpan"):
|
||||||
async_call(self._cursor.execute(stmt))
|
async_call(self._cursor.execute(stmt))
|
||||||
self.validate_spans(stmt)
|
self.validate_spans("CREATE")
|
||||||
|
|
||||||
def test_executemany(self):
|
def test_executemany(self):
|
||||||
"""Should create a child span for executemany"""
|
"""Should create a child span for executemany"""
|
||||||
|
|
@ -98,7 +98,7 @@ class TestFunctionalAiopgConnect(TestBase):
|
||||||
with self._tracer.start_as_current_span("rootSpan"):
|
with self._tracer.start_as_current_span("rootSpan"):
|
||||||
data = (("1",), ("2",), ("3",))
|
data = (("1",), ("2",), ("3",))
|
||||||
async_call(self._cursor.executemany(stmt, data))
|
async_call(self._cursor.executemany(stmt, data))
|
||||||
self.validate_spans(stmt)
|
self.validate_spans("INSERT")
|
||||||
|
|
||||||
def test_callproc(self):
|
def test_callproc(self):
|
||||||
"""Should create a child span for callproc"""
|
"""Should create a child span for callproc"""
|
||||||
|
|
@ -167,7 +167,7 @@ class TestFunctionalAiopgCreatePool(TestBase):
|
||||||
stmt = "CREATE TABLE IF NOT EXISTS test (id integer)"
|
stmt = "CREATE TABLE IF NOT EXISTS test (id integer)"
|
||||||
with self._tracer.start_as_current_span("rootSpan"):
|
with self._tracer.start_as_current_span("rootSpan"):
|
||||||
async_call(self._cursor.execute(stmt))
|
async_call(self._cursor.execute(stmt))
|
||||||
self.validate_spans(stmt)
|
self.validate_spans("CREATE")
|
||||||
|
|
||||||
def test_executemany(self):
|
def test_executemany(self):
|
||||||
"""Should create a child span for executemany"""
|
"""Should create a child span for executemany"""
|
||||||
|
|
@ -176,7 +176,7 @@ class TestFunctionalAiopgCreatePool(TestBase):
|
||||||
with self._tracer.start_as_current_span("rootSpan"):
|
with self._tracer.start_as_current_span("rootSpan"):
|
||||||
data = (("1",), ("2",), ("3",))
|
data = (("1",), ("2",), ("3",))
|
||||||
async_call(self._cursor.executemany(stmt, data))
|
async_call(self._cursor.executemany(stmt, data))
|
||||||
self.validate_spans(stmt)
|
self.validate_spans("INSERT")
|
||||||
|
|
||||||
def test_callproc(self):
|
def test_callproc(self):
|
||||||
"""Should create a child span for callproc"""
|
"""Should create a child span for callproc"""
|
||||||
|
|
|
||||||
|
|
@ -15,6 +15,7 @@
|
||||||
import os
|
import os
|
||||||
|
|
||||||
import psycopg2
|
import psycopg2
|
||||||
|
from psycopg2 import sql
|
||||||
|
|
||||||
from opentelemetry import trace as trace_api
|
from opentelemetry import trace as trace_api
|
||||||
from opentelemetry.instrumentation.psycopg2 import Psycopg2Instrumentor
|
from opentelemetry.instrumentation.psycopg2 import Psycopg2Instrumentor
|
||||||
|
|
@ -81,7 +82,7 @@ class TestFunctionalPsycopg(TestBase):
|
||||||
stmt = "CREATE TABLE IF NOT EXISTS test (id integer)"
|
stmt = "CREATE TABLE IF NOT EXISTS test (id integer)"
|
||||||
with self._tracer.start_as_current_span("rootSpan"):
|
with self._tracer.start_as_current_span("rootSpan"):
|
||||||
self._cursor.execute(stmt)
|
self._cursor.execute(stmt)
|
||||||
self.validate_spans(stmt)
|
self.validate_spans("CREATE")
|
||||||
|
|
||||||
def test_execute_with_connection_context_manager(self):
|
def test_execute_with_connection_context_manager(self):
|
||||||
"""Should create a child span for execute with connection context"""
|
"""Should create a child span for execute with connection context"""
|
||||||
|
|
@ -90,7 +91,7 @@ class TestFunctionalPsycopg(TestBase):
|
||||||
with self._connection as conn:
|
with self._connection as conn:
|
||||||
cursor = conn.cursor()
|
cursor = conn.cursor()
|
||||||
cursor.execute(stmt)
|
cursor.execute(stmt)
|
||||||
self.validate_spans(stmt)
|
self.validate_spans("CREATE")
|
||||||
|
|
||||||
def test_execute_with_cursor_context_manager(self):
|
def test_execute_with_cursor_context_manager(self):
|
||||||
"""Should create a child span for execute with cursor context"""
|
"""Should create a child span for execute with cursor context"""
|
||||||
|
|
@ -98,7 +99,7 @@ class TestFunctionalPsycopg(TestBase):
|
||||||
with self._tracer.start_as_current_span("rootSpan"):
|
with self._tracer.start_as_current_span("rootSpan"):
|
||||||
with self._connection.cursor() as cursor:
|
with self._connection.cursor() as cursor:
|
||||||
cursor.execute(stmt)
|
cursor.execute(stmt)
|
||||||
self.validate_spans(stmt)
|
self.validate_spans("CREATE")
|
||||||
self.assertTrue(cursor.closed)
|
self.assertTrue(cursor.closed)
|
||||||
|
|
||||||
def test_executemany(self):
|
def test_executemany(self):
|
||||||
|
|
@ -107,7 +108,7 @@ class TestFunctionalPsycopg(TestBase):
|
||||||
with self._tracer.start_as_current_span("rootSpan"):
|
with self._tracer.start_as_current_span("rootSpan"):
|
||||||
data = (("1",), ("2",), ("3",))
|
data = (("1",), ("2",), ("3",))
|
||||||
self._cursor.executemany(stmt, data)
|
self._cursor.executemany(stmt, data)
|
||||||
self.validate_spans(stmt)
|
self.validate_spans("INSERT")
|
||||||
|
|
||||||
def test_callproc(self):
|
def test_callproc(self):
|
||||||
"""Should create a child span for callproc"""
|
"""Should create a child span for callproc"""
|
||||||
|
|
@ -116,3 +117,30 @@ class TestFunctionalPsycopg(TestBase):
|
||||||
):
|
):
|
||||||
self._cursor.callproc("test", ())
|
self._cursor.callproc("test", ())
|
||||||
self.validate_spans("test")
|
self.validate_spans("test")
|
||||||
|
|
||||||
|
def test_register_types(self):
|
||||||
|
psycopg2.extras.register_default_jsonb(
|
||||||
|
conn_or_curs=self._cursor, loads=lambda x: x
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_composed_queries(self):
|
||||||
|
stmt = "CREATE TABLE IF NOT EXISTS users (id integer, name varchar)"
|
||||||
|
with self._tracer.start_as_current_span("rootSpan"):
|
||||||
|
self._cursor.execute(stmt)
|
||||||
|
self.validate_spans("CREATE")
|
||||||
|
|
||||||
|
self._cursor.execute(
|
||||||
|
sql.SQL("SELECT FROM {table} where {field}='{value}'").format(
|
||||||
|
table=sql.Identifier("users"),
|
||||||
|
field=sql.Identifier("name"),
|
||||||
|
value=sql.Identifier("abc"),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
spans = self.memory_exporter.get_finished_spans()
|
||||||
|
span = spans[2]
|
||||||
|
self.assertEqual(span.name, "SELECT")
|
||||||
|
self.assertEqual(
|
||||||
|
span.attributes["db.statement"],
|
||||||
|
'SELECT FROM "users" where "name"=\'"abc"\'',
|
||||||
|
)
|
||||||
|
|
|
||||||
|
|
@ -78,7 +78,7 @@ class TestFunctionalPyMysql(TestBase):
|
||||||
stmt = "CREATE TABLE IF NOT EXISTS test (id INT)"
|
stmt = "CREATE TABLE IF NOT EXISTS test (id INT)"
|
||||||
with self._tracer.start_as_current_span("rootSpan"):
|
with self._tracer.start_as_current_span("rootSpan"):
|
||||||
self._cursor.execute(stmt)
|
self._cursor.execute(stmt)
|
||||||
self.validate_spans(stmt)
|
self.validate_spans("CREATE")
|
||||||
|
|
||||||
def test_execute_with_cursor_context_manager(self):
|
def test_execute_with_cursor_context_manager(self):
|
||||||
"""Should create a child span for execute with cursor context"""
|
"""Should create a child span for execute with cursor context"""
|
||||||
|
|
@ -86,7 +86,7 @@ class TestFunctionalPyMysql(TestBase):
|
||||||
with self._tracer.start_as_current_span("rootSpan"):
|
with self._tracer.start_as_current_span("rootSpan"):
|
||||||
with self._connection.cursor() as cursor:
|
with self._connection.cursor() as cursor:
|
||||||
cursor.execute(stmt)
|
cursor.execute(stmt)
|
||||||
self.validate_spans(stmt)
|
self.validate_spans("CREATE")
|
||||||
|
|
||||||
def test_executemany(self):
|
def test_executemany(self):
|
||||||
"""Should create a child span for executemany"""
|
"""Should create a child span for executemany"""
|
||||||
|
|
@ -94,7 +94,7 @@ class TestFunctionalPyMysql(TestBase):
|
||||||
with self._tracer.start_as_current_span("rootSpan"):
|
with self._tracer.start_as_current_span("rootSpan"):
|
||||||
data = (("1",), ("2",), ("3",))
|
data = (("1",), ("2",), ("3",))
|
||||||
self._cursor.executemany(stmt, data)
|
self._cursor.executemany(stmt, data)
|
||||||
self.validate_spans(stmt)
|
self.validate_spans("INSERT")
|
||||||
|
|
||||||
def test_callproc(self):
|
def test_callproc(self):
|
||||||
"""Should create a child span for callproc"""
|
"""Should create a child span for callproc"""
|
||||||
|
|
|
||||||
|
|
@ -110,6 +110,8 @@ class SQLAlchemyTestMixin(TestBase):
|
||||||
super().tearDown()
|
super().tearDown()
|
||||||
|
|
||||||
def _check_span(self, span, name):
|
def _check_span(self, span, name):
|
||||||
|
if self.SQL_DB:
|
||||||
|
name = "{0} {1}".format(name, self.SQL_DB)
|
||||||
self.assertEqual(span.name, name)
|
self.assertEqual(span.name, name)
|
||||||
self.assertEqual(span.attributes.get(_DB), self.SQL_DB)
|
self.assertEqual(span.attributes.get(_DB), self.SQL_DB)
|
||||||
self.assertIs(span.status.status_code, trace.status.StatusCode.UNSET)
|
self.assertIs(span.status.status_code, trace.status.StatusCode.UNSET)
|
||||||
|
|
@ -129,7 +131,7 @@ class SQLAlchemyTestMixin(TestBase):
|
||||||
stmt += "(?, ?)"
|
stmt += "(?, ?)"
|
||||||
else:
|
else:
|
||||||
stmt += "(%(id)s, %(name)s)"
|
stmt += "(%(id)s, %(name)s)"
|
||||||
self._check_span(span, stmt)
|
self._check_span(span, "INSERT")
|
||||||
self.assertIn("INSERT INTO players", span.attributes.get(_STMT))
|
self.assertIn("INSERT INTO players", span.attributes.get(_STMT))
|
||||||
self.check_meta(span)
|
self.check_meta(span)
|
||||||
|
|
||||||
|
|
@ -146,7 +148,7 @@ class SQLAlchemyTestMixin(TestBase):
|
||||||
stmt += "?"
|
stmt += "?"
|
||||||
else:
|
else:
|
||||||
stmt += "%(name_1)s"
|
stmt += "%(name_1)s"
|
||||||
self._check_span(span, stmt)
|
self._check_span(span, "SELECT")
|
||||||
self.assertIn(
|
self.assertIn(
|
||||||
"SELECT players.id AS players_id, players.name AS players_name \nFROM players \nWHERE players.name",
|
"SELECT players.id AS players_id, players.name AS players_name \nFROM players \nWHERE players.name",
|
||||||
span.attributes.get(_STMT),
|
span.attributes.get(_STMT),
|
||||||
|
|
@ -163,7 +165,7 @@ class SQLAlchemyTestMixin(TestBase):
|
||||||
spans = self.memory_exporter.get_finished_spans()
|
spans = self.memory_exporter.get_finished_spans()
|
||||||
self.assertEqual(len(spans), 1)
|
self.assertEqual(len(spans), 1)
|
||||||
span = spans[0]
|
span = spans[0]
|
||||||
self._check_span(span, stmt)
|
self._check_span(span, "SELECT")
|
||||||
self.assertEqual(span.attributes.get(_STMT), "SELECT * FROM players")
|
self.assertEqual(span.attributes.get(_STMT), "SELECT * FROM players")
|
||||||
self.check_meta(span)
|
self.check_meta(span)
|
||||||
|
|
||||||
|
|
@ -188,4 +190,4 @@ class SQLAlchemyTestMixin(TestBase):
|
||||||
self.assertEqual(parent_span.name, "sqlalch_op")
|
self.assertEqual(parent_span.name, "sqlalch_op")
|
||||||
self.assertEqual(parent_span.instrumentation_info.name, "sqlalch_svc")
|
self.assertEqual(parent_span.instrumentation_info.name, "sqlalch_svc")
|
||||||
|
|
||||||
self.assertEqual(child_span.name, stmt)
|
self.assertEqual(child_span.name, "SELECT " + self.SQL_DB)
|
||||||
|
|
|
||||||
|
|
@ -56,7 +56,7 @@ class SQLAlchemyInstrumentTestCase(TestBase):
|
||||||
|
|
||||||
def test_engine_traced(self):
|
def test_engine_traced(self):
|
||||||
# ensures that the engine is traced
|
# ensures that the engine is traced
|
||||||
rows = self.conn.execute("SELECT 1").fetchall()
|
rows = self.conn.execute("SELECT").fetchall()
|
||||||
self.assertEqual(len(rows), 1)
|
self.assertEqual(len(rows), 1)
|
||||||
|
|
||||||
traces = self.memory_exporter.get_finished_spans()
|
traces = self.memory_exporter.get_finished_spans()
|
||||||
|
|
@ -64,6 +64,6 @@ class SQLAlchemyInstrumentTestCase(TestBase):
|
||||||
self.assertEqual(len(traces), 1)
|
self.assertEqual(len(traces), 1)
|
||||||
span = traces[0]
|
span = traces[0]
|
||||||
# check subset of span fields
|
# check subset of span fields
|
||||||
self.assertEqual(span.name, "SELECT 1")
|
self.assertEqual(span.name, "SELECT opentelemetry-tests")
|
||||||
self.assertIs(span.status.status_code, trace.status.StatusCode.UNSET)
|
self.assertIs(span.status.status_code, trace.status.StatusCode.UNSET)
|
||||||
self.assertGreater((span.end_time - span.start_time), 0)
|
self.assertGreater((span.end_time - span.start_time), 0)
|
||||||
|
|
|
||||||
|
|
@ -67,7 +67,7 @@ class MysqlConnectorTestCase(SQLAlchemyTestMixin):
|
||||||
self.assertEqual(len(spans), 1)
|
self.assertEqual(len(spans), 1)
|
||||||
span = spans[0]
|
span = spans[0]
|
||||||
# span fields
|
# span fields
|
||||||
self.assertEqual(span.name, "SELECT * FROM a_wrong_table")
|
self.assertEqual(span.name, "SELECT opentelemetry-tests")
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
span.attributes.get(_STMT), "SELECT * FROM a_wrong_table"
|
span.attributes.get(_STMT), "SELECT * FROM a_wrong_table"
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -65,7 +65,7 @@ class PostgresTestCase(SQLAlchemyTestMixin):
|
||||||
self.assertEqual(len(spans), 1)
|
self.assertEqual(len(spans), 1)
|
||||||
span = spans[0]
|
span = spans[0]
|
||||||
# span fields
|
# span fields
|
||||||
self.assertEqual(span.name, "SELECT * FROM a_wrong_table")
|
self.assertEqual(span.name, "SELECT opentelemetry-tests")
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
span.attributes.get(_STMT), "SELECT * FROM a_wrong_table"
|
span.attributes.get(_STMT), "SELECT * FROM a_wrong_table"
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -43,7 +43,7 @@ class SQLiteTestCase(SQLAlchemyTestMixin):
|
||||||
self.assertEqual(len(spans), 1)
|
self.assertEqual(len(spans), 1)
|
||||||
span = spans[0]
|
span = spans[0]
|
||||||
# span fields
|
# span fields
|
||||||
self.assertEqual(span.name, stmt)
|
self.assertEqual(span.name, "SELECT :memory:")
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
span.attributes.get(_STMT), "SELECT * FROM a_wrong_table"
|
span.attributes.get(_STMT), "SELECT * FROM a_wrong_table"
|
||||||
)
|
)
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue