From f1e3f75f3625564581c1b6cd6945b90fcb4efa5c Mon Sep 17 00:00:00 2001 From: Leighton Chen Date: Tue, 4 Aug 2020 19:10:51 -0700 Subject: [PATCH 01/15] Rename remaining framework packages from "ext" to "instrumentation" (#969) --- .../CHANGELOG.md | 32 ++ .../LICENSE | 201 ++++++++ .../MANIFEST.in | 9 + .../README.rst | 18 + .../setup.cfg | 59 +++ .../setup.py | 27 ++ .../instrumentation/grpc/__init__.py | 224 +++++++++ .../instrumentation/grpc/_client.py | 275 +++++++++++ .../instrumentation/grpc/_server.py | 209 +++++++++ .../instrumentation/grpc/_utilities.py | 107 +++++ .../instrumentation/grpc/grpcext/__init__.py | 216 +++++++++ .../grpc/grpcext/_interceptor.py | 431 ++++++++++++++++++ .../instrumentation/grpc/version.py | 15 + .../tests/__init__.py | 13 + .../tests/_client.py | 57 +++ .../tests/_server.py | 87 ++++ .../tests/protobuf/test_server.proto | 34 ++ .../tests/protobuf/test_server_pb2.py | 215 +++++++++ .../tests/protobuf/test_server_pb2_grpc.py | 205 +++++++++ .../tests/test_client_interceptor.py | 298 ++++++++++++ .../tests/test_server_interceptor.py | 297 ++++++++++++ 21 files changed, 3029 insertions(+) create mode 100644 instrumentation/opentelemetry-instrumentation-grpc/CHANGELOG.md create mode 100644 instrumentation/opentelemetry-instrumentation-grpc/LICENSE create mode 100644 instrumentation/opentelemetry-instrumentation-grpc/MANIFEST.in create mode 100644 instrumentation/opentelemetry-instrumentation-grpc/README.rst create mode 100644 instrumentation/opentelemetry-instrumentation-grpc/setup.cfg create mode 100644 instrumentation/opentelemetry-instrumentation-grpc/setup.py create mode 100644 instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py create mode 100644 instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py create mode 100644 instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py create mode 100644 instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_utilities.py create mode 100644 instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/__init__.py create mode 100644 instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/_interceptor.py create mode 100644 instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/version.py create mode 100644 instrumentation/opentelemetry-instrumentation-grpc/tests/__init__.py create mode 100644 instrumentation/opentelemetry-instrumentation-grpc/tests/_client.py create mode 100644 instrumentation/opentelemetry-instrumentation-grpc/tests/_server.py create mode 100644 instrumentation/opentelemetry-instrumentation-grpc/tests/protobuf/test_server.proto create mode 100644 instrumentation/opentelemetry-instrumentation-grpc/tests/protobuf/test_server_pb2.py create mode 100644 instrumentation/opentelemetry-instrumentation-grpc/tests/protobuf/test_server_pb2_grpc.py create mode 100644 instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py create mode 100644 instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py diff --git a/instrumentation/opentelemetry-instrumentation-grpc/CHANGELOG.md b/instrumentation/opentelemetry-instrumentation-grpc/CHANGELOG.md new file mode 100644 index 000000000..b6b28ecd9 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/CHANGELOG.md @@ -0,0 +1,32 @@ +# Changelog + +## Unreleased + +- Change package name to opentelemetry-instrumentation-grpc + ([#969](https://github.com/open-telemetry/opentelemetry-python/pull/969)) + +## Version 0.11b0 + +Released 2020-07-28 + +- Add status code to gRPC client spans + ([896](https://github.com/open-telemetry/opentelemetry-python/pull/896)) +- Add gRPC client and server instrumentors + ([788](https://github.com/open-telemetry/opentelemetry-python/pull/788)) + +- Add metric recording (bytes in/out, errors, latency) to gRPC client + +## 0.8b0 + +Released 2020-05-27 + +- lint: version of grpc causes lint issues + ([#696](https://github.com/open-telemetry/opentelemetry-python/pull/696)) + +## 0.6b0 + +Released 2020-03-30 + +- Add gRPC integration + ([#476](https://github.com/open-telemetry/opentelemetry-python/pull/476)) +- Initial release diff --git a/instrumentation/opentelemetry-instrumentation-grpc/LICENSE b/instrumentation/opentelemetry-instrumentation-grpc/LICENSE new file mode 100644 index 000000000..261eeb9e9 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/instrumentation/opentelemetry-instrumentation-grpc/MANIFEST.in b/instrumentation/opentelemetry-instrumentation-grpc/MANIFEST.in new file mode 100644 index 000000000..aed3e3327 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/MANIFEST.in @@ -0,0 +1,9 @@ +graft src +graft tests +global-exclude *.pyc +global-exclude *.pyo +global-exclude __pycache__/* +include CHANGELOG.md +include MANIFEST.in +include README.rst +include LICENSE diff --git a/instrumentation/opentelemetry-instrumentation-grpc/README.rst b/instrumentation/opentelemetry-instrumentation-grpc/README.rst new file mode 100644 index 000000000..176bdf1a3 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/README.rst @@ -0,0 +1,18 @@ +OpenTelemetry gRPC Integration +============================== + +|pypi| + +.. |pypi| image:: https://badge.fury.io/py/opentelemetry-instrumentation-grpc.svg + :target: https://pypi.org/project/opentelemetry-instrumentation-grpc/ + +Client and server interceptors for `gRPC Python`_. + +.. _gRPC Python: https://grpc.github.io/grpc/python/grpc.html + +Installation +------------ + +:: + + pip install opentelemetry-instrumentation-grpc diff --git a/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg b/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg new file mode 100644 index 000000000..0308a229f --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg @@ -0,0 +1,59 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +[metadata] +name = opentelemetry-instrumentation-grpc +description = OpenTelemetry gRPC instrumentation +long_description = file: README.rst +long_description_content_type = text/x-rst +author = OpenTelemetry Authors +author_email = cncf-opentelemetry-contributors@lists.cncf.io +url = https://github.com/open-telemetry/opentelemetry-python/tree/master/instrumentation/opentelemetry-instrumentation-grpc +platforms = any +license = Apache-2.0 +classifiers = + Development Status :: 4 - Beta + Intended Audience :: Developers + License :: OSI Approved :: Apache Software License + Programming Language :: Python + Programming Language :: Python :: 3 + Programming Language :: Python :: 3.4 + Programming Language :: Python :: 3.5 + Programming Language :: Python :: 3.6 + Programming Language :: Python :: 3.7 + Programming Language :: Python :: 3.8 + +[options] +python_requires = >=3.4 +package_dir= + =src +packages=find_namespace: +install_requires = + opentelemetry-api == 0.12.dev0 + opentelemetry-sdk == 0.12.dev0 + grpcio ~= 1.27 + +[options.extras_require] +test = + opentelemetry-test == 0.12.dev0 + opentelemetry-sdk == 0.12.dev0 + protobuf == 3.12.2 + +[options.packages.find] +where = src + +[options.entry_points] +opentelemetry_instrumentor = + grpc_client = opentelemetry.instrumentation.grpc:GrpcInstrumentorClient + grpc_server = opentelemetry.instrumentation.grpc:GrpcInstrumentorServer diff --git a/instrumentation/opentelemetry-instrumentation-grpc/setup.py b/instrumentation/opentelemetry-instrumentation-grpc/setup.py new file mode 100644 index 000000000..87c720aea --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/setup.py @@ -0,0 +1,27 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +import setuptools + +BASE_DIR = os.path.dirname(__file__) +VERSION_FILENAME = os.path.join( + BASE_DIR, "src", "opentelemetry", "instrumentation", "grpc", "version.py" +) +PACKAGE_INFO = {} +with open(VERSION_FILENAME) as f: + exec(f.read(), PACKAGE_INFO) + +setuptools.setup(version=PACKAGE_INFO["__version__"]) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py new file mode 100644 index 000000000..9b6862eba --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py @@ -0,0 +1,224 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint:disable=no-name-in-module +# pylint:disable=relative-beyond-top-level +# pylint:disable=import-error +# pylint:disable=no-self-use +""" +Usage Client +------------ +.. code-block:: python + + import logging + + import grpc + + from opentelemetry import trace + from opentelemetry.instrumentation.grpc import GrpcInstrumentorClient, client_interceptor + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import ( + ConsoleSpanExporter, + SimpleExportSpanProcessor, + ) + + from opentelemetry import metrics + from opentelemetry.sdk.metrics import MeterProvider + from opentelemetry.sdk.metrics.export import ConsoleMetricsExporter + + try: + from .gen import helloworld_pb2, helloworld_pb2_grpc + except ImportError: + from gen import helloworld_pb2, helloworld_pb2_grpc + + trace.set_tracer_provider(TracerProvider()) + trace.get_tracer_provider().add_span_processor( + SimpleExportSpanProcessor(ConsoleSpanExporter()) + ) + + # Set meter provider to opentelemetry-sdk's MeterProvider + metrics.set_meter_provider(MeterProvider()) + + # Optional - export GRPC specific metrics (latency, bytes in/out, errors) by passing an exporter + instrumentor = GrpcInstrumentorClient(exporter=ConsoleMetricsExporter(), interval=10) + instrumentor.instrument() + + def run(): + with grpc.insecure_channel("localhost:50051") as channel: + + stub = helloworld_pb2_grpc.GreeterStub(channel) + response = stub.SayHello(helloworld_pb2.HelloRequest(name="YOU")) + + print("Greeter client received: " + response.message) + + + if __name__ == "__main__": + logging.basicConfig() + run() + +Usage Server +------------ +.. code-block:: python + + import logging + from concurrent import futures + + import grpc + + from opentelemetry import trace + from opentelemetry.instrumentation.grpc import GrpcInstrumentorServer, server_interceptor + from opentelemetry.instrumentation.grpc.grpcext import intercept_server + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import ( + ConsoleSpanExporter, + SimpleExportSpanProcessor, + ) + + try: + from .gen import helloworld_pb2, helloworld_pb2_grpc + except ImportError: + from gen import helloworld_pb2, helloworld_pb2_grpc + + trace.set_tracer_provider(TracerProvider()) + trace.get_tracer_provider().add_span_processor( + SimpleExportSpanProcessor(ConsoleSpanExporter()) + ) + grpc_server_instrumentor = GrpcInstrumentorServer() + grpc_server_instrumentor.instrument() + + + class Greeter(helloworld_pb2_grpc.GreeterServicer): + def SayHello(self, request, context): + return helloworld_pb2.HelloReply(message="Hello, %s!" % request.name) + + + def serve(): + + server = grpc.server(futures.ThreadPoolExecutor()) + + helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server) + server.add_insecure_port("[::]:50051") + server.start() + server.wait_for_termination() + + + if __name__ == "__main__": + logging.basicConfig() + serve() +""" +from contextlib import contextmanager +from functools import partial + +import grpc +from wrapt import wrap_function_wrapper as _wrap + +from opentelemetry import trace +from opentelemetry.instrumentation.grpc.grpcext import ( + intercept_channel, + intercept_server, +) +from opentelemetry.instrumentation.grpc.version import __version__ +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor +from opentelemetry.instrumentation.utils import unwrap + +# pylint:disable=import-outside-toplevel +# pylint:disable=import-self +# pylint:disable=unused-argument +# isort:skip + + +class GrpcInstrumentorServer(BaseInstrumentor): + def _instrument(self, **kwargs): + _wrap("grpc", "server", self.wrapper_fn) + + def _uninstrument(self, **kwargs): + unwrap(grpc, "server") + + def wrapper_fn(self, original_func, instance, args, kwargs): + server = original_func(*args, **kwargs) + return intercept_server(server, server_interceptor()) + + +class GrpcInstrumentorClient(BaseInstrumentor): + def _instrument(self, **kwargs): + exporter = kwargs.get("exporter", None) + interval = kwargs.get("interval", 30) + if kwargs.get("channel_type") == "secure": + _wrap( + "grpc", + "secure_channel", + partial(self.wrapper_fn, exporter, interval), + ) + + else: + _wrap( + "grpc", + "insecure_channel", + partial(self.wrapper_fn, exporter, interval), + ) + + def _uninstrument(self, **kwargs): + if kwargs.get("channel_type") == "secure": + unwrap(grpc, "secure_channel") + + else: + unwrap(grpc, "insecure_channel") + + def wrapper_fn( + self, exporter, interval, original_func, instance, args, kwargs + ): + channel = original_func(*args, **kwargs) + tracer_provider = kwargs.get("tracer_provider") + return intercept_channel( + channel, + client_interceptor( + tracer_provider=tracer_provider, + exporter=exporter, + interval=interval, + ), + ) + + +def client_interceptor(tracer_provider=None, exporter=None, interval=30): + """Create a gRPC client channel interceptor. + + Args: + tracer: The tracer to use to create client-side spans. + exporter: The exporter that will receive client metrics + interval: Time between every export call + + Returns: + An invocation-side interceptor object. + """ + from . import _client + + tracer = trace.get_tracer(__name__, __version__, tracer_provider) + + return _client.OpenTelemetryClientInterceptor(tracer, exporter, interval) + + +def server_interceptor(tracer_provider=None): + """Create a gRPC server interceptor. + + Args: + tracer: The tracer to use to create server-side spans. + + Returns: + A service-side interceptor object. + """ + from . import _server + + tracer = trace.get_tracer(__name__, __version__, tracer_provider) + + return _server.OpenTelemetryServerInterceptor(tracer) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py new file mode 100644 index 000000000..028804f59 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py @@ -0,0 +1,275 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint:disable=relative-beyond-top-level +# pylint:disable=arguments-differ +# pylint:disable=no-member +# pylint:disable=signature-differs + +"""Implementation of the invocation-side open-telemetry interceptor.""" + +from collections import OrderedDict +from typing import MutableMapping + +import grpc + +from opentelemetry import metrics, propagators, trace +from opentelemetry.sdk.metrics.export.controller import PushController +from opentelemetry.trace.status import Status, StatusCanonicalCode + +from . import grpcext +from ._utilities import RpcInfo, TimedMetricRecorder + + +class _GuardedSpan: + def __init__(self, span): + self.span = span + self.generated_span = None + self._engaged = True + + def __enter__(self): + self.generated_span = self.span.__enter__() + return self + + def __exit__(self, *args, **kwargs): + if self._engaged: + self.generated_span = None + return self.span.__exit__(*args, **kwargs) + return False + + def release(self): + self._engaged = False + return self.span + + +def _inject_span_context(metadata: MutableMapping[str, str]) -> None: + # pylint:disable=unused-argument + def append_metadata( + carrier: MutableMapping[str, str], key: str, value: str + ): + metadata[key] = value + + # Inject current active span from the context + propagators.inject(append_metadata, metadata) + + +def _make_future_done_callback(span, rpc_info, client_info, metrics_recorder): + def callback(response_future): + with span: + code = response_future.code() + if code != grpc.StatusCode.OK: + rpc_info.error = code + return + response = response_future.result() + rpc_info.response = response + if "ByteSize" in dir(response): + metrics_recorder.record_bytes_in( + response.ByteSize(), client_info.full_method + ) + + return callback + + +class OpenTelemetryClientInterceptor( + grpcext.UnaryClientInterceptor, grpcext.StreamClientInterceptor +): + def __init__(self, tracer, exporter, interval): + self._tracer = tracer + + self._meter = None + if exporter and interval: + self._meter = metrics.get_meter(__name__) + self.controller = PushController( + meter=self._meter, exporter=exporter, interval=interval + ) + self._metrics_recorder = TimedMetricRecorder(self._meter, "client") + + def _start_span(self, method): + return self._tracer.start_as_current_span( + name=method, kind=trace.SpanKind.CLIENT + ) + + # pylint:disable=no-self-use + def _trace_result(self, guarded_span, rpc_info, result, client_info): + # If the RPC is called asynchronously, release the guard and add a + # callback so that the span can be finished once the future is done. + if isinstance(result, grpc.Future): + result.add_done_callback( + _make_future_done_callback( + guarded_span.release(), + rpc_info, + client_info, + self._metrics_recorder, + ) + ) + return result + response = result + # Handle the case when the RPC is initiated via the with_call + # method and the result is a tuple with the first element as the + # response. + # http://www.grpc.io/grpc/python/grpc.html#grpc.UnaryUnaryMultiCallable.with_call + if isinstance(result, tuple): + response = result[0] + rpc_info.response = response + + if "ByteSize" in dir(response): + self._metrics_recorder.record_bytes_in( + response.ByteSize(), client_info.full_method + ) + return result + + def _start_guarded_span(self, *args, **kwargs): + return _GuardedSpan(self._start_span(*args, **kwargs)) + + def _bytes_out_iterator_wrapper(self, iterator, client_info): + for request in iterator: + if "ByteSize" in dir(request): + self._metrics_recorder.record_bytes_out( + request.ByteSize(), client_info.full_method + ) + yield request + + def intercept_unary(self, request, metadata, client_info, invoker): + if not metadata: + mutable_metadata = OrderedDict() + else: + mutable_metadata = OrderedDict(metadata) + + with self._start_guarded_span(client_info.full_method) as guarded_span: + with self._metrics_recorder.record_latency( + client_info.full_method + ): + _inject_span_context(mutable_metadata) + metadata = tuple(mutable_metadata.items()) + + # If protobuf is used, we can record the bytes in/out. Otherwise, we have no way + # to get the size of the request/response properly, so don't record anything + if "ByteSize" in dir(request): + self._metrics_recorder.record_bytes_out( + request.ByteSize(), client_info.full_method + ) + + rpc_info = RpcInfo( + full_method=client_info.full_method, + metadata=metadata, + timeout=client_info.timeout, + request=request, + ) + + try: + result = invoker(request, metadata) + except grpc.RpcError as exc: + guarded_span.generated_span.set_status( + Status(StatusCanonicalCode(exc.code().value[0])) + ) + raise + + return self._trace_result( + guarded_span, rpc_info, result, client_info + ) + + # For RPCs that stream responses, the result can be a generator. To record + # the span across the generated responses and detect any errors, we wrap + # the result in a new generator that yields the response values. + def _intercept_server_stream( + self, request_or_iterator, metadata, client_info, invoker + ): + if not metadata: + mutable_metadata = OrderedDict() + else: + mutable_metadata = OrderedDict(metadata) + + with self._start_span(client_info.full_method) as span: + with self._metrics_recorder.record_latency( + client_info.full_method + ): + _inject_span_context(mutable_metadata) + metadata = tuple(mutable_metadata.items()) + rpc_info = RpcInfo( + full_method=client_info.full_method, + metadata=metadata, + timeout=client_info.timeout, + ) + + if client_info.is_client_stream: + rpc_info.request = request_or_iterator + request_or_iterator = self._bytes_out_iterator_wrapper( + request_or_iterator, client_info + ) + else: + if "ByteSize" in dir(request_or_iterator): + self._metrics_recorder.record_bytes_out( + request_or_iterator.ByteSize(), + client_info.full_method, + ) + + try: + result = invoker(request_or_iterator, metadata) + + # Rewrap the result stream into a generator, and record the bytes received + for response in result: + if "ByteSize" in dir(response): + self._metrics_recorder.record_bytes_in( + response.ByteSize(), client_info.full_method + ) + yield response + except grpc.RpcError as exc: + span.set_status( + Status(StatusCanonicalCode(exc.code().value[0])) + ) + raise + + def intercept_stream( + self, request_or_iterator, metadata, client_info, invoker + ): + if client_info.is_server_stream: + return self._intercept_server_stream( + request_or_iterator, metadata, client_info, invoker + ) + + if not metadata: + mutable_metadata = OrderedDict() + else: + mutable_metadata = OrderedDict(metadata) + + with self._start_guarded_span(client_info.full_method) as guarded_span: + with self._metrics_recorder.record_latency( + client_info.full_method + ): + _inject_span_context(mutable_metadata) + metadata = tuple(mutable_metadata.items()) + rpc_info = RpcInfo( + full_method=client_info.full_method, + metadata=metadata, + timeout=client_info.timeout, + request=request_or_iterator, + ) + + rpc_info.request = request_or_iterator + + request_or_iterator = self._bytes_out_iterator_wrapper( + request_or_iterator, client_info + ) + + try: + result = invoker(request_or_iterator, metadata) + except grpc.RpcError as exc: + guarded_span.generated_span.set_status( + Status(StatusCanonicalCode(exc.code().value[0])) + ) + raise + + return self._trace_result( + guarded_span, rpc_info, result, client_info + ) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py new file mode 100644 index 000000000..cb0e997d3 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py @@ -0,0 +1,209 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint:disable=relative-beyond-top-level +# pylint:disable=arguments-differ +# pylint:disable=no-member +# pylint:disable=signature-differs + +"""Implementation of the service-side open-telemetry interceptor. + +This library borrows heavily from the OpenTracing gRPC integration: +https://github.com/opentracing-contrib/python-grpc +""" + +from contextlib import contextmanager +from typing import List + +import grpc + +from opentelemetry import propagators, trace +from opentelemetry.context import attach, detach + +from . import grpcext +from ._utilities import RpcInfo + + +# pylint:disable=abstract-method +class _OpenTelemetryServicerContext(grpc.ServicerContext): + def __init__(self, servicer_context, active_span): + self._servicer_context = servicer_context + self._active_span = active_span + self.code = grpc.StatusCode.OK + self.details = None + super(_OpenTelemetryServicerContext, self).__init__() + + def is_active(self, *args, **kwargs): + return self._servicer_context.is_active(*args, **kwargs) + + def time_remaining(self, *args, **kwargs): + return self._servicer_context.time_remaining(*args, **kwargs) + + def cancel(self, *args, **kwargs): + return self._servicer_context.cancel(*args, **kwargs) + + def add_callback(self, *args, **kwargs): + return self._servicer_context.add_callback(*args, **kwargs) + + def invocation_metadata(self, *args, **kwargs): + return self._servicer_context.invocation_metadata(*args, **kwargs) + + def peer(self, *args, **kwargs): + return self._servicer_context.peer(*args, **kwargs) + + def peer_identities(self, *args, **kwargs): + return self._servicer_context.peer_identities(*args, **kwargs) + + def peer_identity_key(self, *args, **kwargs): + return self._servicer_context.peer_identity_key(*args, **kwargs) + + def auth_context(self, *args, **kwargs): + return self._servicer_context.auth_context(*args, **kwargs) + + def send_initial_metadata(self, *args, **kwargs): + return self._servicer_context.send_initial_metadata(*args, **kwargs) + + def set_trailing_metadata(self, *args, **kwargs): + return self._servicer_context.set_trailing_metadata(*args, **kwargs) + + def abort(self, *args, **kwargs): + if not hasattr(self._servicer_context, "abort"): + raise RuntimeError( + "abort() is not supported with the installed version of grpcio" + ) + return self._servicer_context.abort(*args, **kwargs) + + def abort_with_status(self, *args, **kwargs): + if not hasattr(self._servicer_context, "abort_with_status"): + raise RuntimeError( + "abort_with_status() is not supported with the installed " + "version of grpcio" + ) + return self._servicer_context.abort_with_status(*args, **kwargs) + + def set_code(self, code): + self.code = code + return self._servicer_context.set_code(code) + + def set_details(self, details): + self.details = details + return self._servicer_context.set_details(details) + + +# On the service-side, errors can be signaled either by exceptions or by +# calling `set_code` on the `servicer_context`. This function checks for the +# latter and updates the span accordingly. +# pylint:disable=unused-argument +def _check_error_code(span, servicer_context, rpc_info): + if servicer_context.code != grpc.StatusCode.OK: + rpc_info.error = servicer_context.code + + +class OpenTelemetryServerInterceptor( + grpcext.UnaryServerInterceptor, grpcext.StreamServerInterceptor +): + def __init__(self, tracer): + self._tracer = tracer + + @contextmanager + # pylint:disable=no-self-use + def _set_remote_context(self, servicer_context): + metadata = servicer_context.invocation_metadata() + if metadata: + md_dict = {md.key: md.value for md in metadata} + + def get_from_grpc_metadata(metadata, key) -> List[str]: + return [md_dict[key]] if key in md_dict else [] + + # Update the context with the traceparent from the RPC metadata. + ctx = propagators.extract(get_from_grpc_metadata, metadata) + token = attach(ctx) + try: + yield + finally: + detach(token) + else: + yield + + def _start_span(self, method): + span = self._tracer.start_as_current_span( + name=method, kind=trace.SpanKind.SERVER + ) + return span + + def intercept_unary(self, request, servicer_context, server_info, handler): + + with self._set_remote_context(servicer_context): + with self._start_span(server_info.full_method) as span: + rpc_info = RpcInfo( + full_method=server_info.full_method, + metadata=servicer_context.invocation_metadata(), + timeout=servicer_context.time_remaining(), + request=request, + ) + servicer_context = _OpenTelemetryServicerContext( + servicer_context, span + ) + response = handler(request, servicer_context) + + _check_error_code(span, servicer_context, rpc_info) + + rpc_info.response = response + + return response + + # For RPCs that stream responses, the result can be a generator. To record + # the span across the generated responses and detect any errors, we wrap + # the result in a new generator that yields the response values. + def _intercept_server_stream( + self, request_or_iterator, servicer_context, server_info, handler + ): + with self._set_remote_context(servicer_context): + with self._start_span(server_info.full_method) as span: + rpc_info = RpcInfo( + full_method=server_info.full_method, + metadata=servicer_context.invocation_metadata(), + timeout=servicer_context.time_remaining(), + ) + if not server_info.is_client_stream: + rpc_info.request = request_or_iterator + servicer_context = _OpenTelemetryServicerContext( + servicer_context, span + ) + result = handler(request_or_iterator, servicer_context) + for response in result: + yield response + _check_error_code(span, servicer_context, rpc_info) + + def intercept_stream( + self, request_or_iterator, servicer_context, server_info, handler + ): + if server_info.is_server_stream: + return self._intercept_server_stream( + request_or_iterator, servicer_context, server_info, handler + ) + with self._set_remote_context(servicer_context): + with self._start_span(server_info.full_method) as span: + rpc_info = RpcInfo( + full_method=server_info.full_method, + metadata=servicer_context.invocation_metadata(), + timeout=servicer_context.time_remaining(), + ) + servicer_context = _OpenTelemetryServicerContext( + servicer_context, span + ) + response = handler(request_or_iterator, servicer_context) + _check_error_code(span, servicer_context, rpc_info) + rpc_info.response = response + return response diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_utilities.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_utilities.py new file mode 100644 index 000000000..a57763558 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_utilities.py @@ -0,0 +1,107 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Internal utilities.""" + +from contextlib import contextmanager +from time import time + +import grpc + +from opentelemetry.sdk.metrics import Counter, ValueRecorder + + +class RpcInfo: + def __init__( + self, + full_method=None, + metadata=None, + timeout=None, + request=None, + response=None, + error=None, + ): + self.full_method = full_method + self.metadata = metadata + self.timeout = timeout + self.request = request + self.response = response + self.error = error + + +class TimedMetricRecorder: + def __init__(self, meter, span_kind): + self._meter = meter + service_name = "grpcio" + self._span_kind = span_kind + + if self._meter: + self._duration = self._meter.create_metric( + name="{}/{}/duration".format(service_name, span_kind), + description="Duration of grpc requests to the server", + unit="ms", + value_type=float, + metric_type=ValueRecorder, + ) + self._error_count = self._meter.create_metric( + name="{}/{}/errors".format(service_name, span_kind), + description="Number of errors that were returned from the server", + unit="1", + value_type=int, + metric_type=Counter, + ) + self._bytes_in = self._meter.create_metric( + name="{}/{}/bytes_in".format(service_name, span_kind), + description="Number of bytes received from the server", + unit="by", + value_type=int, + metric_type=Counter, + ) + self._bytes_out = self._meter.create_metric( + name="{}/{}/bytes_out".format(service_name, span_kind), + description="Number of bytes sent out through gRPC", + unit="by", + value_type=int, + metric_type=Counter, + ) + + def record_bytes_in(self, bytes_in, method): + if self._meter: + labels = {"method": method} + self._bytes_in.add(bytes_in, labels) + + def record_bytes_out(self, bytes_out, method): + if self._meter: + labels = {"method": method} + self._bytes_out.add(bytes_out, labels) + + @contextmanager + def record_latency(self, method): + start_time = time() + labels = {"method": method, "status_code": grpc.StatusCode.OK} + try: + yield labels + except grpc.RpcError as exc: + if self._meter: + # pylint: disable=no-member + labels["status_code"] = exc.code() + self._error_count.add(1, labels) + labels["error"] = True + raise + finally: + if self._meter: + if "error" not in labels: + labels["error"] = False + elapsed_time = (time() - start_time) * 1000 + self._duration.record(elapsed_time, labels) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/__init__.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/__init__.py new file mode 100644 index 000000000..fe83467a7 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/__init__.py @@ -0,0 +1,216 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint:disable=import-outside-toplevel +# pylint:disable=import-self +# pylint:disable=no-name-in-module + +import abc + + +class UnaryClientInfo(abc.ABC): + """Consists of various information about a unary RPC on the + invocation-side. + + Attributes: + full_method: A string of the full RPC method, i.e., + /package.service/method. + timeout: The length of time in seconds to wait for the computation to + terminate or be cancelled, or None if this method should block until + the computation is terminated or is cancelled no matter how long that + takes. + """ + + +class StreamClientInfo(abc.ABC): + """Consists of various information about a stream RPC on the + invocation-side. + + Attributes: + full_method: A string of the full RPC method, i.e., + /package.service/method. + is_client_stream: Indicates whether the RPC is client-streaming. + is_server_stream: Indicates whether the RPC is server-streaming. + timeout: The length of time in seconds to wait for the computation to + terminate or be cancelled, or None if this method should block until + the computation is terminated or is cancelled no matter how long that + takes. + """ + + +class UnaryClientInterceptor(abc.ABC): + """Affords intercepting unary-unary RPCs on the invocation-side.""" + + @abc.abstractmethod + def intercept_unary(self, request, metadata, client_info, invoker): + """Intercepts unary-unary RPCs on the invocation-side. + + Args: + request: The request value for the RPC. + metadata: Optional :term:`metadata` to be transmitted to the + service-side of the RPC. + client_info: A UnaryClientInfo containing various information about + the RPC. + invoker: The handler to complete the RPC on the client. It is the + interceptor's responsibility to call it. + + Returns: + The result from calling invoker(request, metadata). + """ + raise NotImplementedError() + + +class StreamClientInterceptor(abc.ABC): + """Affords intercepting stream RPCs on the invocation-side.""" + + @abc.abstractmethod + def intercept_stream( + self, request_or_iterator, metadata, client_info, invoker + ): + """Intercepts stream RPCs on the invocation-side. + + Args: + request_or_iterator: The request value for the RPC if + `client_info.is_client_stream` is `false`; otherwise, an iterator of + request values. + metadata: Optional :term:`metadata` to be transmitted to the service-side + of the RPC. + client_info: A StreamClientInfo containing various information about + the RPC. + invoker: The handler to complete the RPC on the client. It is the + interceptor's responsibility to call it. + + Returns: + The result from calling invoker(metadata). + """ + raise NotImplementedError() + + +def intercept_channel(channel, *interceptors): + """Creates an intercepted channel. + + Args: + channel: A Channel. + interceptors: Zero or more UnaryClientInterceptors or + StreamClientInterceptors + + Returns: + A Channel. + + Raises: + TypeError: If an interceptor derives from neither UnaryClientInterceptor + nor StreamClientInterceptor. + """ + from . import _interceptor + + return _interceptor.intercept_channel(channel, *interceptors) + + +class UnaryServerInfo(abc.ABC): + """Consists of various information about a unary RPC on the service-side. + + Attributes: + full_method: A string of the full RPC method, i.e., + /package.service/method. + """ + + +class StreamServerInfo(abc.ABC): + """Consists of various information about a stream RPC on the service-side. + + Attributes: + full_method: A string of the full RPC method, i.e., + /package.service/method. + is_client_stream: Indicates whether the RPC is client-streaming. + is_server_stream: Indicates whether the RPC is server-streaming. + """ + + +class UnaryServerInterceptor(abc.ABC): + """Affords intercepting unary-unary RPCs on the service-side.""" + + @abc.abstractmethod + def intercept_unary(self, request, servicer_context, server_info, handler): + """Intercepts unary-unary RPCs on the service-side. + + Args: + request: The request value for the RPC. + servicer_context: A ServicerContext. + server_info: A UnaryServerInfo containing various information about + the RPC. + handler: The handler to complete the RPC on the server. It is the + interceptor's responsibility to call it. + + Returns: + The result from calling handler(request, servicer_context). + """ + raise NotImplementedError() + + +class StreamServerInterceptor(abc.ABC): + """Affords intercepting stream RPCs on the service-side.""" + + @abc.abstractmethod + def intercept_stream( + self, request_or_iterator, servicer_context, server_info, handler + ): + """Intercepts stream RPCs on the service-side. + + Args: + request_or_iterator: The request value for the RPC if + `server_info.is_client_stream` is `False`; otherwise, an iterator of + request values. + servicer_context: A ServicerContext. + server_info: A StreamServerInfo containing various information about + the RPC. + handler: The handler to complete the RPC on the server. It is the + interceptor's responsibility to call it. + + Returns: + The result from calling handler(servicer_context). + """ + raise NotImplementedError() + + +def intercept_server(server, *interceptors): + """Creates an intercepted server. + + Args: + server: A Server. + interceptors: Zero or more UnaryServerInterceptors or + StreamServerInterceptors + + Returns: + A Server. + + Raises: + TypeError: If an interceptor derives from neither UnaryServerInterceptor + nor StreamServerInterceptor. + """ + from . import _interceptor + + return _interceptor.intercept_server(server, *interceptors) + + +__all__ = ( + "UnaryClientInterceptor", + "StreamClientInfo", + "StreamClientInterceptor", + "UnaryServerInfo", + "StreamServerInfo", + "UnaryServerInterceptor", + "StreamServerInterceptor", + "intercept_channel", + "intercept_server", +) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/_interceptor.py new file mode 100644 index 000000000..74861913b --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/_interceptor.py @@ -0,0 +1,431 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint:disable=relative-beyond-top-level +# pylint:disable=arguments-differ +# pylint:disable=no-member +# pylint:disable=signature-differs + +"""Implementation of gRPC Python interceptors.""" + + +import collections + +import grpc + +from .. import grpcext + + +class _UnaryClientInfo( + collections.namedtuple("_UnaryClientInfo", ("full_method", "timeout")) +): + pass + + +class _StreamClientInfo( + collections.namedtuple( + "_StreamClientInfo", + ("full_method", "is_client_stream", "is_server_stream", "timeout"), + ) +): + pass + + +class _InterceptorUnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): + def __init__(self, method, base_callable, interceptor): + self._method = method + self._base_callable = base_callable + self._interceptor = interceptor + + def __call__(self, request, timeout=None, metadata=None, credentials=None): + def invoker(request, metadata): + return self._base_callable(request, timeout, metadata, credentials) + + client_info = _UnaryClientInfo(self._method, timeout) + return self._interceptor.intercept_unary( + request, metadata, client_info, invoker + ) + + def with_call( + self, request, timeout=None, metadata=None, credentials=None + ): + def invoker(request, metadata): + return self._base_callable.with_call( + request, timeout, metadata, credentials + ) + + client_info = _UnaryClientInfo(self._method, timeout) + return self._interceptor.intercept_unary( + request, metadata, client_info, invoker + ) + + def future(self, request, timeout=None, metadata=None, credentials=None): + def invoker(request, metadata): + return self._base_callable.future( + request, timeout, metadata, credentials + ) + + client_info = _UnaryClientInfo(self._method, timeout) + return self._interceptor.intercept_unary( + request, metadata, client_info, invoker + ) + + +class _InterceptorUnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): + def __init__(self, method, base_callable, interceptor): + self._method = method + self._base_callable = base_callable + self._interceptor = interceptor + + def __call__(self, request, timeout=None, metadata=None, credentials=None): + def invoker(request, metadata): + return self._base_callable(request, timeout, metadata, credentials) + + client_info = _StreamClientInfo(self._method, False, True, timeout) + return self._interceptor.intercept_stream( + request, metadata, client_info, invoker + ) + + +class _InterceptorStreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): + def __init__(self, method, base_callable, interceptor): + self._method = method + self._base_callable = base_callable + self._interceptor = interceptor + + def __call__( + self, request_iterator, timeout=None, metadata=None, credentials=None + ): + def invoker(request_iterator, metadata): + return self._base_callable( + request_iterator, timeout, metadata, credentials + ) + + client_info = _StreamClientInfo(self._method, True, False, timeout) + return self._interceptor.intercept_stream( + request_iterator, metadata, client_info, invoker + ) + + def with_call( + self, request_iterator, timeout=None, metadata=None, credentials=None + ): + def invoker(request_iterator, metadata): + return self._base_callable.with_call( + request_iterator, timeout, metadata, credentials + ) + + client_info = _StreamClientInfo(self._method, True, False, timeout) + return self._interceptor.intercept_stream( + request_iterator, metadata, client_info, invoker + ) + + def future( + self, request_iterator, timeout=None, metadata=None, credentials=None + ): + def invoker(request_iterator, metadata): + return self._base_callable.future( + request_iterator, timeout, metadata, credentials + ) + + client_info = _StreamClientInfo(self._method, True, False, timeout) + return self._interceptor.intercept_stream( + request_iterator, metadata, client_info, invoker + ) + + +class _InterceptorStreamStreamMultiCallable(grpc.StreamStreamMultiCallable): + def __init__(self, method, base_callable, interceptor): + self._method = method + self._base_callable = base_callable + self._interceptor = interceptor + + def __call__( + self, request_iterator, timeout=None, metadata=None, credentials=None + ): + def invoker(request_iterator, metadata): + return self._base_callable( + request_iterator, timeout, metadata, credentials + ) + + client_info = _StreamClientInfo(self._method, True, True, timeout) + return self._interceptor.intercept_stream( + request_iterator, metadata, client_info, invoker + ) + + +class _InterceptorChannel(grpc.Channel): + def __init__(self, channel, interceptor): + self._channel = channel + self._interceptor = interceptor + + def subscribe(self, *args, **kwargs): + self._channel.subscribe(*args, **kwargs) + + def unsubscribe(self, *args, **kwargs): + self._channel.unsubscribe(*args, **kwargs) + + def unary_unary( + self, method, request_serializer=None, response_deserializer=None + ): + base_callable = self._channel.unary_unary( + method, request_serializer, response_deserializer + ) + if isinstance(self._interceptor, grpcext.UnaryClientInterceptor): + return _InterceptorUnaryUnaryMultiCallable( + method, base_callable, self._interceptor + ) + return base_callable + + def unary_stream( + self, method, request_serializer=None, response_deserializer=None + ): + base_callable = self._channel.unary_stream( + method, request_serializer, response_deserializer + ) + if isinstance(self._interceptor, grpcext.StreamClientInterceptor): + return _InterceptorUnaryStreamMultiCallable( + method, base_callable, self._interceptor + ) + return base_callable + + def stream_unary( + self, method, request_serializer=None, response_deserializer=None + ): + base_callable = self._channel.stream_unary( + method, request_serializer, response_deserializer + ) + if isinstance(self._interceptor, grpcext.StreamClientInterceptor): + return _InterceptorStreamUnaryMultiCallable( + method, base_callable, self._interceptor + ) + return base_callable + + def stream_stream( + self, method, request_serializer=None, response_deserializer=None + ): + base_callable = self._channel.stream_stream( + method, request_serializer, response_deserializer + ) + if isinstance(self._interceptor, grpcext.StreamClientInterceptor): + return _InterceptorStreamStreamMultiCallable( + method, base_callable, self._interceptor + ) + return base_callable + + def close(self): + if not hasattr(self._channel, "close"): + raise RuntimeError( + "close() is not supported with the installed version of grpcio" + ) + self._channel.close() + + def __enter__(self): + """Enters the runtime context related to the channel object.""" + raise NotImplementedError() + + def __exit__(self, exc_type, exc_val, exc_tb): + """Exits the runtime context related to the channel object.""" + raise NotImplementedError() + + +def intercept_channel(channel, *interceptors): + result = channel + for interceptor in interceptors: + if not isinstance( + interceptor, grpcext.UnaryClientInterceptor + ) and not isinstance(interceptor, grpcext.StreamClientInterceptor): + raise TypeError( + "interceptor must be either a " + "grpcext.UnaryClientInterceptor or a " + "grpcext.StreamClientInterceptor" + ) + result = _InterceptorChannel(result, interceptor) + return result + + +class _UnaryServerInfo( + collections.namedtuple("_UnaryServerInfo", ("full_method",)) +): + pass + + +class _StreamServerInfo( + collections.namedtuple( + "_StreamServerInfo", + ("full_method", "is_client_stream", "is_server_stream"), + ) +): + pass + + +class _InterceptorRpcMethodHandler(grpc.RpcMethodHandler): + def __init__(self, rpc_method_handler, method, interceptor): + self._rpc_method_handler = rpc_method_handler + self._method = method + self._interceptor = interceptor + + @property + def request_streaming(self): + return self._rpc_method_handler.request_streaming + + @property + def response_streaming(self): + return self._rpc_method_handler.response_streaming + + @property + def request_deserializer(self): + return self._rpc_method_handler.request_deserializer + + @property + def response_serializer(self): + return self._rpc_method_handler.response_serializer + + @property + def unary_unary(self): + if not isinstance(self._interceptor, grpcext.UnaryServerInterceptor): + return self._rpc_method_handler.unary_unary + + def adaptation(request, servicer_context): + def handler(request, servicer_context): + return self._rpc_method_handler.unary_unary( + request, servicer_context + ) + + return self._interceptor.intercept_unary( + request, + servicer_context, + _UnaryServerInfo(self._method), + handler, + ) + + return adaptation + + @property + def unary_stream(self): + if not isinstance(self._interceptor, grpcext.StreamServerInterceptor): + return self._rpc_method_handler.unary_stream + + def adaptation(request, servicer_context): + def handler(request, servicer_context): + return self._rpc_method_handler.unary_stream( + request, servicer_context + ) + + return self._interceptor.intercept_stream( + request, + servicer_context, + _StreamServerInfo(self._method, False, True), + handler, + ) + + return adaptation + + @property + def stream_unary(self): + if not isinstance(self._interceptor, grpcext.StreamServerInterceptor): + return self._rpc_method_handler.stream_unary + + def adaptation(request_iterator, servicer_context): + def handler(request_iterator, servicer_context): + return self._rpc_method_handler.stream_unary( + request_iterator, servicer_context + ) + + return self._interceptor.intercept_stream( + request_iterator, + servicer_context, + _StreamServerInfo(self._method, True, False), + handler, + ) + + return adaptation + + @property + def stream_stream(self): + if not isinstance(self._interceptor, grpcext.StreamServerInterceptor): + return self._rpc_method_handler.stream_stream + + def adaptation(request_iterator, servicer_context): + def handler(request_iterator, servicer_context): + return self._rpc_method_handler.stream_stream( + request_iterator, servicer_context + ) + + return self._interceptor.intercept_stream( + request_iterator, + servicer_context, + _StreamServerInfo(self._method, True, True), + handler, + ) + + return adaptation + + +class _InterceptorGenericRpcHandler(grpc.GenericRpcHandler): + def __init__(self, generic_rpc_handler, interceptor): + self.generic_rpc_handler = generic_rpc_handler + self._interceptor = interceptor + + def service(self, handler_call_details): + result = self.generic_rpc_handler.service(handler_call_details) + if result: + result = _InterceptorRpcMethodHandler( + result, handler_call_details.method, self._interceptor + ) + return result + + +class _InterceptorServer(grpc.Server): + def __init__(self, server, interceptor): + self._server = server + self._interceptor = interceptor + + def add_generic_rpc_handlers(self, generic_rpc_handlers): + generic_rpc_handlers = [ + _InterceptorGenericRpcHandler( + generic_rpc_handler, self._interceptor + ) + for generic_rpc_handler in generic_rpc_handlers + ] + return self._server.add_generic_rpc_handlers(generic_rpc_handlers) + + def add_insecure_port(self, *args, **kwargs): + return self._server.add_insecure_port(*args, **kwargs) + + def add_secure_port(self, *args, **kwargs): + return self._server.add_secure_port(*args, **kwargs) + + def start(self, *args, **kwargs): + return self._server.start(*args, **kwargs) + + def stop(self, *args, **kwargs): + return self._server.stop(*args, **kwargs) + + def wait_for_termination(self, *args, **kwargs): + return self._server.wait_for_termination(*args, **kwargs) + + +def intercept_server(server, *interceptors): + result = server + for interceptor in interceptors: + if not isinstance( + interceptor, grpcext.UnaryServerInterceptor + ) and not isinstance(interceptor, grpcext.StreamServerInterceptor): + raise TypeError( + "interceptor must be either a " + "grpcext.UnaryServerInterceptor or a " + "grpcext.StreamServerInterceptor" + ) + result = _InterceptorServer(result, interceptor) + return result diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/version.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/version.py new file mode 100644 index 000000000..780a92b6a --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/version.py @@ -0,0 +1,15 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +__version__ = "0.12.dev0" diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/__init__.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/__init__.py new file mode 100644 index 000000000..b0a6f4284 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/__init__.py @@ -0,0 +1,13 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/_client.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/_client.py new file mode 100644 index 000000000..43310b5f6 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/_client.py @@ -0,0 +1,57 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .protobuf.test_server_pb2 import Request + +CLIENT_ID = 1 + + +def simple_method(stub, error=False): + request = Request( + client_id=CLIENT_ID, request_data="error" if error else "data" + ) + stub.SimpleMethod(request) + + +def client_streaming_method(stub, error=False): + # create a generator + def request_messages(): + for _ in range(5): + request = Request( + client_id=CLIENT_ID, request_data="error" if error else "data" + ) + yield request + + stub.ClientStreamingMethod(request_messages()) + + +def server_streaming_method(stub, error=False): + request = Request( + client_id=CLIENT_ID, request_data="error" if error else "data" + ) + response_iterator = stub.ServerStreamingMethod(request) + list(response_iterator) + + +def bidirectional_streaming_method(stub, error=False): + def request_messages(): + for _ in range(5): + request = Request( + client_id=CLIENT_ID, request_data="error" if error else "data" + ) + yield request + + response_iterator = stub.BidirectionalStreamingMethod(request_messages()) + + list(response_iterator) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/_server.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/_server.py new file mode 100644 index 000000000..a4e1c266b --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/_server.py @@ -0,0 +1,87 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from concurrent import futures + +import grpc + +from .protobuf import test_server_pb2, test_server_pb2_grpc + +SERVER_ID = 1 + + +class TestServer(test_server_pb2_grpc.GRPCTestServerServicer): + def SimpleMethod(self, request, context): + if request.request_data == "error": + context.set_code(grpc.StatusCode.INVALID_ARGUMENT) + return test_server_pb2.Response() + response = test_server_pb2.Response( + server_id=SERVER_ID, response_data="data" + ) + return response + + def ClientStreamingMethod(self, request_iterator, context): + data = list(request_iterator) + if data[0].request_data == "error": + context.set_code(grpc.StatusCode.INVALID_ARGUMENT) + return test_server_pb2.Response() + response = test_server_pb2.Response( + server_id=SERVER_ID, response_data="data" + ) + return response + + def ServerStreamingMethod(self, request, context): + if request.request_data == "error": + + context.abort( + code=grpc.StatusCode.INVALID_ARGUMENT, + details="server stream error", + ) + return test_server_pb2.Response() + + # create a generator + def response_messages(): + for _ in range(5): + response = test_server_pb2.Response( + server_id=SERVER_ID, response_data="data" + ) + yield response + + return response_messages() + + def BidirectionalStreamingMethod(self, request_iterator, context): + data = list(request_iterator) + if data[0].request_data == "error": + context.abort( + code=grpc.StatusCode.INVALID_ARGUMENT, + details="bidirectional error", + ) + return + + for _ in range(5): + yield test_server_pb2.Response( + server_id=SERVER_ID, response_data="data" + ) + + +def create_test_server(port): + server = grpc.server(futures.ThreadPoolExecutor(max_workers=1)) + + test_server_pb2_grpc.add_GRPCTestServerServicer_to_server( + TestServer(), server + ) + + server.add_insecure_port("localhost:{}".format(port)) + + return server diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/protobuf/test_server.proto b/instrumentation/opentelemetry-instrumentation-grpc/tests/protobuf/test_server.proto new file mode 100644 index 000000000..790a7675d --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/protobuf/test_server.proto @@ -0,0 +1,34 @@ +// Copyright 2019 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +syntax = "proto3"; + +message Request { + int64 client_id = 1; + string request_data = 2; +} + +message Response { + int64 server_id = 1; + string response_data = 2; +} + +service GRPCTestServer { + rpc SimpleMethod (Request) returns (Response); + + rpc ClientStreamingMethod (stream Request) returns (Response); + + rpc ServerStreamingMethod (Request) returns (stream Response); + + rpc BidirectionalStreamingMethod (stream Request) returns (stream Response); +} diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/protobuf/test_server_pb2.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/protobuf/test_server_pb2.py new file mode 100644 index 000000000..735206f85 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/protobuf/test_server_pb2.py @@ -0,0 +1,215 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: test_server.proto + +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database + +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +DESCRIPTOR = _descriptor.FileDescriptor( + name="test_server.proto", + package="", + syntax="proto3", + serialized_options=None, + serialized_pb=b'\n\x11test_server.proto"2\n\x07Request\x12\x11\n\tclient_id\x18\x01 \x01(\x03\x12\x14\n\x0crequest_data\x18\x02 \x01(\t"4\n\x08Response\x12\x11\n\tserver_id\x18\x01 \x01(\x03\x12\x15\n\rresponse_data\x18\x02 \x01(\t2\xce\x01\n\x0eGRPCTestServer\x12#\n\x0cSimpleMethod\x12\x08.Request\x1a\t.Response\x12.\n\x15\x43lientStreamingMethod\x12\x08.Request\x1a\t.Response(\x01\x12.\n\x15ServerStreamingMethod\x12\x08.Request\x1a\t.Response0\x01\x12\x37\n\x1c\x42idirectionalStreamingMethod\x12\x08.Request\x1a\t.Response(\x01\x30\x01\x62\x06proto3', +) + + +_REQUEST = _descriptor.Descriptor( + name="Request", + full_name="Request", + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name="client_id", + full_name="Request.client_id", + index=0, + number=1, + type=3, + cpp_type=2, + label=1, + has_default_value=False, + default_value=0, + message_type=None, + enum_type=None, + containing_type=None, + is_extension=False, + extension_scope=None, + serialized_options=None, + file=DESCRIPTOR, + ), + _descriptor.FieldDescriptor( + name="request_data", + full_name="Request.request_data", + index=1, + number=2, + type=9, + cpp_type=9, + label=1, + has_default_value=False, + default_value=b"".decode("utf-8"), + message_type=None, + enum_type=None, + containing_type=None, + is_extension=False, + extension_scope=None, + serialized_options=None, + file=DESCRIPTOR, + ), + ], + extensions=[], + nested_types=[], + enum_types=[], + serialized_options=None, + is_extendable=False, + syntax="proto3", + extension_ranges=[], + oneofs=[], + serialized_start=21, + serialized_end=71, +) + + +_RESPONSE = _descriptor.Descriptor( + name="Response", + full_name="Response", + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name="server_id", + full_name="Response.server_id", + index=0, + number=1, + type=3, + cpp_type=2, + label=1, + has_default_value=False, + default_value=0, + message_type=None, + enum_type=None, + containing_type=None, + is_extension=False, + extension_scope=None, + serialized_options=None, + file=DESCRIPTOR, + ), + _descriptor.FieldDescriptor( + name="response_data", + full_name="Response.response_data", + index=1, + number=2, + type=9, + cpp_type=9, + label=1, + has_default_value=False, + default_value=b"".decode("utf-8"), + message_type=None, + enum_type=None, + containing_type=None, + is_extension=False, + extension_scope=None, + serialized_options=None, + file=DESCRIPTOR, + ), + ], + extensions=[], + nested_types=[], + enum_types=[], + serialized_options=None, + is_extendable=False, + syntax="proto3", + extension_ranges=[], + oneofs=[], + serialized_start=73, + serialized_end=125, +) + +DESCRIPTOR.message_types_by_name["Request"] = _REQUEST +DESCRIPTOR.message_types_by_name["Response"] = _RESPONSE +_sym_db.RegisterFileDescriptor(DESCRIPTOR) + +Request = _reflection.GeneratedProtocolMessageType( + "Request", + (_message.Message,), + { + "DESCRIPTOR": _REQUEST, + "__module__": "test_server_pb2" + # @@protoc_insertion_point(class_scope:Request) + }, +) +_sym_db.RegisterMessage(Request) + +Response = _reflection.GeneratedProtocolMessageType( + "Response", + (_message.Message,), + { + "DESCRIPTOR": _RESPONSE, + "__module__": "test_server_pb2" + # @@protoc_insertion_point(class_scope:Response) + }, +) +_sym_db.RegisterMessage(Response) + + +_GRPCTESTSERVER = _descriptor.ServiceDescriptor( + name="GRPCTestServer", + full_name="GRPCTestServer", + file=DESCRIPTOR, + index=0, + serialized_options=None, + serialized_start=128, + serialized_end=334, + methods=[ + _descriptor.MethodDescriptor( + name="SimpleMethod", + full_name="GRPCTestServer.SimpleMethod", + index=0, + containing_service=None, + input_type=_REQUEST, + output_type=_RESPONSE, + serialized_options=None, + ), + _descriptor.MethodDescriptor( + name="ClientStreamingMethod", + full_name="GRPCTestServer.ClientStreamingMethod", + index=1, + containing_service=None, + input_type=_REQUEST, + output_type=_RESPONSE, + serialized_options=None, + ), + _descriptor.MethodDescriptor( + name="ServerStreamingMethod", + full_name="GRPCTestServer.ServerStreamingMethod", + index=2, + containing_service=None, + input_type=_REQUEST, + output_type=_RESPONSE, + serialized_options=None, + ), + _descriptor.MethodDescriptor( + name="BidirectionalStreamingMethod", + full_name="GRPCTestServer.BidirectionalStreamingMethod", + index=3, + containing_service=None, + input_type=_REQUEST, + output_type=_RESPONSE, + serialized_options=None, + ), + ], +) +_sym_db.RegisterServiceDescriptor(_GRPCTESTSERVER) + +DESCRIPTOR.services_by_name["GRPCTestServer"] = _GRPCTESTSERVER + +# @@protoc_insertion_point(module_scope) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/protobuf/test_server_pb2_grpc.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/protobuf/test_server_pb2_grpc.py new file mode 100644 index 000000000..d0a6fd518 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/protobuf/test_server_pb2_grpc.py @@ -0,0 +1,205 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +import grpc + +from tests.protobuf import test_server_pb2 as test__server__pb2 + + +class GRPCTestServerStub(object): + """Missing associated documentation comment in .proto file""" + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.SimpleMethod = channel.unary_unary( + "/GRPCTestServer/SimpleMethod", + request_serializer=test__server__pb2.Request.SerializeToString, + response_deserializer=test__server__pb2.Response.FromString, + ) + self.ClientStreamingMethod = channel.stream_unary( + "/GRPCTestServer/ClientStreamingMethod", + request_serializer=test__server__pb2.Request.SerializeToString, + response_deserializer=test__server__pb2.Response.FromString, + ) + self.ServerStreamingMethod = channel.unary_stream( + "/GRPCTestServer/ServerStreamingMethod", + request_serializer=test__server__pb2.Request.SerializeToString, + response_deserializer=test__server__pb2.Response.FromString, + ) + self.BidirectionalStreamingMethod = channel.stream_stream( + "/GRPCTestServer/BidirectionalStreamingMethod", + request_serializer=test__server__pb2.Request.SerializeToString, + response_deserializer=test__server__pb2.Response.FromString, + ) + + +class GRPCTestServerServicer(object): + """Missing associated documentation comment in .proto file""" + + def SimpleMethod(self, request, context): + """Missing associated documentation comment in .proto file""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + + def ClientStreamingMethod(self, request_iterator, context): + """Missing associated documentation comment in .proto file""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + + def ServerStreamingMethod(self, request, context): + """Missing associated documentation comment in .proto file""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + + def BidirectionalStreamingMethod(self, request_iterator, context): + """Missing associated documentation comment in .proto file""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + + +def add_GRPCTestServerServicer_to_server(servicer, server): + rpc_method_handlers = { + "SimpleMethod": grpc.unary_unary_rpc_method_handler( + servicer.SimpleMethod, + request_deserializer=test__server__pb2.Request.FromString, + response_serializer=test__server__pb2.Response.SerializeToString, + ), + "ClientStreamingMethod": grpc.stream_unary_rpc_method_handler( + servicer.ClientStreamingMethod, + request_deserializer=test__server__pb2.Request.FromString, + response_serializer=test__server__pb2.Response.SerializeToString, + ), + "ServerStreamingMethod": grpc.unary_stream_rpc_method_handler( + servicer.ServerStreamingMethod, + request_deserializer=test__server__pb2.Request.FromString, + response_serializer=test__server__pb2.Response.SerializeToString, + ), + "BidirectionalStreamingMethod": grpc.stream_stream_rpc_method_handler( + servicer.BidirectionalStreamingMethod, + request_deserializer=test__server__pb2.Request.FromString, + response_serializer=test__server__pb2.Response.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + "GRPCTestServer", rpc_method_handlers + ) + server.add_generic_rpc_handlers((generic_handler,)) + + +# This class is part of an EXPERIMENTAL API. +class GRPCTestServer(object): + """Missing associated documentation comment in .proto file""" + + @staticmethod + def SimpleMethod( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_unary( + request, + target, + "/GRPCTestServer/SimpleMethod", + test__server__pb2.Request.SerializeToString, + test__server__pb2.Response.FromString, + options, + channel_credentials, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) + + @staticmethod + def ClientStreamingMethod( + request_iterator, + target, + options=(), + channel_credentials=None, + call_credentials=None, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.stream_unary( + request_iterator, + target, + "/GRPCTestServer/ClientStreamingMethod", + test__server__pb2.Request.SerializeToString, + test__server__pb2.Response.FromString, + options, + channel_credentials, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) + + @staticmethod + def ServerStreamingMethod( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_stream( + request, + target, + "/GRPCTestServer/ServerStreamingMethod", + test__server__pb2.Request.SerializeToString, + test__server__pb2.Response.FromString, + options, + channel_credentials, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) + + @staticmethod + def BidirectionalStreamingMethod( + request_iterator, + target, + options=(), + channel_credentials=None, + call_credentials=None, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.stream_stream( + request_iterator, + target, + "/GRPCTestServer/BidirectionalStreamingMethod", + test__server__pb2.Request.SerializeToString, + test__server__pb2.Response.FromString, + options, + channel_credentials, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py new file mode 100644 index 000000000..458f32e04 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py @@ -0,0 +1,298 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import grpc + +import opentelemetry.instrumentation.grpc +from opentelemetry import trace +from opentelemetry.instrumentation.grpc import GrpcInstrumentorClient +from opentelemetry.sdk.metrics.export.aggregate import ( + MinMaxSumCountAggregator, + SumAggregator, +) +from opentelemetry.test.test_base import TestBase +from tests.protobuf import test_server_pb2_grpc + +from ._client import ( + bidirectional_streaming_method, + client_streaming_method, + server_streaming_method, + simple_method, +) +from ._server import create_test_server + + +class TestClientProto(TestBase): + def setUp(self): + super().setUp() + GrpcInstrumentorClient().instrument( + exporter=self.memory_metrics_exporter + ) + self.server = create_test_server(25565) + self.server.start() + self.channel = grpc.insecure_channel("localhost:25565") + self._stub = test_server_pb2_grpc.GRPCTestServerStub(self.channel) + + def tearDown(self): + super().tearDown() + GrpcInstrumentorClient().uninstrument() + self.memory_metrics_exporter.clear() + self.server.stop(None) + + def _verify_success_records(self, num_bytes_out, num_bytes_in, method): + # pylint: disable=protected-access,no-member + self.channel._interceptor.controller.tick() + records = self.memory_metrics_exporter.get_exported_metrics() + self.assertEqual(len(records), 3) + + bytes_out = None + bytes_in = None + duration = None + + for record in records: + if record.instrument.name == "grpcio/client/duration": + duration = record + elif record.instrument.name == "grpcio/client/bytes_out": + bytes_out = record + elif record.instrument.name == "grpcio/client/bytes_in": + bytes_in = record + + self.assertIsNotNone(bytes_out) + self.assertEqual(bytes_out.instrument.name, "grpcio/client/bytes_out") + self.assertEqual(bytes_out.labels, (("method", method),)) + + self.assertIsNotNone(bytes_in) + self.assertEqual(bytes_in.instrument.name, "grpcio/client/bytes_in") + self.assertEqual(bytes_in.labels, (("method", method),)) + + self.assertIsNotNone(duration) + self.assertEqual(duration.instrument.name, "grpcio/client/duration") + self.assertEqual( + duration.labels, + ( + ("error", False), + ("method", method), + ("status_code", grpc.StatusCode.OK), + ), + ) + + self.assertEqual(type(bytes_out.aggregator), SumAggregator) + self.assertEqual(type(bytes_in.aggregator), SumAggregator) + self.assertEqual(type(duration.aggregator), MinMaxSumCountAggregator) + + self.assertEqual(bytes_out.aggregator.checkpoint, num_bytes_out) + self.assertEqual(bytes_in.aggregator.checkpoint, num_bytes_in) + + self.assertEqual(duration.aggregator.checkpoint.count, 1) + self.assertGreaterEqual(duration.aggregator.checkpoint.sum, 0) + + def test_unary_unary(self): + simple_method(self._stub) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + self.assertEqual(span.name, "/GRPCTestServer/SimpleMethod") + self.assertIs(span.kind, trace.SpanKind.CLIENT) + + # Check version and name in span's instrumentation info + self.check_span_instrumentation_info( + span, opentelemetry.instrumentation.grpc + ) + + self._verify_success_records(8, 8, "/GRPCTestServer/SimpleMethod") + + def test_unary_stream(self): + server_streaming_method(self._stub) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + self.assertEqual(span.name, "/GRPCTestServer/ServerStreamingMethod") + self.assertIs(span.kind, trace.SpanKind.CLIENT) + + # Check version and name in span's instrumentation info + self.check_span_instrumentation_info( + span, opentelemetry.instrumentation.grpc + ) + + self._verify_success_records( + 8, 40, "/GRPCTestServer/ServerStreamingMethod" + ) + + def test_stream_unary(self): + client_streaming_method(self._stub) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + self.assertEqual(span.name, "/GRPCTestServer/ClientStreamingMethod") + self.assertIs(span.kind, trace.SpanKind.CLIENT) + + # Check version and name in span's instrumentation info + self.check_span_instrumentation_info( + span, opentelemetry.instrumentation.grpc + ) + + self._verify_success_records( + 40, 8, "/GRPCTestServer/ClientStreamingMethod" + ) + + def test_stream_stream(self): + bidirectional_streaming_method(self._stub) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + self.assertEqual( + span.name, "/GRPCTestServer/BidirectionalStreamingMethod" + ) + self.assertIs(span.kind, trace.SpanKind.CLIENT) + + # Check version and name in span's instrumentation info + self.check_span_instrumentation_info( + span, opentelemetry.instrumentation.grpc + ) + + self._verify_success_records( + 40, 40, "/GRPCTestServer/BidirectionalStreamingMethod" + ) + + def _verify_error_records(self, method): + # pylint: disable=protected-access,no-member + self.channel._interceptor.controller.tick() + records = self.memory_metrics_exporter.get_exported_metrics() + self.assertEqual(len(records), 3) + + bytes_out = None + errors = None + duration = None + + for record in records: + if record.instrument.name == "grpcio/client/duration": + duration = record + elif record.instrument.name == "grpcio/client/bytes_out": + bytes_out = record + elif record.instrument.name == "grpcio/client/errors": + errors = record + + self.assertIsNotNone(bytes_out) + self.assertIsNotNone(errors) + self.assertIsNotNone(duration) + + self.assertEqual(errors.instrument.name, "grpcio/client/errors") + self.assertEqual( + errors.labels, + ( + ("method", method), + ("status_code", grpc.StatusCode.INVALID_ARGUMENT), + ), + ) + self.assertEqual(errors.aggregator.checkpoint, 1) + + self.assertEqual( + duration.labels, + ( + ("error", True), + ("method", method), + ("status_code", grpc.StatusCode.INVALID_ARGUMENT), + ), + ) + + def test_error_simple(self): + with self.assertRaises(grpc.RpcError): + simple_method(self._stub, error=True) + + self._verify_error_records("/GRPCTestServer/SimpleMethod") + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertEqual( + span.status.canonical_code.value, + grpc.StatusCode.INVALID_ARGUMENT.value[0], + ) + + def test_error_stream_unary(self): + with self.assertRaises(grpc.RpcError): + client_streaming_method(self._stub, error=True) + + self._verify_error_records("/GRPCTestServer/ClientStreamingMethod") + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertEqual( + span.status.canonical_code.value, + grpc.StatusCode.INVALID_ARGUMENT.value[0], + ) + + def test_error_unary_stream(self): + with self.assertRaises(grpc.RpcError): + server_streaming_method(self._stub, error=True) + + self._verify_error_records("/GRPCTestServer/ServerStreamingMethod") + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertEqual( + span.status.canonical_code.value, + grpc.StatusCode.INVALID_ARGUMENT.value[0], + ) + + def test_error_stream_stream(self): + with self.assertRaises(grpc.RpcError): + bidirectional_streaming_method(self._stub, error=True) + + self._verify_error_records( + "/GRPCTestServer/BidirectionalStreamingMethod" + ) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertEqual( + span.status.canonical_code.value, + grpc.StatusCode.INVALID_ARGUMENT.value[0], + ) + + +class TestClientNoMetrics(TestBase): + def setUp(self): + super().setUp() + GrpcInstrumentorClient().instrument() + self.server = create_test_server(25565) + self.server.start() + self.channel = grpc.insecure_channel("localhost:25565") + self._stub = test_server_pb2_grpc.GRPCTestServerStub(self.channel) + + def tearDown(self): + super().tearDown() + GrpcInstrumentorClient().uninstrument() + self.memory_metrics_exporter.clear() + self.server.stop(None) + + def test_unary_unary(self): + simple_method(self._stub) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + self.assertEqual(span.name, "/GRPCTestServer/SimpleMethod") + self.assertIs(span.kind, trace.SpanKind.CLIENT) + + # Check version and name in span's instrumentation info + self.check_span_instrumentation_info( + span, opentelemetry.instrumentation.grpc + ) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py new file mode 100644 index 000000000..a41da47ae --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py @@ -0,0 +1,297 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint:disable=unused-argument +# pylint:disable=no-self-use + +import threading +from concurrent import futures + +import grpc + +import opentelemetry.instrumentation.grpc +from opentelemetry import trace +from opentelemetry.instrumentation.grpc import ( + GrpcInstrumentorServer, + server_interceptor, +) +from opentelemetry.instrumentation.grpc.grpcext import intercept_server +from opentelemetry.sdk import trace as trace_sdk +from opentelemetry.test.test_base import TestBase + + +class UnaryUnaryMethodHandler(grpc.RpcMethodHandler): + def __init__(self, handler): + self.request_streaming = False + self.response_streaming = False + self.request_deserializer = None + self.response_serializer = None + self.unary_unary = handler + self.unary_stream = None + self.stream_unary = None + self.stream_stream = None + + +class UnaryUnaryRpcHandler(grpc.GenericRpcHandler): + def __init__(self, handler): + self._unary_unary_handler = handler + + def service(self, handler_call_details): + return UnaryUnaryMethodHandler(self._unary_unary_handler) + + +class TestOpenTelemetryServerInterceptor(TestBase): + def test_instrumentor(self): + def handler(request, context): + return b"" + + grpc_server_instrumentor = GrpcInstrumentorServer() + grpc_server_instrumentor.instrument() + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=1), + options=(("grpc.so_reuseport", 0),), + ) + + server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),)) + + port = server.add_insecure_port("[::]:0") + channel = grpc.insecure_channel("localhost:{:d}".format(port)) + + try: + server.start() + channel.unary_unary("test")(b"test") + finally: + server.stop(None) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + self.assertEqual(span.name, "test") + self.assertIs(span.kind, trace.SpanKind.SERVER) + self.check_span_instrumentation_info( + span, opentelemetry.instrumentation.grpc + ) + grpc_server_instrumentor.uninstrument() + + def test_uninstrument(self): + def handler(request, context): + return b"" + + grpc_server_instrumentor = GrpcInstrumentorServer() + grpc_server_instrumentor.instrument() + grpc_server_instrumentor.uninstrument() + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=1), + options=(("grpc.so_reuseport", 0),), + ) + + server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),)) + + port = server.add_insecure_port("[::]:0") + channel = grpc.insecure_channel("localhost:{:d}".format(port)) + + try: + server.start() + channel.unary_unary("test")(b"test") + finally: + server.stop(None) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 0) + + def test_create_span(self): + """Check that the interceptor wraps calls with spans server-side.""" + + # Intercept gRPC calls... + interceptor = server_interceptor() + + # No-op RPC handler + def handler(request, context): + return b"" + + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=1), + options=(("grpc.so_reuseport", 0),), + ) + # FIXME: grpcext interceptor doesn't apply to handlers passed to server + # init, should use intercept_service API instead. + server = intercept_server(server, interceptor) + server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),)) + + port = server.add_insecure_port("[::]:0") + channel = grpc.insecure_channel("localhost:{:d}".format(port)) + + try: + server.start() + channel.unary_unary("")(b"") + finally: + server.stop(None) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + + self.assertEqual(span.name, "") + self.assertIs(span.kind, trace.SpanKind.SERVER) + + # Check version and name in span's instrumentation info + self.check_span_instrumentation_info( + span, opentelemetry.instrumentation.grpc + ) + + def test_span_lifetime(self): + """Check that the span is active for the duration of the call.""" + + interceptor = server_interceptor() + + # To capture the current span at the time the handler is called + active_span_in_handler = None + + def handler(request, context): + nonlocal active_span_in_handler + active_span_in_handler = trace.get_current_span() + return b"" + + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=1), + options=(("grpc.so_reuseport", 0),), + ) + server = intercept_server(server, interceptor) + server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),)) + + port = server.add_insecure_port("[::]:0") + channel = grpc.insecure_channel("localhost:{:d}".format(port)) + + active_span_before_call = trace.get_current_span() + try: + server.start() + channel.unary_unary("")(b"") + finally: + server.stop(None) + active_span_after_call = trace.get_current_span() + + self.assertEqual(active_span_before_call, trace.INVALID_SPAN) + self.assertEqual(active_span_after_call, trace.INVALID_SPAN) + self.assertIsInstance(active_span_in_handler, trace_sdk.Span) + self.assertIsNone(active_span_in_handler.parent) + + def test_sequential_server_spans(self): + """Check that sequential RPCs get separate server spans.""" + + interceptor = server_interceptor() + + # Capture the currently active span in each thread + active_spans_in_handler = [] + + def handler(request, context): + active_spans_in_handler.append(trace.get_current_span()) + return b"" + + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=1), + options=(("grpc.so_reuseport", 0),), + ) + server = intercept_server(server, interceptor) + server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),)) + + port = server.add_insecure_port("[::]:0") + channel = grpc.insecure_channel("localhost:{:d}".format(port)) + + try: + server.start() + channel.unary_unary("")(b"") + channel.unary_unary("")(b"") + finally: + server.stop(None) + + self.assertEqual(len(active_spans_in_handler), 2) + # pylint:disable=unbalanced-tuple-unpacking + span1, span2 = active_spans_in_handler + # Spans should belong to separate traces, and each should be a root + # span + self.assertNotEqual(span1.context.span_id, span2.context.span_id) + self.assertNotEqual(span1.context.trace_id, span2.context.trace_id) + self.assertIsNone(span1.parent) + self.assertIsNone(span1.parent) + + def test_concurrent_server_spans(self): + """Check that concurrent RPC calls don't interfere with each other. + + This is the same check as test_sequential_server_spans except that the + RPCs are concurrent. Two handlers are invoked at the same time on two + separate threads. Each one should see a different active span and + context. + """ + + interceptor = server_interceptor() + + # Capture the currently active span in each thread + active_spans_in_handler = [] + latch = get_latch(2) + + def handler(request, context): + latch() + active_spans_in_handler.append(trace.get_current_span()) + return b"" + + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=2), + options=(("grpc.so_reuseport", 0),), + ) + server = intercept_server(server, interceptor) + server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),)) + + port = server.add_insecure_port("[::]:0") + channel = grpc.insecure_channel("localhost:{:d}".format(port)) + + try: + server.start() + # Interleave calls so spans are active on each thread at the same + # time + with futures.ThreadPoolExecutor(max_workers=2) as tpe: + f1 = tpe.submit(channel.unary_unary(""), b"") + f2 = tpe.submit(channel.unary_unary(""), b"") + futures.wait((f1, f2)) + finally: + server.stop(None) + + self.assertEqual(len(active_spans_in_handler), 2) + # pylint:disable=unbalanced-tuple-unpacking + span1, span2 = active_spans_in_handler + # Spans should belong to separate traces, and each should be a root + # span + self.assertNotEqual(span1.context.span_id, span2.context.span_id) + self.assertNotEqual(span1.context.trace_id, span2.context.trace_id) + self.assertIsNone(span1.parent) + self.assertIsNone(span1.parent) + + +def get_latch(num): + """Get a countdown latch function for use in n threads.""" + cv = threading.Condition() + count = 0 + + def countdown_latch(): + """Block until n-1 other threads have called.""" + nonlocal count + cv.acquire() + count += 1 + cv.notify() + cv.release() + cv.acquire() + while count < num: + cv.wait() + cv.release() + + return countdown_latch From 3c25bf62824b38f47149c51b1f92bc3fefe94c4c Mon Sep 17 00:00:00 2001 From: Diego Hurtado Date: Thu, 6 Aug 2020 17:10:35 -0600 Subject: [PATCH 02/15] Fix grpc version to previous version (#975) --- instrumentation/opentelemetry-instrumentation-grpc/setup.cfg | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg b/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg index 0308a229f..4d4027823 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg +++ b/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg @@ -42,7 +42,7 @@ packages=find_namespace: install_requires = opentelemetry-api == 0.12.dev0 opentelemetry-sdk == 0.12.dev0 - grpcio ~= 1.27 + grpcio == 1.30 [options.extras_require] test = From 665a5acb6e29fbc2783a0d1a485531ef43f9d1a4 Mon Sep 17 00:00:00 2001 From: Leighton Chen Date: Sat, 15 Aug 2020 18:06:27 -0700 Subject: [PATCH 03/15] chore: 0.13.dev0 version update (#991) --- .../opentelemetry-instrumentation-grpc/CHANGELOG.md | 4 ++++ .../opentelemetry-instrumentation-grpc/setup.cfg | 8 ++++---- .../src/opentelemetry/instrumentation/grpc/version.py | 2 +- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/CHANGELOG.md b/instrumentation/opentelemetry-instrumentation-grpc/CHANGELOG.md index b6b28ecd9..ced6b9c34 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/CHANGELOG.md +++ b/instrumentation/opentelemetry-instrumentation-grpc/CHANGELOG.md @@ -2,6 +2,10 @@ ## Unreleased +## Version 0.12b0 + +Released 2020-08-14 + - Change package name to opentelemetry-instrumentation-grpc ([#969](https://github.com/open-telemetry/opentelemetry-python/pull/969)) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg b/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg index 4d4027823..590cb5e9c 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg +++ b/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg @@ -40,14 +40,14 @@ package_dir= =src packages=find_namespace: install_requires = - opentelemetry-api == 0.12.dev0 - opentelemetry-sdk == 0.12.dev0 + opentelemetry-api == 0.13dev0 + opentelemetry-sdk == 0.13dev0 grpcio == 1.30 [options.extras_require] test = - opentelemetry-test == 0.12.dev0 - opentelemetry-sdk == 0.12.dev0 + opentelemetry-test == 0.13dev0 + opentelemetry-sdk == 0.13dev0 protobuf == 3.12.2 [options.packages.find] diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/version.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/version.py index 780a92b6a..9cc445d09 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/version.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/version.py @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "0.12.dev0" +__version__ = "0.13dev0" From 9b6f43ec0f4dcb23687e5b3a709a1f007e98e4c5 Mon Sep 17 00:00:00 2001 From: Mario Jonke Date: Fri, 21 Aug 2020 17:20:04 +0200 Subject: [PATCH 04/15] Fix grpc tests when running from cmd-line/eachdist script (#1027) * when running the grpc tests with pytest or eachdist from the command line the 2nd test trying to connect to the test server failed with a connection refused message. Seems like the connection from the previous test was still alive due to the channel not being properly closed. --- instrumentation/opentelemetry-instrumentation-grpc/setup.cfg | 2 +- .../tests/test_client_interceptor.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg b/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg index 590cb5e9c..6a7db72aa 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg +++ b/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg @@ -42,7 +42,7 @@ packages=find_namespace: install_requires = opentelemetry-api == 0.13dev0 opentelemetry-sdk == 0.13dev0 - grpcio == 1.30 + grpcio ~= 1.27 [options.extras_require] test = diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py index 458f32e04..3ed40c141 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py @@ -49,6 +49,7 @@ class TestClientProto(TestBase): GrpcInstrumentorClient().uninstrument() self.memory_metrics_exporter.clear() self.server.stop(None) + self.channel.close() def _verify_success_records(self, num_bytes_out, num_bytes_in, method): # pylint: disable=protected-access,no-member From efd0520445cbfd1c6780f92a463849c6a4f3dd2d Mon Sep 17 00:00:00 2001 From: alrex Date: Mon, 14 Sep 2020 15:11:56 -0700 Subject: [PATCH 05/15] dropping support for python 3.4 (#1099) * dropping support for python 3.4 --- .../opentelemetry-instrumentation-grpc/CHANGELOG.md | 3 +++ instrumentation/opentelemetry-instrumentation-grpc/setup.cfg | 3 +-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/CHANGELOG.md b/instrumentation/opentelemetry-instrumentation-grpc/CHANGELOG.md index ced6b9c34..bbf2e7b18 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/CHANGELOG.md +++ b/instrumentation/opentelemetry-instrumentation-grpc/CHANGELOG.md @@ -2,6 +2,9 @@ ## Unreleased +- Drop support for Python 3.4 + ([#1099](https://github.com/open-telemetry/opentelemetry-python/pull/1099)) + ## Version 0.12b0 Released 2020-08-14 diff --git a/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg b/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg index 6a7db72aa..665e9a941 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg +++ b/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg @@ -28,14 +28,13 @@ classifiers = License :: OSI Approved :: Apache Software License Programming Language :: Python Programming Language :: Python :: 3 - Programming Language :: Python :: 3.4 Programming Language :: Python :: 3.5 Programming Language :: Python :: 3.6 Programming Language :: Python :: 3.7 Programming Language :: Python :: 3.8 [options] -python_requires = >=3.4 +python_requires = >=3.5 package_dir= =src packages=find_namespace: From a50fe55929503559d8425aa00c3dedf8f3c1d25d Mon Sep 17 00:00:00 2001 From: Michael Stella Date: Tue, 15 Sep 2020 10:20:47 -0400 Subject: [PATCH 06/15] Fix missing step in docs (#1113) As far as I can tell, the tutorial in the docs doesn't actually work without this line. --- .../src/opentelemetry/instrumentation/grpc/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py index 9b6862eba..58d2ebbb1 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py @@ -106,6 +106,7 @@ Usage Server def serve(): server = grpc.server(futures.ThreadPoolExecutor()) + server = intercept_server(server, server_interceptor()) helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server) server.add_insecure_port("[::]:50051") From 1149a0b0a2e7c484127e28902c920ce791fd63a4 Mon Sep 17 00:00:00 2001 From: alrex Date: Thu, 17 Sep 2020 08:23:52 -0700 Subject: [PATCH 07/15] release: updating changelogs and version to 0.13b0 (#1129) * updating changelogs and version to 0.13b0 --- .../opentelemetry-instrumentation-grpc/CHANGELOG.md | 4 ++++ .../opentelemetry-instrumentation-grpc/setup.cfg | 8 ++++---- .../src/opentelemetry/instrumentation/grpc/version.py | 2 +- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/CHANGELOG.md b/instrumentation/opentelemetry-instrumentation-grpc/CHANGELOG.md index bbf2e7b18..ed6889dcb 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/CHANGELOG.md +++ b/instrumentation/opentelemetry-instrumentation-grpc/CHANGELOG.md @@ -2,6 +2,10 @@ ## Unreleased +## Version 0.13b0 + +Released 2020-09-17 + - Drop support for Python 3.4 ([#1099](https://github.com/open-telemetry/opentelemetry-python/pull/1099)) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg b/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg index 665e9a941..e511bcf4f 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg +++ b/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg @@ -39,14 +39,14 @@ package_dir= =src packages=find_namespace: install_requires = - opentelemetry-api == 0.13dev0 - opentelemetry-sdk == 0.13dev0 + opentelemetry-api == 0.13b0 + opentelemetry-sdk == 0.13b0 grpcio ~= 1.27 [options.extras_require] test = - opentelemetry-test == 0.13dev0 - opentelemetry-sdk == 0.13dev0 + opentelemetry-test == 0.13b0 + opentelemetry-sdk == 0.13b0 protobuf == 3.12.2 [options.packages.find] diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/version.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/version.py index 9cc445d09..2015e87c7 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/version.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/version.py @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "0.13dev0" +__version__ = "0.13b0" From e121ff8b91d29d05d4e0ea5aa9f3b373a88c9e2d Mon Sep 17 00:00:00 2001 From: alrex Date: Thu, 17 Sep 2020 12:21:39 -0700 Subject: [PATCH 08/15] chore: bump dev version (#1131) --- .../opentelemetry-instrumentation-grpc/setup.cfg | 8 ++++---- .../src/opentelemetry/instrumentation/grpc/version.py | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg b/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg index e511bcf4f..69e91ef9a 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg +++ b/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg @@ -39,14 +39,14 @@ package_dir= =src packages=find_namespace: install_requires = - opentelemetry-api == 0.13b0 - opentelemetry-sdk == 0.13b0 + opentelemetry-api == 0.14.dev0 + opentelemetry-sdk == 0.14.dev0 grpcio ~= 1.27 [options.extras_require] test = - opentelemetry-test == 0.13b0 - opentelemetry-sdk == 0.13b0 + opentelemetry-test == 0.14.dev0 + opentelemetry-sdk == 0.14.dev0 protobuf == 3.12.2 [options.packages.find] diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/version.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/version.py index 2015e87c7..0f9902789 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/version.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/version.py @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "0.13b0" +__version__ = "0.14.dev0" From 7cf40b1302471de47ae78d1ed720a7c5e472b5fc Mon Sep 17 00:00:00 2001 From: Leighton Chen Date: Tue, 13 Oct 2020 14:38:09 -0400 Subject: [PATCH 09/15] chore: bump dev version (#1235) --- .../opentelemetry-instrumentation-grpc/setup.cfg | 8 ++++---- .../src/opentelemetry/instrumentation/grpc/version.py | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg b/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg index 69e91ef9a..5d39574bf 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg +++ b/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg @@ -39,14 +39,14 @@ package_dir= =src packages=find_namespace: install_requires = - opentelemetry-api == 0.14.dev0 - opentelemetry-sdk == 0.14.dev0 + opentelemetry-api == 0.15.dev0 + opentelemetry-sdk == 0.15.dev0 grpcio ~= 1.27 [options.extras_require] test = - opentelemetry-test == 0.14.dev0 - opentelemetry-sdk == 0.14.dev0 + opentelemetry-test == 0.15.dev0 + opentelemetry-sdk == 0.15.dev0 protobuf == 3.12.2 [options.packages.find] diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/version.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/version.py index 0f9902789..e7b342d64 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/version.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/version.py @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "0.14.dev0" +__version__ = "0.15.dev0" From 53bee8ddc92308e4bae7d29f1a376dd77adf95c9 Mon Sep 17 00:00:00 2001 From: Leighton Chen Date: Tue, 27 Oct 2020 22:33:38 -0400 Subject: [PATCH 10/15] Split up metric instrument constructors (#1254) --- .../instrumentation/grpc/_utilities.py | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_utilities.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_utilities.py index a57763558..8cf1f957c 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_utilities.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_utilities.py @@ -19,8 +19,6 @@ from time import time import grpc -from opentelemetry.sdk.metrics import Counter, ValueRecorder - class RpcInfo: def __init__( @@ -47,33 +45,29 @@ class TimedMetricRecorder: self._span_kind = span_kind if self._meter: - self._duration = self._meter.create_metric( + self._duration = self._meter.create_valuerecorder( name="{}/{}/duration".format(service_name, span_kind), description="Duration of grpc requests to the server", unit="ms", value_type=float, - metric_type=ValueRecorder, ) - self._error_count = self._meter.create_metric( + self._error_count = self._meter.create_counter( name="{}/{}/errors".format(service_name, span_kind), description="Number of errors that were returned from the server", unit="1", value_type=int, - metric_type=Counter, ) - self._bytes_in = self._meter.create_metric( + self._bytes_in = self._meter.create_counter( name="{}/{}/bytes_in".format(service_name, span_kind), description="Number of bytes received from the server", unit="by", value_type=int, - metric_type=Counter, ) - self._bytes_out = self._meter.create_metric( + self._bytes_out = self._meter.create_counter( name="{}/{}/bytes_out".format(service_name, span_kind), description="Number of bytes sent out through gRPC", unit="by", value_type=int, - metric_type=Counter, ) def record_bytes_in(self, bytes_in, method): From 43b88daa814948eb6b9d8d9cbca34a4c627a08ce Mon Sep 17 00:00:00 2001 From: Leighton Chen Date: Wed, 28 Oct 2020 17:28:58 -0400 Subject: [PATCH 11/15] Change status codes from grpc status codes, remove setting status in instrumentations except on ERROR (#1282) --- .../instrumentation/grpc/_client.py | 16 +++++++-------- .../tests/test_client_interceptor.py | 20 ++++++++----------- 2 files changed, 15 insertions(+), 21 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py index 028804f59..f8a72931f 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py @@ -26,7 +26,7 @@ import grpc from opentelemetry import metrics, propagators, trace from opentelemetry.sdk.metrics.export.controller import PushController -from opentelemetry.trace.status import Status, StatusCanonicalCode +from opentelemetry.trace.status import Status, StatusCode from . import grpcext from ._utilities import RpcInfo, TimedMetricRecorder @@ -169,9 +169,9 @@ class OpenTelemetryClientInterceptor( try: result = invoker(request, metadata) - except grpc.RpcError as exc: + except grpc.RpcError: guarded_span.generated_span.set_status( - Status(StatusCanonicalCode(exc.code().value[0])) + Status(StatusCode.ERROR) ) raise @@ -224,10 +224,8 @@ class OpenTelemetryClientInterceptor( response.ByteSize(), client_info.full_method ) yield response - except grpc.RpcError as exc: - span.set_status( - Status(StatusCanonicalCode(exc.code().value[0])) - ) + except grpc.RpcError: + span.set_status(Status(StatusCode.ERROR)) raise def intercept_stream( @@ -264,9 +262,9 @@ class OpenTelemetryClientInterceptor( try: result = invoker(request_or_iterator, metadata) - except grpc.RpcError as exc: + except grpc.RpcError: guarded_span.generated_span.set_status( - Status(StatusCanonicalCode(exc.code().value[0])) + Status(StatusCode.ERROR) ) raise diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py index 3ed40c141..f351c6fdd 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py @@ -220,9 +220,8 @@ class TestClientProto(TestBase): spans = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans), 1) span = spans[0] - self.assertEqual( - span.status.canonical_code.value, - grpc.StatusCode.INVALID_ARGUMENT.value[0], + self.assertIs( + span.status.status_code, trace.status.StatusCode.ERROR, ) def test_error_stream_unary(self): @@ -233,9 +232,8 @@ class TestClientProto(TestBase): spans = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans), 1) span = spans[0] - self.assertEqual( - span.status.canonical_code.value, - grpc.StatusCode.INVALID_ARGUMENT.value[0], + self.assertIs( + span.status.status_code, trace.status.StatusCode.ERROR, ) def test_error_unary_stream(self): @@ -247,9 +245,8 @@ class TestClientProto(TestBase): spans = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans), 1) span = spans[0] - self.assertEqual( - span.status.canonical_code.value, - grpc.StatusCode.INVALID_ARGUMENT.value[0], + self.assertIs( + span.status.status_code, trace.status.StatusCode.ERROR, ) def test_error_stream_stream(self): @@ -263,9 +260,8 @@ class TestClientProto(TestBase): spans = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans), 1) span = spans[0] - self.assertEqual( - span.status.canonical_code.value, - grpc.StatusCode.INVALID_ARGUMENT.value[0], + self.assertIs( + span.status.status_code, trace.status.StatusCode.ERROR, ) From 0c33c1eaace70a77a822cc6188c9d30cfabaf44c Mon Sep 17 00:00:00 2001 From: Michael Stella Date: Thu, 29 Oct 2020 16:30:18 -0400 Subject: [PATCH 12/15] Rewrite gRPC server interceptor (#1171) Co-authored-by: Aaron Abbott --- .../instrumentation/grpc/__init__.py | 53 ++-- .../instrumentation/grpc/_server.py | 251 ++++++++++-------- .../instrumentation/grpc/grpcext/__init__.py | 199 ++++---------- .../grpc/grpcext/_interceptor.py | 177 ------------ .../tests/test_server_interceptor.py | 12 +- 5 files changed, 243 insertions(+), 449 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py index 58d2ebbb1..776e29e8e 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py @@ -77,8 +77,7 @@ Usage Server import grpc from opentelemetry import trace - from opentelemetry.instrumentation.grpc import GrpcInstrumentorServer, server_interceptor - from opentelemetry.instrumentation.grpc.grpcext import intercept_server + from opentelemetry.instrumentation.grpc import GrpcInstrumentorServer from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import ( ConsoleSpanExporter, @@ -94,10 +93,10 @@ Usage Server trace.get_tracer_provider().add_span_processor( SimpleExportSpanProcessor(ConsoleSpanExporter()) ) + grpc_server_instrumentor = GrpcInstrumentorServer() grpc_server_instrumentor.instrument() - class Greeter(helloworld_pb2_grpc.GreeterServicer): def SayHello(self, request, context): return helloworld_pb2.HelloReply(message="Hello, %s!" % request.name) @@ -106,7 +105,6 @@ Usage Server def serve(): server = grpc.server(futures.ThreadPoolExecutor()) - server = intercept_server(server, server_interceptor()) helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server) server.add_insecure_port("[::]:50051") @@ -117,18 +115,25 @@ Usage Server if __name__ == "__main__": logging.basicConfig() serve() + +You can also add the instrumentor manually, rather than using +:py:class:`~opentelemetry.instrumentation.grpc.GrpcInstrumentorServer`: + +.. code-block:: python + + from opentelemetry.instrumentation.grpc import server_interceptor + + server = grpc.server(futures.ThreadPoolExecutor(), + interceptors = [server_interceptor()]) + """ -from contextlib import contextmanager from functools import partial import grpc from wrapt import wrap_function_wrapper as _wrap from opentelemetry import trace -from opentelemetry.instrumentation.grpc.grpcext import ( - intercept_channel, - intercept_server, -) +from opentelemetry.instrumentation.grpc.grpcext import intercept_channel from opentelemetry.instrumentation.grpc.version import __version__ from opentelemetry.instrumentation.instrumentor import BaseInstrumentor from opentelemetry.instrumentation.utils import unwrap @@ -140,15 +145,33 @@ from opentelemetry.instrumentation.utils import unwrap class GrpcInstrumentorServer(BaseInstrumentor): + """ + Globally instrument the grpc server. + + Usage:: + + grpc_server_instrumentor = GrpcInstrumentorServer() + grpc_server_instrumentor.instrument() + + """ + + # pylint:disable=attribute-defined-outside-init + def _instrument(self, **kwargs): - _wrap("grpc", "server", self.wrapper_fn) + self._original_func = grpc.server + + def server(*args, **kwargs): + if "interceptors" in kwargs: + # add our interceptor as the first + kwargs["interceptors"].insert(0, server_interceptor()) + else: + kwargs["interceptors"] = [server_interceptor()] + return self._original_func(*args, **kwargs) + + grpc.server = server def _uninstrument(self, **kwargs): - unwrap(grpc, "server") - - def wrapper_fn(self, original_func, instance, args, kwargs): - server = original_func(*args, **kwargs) - return intercept_server(server, server_interceptor()) + grpc.server = self._original_func class GrpcInstrumentorClient(BaseInstrumentor): diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py index cb0e997d3..83cc5824f 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py @@ -17,12 +17,11 @@ # pylint:disable=no-member # pylint:disable=signature-differs -"""Implementation of the service-side open-telemetry interceptor. - -This library borrows heavily from the OpenTracing gRPC integration: -https://github.com/opentracing-contrib/python-grpc +""" +Implementation of the service-side open-telemetry interceptor. """ +import logging from contextlib import contextmanager from typing import List @@ -30,9 +29,37 @@ import grpc from opentelemetry import propagators, trace from opentelemetry.context import attach, detach +from opentelemetry.trace.status import Status, StatusCode -from . import grpcext -from ._utilities import RpcInfo +logger = logging.getLogger(__name__) + + +# wrap an RPC call +# see https://github.com/grpc/grpc/issues/18191 +def _wrap_rpc_behavior(handler, continuation): + if handler is None: + return None + + if handler.request_streaming and handler.response_streaming: + behavior_fn = handler.stream_stream + handler_factory = grpc.stream_stream_rpc_method_handler + elif handler.request_streaming and not handler.response_streaming: + behavior_fn = handler.stream_unary + handler_factory = grpc.stream_unary_rpc_method_handler + elif not handler.request_streaming and handler.response_streaming: + behavior_fn = handler.unary_stream + handler_factory = grpc.unary_stream_rpc_method_handler + else: + behavior_fn = handler.unary_unary + handler_factory = grpc.unary_unary_rpc_method_handler + + return handler_factory( + continuation( + behavior_fn, handler.request_streaming, handler.response_streaming + ), + request_deserializer=handler.request_deserializer, + response_serializer=handler.response_serializer, + ) # pylint:disable=abstract-method @@ -42,7 +69,7 @@ class _OpenTelemetryServicerContext(grpc.ServicerContext): self._active_span = active_span self.code = grpc.StatusCode.OK self.details = None - super(_OpenTelemetryServicerContext, self).__init__() + super().__init__() def is_active(self, *args, **kwargs): return self._servicer_context.is_active(*args, **kwargs) @@ -56,20 +83,26 @@ class _OpenTelemetryServicerContext(grpc.ServicerContext): def add_callback(self, *args, **kwargs): return self._servicer_context.add_callback(*args, **kwargs) + def disable_next_message_compression(self): + return self._service_context.disable_next_message_compression() + def invocation_metadata(self, *args, **kwargs): return self._servicer_context.invocation_metadata(*args, **kwargs) - def peer(self, *args, **kwargs): - return self._servicer_context.peer(*args, **kwargs) + def peer(self): + return self._servicer_context.peer() - def peer_identities(self, *args, **kwargs): - return self._servicer_context.peer_identities(*args, **kwargs) + def peer_identities(self): + return self._servicer_context.peer_identities() - def peer_identity_key(self, *args, **kwargs): - return self._servicer_context.peer_identity_key(*args, **kwargs) + def peer_identity_key(self): + return self._servicer_context.peer_identity_key() - def auth_context(self, *args, **kwargs): - return self._servicer_context.auth_context(*args, **kwargs) + def auth_context(self): + return self._servicer_context.auth_context() + + def set_compression(self, compression): + return self._servicer_context.set_compression(compression) def send_initial_metadata(self, *args, **kwargs): return self._servicer_context.send_initial_metadata(*args, **kwargs) @@ -77,47 +110,62 @@ class _OpenTelemetryServicerContext(grpc.ServicerContext): def set_trailing_metadata(self, *args, **kwargs): return self._servicer_context.set_trailing_metadata(*args, **kwargs) - def abort(self, *args, **kwargs): - if not hasattr(self._servicer_context, "abort"): - raise RuntimeError( - "abort() is not supported with the installed version of grpcio" - ) - return self._servicer_context.abort(*args, **kwargs) + def abort(self, code, details): + self.code = code + self.details = details + self._active_span.set_status( + Status(status_code=StatusCode(code.value[0]), description=details) + ) + return self._servicer_context.abort(code, details) - def abort_with_status(self, *args, **kwargs): - if not hasattr(self._servicer_context, "abort_with_status"): - raise RuntimeError( - "abort_with_status() is not supported with the installed " - "version of grpcio" - ) - return self._servicer_context.abort_with_status(*args, **kwargs) + def abort_with_status(self, status): + return self._servicer_context.abort_with_status(status) def set_code(self, code): self.code = code + # use details if we already have it, otherwise the status description + details = self.details or code.value[1] + self._active_span.set_status( + Status(status_code=StatusCode(code.value[0]), description=details) + ) return self._servicer_context.set_code(code) def set_details(self, details): self.details = details + self._active_span.set_status( + Status( + status_code=StatusCode(self.code.value[0]), + description=details, + ) + ) return self._servicer_context.set_details(details) -# On the service-side, errors can be signaled either by exceptions or by -# calling `set_code` on the `servicer_context`. This function checks for the -# latter and updates the span accordingly. +# pylint:disable=abstract-method +# pylint:disable=no-self-use # pylint:disable=unused-argument -def _check_error_code(span, servicer_context, rpc_info): - if servicer_context.code != grpc.StatusCode.OK: - rpc_info.error = servicer_context.code +class OpenTelemetryServerInterceptor(grpc.ServerInterceptor): + """ + A gRPC server interceptor, to add OpenTelemetry. + Usage:: + + tracer = some OpenTelemetry tracer + + interceptors = [ + OpenTelemetryServerInterceptor(tracer), + ] + + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=concurrency), + interceptors = interceptors) + + """ -class OpenTelemetryServerInterceptor( - grpcext.UnaryServerInterceptor, grpcext.StreamServerInterceptor -): def __init__(self, tracer): self._tracer = tracer @contextmanager - # pylint:disable=no-self-use def _set_remote_context(self, servicer_context): metadata = servicer_context.invocation_metadata() if metadata: @@ -136,74 +184,67 @@ class OpenTelemetryServerInterceptor( else: yield - def _start_span(self, method): - span = self._tracer.start_as_current_span( - name=method, kind=trace.SpanKind.SERVER - ) - return span + def _start_span(self, handler_call_details, context): - def intercept_unary(self, request, servicer_context, server_info, handler): + attributes = { + "rpc.method": handler_call_details.method, + "rpc.system": "grpc", + } - with self._set_remote_context(servicer_context): - with self._start_span(server_info.full_method) as span: - rpc_info = RpcInfo( - full_method=server_info.full_method, - metadata=servicer_context.invocation_metadata(), - timeout=servicer_context.time_remaining(), - request=request, - ) - servicer_context = _OpenTelemetryServicerContext( - servicer_context, span - ) - response = handler(request, servicer_context) + metadata = dict(context.invocation_metadata()) + if "user-agent" in metadata: + attributes["rpc.user_agent"] = metadata["user-agent"] - _check_error_code(span, servicer_context, rpc_info) - - rpc_info.response = response - - return response - - # For RPCs that stream responses, the result can be a generator. To record - # the span across the generated responses and detect any errors, we wrap - # the result in a new generator that yields the response values. - def _intercept_server_stream( - self, request_or_iterator, servicer_context, server_info, handler - ): - with self._set_remote_context(servicer_context): - with self._start_span(server_info.full_method) as span: - rpc_info = RpcInfo( - full_method=server_info.full_method, - metadata=servicer_context.invocation_metadata(), - timeout=servicer_context.time_remaining(), - ) - if not server_info.is_client_stream: - rpc_info.request = request_or_iterator - servicer_context = _OpenTelemetryServicerContext( - servicer_context, span - ) - result = handler(request_or_iterator, servicer_context) - for response in result: - yield response - _check_error_code(span, servicer_context, rpc_info) - - def intercept_stream( - self, request_or_iterator, servicer_context, server_info, handler - ): - if server_info.is_server_stream: - return self._intercept_server_stream( - request_or_iterator, servicer_context, server_info, handler + # Split up the peer to keep with how other telemetry sources + # do it. This looks like: + # * ipv6:[::1]:57284 + # * ipv4:127.0.0.1:57284 + # * ipv4:10.2.1.1:57284,127.0.0.1:57284 + # + try: + host, port = ( + context.peer().split(",")[0].split(":", 1)[1].rsplit(":", 1) ) - with self._set_remote_context(servicer_context): - with self._start_span(server_info.full_method) as span: - rpc_info = RpcInfo( - full_method=server_info.full_method, - metadata=servicer_context.invocation_metadata(), - timeout=servicer_context.time_remaining(), - ) - servicer_context = _OpenTelemetryServicerContext( - servicer_context, span - ) - response = handler(request_or_iterator, servicer_context) - _check_error_code(span, servicer_context, rpc_info) - rpc_info.response = response - return response + + # other telemetry sources convert this, so we will too + if host in ("[::1]", "127.0.0.1"): + host = "localhost" + + attributes.update({"net.peer.name": host, "net.peer.port": port}) + except IndexError: + logger.warning("Failed to parse peer address '%s'", context.peer()) + + return self._tracer.start_as_current_span( + name=handler_call_details.method, + kind=trace.SpanKind.SERVER, + attributes=attributes, + ) + + def intercept_service(self, continuation, handler_call_details): + def telemetry_wrapper(behavior, request_streaming, response_streaming): + def telemetry_interceptor(request_or_iterator, context): + + with self._set_remote_context(context): + with self._start_span( + handler_call_details, context + ) as span: + # wrap the context + context = _OpenTelemetryServicerContext(context, span) + + # And now we run the actual RPC. + try: + return behavior(request_or_iterator, context) + except Exception as error: + # Bare exceptions are likely to be gRPC aborts, which + # we handle in our context wrapper. + # Here, we're interested in uncaught exceptions. + # pylint:disable=unidiomatic-typecheck + if type(error) != Exception: + span.record_exception(error) + raise error + + return telemetry_interceptor + + return _wrap_rpc_behavior( + continuation(handler_call_details), telemetry_wrapper + ) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/__init__.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/__init__.py index fe83467a7..d5e2549ba 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/__init__.py @@ -21,32 +21,32 @@ import abc class UnaryClientInfo(abc.ABC): """Consists of various information about a unary RPC on the - invocation-side. + invocation-side. - Attributes: - full_method: A string of the full RPC method, i.e., - /package.service/method. - timeout: The length of time in seconds to wait for the computation to - terminate or be cancelled, or None if this method should block until - the computation is terminated or is cancelled no matter how long that - takes. - """ + Attributes: + full_method: A string of the full RPC method, i.e., + /package.service/method. + timeout: The length of time in seconds to wait for the computation to + terminate or be cancelled, or None if this method should block until + the computation is terminated or is cancelled no matter how long that + takes. + """ class StreamClientInfo(abc.ABC): """Consists of various information about a stream RPC on the - invocation-side. + invocation-side. - Attributes: - full_method: A string of the full RPC method, i.e., - /package.service/method. - is_client_stream: Indicates whether the RPC is client-streaming. - is_server_stream: Indicates whether the RPC is server-streaming. - timeout: The length of time in seconds to wait for the computation to - terminate or be cancelled, or None if this method should block until - the computation is terminated or is cancelled no matter how long that - takes. - """ + Attributes: + full_method: A string of the full RPC method, i.e., + /package.service/method. + is_client_stream: Indicates whether the RPC is client-streaming. + is_server_stream: Indicates whether the RPC is server-streaming. + timeout: The length of time in seconds to wait for the computation to + terminate or be cancelled, or None if this method should block until + the computation is terminated or is cancelled no matter how long that + takes. + """ class UnaryClientInterceptor(abc.ABC): @@ -56,18 +56,18 @@ class UnaryClientInterceptor(abc.ABC): def intercept_unary(self, request, metadata, client_info, invoker): """Intercepts unary-unary RPCs on the invocation-side. - Args: - request: The request value for the RPC. - metadata: Optional :term:`metadata` to be transmitted to the - service-side of the RPC. - client_info: A UnaryClientInfo containing various information about - the RPC. - invoker: The handler to complete the RPC on the client. It is the - interceptor's responsibility to call it. + Args: + request: The request value for the RPC. + metadata: Optional :term:`metadata` to be transmitted to the + service-side of the RPC. + client_info: A UnaryClientInfo containing various information about + the RPC. + invoker: The handler to complete the RPC on the client. It is the + interceptor's responsibility to call it. - Returns: - The result from calling invoker(request, metadata). - """ + Returns: + The result from calling invoker(request, metadata). + """ raise NotImplementedError() @@ -80,137 +80,46 @@ class StreamClientInterceptor(abc.ABC): ): """Intercepts stream RPCs on the invocation-side. - Args: - request_or_iterator: The request value for the RPC if - `client_info.is_client_stream` is `false`; otherwise, an iterator of - request values. - metadata: Optional :term:`metadata` to be transmitted to the service-side - of the RPC. - client_info: A StreamClientInfo containing various information about - the RPC. - invoker: The handler to complete the RPC on the client. It is the - interceptor's responsibility to call it. + Args: + request_or_iterator: The request value for the RPC if + `client_info.is_client_stream` is `false`; otherwise, an iterator of + request values. + metadata: Optional :term:`metadata` to be transmitted to the service-side + of the RPC. + client_info: A StreamClientInfo containing various information about + the RPC. + invoker: The handler to complete the RPC on the client. It is the + interceptor's responsibility to call it. - Returns: - The result from calling invoker(metadata). - """ + Returns: + The result from calling invoker(metadata). + """ raise NotImplementedError() def intercept_channel(channel, *interceptors): """Creates an intercepted channel. - Args: - channel: A Channel. - interceptors: Zero or more UnaryClientInterceptors or - StreamClientInterceptors + Args: + channel: A Channel. + interceptors: Zero or more UnaryClientInterceptors or + StreamClientInterceptors - Returns: - A Channel. + Returns: + A Channel. - Raises: - TypeError: If an interceptor derives from neither UnaryClientInterceptor - nor StreamClientInterceptor. - """ + Raises: + TypeError: If an interceptor derives from neither UnaryClientInterceptor + nor StreamClientInterceptor. + """ from . import _interceptor return _interceptor.intercept_channel(channel, *interceptors) -class UnaryServerInfo(abc.ABC): - """Consists of various information about a unary RPC on the service-side. - - Attributes: - full_method: A string of the full RPC method, i.e., - /package.service/method. - """ - - -class StreamServerInfo(abc.ABC): - """Consists of various information about a stream RPC on the service-side. - - Attributes: - full_method: A string of the full RPC method, i.e., - /package.service/method. - is_client_stream: Indicates whether the RPC is client-streaming. - is_server_stream: Indicates whether the RPC is server-streaming. - """ - - -class UnaryServerInterceptor(abc.ABC): - """Affords intercepting unary-unary RPCs on the service-side.""" - - @abc.abstractmethod - def intercept_unary(self, request, servicer_context, server_info, handler): - """Intercepts unary-unary RPCs on the service-side. - - Args: - request: The request value for the RPC. - servicer_context: A ServicerContext. - server_info: A UnaryServerInfo containing various information about - the RPC. - handler: The handler to complete the RPC on the server. It is the - interceptor's responsibility to call it. - - Returns: - The result from calling handler(request, servicer_context). - """ - raise NotImplementedError() - - -class StreamServerInterceptor(abc.ABC): - """Affords intercepting stream RPCs on the service-side.""" - - @abc.abstractmethod - def intercept_stream( - self, request_or_iterator, servicer_context, server_info, handler - ): - """Intercepts stream RPCs on the service-side. - - Args: - request_or_iterator: The request value for the RPC if - `server_info.is_client_stream` is `False`; otherwise, an iterator of - request values. - servicer_context: A ServicerContext. - server_info: A StreamServerInfo containing various information about - the RPC. - handler: The handler to complete the RPC on the server. It is the - interceptor's responsibility to call it. - - Returns: - The result from calling handler(servicer_context). - """ - raise NotImplementedError() - - -def intercept_server(server, *interceptors): - """Creates an intercepted server. - - Args: - server: A Server. - interceptors: Zero or more UnaryServerInterceptors or - StreamServerInterceptors - - Returns: - A Server. - - Raises: - TypeError: If an interceptor derives from neither UnaryServerInterceptor - nor StreamServerInterceptor. - """ - from . import _interceptor - - return _interceptor.intercept_server(server, *interceptors) - - __all__ = ( "UnaryClientInterceptor", "StreamClientInfo", "StreamClientInterceptor", - "UnaryServerInfo", - "StreamServerInfo", - "UnaryServerInterceptor", - "StreamServerInterceptor", "intercept_channel", - "intercept_server", ) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/_interceptor.py index 74861913b..b9f74fff8 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/_interceptor.py @@ -252,180 +252,3 @@ def intercept_channel(channel, *interceptors): ) result = _InterceptorChannel(result, interceptor) return result - - -class _UnaryServerInfo( - collections.namedtuple("_UnaryServerInfo", ("full_method",)) -): - pass - - -class _StreamServerInfo( - collections.namedtuple( - "_StreamServerInfo", - ("full_method", "is_client_stream", "is_server_stream"), - ) -): - pass - - -class _InterceptorRpcMethodHandler(grpc.RpcMethodHandler): - def __init__(self, rpc_method_handler, method, interceptor): - self._rpc_method_handler = rpc_method_handler - self._method = method - self._interceptor = interceptor - - @property - def request_streaming(self): - return self._rpc_method_handler.request_streaming - - @property - def response_streaming(self): - return self._rpc_method_handler.response_streaming - - @property - def request_deserializer(self): - return self._rpc_method_handler.request_deserializer - - @property - def response_serializer(self): - return self._rpc_method_handler.response_serializer - - @property - def unary_unary(self): - if not isinstance(self._interceptor, grpcext.UnaryServerInterceptor): - return self._rpc_method_handler.unary_unary - - def adaptation(request, servicer_context): - def handler(request, servicer_context): - return self._rpc_method_handler.unary_unary( - request, servicer_context - ) - - return self._interceptor.intercept_unary( - request, - servicer_context, - _UnaryServerInfo(self._method), - handler, - ) - - return adaptation - - @property - def unary_stream(self): - if not isinstance(self._interceptor, grpcext.StreamServerInterceptor): - return self._rpc_method_handler.unary_stream - - def adaptation(request, servicer_context): - def handler(request, servicer_context): - return self._rpc_method_handler.unary_stream( - request, servicer_context - ) - - return self._interceptor.intercept_stream( - request, - servicer_context, - _StreamServerInfo(self._method, False, True), - handler, - ) - - return adaptation - - @property - def stream_unary(self): - if not isinstance(self._interceptor, grpcext.StreamServerInterceptor): - return self._rpc_method_handler.stream_unary - - def adaptation(request_iterator, servicer_context): - def handler(request_iterator, servicer_context): - return self._rpc_method_handler.stream_unary( - request_iterator, servicer_context - ) - - return self._interceptor.intercept_stream( - request_iterator, - servicer_context, - _StreamServerInfo(self._method, True, False), - handler, - ) - - return adaptation - - @property - def stream_stream(self): - if not isinstance(self._interceptor, grpcext.StreamServerInterceptor): - return self._rpc_method_handler.stream_stream - - def adaptation(request_iterator, servicer_context): - def handler(request_iterator, servicer_context): - return self._rpc_method_handler.stream_stream( - request_iterator, servicer_context - ) - - return self._interceptor.intercept_stream( - request_iterator, - servicer_context, - _StreamServerInfo(self._method, True, True), - handler, - ) - - return adaptation - - -class _InterceptorGenericRpcHandler(grpc.GenericRpcHandler): - def __init__(self, generic_rpc_handler, interceptor): - self.generic_rpc_handler = generic_rpc_handler - self._interceptor = interceptor - - def service(self, handler_call_details): - result = self.generic_rpc_handler.service(handler_call_details) - if result: - result = _InterceptorRpcMethodHandler( - result, handler_call_details.method, self._interceptor - ) - return result - - -class _InterceptorServer(grpc.Server): - def __init__(self, server, interceptor): - self._server = server - self._interceptor = interceptor - - def add_generic_rpc_handlers(self, generic_rpc_handlers): - generic_rpc_handlers = [ - _InterceptorGenericRpcHandler( - generic_rpc_handler, self._interceptor - ) - for generic_rpc_handler in generic_rpc_handlers - ] - return self._server.add_generic_rpc_handlers(generic_rpc_handlers) - - def add_insecure_port(self, *args, **kwargs): - return self._server.add_insecure_port(*args, **kwargs) - - def add_secure_port(self, *args, **kwargs): - return self._server.add_secure_port(*args, **kwargs) - - def start(self, *args, **kwargs): - return self._server.start(*args, **kwargs) - - def stop(self, *args, **kwargs): - return self._server.stop(*args, **kwargs) - - def wait_for_termination(self, *args, **kwargs): - return self._server.wait_for_termination(*args, **kwargs) - - -def intercept_server(server, *interceptors): - result = server - for interceptor in interceptors: - if not isinstance( - interceptor, grpcext.UnaryServerInterceptor - ) and not isinstance(interceptor, grpcext.StreamServerInterceptor): - raise TypeError( - "interceptor must be either a " - "grpcext.UnaryServerInterceptor or a " - "grpcext.StreamServerInterceptor" - ) - result = _InterceptorServer(result, interceptor) - return result diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py index a41da47ae..13b535d84 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py @@ -26,7 +26,6 @@ from opentelemetry.instrumentation.grpc import ( GrpcInstrumentorServer, server_interceptor, ) -from opentelemetry.instrumentation.grpc.grpcext import intercept_server from opentelemetry.sdk import trace as trace_sdk from opentelemetry.test.test_base import TestBase @@ -123,10 +122,9 @@ class TestOpenTelemetryServerInterceptor(TestBase): server = grpc.server( futures.ThreadPoolExecutor(max_workers=1), options=(("grpc.so_reuseport", 0),), + interceptors=[interceptor], ) - # FIXME: grpcext interceptor doesn't apply to handlers passed to server - # init, should use intercept_service API instead. - server = intercept_server(server, interceptor) + server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),)) port = server.add_insecure_port("[::]:0") @@ -166,8 +164,8 @@ class TestOpenTelemetryServerInterceptor(TestBase): server = grpc.server( futures.ThreadPoolExecutor(max_workers=1), options=(("grpc.so_reuseport", 0),), + interceptors=[interceptor], ) - server = intercept_server(server, interceptor) server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),)) port = server.add_insecure_port("[::]:0") @@ -201,8 +199,8 @@ class TestOpenTelemetryServerInterceptor(TestBase): server = grpc.server( futures.ThreadPoolExecutor(max_workers=1), options=(("grpc.so_reuseport", 0),), + interceptors=[interceptor], ) - server = intercept_server(server, interceptor) server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),)) port = server.add_insecure_port("[::]:0") @@ -248,8 +246,8 @@ class TestOpenTelemetryServerInterceptor(TestBase): server = grpc.server( futures.ThreadPoolExecutor(max_workers=2), options=(("grpc.so_reuseport", 0),), + interceptors=[interceptor], ) - server = intercept_server(server, interceptor) server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),)) port = server.add_insecure_port("[::]:0") From 6a8625974e799f2c5f89e20d9a8287c9ca5335ae Mon Sep 17 00:00:00 2001 From: Michael Stella Date: Sun, 1 Nov 2020 22:22:48 -0500 Subject: [PATCH 13/15] Rework gRPC status based on new rules (#1308) As of #1214, the status codes changed and no longer line up with gRPC status codes, so now we'll just set `StatusCode.ERROR` and store the actual gRPC status code in the trace as `grpc.status_code`. Specifically this: https://github.com/open-telemetry/opentelemetry-specification/pull/1156 --- .../opentelemetry/instrumentation/grpc/_server.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py index 83cc5824f..5927c99b5 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py @@ -113,8 +113,9 @@ class _OpenTelemetryServicerContext(grpc.ServicerContext): def abort(self, code, details): self.code = code self.details = details + self._active_span.set_attribute("rpc.grpc.status_code", code.name) self._active_span.set_status( - Status(status_code=StatusCode(code.value[0]), description=details) + Status(status_code=StatusCode.ERROR, description=details) ) return self._servicer_context.abort(code, details) @@ -125,18 +126,16 @@ class _OpenTelemetryServicerContext(grpc.ServicerContext): self.code = code # use details if we already have it, otherwise the status description details = self.details or code.value[1] + self._active_span.set_attribute("rpc.grpc.status_code", code.name) self._active_span.set_status( - Status(status_code=StatusCode(code.value[0]), description=details) + Status(status_code=StatusCode.ERROR, description=details) ) return self._servicer_context.set_code(code) def set_details(self, details): self.details = details self._active_span.set_status( - Status( - status_code=StatusCode(self.code.value[0]), - description=details, - ) + Status(status_code=StatusCode.ERROR, description=details) ) return self._servicer_context.set_details(details) @@ -189,6 +188,7 @@ class OpenTelemetryServerInterceptor(grpc.ServerInterceptor): attributes = { "rpc.method": handler_call_details.method, "rpc.system": "grpc", + "rpc.grpc.status_code": grpc.StatusCode.OK, } metadata = dict(context.invocation_metadata()) From c1dd64a81133935ef88b469237aa119a2b7ec571 Mon Sep 17 00:00:00 2001 From: Prajilesh N Date: Mon, 2 Nov 2020 09:42:47 +0530 Subject: [PATCH 14/15] Converted TextMap propagator getter to a class and added keys method (#1196) Co-authored-by: alrex --- .../src/opentelemetry/instrumentation/grpc/_server.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py index 5927c99b5..087cf4f9c 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py @@ -23,12 +23,12 @@ Implementation of the service-side open-telemetry interceptor. import logging from contextlib import contextmanager -from typing import List import grpc from opentelemetry import propagators, trace from opentelemetry.context import attach, detach +from opentelemetry.trace.propagation.textmap import DictGetter from opentelemetry.trace.status import Status, StatusCode logger = logging.getLogger(__name__) @@ -163,18 +163,14 @@ class OpenTelemetryServerInterceptor(grpc.ServerInterceptor): def __init__(self, tracer): self._tracer = tracer + self._carrier_getter = DictGetter() @contextmanager def _set_remote_context(self, servicer_context): metadata = servicer_context.invocation_metadata() if metadata: md_dict = {md.key: md.value for md in metadata} - - def get_from_grpc_metadata(metadata, key) -> List[str]: - return [md_dict[key]] if key in md_dict else [] - - # Update the context with the traceparent from the RPC metadata. - ctx = propagators.extract(get_from_grpc_metadata, metadata) + ctx = propagators.extract(self._carrier_getter, md_dict) token = attach(ctx) try: yield From 41019f3b768989844dfee8153609cb14ce93bb29 Mon Sep 17 00:00:00 2001 From: alrex Date: Mon, 2 Nov 2020 09:00:06 -0800 Subject: [PATCH 15/15] [pre-release] Update changelogs, version [0.15b0] (#1320) --- .../opentelemetry-instrumentation-grpc/setup.cfg | 8 ++++---- .../src/opentelemetry/instrumentation/grpc/version.py | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg b/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg index 5d39574bf..ed3c85583 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg +++ b/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg @@ -39,14 +39,14 @@ package_dir= =src packages=find_namespace: install_requires = - opentelemetry-api == 0.15.dev0 - opentelemetry-sdk == 0.15.dev0 + opentelemetry-api == 0.15b0 + opentelemetry-sdk == 0.15b0 grpcio ~= 1.27 [options.extras_require] test = - opentelemetry-test == 0.15.dev0 - opentelemetry-sdk == 0.15.dev0 + opentelemetry-test == 0.15b0 + opentelemetry-sdk == 0.15b0 protobuf == 3.12.2 [options.packages.find] diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/version.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/version.py index e7b342d64..ff494d225 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/version.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/version.py @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "0.15.dev0" +__version__ = "0.15b0"