From f1e3f75f3625564581c1b6cd6945b90fcb4efa5c Mon Sep 17 00:00:00 2001 From: Leighton Chen Date: Tue, 4 Aug 2020 19:10:51 -0700 Subject: [PATCH] Rename remaining framework packages from "ext" to "instrumentation" (#969) --- .../CHANGELOG.md | 32 ++ .../LICENSE | 201 ++++++++ .../MANIFEST.in | 9 + .../README.rst | 18 + .../setup.cfg | 59 +++ .../setup.py | 27 ++ .../instrumentation/grpc/__init__.py | 224 +++++++++ .../instrumentation/grpc/_client.py | 275 +++++++++++ .../instrumentation/grpc/_server.py | 209 +++++++++ .../instrumentation/grpc/_utilities.py | 107 +++++ .../instrumentation/grpc/grpcext/__init__.py | 216 +++++++++ .../grpc/grpcext/_interceptor.py | 431 ++++++++++++++++++ .../instrumentation/grpc/version.py | 15 + .../tests/__init__.py | 13 + .../tests/_client.py | 57 +++ .../tests/_server.py | 87 ++++ .../tests/protobuf/test_server.proto | 34 ++ .../tests/protobuf/test_server_pb2.py | 215 +++++++++ .../tests/protobuf/test_server_pb2_grpc.py | 205 +++++++++ .../tests/test_client_interceptor.py | 298 ++++++++++++ .../tests/test_server_interceptor.py | 297 ++++++++++++ 21 files changed, 3029 insertions(+) create mode 100644 instrumentation/opentelemetry-instrumentation-grpc/CHANGELOG.md create mode 100644 instrumentation/opentelemetry-instrumentation-grpc/LICENSE create mode 100644 instrumentation/opentelemetry-instrumentation-grpc/MANIFEST.in create mode 100644 instrumentation/opentelemetry-instrumentation-grpc/README.rst create mode 100644 instrumentation/opentelemetry-instrumentation-grpc/setup.cfg create mode 100644 instrumentation/opentelemetry-instrumentation-grpc/setup.py create mode 100644 instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py create mode 100644 instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py create mode 100644 instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py create mode 100644 instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_utilities.py create mode 100644 instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/__init__.py create mode 100644 instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/_interceptor.py create mode 100644 instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/version.py create mode 100644 instrumentation/opentelemetry-instrumentation-grpc/tests/__init__.py create mode 100644 instrumentation/opentelemetry-instrumentation-grpc/tests/_client.py create mode 100644 instrumentation/opentelemetry-instrumentation-grpc/tests/_server.py create mode 100644 instrumentation/opentelemetry-instrumentation-grpc/tests/protobuf/test_server.proto create mode 100644 instrumentation/opentelemetry-instrumentation-grpc/tests/protobuf/test_server_pb2.py create mode 100644 instrumentation/opentelemetry-instrumentation-grpc/tests/protobuf/test_server_pb2_grpc.py create mode 100644 instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py create mode 100644 instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py diff --git a/instrumentation/opentelemetry-instrumentation-grpc/CHANGELOG.md b/instrumentation/opentelemetry-instrumentation-grpc/CHANGELOG.md new file mode 100644 index 000000000..b6b28ecd9 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/CHANGELOG.md @@ -0,0 +1,32 @@ +# Changelog + +## Unreleased + +- Change package name to opentelemetry-instrumentation-grpc + ([#969](https://github.com/open-telemetry/opentelemetry-python/pull/969)) + +## Version 0.11b0 + +Released 2020-07-28 + +- Add status code to gRPC client spans + ([896](https://github.com/open-telemetry/opentelemetry-python/pull/896)) +- Add gRPC client and server instrumentors + ([788](https://github.com/open-telemetry/opentelemetry-python/pull/788)) + +- Add metric recording (bytes in/out, errors, latency) to gRPC client + +## 0.8b0 + +Released 2020-05-27 + +- lint: version of grpc causes lint issues + ([#696](https://github.com/open-telemetry/opentelemetry-python/pull/696)) + +## 0.6b0 + +Released 2020-03-30 + +- Add gRPC integration + ([#476](https://github.com/open-telemetry/opentelemetry-python/pull/476)) +- Initial release diff --git a/instrumentation/opentelemetry-instrumentation-grpc/LICENSE b/instrumentation/opentelemetry-instrumentation-grpc/LICENSE new file mode 100644 index 000000000..261eeb9e9 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/instrumentation/opentelemetry-instrumentation-grpc/MANIFEST.in b/instrumentation/opentelemetry-instrumentation-grpc/MANIFEST.in new file mode 100644 index 000000000..aed3e3327 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/MANIFEST.in @@ -0,0 +1,9 @@ +graft src +graft tests +global-exclude *.pyc +global-exclude *.pyo +global-exclude __pycache__/* +include CHANGELOG.md +include MANIFEST.in +include README.rst +include LICENSE diff --git a/instrumentation/opentelemetry-instrumentation-grpc/README.rst b/instrumentation/opentelemetry-instrumentation-grpc/README.rst new file mode 100644 index 000000000..176bdf1a3 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/README.rst @@ -0,0 +1,18 @@ +OpenTelemetry gRPC Integration +============================== + +|pypi| + +.. |pypi| image:: https://badge.fury.io/py/opentelemetry-instrumentation-grpc.svg + :target: https://pypi.org/project/opentelemetry-instrumentation-grpc/ + +Client and server interceptors for `gRPC Python`_. + +.. _gRPC Python: https://grpc.github.io/grpc/python/grpc.html + +Installation +------------ + +:: + + pip install opentelemetry-instrumentation-grpc diff --git a/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg b/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg new file mode 100644 index 000000000..0308a229f --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg @@ -0,0 +1,59 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +[metadata] +name = opentelemetry-instrumentation-grpc +description = OpenTelemetry gRPC instrumentation +long_description = file: README.rst +long_description_content_type = text/x-rst +author = OpenTelemetry Authors +author_email = cncf-opentelemetry-contributors@lists.cncf.io +url = https://github.com/open-telemetry/opentelemetry-python/tree/master/instrumentation/opentelemetry-instrumentation-grpc +platforms = any +license = Apache-2.0 +classifiers = + Development Status :: 4 - Beta + Intended Audience :: Developers + License :: OSI Approved :: Apache Software License + Programming Language :: Python + Programming Language :: Python :: 3 + Programming Language :: Python :: 3.4 + Programming Language :: Python :: 3.5 + Programming Language :: Python :: 3.6 + Programming Language :: Python :: 3.7 + Programming Language :: Python :: 3.8 + +[options] +python_requires = >=3.4 +package_dir= + =src +packages=find_namespace: +install_requires = + opentelemetry-api == 0.12.dev0 + opentelemetry-sdk == 0.12.dev0 + grpcio ~= 1.27 + +[options.extras_require] +test = + opentelemetry-test == 0.12.dev0 + opentelemetry-sdk == 0.12.dev0 + protobuf == 3.12.2 + +[options.packages.find] +where = src + +[options.entry_points] +opentelemetry_instrumentor = + grpc_client = opentelemetry.instrumentation.grpc:GrpcInstrumentorClient + grpc_server = opentelemetry.instrumentation.grpc:GrpcInstrumentorServer diff --git a/instrumentation/opentelemetry-instrumentation-grpc/setup.py b/instrumentation/opentelemetry-instrumentation-grpc/setup.py new file mode 100644 index 000000000..87c720aea --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/setup.py @@ -0,0 +1,27 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +import setuptools + +BASE_DIR = os.path.dirname(__file__) +VERSION_FILENAME = os.path.join( + BASE_DIR, "src", "opentelemetry", "instrumentation", "grpc", "version.py" +) +PACKAGE_INFO = {} +with open(VERSION_FILENAME) as f: + exec(f.read(), PACKAGE_INFO) + +setuptools.setup(version=PACKAGE_INFO["__version__"]) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py new file mode 100644 index 000000000..9b6862eba --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py @@ -0,0 +1,224 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint:disable=no-name-in-module +# pylint:disable=relative-beyond-top-level +# pylint:disable=import-error +# pylint:disable=no-self-use +""" +Usage Client +------------ +.. code-block:: python + + import logging + + import grpc + + from opentelemetry import trace + from opentelemetry.instrumentation.grpc import GrpcInstrumentorClient, client_interceptor + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import ( + ConsoleSpanExporter, + SimpleExportSpanProcessor, + ) + + from opentelemetry import metrics + from opentelemetry.sdk.metrics import MeterProvider + from opentelemetry.sdk.metrics.export import ConsoleMetricsExporter + + try: + from .gen import helloworld_pb2, helloworld_pb2_grpc + except ImportError: + from gen import helloworld_pb2, helloworld_pb2_grpc + + trace.set_tracer_provider(TracerProvider()) + trace.get_tracer_provider().add_span_processor( + SimpleExportSpanProcessor(ConsoleSpanExporter()) + ) + + # Set meter provider to opentelemetry-sdk's MeterProvider + metrics.set_meter_provider(MeterProvider()) + + # Optional - export GRPC specific metrics (latency, bytes in/out, errors) by passing an exporter + instrumentor = GrpcInstrumentorClient(exporter=ConsoleMetricsExporter(), interval=10) + instrumentor.instrument() + + def run(): + with grpc.insecure_channel("localhost:50051") as channel: + + stub = helloworld_pb2_grpc.GreeterStub(channel) + response = stub.SayHello(helloworld_pb2.HelloRequest(name="YOU")) + + print("Greeter client received: " + response.message) + + + if __name__ == "__main__": + logging.basicConfig() + run() + +Usage Server +------------ +.. code-block:: python + + import logging + from concurrent import futures + + import grpc + + from opentelemetry import trace + from opentelemetry.instrumentation.grpc import GrpcInstrumentorServer, server_interceptor + from opentelemetry.instrumentation.grpc.grpcext import intercept_server + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import ( + ConsoleSpanExporter, + SimpleExportSpanProcessor, + ) + + try: + from .gen import helloworld_pb2, helloworld_pb2_grpc + except ImportError: + from gen import helloworld_pb2, helloworld_pb2_grpc + + trace.set_tracer_provider(TracerProvider()) + trace.get_tracer_provider().add_span_processor( + SimpleExportSpanProcessor(ConsoleSpanExporter()) + ) + grpc_server_instrumentor = GrpcInstrumentorServer() + grpc_server_instrumentor.instrument() + + + class Greeter(helloworld_pb2_grpc.GreeterServicer): + def SayHello(self, request, context): + return helloworld_pb2.HelloReply(message="Hello, %s!" % request.name) + + + def serve(): + + server = grpc.server(futures.ThreadPoolExecutor()) + + helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server) + server.add_insecure_port("[::]:50051") + server.start() + server.wait_for_termination() + + + if __name__ == "__main__": + logging.basicConfig() + serve() +""" +from contextlib import contextmanager +from functools import partial + +import grpc +from wrapt import wrap_function_wrapper as _wrap + +from opentelemetry import trace +from opentelemetry.instrumentation.grpc.grpcext import ( + intercept_channel, + intercept_server, +) +from opentelemetry.instrumentation.grpc.version import __version__ +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor +from opentelemetry.instrumentation.utils import unwrap + +# pylint:disable=import-outside-toplevel +# pylint:disable=import-self +# pylint:disable=unused-argument +# isort:skip + + +class GrpcInstrumentorServer(BaseInstrumentor): + def _instrument(self, **kwargs): + _wrap("grpc", "server", self.wrapper_fn) + + def _uninstrument(self, **kwargs): + unwrap(grpc, "server") + + def wrapper_fn(self, original_func, instance, args, kwargs): + server = original_func(*args, **kwargs) + return intercept_server(server, server_interceptor()) + + +class GrpcInstrumentorClient(BaseInstrumentor): + def _instrument(self, **kwargs): + exporter = kwargs.get("exporter", None) + interval = kwargs.get("interval", 30) + if kwargs.get("channel_type") == "secure": + _wrap( + "grpc", + "secure_channel", + partial(self.wrapper_fn, exporter, interval), + ) + + else: + _wrap( + "grpc", + "insecure_channel", + partial(self.wrapper_fn, exporter, interval), + ) + + def _uninstrument(self, **kwargs): + if kwargs.get("channel_type") == "secure": + unwrap(grpc, "secure_channel") + + else: + unwrap(grpc, "insecure_channel") + + def wrapper_fn( + self, exporter, interval, original_func, instance, args, kwargs + ): + channel = original_func(*args, **kwargs) + tracer_provider = kwargs.get("tracer_provider") + return intercept_channel( + channel, + client_interceptor( + tracer_provider=tracer_provider, + exporter=exporter, + interval=interval, + ), + ) + + +def client_interceptor(tracer_provider=None, exporter=None, interval=30): + """Create a gRPC client channel interceptor. + + Args: + tracer: The tracer to use to create client-side spans. + exporter: The exporter that will receive client metrics + interval: Time between every export call + + Returns: + An invocation-side interceptor object. + """ + from . import _client + + tracer = trace.get_tracer(__name__, __version__, tracer_provider) + + return _client.OpenTelemetryClientInterceptor(tracer, exporter, interval) + + +def server_interceptor(tracer_provider=None): + """Create a gRPC server interceptor. + + Args: + tracer: The tracer to use to create server-side spans. + + Returns: + A service-side interceptor object. + """ + from . import _server + + tracer = trace.get_tracer(__name__, __version__, tracer_provider) + + return _server.OpenTelemetryServerInterceptor(tracer) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py new file mode 100644 index 000000000..028804f59 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py @@ -0,0 +1,275 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint:disable=relative-beyond-top-level +# pylint:disable=arguments-differ +# pylint:disable=no-member +# pylint:disable=signature-differs + +"""Implementation of the invocation-side open-telemetry interceptor.""" + +from collections import OrderedDict +from typing import MutableMapping + +import grpc + +from opentelemetry import metrics, propagators, trace +from opentelemetry.sdk.metrics.export.controller import PushController +from opentelemetry.trace.status import Status, StatusCanonicalCode + +from . import grpcext +from ._utilities import RpcInfo, TimedMetricRecorder + + +class _GuardedSpan: + def __init__(self, span): + self.span = span + self.generated_span = None + self._engaged = True + + def __enter__(self): + self.generated_span = self.span.__enter__() + return self + + def __exit__(self, *args, **kwargs): + if self._engaged: + self.generated_span = None + return self.span.__exit__(*args, **kwargs) + return False + + def release(self): + self._engaged = False + return self.span + + +def _inject_span_context(metadata: MutableMapping[str, str]) -> None: + # pylint:disable=unused-argument + def append_metadata( + carrier: MutableMapping[str, str], key: str, value: str + ): + metadata[key] = value + + # Inject current active span from the context + propagators.inject(append_metadata, metadata) + + +def _make_future_done_callback(span, rpc_info, client_info, metrics_recorder): + def callback(response_future): + with span: + code = response_future.code() + if code != grpc.StatusCode.OK: + rpc_info.error = code + return + response = response_future.result() + rpc_info.response = response + if "ByteSize" in dir(response): + metrics_recorder.record_bytes_in( + response.ByteSize(), client_info.full_method + ) + + return callback + + +class OpenTelemetryClientInterceptor( + grpcext.UnaryClientInterceptor, grpcext.StreamClientInterceptor +): + def __init__(self, tracer, exporter, interval): + self._tracer = tracer + + self._meter = None + if exporter and interval: + self._meter = metrics.get_meter(__name__) + self.controller = PushController( + meter=self._meter, exporter=exporter, interval=interval + ) + self._metrics_recorder = TimedMetricRecorder(self._meter, "client") + + def _start_span(self, method): + return self._tracer.start_as_current_span( + name=method, kind=trace.SpanKind.CLIENT + ) + + # pylint:disable=no-self-use + def _trace_result(self, guarded_span, rpc_info, result, client_info): + # If the RPC is called asynchronously, release the guard and add a + # callback so that the span can be finished once the future is done. + if isinstance(result, grpc.Future): + result.add_done_callback( + _make_future_done_callback( + guarded_span.release(), + rpc_info, + client_info, + self._metrics_recorder, + ) + ) + return result + response = result + # Handle the case when the RPC is initiated via the with_call + # method and the result is a tuple with the first element as the + # response. + # http://www.grpc.io/grpc/python/grpc.html#grpc.UnaryUnaryMultiCallable.with_call + if isinstance(result, tuple): + response = result[0] + rpc_info.response = response + + if "ByteSize" in dir(response): + self._metrics_recorder.record_bytes_in( + response.ByteSize(), client_info.full_method + ) + return result + + def _start_guarded_span(self, *args, **kwargs): + return _GuardedSpan(self._start_span(*args, **kwargs)) + + def _bytes_out_iterator_wrapper(self, iterator, client_info): + for request in iterator: + if "ByteSize" in dir(request): + self._metrics_recorder.record_bytes_out( + request.ByteSize(), client_info.full_method + ) + yield request + + def intercept_unary(self, request, metadata, client_info, invoker): + if not metadata: + mutable_metadata = OrderedDict() + else: + mutable_metadata = OrderedDict(metadata) + + with self._start_guarded_span(client_info.full_method) as guarded_span: + with self._metrics_recorder.record_latency( + client_info.full_method + ): + _inject_span_context(mutable_metadata) + metadata = tuple(mutable_metadata.items()) + + # If protobuf is used, we can record the bytes in/out. Otherwise, we have no way + # to get the size of the request/response properly, so don't record anything + if "ByteSize" in dir(request): + self._metrics_recorder.record_bytes_out( + request.ByteSize(), client_info.full_method + ) + + rpc_info = RpcInfo( + full_method=client_info.full_method, + metadata=metadata, + timeout=client_info.timeout, + request=request, + ) + + try: + result = invoker(request, metadata) + except grpc.RpcError as exc: + guarded_span.generated_span.set_status( + Status(StatusCanonicalCode(exc.code().value[0])) + ) + raise + + return self._trace_result( + guarded_span, rpc_info, result, client_info + ) + + # For RPCs that stream responses, the result can be a generator. To record + # the span across the generated responses and detect any errors, we wrap + # the result in a new generator that yields the response values. + def _intercept_server_stream( + self, request_or_iterator, metadata, client_info, invoker + ): + if not metadata: + mutable_metadata = OrderedDict() + else: + mutable_metadata = OrderedDict(metadata) + + with self._start_span(client_info.full_method) as span: + with self._metrics_recorder.record_latency( + client_info.full_method + ): + _inject_span_context(mutable_metadata) + metadata = tuple(mutable_metadata.items()) + rpc_info = RpcInfo( + full_method=client_info.full_method, + metadata=metadata, + timeout=client_info.timeout, + ) + + if client_info.is_client_stream: + rpc_info.request = request_or_iterator + request_or_iterator = self._bytes_out_iterator_wrapper( + request_or_iterator, client_info + ) + else: + if "ByteSize" in dir(request_or_iterator): + self._metrics_recorder.record_bytes_out( + request_or_iterator.ByteSize(), + client_info.full_method, + ) + + try: + result = invoker(request_or_iterator, metadata) + + # Rewrap the result stream into a generator, and record the bytes received + for response in result: + if "ByteSize" in dir(response): + self._metrics_recorder.record_bytes_in( + response.ByteSize(), client_info.full_method + ) + yield response + except grpc.RpcError as exc: + span.set_status( + Status(StatusCanonicalCode(exc.code().value[0])) + ) + raise + + def intercept_stream( + self, request_or_iterator, metadata, client_info, invoker + ): + if client_info.is_server_stream: + return self._intercept_server_stream( + request_or_iterator, metadata, client_info, invoker + ) + + if not metadata: + mutable_metadata = OrderedDict() + else: + mutable_metadata = OrderedDict(metadata) + + with self._start_guarded_span(client_info.full_method) as guarded_span: + with self._metrics_recorder.record_latency( + client_info.full_method + ): + _inject_span_context(mutable_metadata) + metadata = tuple(mutable_metadata.items()) + rpc_info = RpcInfo( + full_method=client_info.full_method, + metadata=metadata, + timeout=client_info.timeout, + request=request_or_iterator, + ) + + rpc_info.request = request_or_iterator + + request_or_iterator = self._bytes_out_iterator_wrapper( + request_or_iterator, client_info + ) + + try: + result = invoker(request_or_iterator, metadata) + except grpc.RpcError as exc: + guarded_span.generated_span.set_status( + Status(StatusCanonicalCode(exc.code().value[0])) + ) + raise + + return self._trace_result( + guarded_span, rpc_info, result, client_info + ) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py new file mode 100644 index 000000000..cb0e997d3 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py @@ -0,0 +1,209 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint:disable=relative-beyond-top-level +# pylint:disable=arguments-differ +# pylint:disable=no-member +# pylint:disable=signature-differs + +"""Implementation of the service-side open-telemetry interceptor. + +This library borrows heavily from the OpenTracing gRPC integration: +https://github.com/opentracing-contrib/python-grpc +""" + +from contextlib import contextmanager +from typing import List + +import grpc + +from opentelemetry import propagators, trace +from opentelemetry.context import attach, detach + +from . import grpcext +from ._utilities import RpcInfo + + +# pylint:disable=abstract-method +class _OpenTelemetryServicerContext(grpc.ServicerContext): + def __init__(self, servicer_context, active_span): + self._servicer_context = servicer_context + self._active_span = active_span + self.code = grpc.StatusCode.OK + self.details = None + super(_OpenTelemetryServicerContext, self).__init__() + + def is_active(self, *args, **kwargs): + return self._servicer_context.is_active(*args, **kwargs) + + def time_remaining(self, *args, **kwargs): + return self._servicer_context.time_remaining(*args, **kwargs) + + def cancel(self, *args, **kwargs): + return self._servicer_context.cancel(*args, **kwargs) + + def add_callback(self, *args, **kwargs): + return self._servicer_context.add_callback(*args, **kwargs) + + def invocation_metadata(self, *args, **kwargs): + return self._servicer_context.invocation_metadata(*args, **kwargs) + + def peer(self, *args, **kwargs): + return self._servicer_context.peer(*args, **kwargs) + + def peer_identities(self, *args, **kwargs): + return self._servicer_context.peer_identities(*args, **kwargs) + + def peer_identity_key(self, *args, **kwargs): + return self._servicer_context.peer_identity_key(*args, **kwargs) + + def auth_context(self, *args, **kwargs): + return self._servicer_context.auth_context(*args, **kwargs) + + def send_initial_metadata(self, *args, **kwargs): + return self._servicer_context.send_initial_metadata(*args, **kwargs) + + def set_trailing_metadata(self, *args, **kwargs): + return self._servicer_context.set_trailing_metadata(*args, **kwargs) + + def abort(self, *args, **kwargs): + if not hasattr(self._servicer_context, "abort"): + raise RuntimeError( + "abort() is not supported with the installed version of grpcio" + ) + return self._servicer_context.abort(*args, **kwargs) + + def abort_with_status(self, *args, **kwargs): + if not hasattr(self._servicer_context, "abort_with_status"): + raise RuntimeError( + "abort_with_status() is not supported with the installed " + "version of grpcio" + ) + return self._servicer_context.abort_with_status(*args, **kwargs) + + def set_code(self, code): + self.code = code + return self._servicer_context.set_code(code) + + def set_details(self, details): + self.details = details + return self._servicer_context.set_details(details) + + +# On the service-side, errors can be signaled either by exceptions or by +# calling `set_code` on the `servicer_context`. This function checks for the +# latter and updates the span accordingly. +# pylint:disable=unused-argument +def _check_error_code(span, servicer_context, rpc_info): + if servicer_context.code != grpc.StatusCode.OK: + rpc_info.error = servicer_context.code + + +class OpenTelemetryServerInterceptor( + grpcext.UnaryServerInterceptor, grpcext.StreamServerInterceptor +): + def __init__(self, tracer): + self._tracer = tracer + + @contextmanager + # pylint:disable=no-self-use + def _set_remote_context(self, servicer_context): + metadata = servicer_context.invocation_metadata() + if metadata: + md_dict = {md.key: md.value for md in metadata} + + def get_from_grpc_metadata(metadata, key) -> List[str]: + return [md_dict[key]] if key in md_dict else [] + + # Update the context with the traceparent from the RPC metadata. + ctx = propagators.extract(get_from_grpc_metadata, metadata) + token = attach(ctx) + try: + yield + finally: + detach(token) + else: + yield + + def _start_span(self, method): + span = self._tracer.start_as_current_span( + name=method, kind=trace.SpanKind.SERVER + ) + return span + + def intercept_unary(self, request, servicer_context, server_info, handler): + + with self._set_remote_context(servicer_context): + with self._start_span(server_info.full_method) as span: + rpc_info = RpcInfo( + full_method=server_info.full_method, + metadata=servicer_context.invocation_metadata(), + timeout=servicer_context.time_remaining(), + request=request, + ) + servicer_context = _OpenTelemetryServicerContext( + servicer_context, span + ) + response = handler(request, servicer_context) + + _check_error_code(span, servicer_context, rpc_info) + + rpc_info.response = response + + return response + + # For RPCs that stream responses, the result can be a generator. To record + # the span across the generated responses and detect any errors, we wrap + # the result in a new generator that yields the response values. + def _intercept_server_stream( + self, request_or_iterator, servicer_context, server_info, handler + ): + with self._set_remote_context(servicer_context): + with self._start_span(server_info.full_method) as span: + rpc_info = RpcInfo( + full_method=server_info.full_method, + metadata=servicer_context.invocation_metadata(), + timeout=servicer_context.time_remaining(), + ) + if not server_info.is_client_stream: + rpc_info.request = request_or_iterator + servicer_context = _OpenTelemetryServicerContext( + servicer_context, span + ) + result = handler(request_or_iterator, servicer_context) + for response in result: + yield response + _check_error_code(span, servicer_context, rpc_info) + + def intercept_stream( + self, request_or_iterator, servicer_context, server_info, handler + ): + if server_info.is_server_stream: + return self._intercept_server_stream( + request_or_iterator, servicer_context, server_info, handler + ) + with self._set_remote_context(servicer_context): + with self._start_span(server_info.full_method) as span: + rpc_info = RpcInfo( + full_method=server_info.full_method, + metadata=servicer_context.invocation_metadata(), + timeout=servicer_context.time_remaining(), + ) + servicer_context = _OpenTelemetryServicerContext( + servicer_context, span + ) + response = handler(request_or_iterator, servicer_context) + _check_error_code(span, servicer_context, rpc_info) + rpc_info.response = response + return response diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_utilities.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_utilities.py new file mode 100644 index 000000000..a57763558 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_utilities.py @@ -0,0 +1,107 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Internal utilities.""" + +from contextlib import contextmanager +from time import time + +import grpc + +from opentelemetry.sdk.metrics import Counter, ValueRecorder + + +class RpcInfo: + def __init__( + self, + full_method=None, + metadata=None, + timeout=None, + request=None, + response=None, + error=None, + ): + self.full_method = full_method + self.metadata = metadata + self.timeout = timeout + self.request = request + self.response = response + self.error = error + + +class TimedMetricRecorder: + def __init__(self, meter, span_kind): + self._meter = meter + service_name = "grpcio" + self._span_kind = span_kind + + if self._meter: + self._duration = self._meter.create_metric( + name="{}/{}/duration".format(service_name, span_kind), + description="Duration of grpc requests to the server", + unit="ms", + value_type=float, + metric_type=ValueRecorder, + ) + self._error_count = self._meter.create_metric( + name="{}/{}/errors".format(service_name, span_kind), + description="Number of errors that were returned from the server", + unit="1", + value_type=int, + metric_type=Counter, + ) + self._bytes_in = self._meter.create_metric( + name="{}/{}/bytes_in".format(service_name, span_kind), + description="Number of bytes received from the server", + unit="by", + value_type=int, + metric_type=Counter, + ) + self._bytes_out = self._meter.create_metric( + name="{}/{}/bytes_out".format(service_name, span_kind), + description="Number of bytes sent out through gRPC", + unit="by", + value_type=int, + metric_type=Counter, + ) + + def record_bytes_in(self, bytes_in, method): + if self._meter: + labels = {"method": method} + self._bytes_in.add(bytes_in, labels) + + def record_bytes_out(self, bytes_out, method): + if self._meter: + labels = {"method": method} + self._bytes_out.add(bytes_out, labels) + + @contextmanager + def record_latency(self, method): + start_time = time() + labels = {"method": method, "status_code": grpc.StatusCode.OK} + try: + yield labels + except grpc.RpcError as exc: + if self._meter: + # pylint: disable=no-member + labels["status_code"] = exc.code() + self._error_count.add(1, labels) + labels["error"] = True + raise + finally: + if self._meter: + if "error" not in labels: + labels["error"] = False + elapsed_time = (time() - start_time) * 1000 + self._duration.record(elapsed_time, labels) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/__init__.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/__init__.py new file mode 100644 index 000000000..fe83467a7 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/__init__.py @@ -0,0 +1,216 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint:disable=import-outside-toplevel +# pylint:disable=import-self +# pylint:disable=no-name-in-module + +import abc + + +class UnaryClientInfo(abc.ABC): + """Consists of various information about a unary RPC on the + invocation-side. + + Attributes: + full_method: A string of the full RPC method, i.e., + /package.service/method. + timeout: The length of time in seconds to wait for the computation to + terminate or be cancelled, or None if this method should block until + the computation is terminated or is cancelled no matter how long that + takes. + """ + + +class StreamClientInfo(abc.ABC): + """Consists of various information about a stream RPC on the + invocation-side. + + Attributes: + full_method: A string of the full RPC method, i.e., + /package.service/method. + is_client_stream: Indicates whether the RPC is client-streaming. + is_server_stream: Indicates whether the RPC is server-streaming. + timeout: The length of time in seconds to wait for the computation to + terminate or be cancelled, or None if this method should block until + the computation is terminated or is cancelled no matter how long that + takes. + """ + + +class UnaryClientInterceptor(abc.ABC): + """Affords intercepting unary-unary RPCs on the invocation-side.""" + + @abc.abstractmethod + def intercept_unary(self, request, metadata, client_info, invoker): + """Intercepts unary-unary RPCs on the invocation-side. + + Args: + request: The request value for the RPC. + metadata: Optional :term:`metadata` to be transmitted to the + service-side of the RPC. + client_info: A UnaryClientInfo containing various information about + the RPC. + invoker: The handler to complete the RPC on the client. It is the + interceptor's responsibility to call it. + + Returns: + The result from calling invoker(request, metadata). + """ + raise NotImplementedError() + + +class StreamClientInterceptor(abc.ABC): + """Affords intercepting stream RPCs on the invocation-side.""" + + @abc.abstractmethod + def intercept_stream( + self, request_or_iterator, metadata, client_info, invoker + ): + """Intercepts stream RPCs on the invocation-side. + + Args: + request_or_iterator: The request value for the RPC if + `client_info.is_client_stream` is `false`; otherwise, an iterator of + request values. + metadata: Optional :term:`metadata` to be transmitted to the service-side + of the RPC. + client_info: A StreamClientInfo containing various information about + the RPC. + invoker: The handler to complete the RPC on the client. It is the + interceptor's responsibility to call it. + + Returns: + The result from calling invoker(metadata). + """ + raise NotImplementedError() + + +def intercept_channel(channel, *interceptors): + """Creates an intercepted channel. + + Args: + channel: A Channel. + interceptors: Zero or more UnaryClientInterceptors or + StreamClientInterceptors + + Returns: + A Channel. + + Raises: + TypeError: If an interceptor derives from neither UnaryClientInterceptor + nor StreamClientInterceptor. + """ + from . import _interceptor + + return _interceptor.intercept_channel(channel, *interceptors) + + +class UnaryServerInfo(abc.ABC): + """Consists of various information about a unary RPC on the service-side. + + Attributes: + full_method: A string of the full RPC method, i.e., + /package.service/method. + """ + + +class StreamServerInfo(abc.ABC): + """Consists of various information about a stream RPC on the service-side. + + Attributes: + full_method: A string of the full RPC method, i.e., + /package.service/method. + is_client_stream: Indicates whether the RPC is client-streaming. + is_server_stream: Indicates whether the RPC is server-streaming. + """ + + +class UnaryServerInterceptor(abc.ABC): + """Affords intercepting unary-unary RPCs on the service-side.""" + + @abc.abstractmethod + def intercept_unary(self, request, servicer_context, server_info, handler): + """Intercepts unary-unary RPCs on the service-side. + + Args: + request: The request value for the RPC. + servicer_context: A ServicerContext. + server_info: A UnaryServerInfo containing various information about + the RPC. + handler: The handler to complete the RPC on the server. It is the + interceptor's responsibility to call it. + + Returns: + The result from calling handler(request, servicer_context). + """ + raise NotImplementedError() + + +class StreamServerInterceptor(abc.ABC): + """Affords intercepting stream RPCs on the service-side.""" + + @abc.abstractmethod + def intercept_stream( + self, request_or_iterator, servicer_context, server_info, handler + ): + """Intercepts stream RPCs on the service-side. + + Args: + request_or_iterator: The request value for the RPC if + `server_info.is_client_stream` is `False`; otherwise, an iterator of + request values. + servicer_context: A ServicerContext. + server_info: A StreamServerInfo containing various information about + the RPC. + handler: The handler to complete the RPC on the server. It is the + interceptor's responsibility to call it. + + Returns: + The result from calling handler(servicer_context). + """ + raise NotImplementedError() + + +def intercept_server(server, *interceptors): + """Creates an intercepted server. + + Args: + server: A Server. + interceptors: Zero or more UnaryServerInterceptors or + StreamServerInterceptors + + Returns: + A Server. + + Raises: + TypeError: If an interceptor derives from neither UnaryServerInterceptor + nor StreamServerInterceptor. + """ + from . import _interceptor + + return _interceptor.intercept_server(server, *interceptors) + + +__all__ = ( + "UnaryClientInterceptor", + "StreamClientInfo", + "StreamClientInterceptor", + "UnaryServerInfo", + "StreamServerInfo", + "UnaryServerInterceptor", + "StreamServerInterceptor", + "intercept_channel", + "intercept_server", +) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/_interceptor.py new file mode 100644 index 000000000..74861913b --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/_interceptor.py @@ -0,0 +1,431 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint:disable=relative-beyond-top-level +# pylint:disable=arguments-differ +# pylint:disable=no-member +# pylint:disable=signature-differs + +"""Implementation of gRPC Python interceptors.""" + + +import collections + +import grpc + +from .. import grpcext + + +class _UnaryClientInfo( + collections.namedtuple("_UnaryClientInfo", ("full_method", "timeout")) +): + pass + + +class _StreamClientInfo( + collections.namedtuple( + "_StreamClientInfo", + ("full_method", "is_client_stream", "is_server_stream", "timeout"), + ) +): + pass + + +class _InterceptorUnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): + def __init__(self, method, base_callable, interceptor): + self._method = method + self._base_callable = base_callable + self._interceptor = interceptor + + def __call__(self, request, timeout=None, metadata=None, credentials=None): + def invoker(request, metadata): + return self._base_callable(request, timeout, metadata, credentials) + + client_info = _UnaryClientInfo(self._method, timeout) + return self._interceptor.intercept_unary( + request, metadata, client_info, invoker + ) + + def with_call( + self, request, timeout=None, metadata=None, credentials=None + ): + def invoker(request, metadata): + return self._base_callable.with_call( + request, timeout, metadata, credentials + ) + + client_info = _UnaryClientInfo(self._method, timeout) + return self._interceptor.intercept_unary( + request, metadata, client_info, invoker + ) + + def future(self, request, timeout=None, metadata=None, credentials=None): + def invoker(request, metadata): + return self._base_callable.future( + request, timeout, metadata, credentials + ) + + client_info = _UnaryClientInfo(self._method, timeout) + return self._interceptor.intercept_unary( + request, metadata, client_info, invoker + ) + + +class _InterceptorUnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): + def __init__(self, method, base_callable, interceptor): + self._method = method + self._base_callable = base_callable + self._interceptor = interceptor + + def __call__(self, request, timeout=None, metadata=None, credentials=None): + def invoker(request, metadata): + return self._base_callable(request, timeout, metadata, credentials) + + client_info = _StreamClientInfo(self._method, False, True, timeout) + return self._interceptor.intercept_stream( + request, metadata, client_info, invoker + ) + + +class _InterceptorStreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): + def __init__(self, method, base_callable, interceptor): + self._method = method + self._base_callable = base_callable + self._interceptor = interceptor + + def __call__( + self, request_iterator, timeout=None, metadata=None, credentials=None + ): + def invoker(request_iterator, metadata): + return self._base_callable( + request_iterator, timeout, metadata, credentials + ) + + client_info = _StreamClientInfo(self._method, True, False, timeout) + return self._interceptor.intercept_stream( + request_iterator, metadata, client_info, invoker + ) + + def with_call( + self, request_iterator, timeout=None, metadata=None, credentials=None + ): + def invoker(request_iterator, metadata): + return self._base_callable.with_call( + request_iterator, timeout, metadata, credentials + ) + + client_info = _StreamClientInfo(self._method, True, False, timeout) + return self._interceptor.intercept_stream( + request_iterator, metadata, client_info, invoker + ) + + def future( + self, request_iterator, timeout=None, metadata=None, credentials=None + ): + def invoker(request_iterator, metadata): + return self._base_callable.future( + request_iterator, timeout, metadata, credentials + ) + + client_info = _StreamClientInfo(self._method, True, False, timeout) + return self._interceptor.intercept_stream( + request_iterator, metadata, client_info, invoker + ) + + +class _InterceptorStreamStreamMultiCallable(grpc.StreamStreamMultiCallable): + def __init__(self, method, base_callable, interceptor): + self._method = method + self._base_callable = base_callable + self._interceptor = interceptor + + def __call__( + self, request_iterator, timeout=None, metadata=None, credentials=None + ): + def invoker(request_iterator, metadata): + return self._base_callable( + request_iterator, timeout, metadata, credentials + ) + + client_info = _StreamClientInfo(self._method, True, True, timeout) + return self._interceptor.intercept_stream( + request_iterator, metadata, client_info, invoker + ) + + +class _InterceptorChannel(grpc.Channel): + def __init__(self, channel, interceptor): + self._channel = channel + self._interceptor = interceptor + + def subscribe(self, *args, **kwargs): + self._channel.subscribe(*args, **kwargs) + + def unsubscribe(self, *args, **kwargs): + self._channel.unsubscribe(*args, **kwargs) + + def unary_unary( + self, method, request_serializer=None, response_deserializer=None + ): + base_callable = self._channel.unary_unary( + method, request_serializer, response_deserializer + ) + if isinstance(self._interceptor, grpcext.UnaryClientInterceptor): + return _InterceptorUnaryUnaryMultiCallable( + method, base_callable, self._interceptor + ) + return base_callable + + def unary_stream( + self, method, request_serializer=None, response_deserializer=None + ): + base_callable = self._channel.unary_stream( + method, request_serializer, response_deserializer + ) + if isinstance(self._interceptor, grpcext.StreamClientInterceptor): + return _InterceptorUnaryStreamMultiCallable( + method, base_callable, self._interceptor + ) + return base_callable + + def stream_unary( + self, method, request_serializer=None, response_deserializer=None + ): + base_callable = self._channel.stream_unary( + method, request_serializer, response_deserializer + ) + if isinstance(self._interceptor, grpcext.StreamClientInterceptor): + return _InterceptorStreamUnaryMultiCallable( + method, base_callable, self._interceptor + ) + return base_callable + + def stream_stream( + self, method, request_serializer=None, response_deserializer=None + ): + base_callable = self._channel.stream_stream( + method, request_serializer, response_deserializer + ) + if isinstance(self._interceptor, grpcext.StreamClientInterceptor): + return _InterceptorStreamStreamMultiCallable( + method, base_callable, self._interceptor + ) + return base_callable + + def close(self): + if not hasattr(self._channel, "close"): + raise RuntimeError( + "close() is not supported with the installed version of grpcio" + ) + self._channel.close() + + def __enter__(self): + """Enters the runtime context related to the channel object.""" + raise NotImplementedError() + + def __exit__(self, exc_type, exc_val, exc_tb): + """Exits the runtime context related to the channel object.""" + raise NotImplementedError() + + +def intercept_channel(channel, *interceptors): + result = channel + for interceptor in interceptors: + if not isinstance( + interceptor, grpcext.UnaryClientInterceptor + ) and not isinstance(interceptor, grpcext.StreamClientInterceptor): + raise TypeError( + "interceptor must be either a " + "grpcext.UnaryClientInterceptor or a " + "grpcext.StreamClientInterceptor" + ) + result = _InterceptorChannel(result, interceptor) + return result + + +class _UnaryServerInfo( + collections.namedtuple("_UnaryServerInfo", ("full_method",)) +): + pass + + +class _StreamServerInfo( + collections.namedtuple( + "_StreamServerInfo", + ("full_method", "is_client_stream", "is_server_stream"), + ) +): + pass + + +class _InterceptorRpcMethodHandler(grpc.RpcMethodHandler): + def __init__(self, rpc_method_handler, method, interceptor): + self._rpc_method_handler = rpc_method_handler + self._method = method + self._interceptor = interceptor + + @property + def request_streaming(self): + return self._rpc_method_handler.request_streaming + + @property + def response_streaming(self): + return self._rpc_method_handler.response_streaming + + @property + def request_deserializer(self): + return self._rpc_method_handler.request_deserializer + + @property + def response_serializer(self): + return self._rpc_method_handler.response_serializer + + @property + def unary_unary(self): + if not isinstance(self._interceptor, grpcext.UnaryServerInterceptor): + return self._rpc_method_handler.unary_unary + + def adaptation(request, servicer_context): + def handler(request, servicer_context): + return self._rpc_method_handler.unary_unary( + request, servicer_context + ) + + return self._interceptor.intercept_unary( + request, + servicer_context, + _UnaryServerInfo(self._method), + handler, + ) + + return adaptation + + @property + def unary_stream(self): + if not isinstance(self._interceptor, grpcext.StreamServerInterceptor): + return self._rpc_method_handler.unary_stream + + def adaptation(request, servicer_context): + def handler(request, servicer_context): + return self._rpc_method_handler.unary_stream( + request, servicer_context + ) + + return self._interceptor.intercept_stream( + request, + servicer_context, + _StreamServerInfo(self._method, False, True), + handler, + ) + + return adaptation + + @property + def stream_unary(self): + if not isinstance(self._interceptor, grpcext.StreamServerInterceptor): + return self._rpc_method_handler.stream_unary + + def adaptation(request_iterator, servicer_context): + def handler(request_iterator, servicer_context): + return self._rpc_method_handler.stream_unary( + request_iterator, servicer_context + ) + + return self._interceptor.intercept_stream( + request_iterator, + servicer_context, + _StreamServerInfo(self._method, True, False), + handler, + ) + + return adaptation + + @property + def stream_stream(self): + if not isinstance(self._interceptor, grpcext.StreamServerInterceptor): + return self._rpc_method_handler.stream_stream + + def adaptation(request_iterator, servicer_context): + def handler(request_iterator, servicer_context): + return self._rpc_method_handler.stream_stream( + request_iterator, servicer_context + ) + + return self._interceptor.intercept_stream( + request_iterator, + servicer_context, + _StreamServerInfo(self._method, True, True), + handler, + ) + + return adaptation + + +class _InterceptorGenericRpcHandler(grpc.GenericRpcHandler): + def __init__(self, generic_rpc_handler, interceptor): + self.generic_rpc_handler = generic_rpc_handler + self._interceptor = interceptor + + def service(self, handler_call_details): + result = self.generic_rpc_handler.service(handler_call_details) + if result: + result = _InterceptorRpcMethodHandler( + result, handler_call_details.method, self._interceptor + ) + return result + + +class _InterceptorServer(grpc.Server): + def __init__(self, server, interceptor): + self._server = server + self._interceptor = interceptor + + def add_generic_rpc_handlers(self, generic_rpc_handlers): + generic_rpc_handlers = [ + _InterceptorGenericRpcHandler( + generic_rpc_handler, self._interceptor + ) + for generic_rpc_handler in generic_rpc_handlers + ] + return self._server.add_generic_rpc_handlers(generic_rpc_handlers) + + def add_insecure_port(self, *args, **kwargs): + return self._server.add_insecure_port(*args, **kwargs) + + def add_secure_port(self, *args, **kwargs): + return self._server.add_secure_port(*args, **kwargs) + + def start(self, *args, **kwargs): + return self._server.start(*args, **kwargs) + + def stop(self, *args, **kwargs): + return self._server.stop(*args, **kwargs) + + def wait_for_termination(self, *args, **kwargs): + return self._server.wait_for_termination(*args, **kwargs) + + +def intercept_server(server, *interceptors): + result = server + for interceptor in interceptors: + if not isinstance( + interceptor, grpcext.UnaryServerInterceptor + ) and not isinstance(interceptor, grpcext.StreamServerInterceptor): + raise TypeError( + "interceptor must be either a " + "grpcext.UnaryServerInterceptor or a " + "grpcext.StreamServerInterceptor" + ) + result = _InterceptorServer(result, interceptor) + return result diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/version.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/version.py new file mode 100644 index 000000000..780a92b6a --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/version.py @@ -0,0 +1,15 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +__version__ = "0.12.dev0" diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/__init__.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/__init__.py new file mode 100644 index 000000000..b0a6f4284 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/__init__.py @@ -0,0 +1,13 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/_client.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/_client.py new file mode 100644 index 000000000..43310b5f6 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/_client.py @@ -0,0 +1,57 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .protobuf.test_server_pb2 import Request + +CLIENT_ID = 1 + + +def simple_method(stub, error=False): + request = Request( + client_id=CLIENT_ID, request_data="error" if error else "data" + ) + stub.SimpleMethod(request) + + +def client_streaming_method(stub, error=False): + # create a generator + def request_messages(): + for _ in range(5): + request = Request( + client_id=CLIENT_ID, request_data="error" if error else "data" + ) + yield request + + stub.ClientStreamingMethod(request_messages()) + + +def server_streaming_method(stub, error=False): + request = Request( + client_id=CLIENT_ID, request_data="error" if error else "data" + ) + response_iterator = stub.ServerStreamingMethod(request) + list(response_iterator) + + +def bidirectional_streaming_method(stub, error=False): + def request_messages(): + for _ in range(5): + request = Request( + client_id=CLIENT_ID, request_data="error" if error else "data" + ) + yield request + + response_iterator = stub.BidirectionalStreamingMethod(request_messages()) + + list(response_iterator) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/_server.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/_server.py new file mode 100644 index 000000000..a4e1c266b --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/_server.py @@ -0,0 +1,87 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from concurrent import futures + +import grpc + +from .protobuf import test_server_pb2, test_server_pb2_grpc + +SERVER_ID = 1 + + +class TestServer(test_server_pb2_grpc.GRPCTestServerServicer): + def SimpleMethod(self, request, context): + if request.request_data == "error": + context.set_code(grpc.StatusCode.INVALID_ARGUMENT) + return test_server_pb2.Response() + response = test_server_pb2.Response( + server_id=SERVER_ID, response_data="data" + ) + return response + + def ClientStreamingMethod(self, request_iterator, context): + data = list(request_iterator) + if data[0].request_data == "error": + context.set_code(grpc.StatusCode.INVALID_ARGUMENT) + return test_server_pb2.Response() + response = test_server_pb2.Response( + server_id=SERVER_ID, response_data="data" + ) + return response + + def ServerStreamingMethod(self, request, context): + if request.request_data == "error": + + context.abort( + code=grpc.StatusCode.INVALID_ARGUMENT, + details="server stream error", + ) + return test_server_pb2.Response() + + # create a generator + def response_messages(): + for _ in range(5): + response = test_server_pb2.Response( + server_id=SERVER_ID, response_data="data" + ) + yield response + + return response_messages() + + def BidirectionalStreamingMethod(self, request_iterator, context): + data = list(request_iterator) + if data[0].request_data == "error": + context.abort( + code=grpc.StatusCode.INVALID_ARGUMENT, + details="bidirectional error", + ) + return + + for _ in range(5): + yield test_server_pb2.Response( + server_id=SERVER_ID, response_data="data" + ) + + +def create_test_server(port): + server = grpc.server(futures.ThreadPoolExecutor(max_workers=1)) + + test_server_pb2_grpc.add_GRPCTestServerServicer_to_server( + TestServer(), server + ) + + server.add_insecure_port("localhost:{}".format(port)) + + return server diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/protobuf/test_server.proto b/instrumentation/opentelemetry-instrumentation-grpc/tests/protobuf/test_server.proto new file mode 100644 index 000000000..790a7675d --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/protobuf/test_server.proto @@ -0,0 +1,34 @@ +// Copyright 2019 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +syntax = "proto3"; + +message Request { + int64 client_id = 1; + string request_data = 2; +} + +message Response { + int64 server_id = 1; + string response_data = 2; +} + +service GRPCTestServer { + rpc SimpleMethod (Request) returns (Response); + + rpc ClientStreamingMethod (stream Request) returns (Response); + + rpc ServerStreamingMethod (Request) returns (stream Response); + + rpc BidirectionalStreamingMethod (stream Request) returns (stream Response); +} diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/protobuf/test_server_pb2.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/protobuf/test_server_pb2.py new file mode 100644 index 000000000..735206f85 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/protobuf/test_server_pb2.py @@ -0,0 +1,215 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: test_server.proto + +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database + +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +DESCRIPTOR = _descriptor.FileDescriptor( + name="test_server.proto", + package="", + syntax="proto3", + serialized_options=None, + serialized_pb=b'\n\x11test_server.proto"2\n\x07Request\x12\x11\n\tclient_id\x18\x01 \x01(\x03\x12\x14\n\x0crequest_data\x18\x02 \x01(\t"4\n\x08Response\x12\x11\n\tserver_id\x18\x01 \x01(\x03\x12\x15\n\rresponse_data\x18\x02 \x01(\t2\xce\x01\n\x0eGRPCTestServer\x12#\n\x0cSimpleMethod\x12\x08.Request\x1a\t.Response\x12.\n\x15\x43lientStreamingMethod\x12\x08.Request\x1a\t.Response(\x01\x12.\n\x15ServerStreamingMethod\x12\x08.Request\x1a\t.Response0\x01\x12\x37\n\x1c\x42idirectionalStreamingMethod\x12\x08.Request\x1a\t.Response(\x01\x30\x01\x62\x06proto3', +) + + +_REQUEST = _descriptor.Descriptor( + name="Request", + full_name="Request", + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name="client_id", + full_name="Request.client_id", + index=0, + number=1, + type=3, + cpp_type=2, + label=1, + has_default_value=False, + default_value=0, + message_type=None, + enum_type=None, + containing_type=None, + is_extension=False, + extension_scope=None, + serialized_options=None, + file=DESCRIPTOR, + ), + _descriptor.FieldDescriptor( + name="request_data", + full_name="Request.request_data", + index=1, + number=2, + type=9, + cpp_type=9, + label=1, + has_default_value=False, + default_value=b"".decode("utf-8"), + message_type=None, + enum_type=None, + containing_type=None, + is_extension=False, + extension_scope=None, + serialized_options=None, + file=DESCRIPTOR, + ), + ], + extensions=[], + nested_types=[], + enum_types=[], + serialized_options=None, + is_extendable=False, + syntax="proto3", + extension_ranges=[], + oneofs=[], + serialized_start=21, + serialized_end=71, +) + + +_RESPONSE = _descriptor.Descriptor( + name="Response", + full_name="Response", + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name="server_id", + full_name="Response.server_id", + index=0, + number=1, + type=3, + cpp_type=2, + label=1, + has_default_value=False, + default_value=0, + message_type=None, + enum_type=None, + containing_type=None, + is_extension=False, + extension_scope=None, + serialized_options=None, + file=DESCRIPTOR, + ), + _descriptor.FieldDescriptor( + name="response_data", + full_name="Response.response_data", + index=1, + number=2, + type=9, + cpp_type=9, + label=1, + has_default_value=False, + default_value=b"".decode("utf-8"), + message_type=None, + enum_type=None, + containing_type=None, + is_extension=False, + extension_scope=None, + serialized_options=None, + file=DESCRIPTOR, + ), + ], + extensions=[], + nested_types=[], + enum_types=[], + serialized_options=None, + is_extendable=False, + syntax="proto3", + extension_ranges=[], + oneofs=[], + serialized_start=73, + serialized_end=125, +) + +DESCRIPTOR.message_types_by_name["Request"] = _REQUEST +DESCRIPTOR.message_types_by_name["Response"] = _RESPONSE +_sym_db.RegisterFileDescriptor(DESCRIPTOR) + +Request = _reflection.GeneratedProtocolMessageType( + "Request", + (_message.Message,), + { + "DESCRIPTOR": _REQUEST, + "__module__": "test_server_pb2" + # @@protoc_insertion_point(class_scope:Request) + }, +) +_sym_db.RegisterMessage(Request) + +Response = _reflection.GeneratedProtocolMessageType( + "Response", + (_message.Message,), + { + "DESCRIPTOR": _RESPONSE, + "__module__": "test_server_pb2" + # @@protoc_insertion_point(class_scope:Response) + }, +) +_sym_db.RegisterMessage(Response) + + +_GRPCTESTSERVER = _descriptor.ServiceDescriptor( + name="GRPCTestServer", + full_name="GRPCTestServer", + file=DESCRIPTOR, + index=0, + serialized_options=None, + serialized_start=128, + serialized_end=334, + methods=[ + _descriptor.MethodDescriptor( + name="SimpleMethod", + full_name="GRPCTestServer.SimpleMethod", + index=0, + containing_service=None, + input_type=_REQUEST, + output_type=_RESPONSE, + serialized_options=None, + ), + _descriptor.MethodDescriptor( + name="ClientStreamingMethod", + full_name="GRPCTestServer.ClientStreamingMethod", + index=1, + containing_service=None, + input_type=_REQUEST, + output_type=_RESPONSE, + serialized_options=None, + ), + _descriptor.MethodDescriptor( + name="ServerStreamingMethod", + full_name="GRPCTestServer.ServerStreamingMethod", + index=2, + containing_service=None, + input_type=_REQUEST, + output_type=_RESPONSE, + serialized_options=None, + ), + _descriptor.MethodDescriptor( + name="BidirectionalStreamingMethod", + full_name="GRPCTestServer.BidirectionalStreamingMethod", + index=3, + containing_service=None, + input_type=_REQUEST, + output_type=_RESPONSE, + serialized_options=None, + ), + ], +) +_sym_db.RegisterServiceDescriptor(_GRPCTESTSERVER) + +DESCRIPTOR.services_by_name["GRPCTestServer"] = _GRPCTESTSERVER + +# @@protoc_insertion_point(module_scope) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/protobuf/test_server_pb2_grpc.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/protobuf/test_server_pb2_grpc.py new file mode 100644 index 000000000..d0a6fd518 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/protobuf/test_server_pb2_grpc.py @@ -0,0 +1,205 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +import grpc + +from tests.protobuf import test_server_pb2 as test__server__pb2 + + +class GRPCTestServerStub(object): + """Missing associated documentation comment in .proto file""" + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.SimpleMethod = channel.unary_unary( + "/GRPCTestServer/SimpleMethod", + request_serializer=test__server__pb2.Request.SerializeToString, + response_deserializer=test__server__pb2.Response.FromString, + ) + self.ClientStreamingMethod = channel.stream_unary( + "/GRPCTestServer/ClientStreamingMethod", + request_serializer=test__server__pb2.Request.SerializeToString, + response_deserializer=test__server__pb2.Response.FromString, + ) + self.ServerStreamingMethod = channel.unary_stream( + "/GRPCTestServer/ServerStreamingMethod", + request_serializer=test__server__pb2.Request.SerializeToString, + response_deserializer=test__server__pb2.Response.FromString, + ) + self.BidirectionalStreamingMethod = channel.stream_stream( + "/GRPCTestServer/BidirectionalStreamingMethod", + request_serializer=test__server__pb2.Request.SerializeToString, + response_deserializer=test__server__pb2.Response.FromString, + ) + + +class GRPCTestServerServicer(object): + """Missing associated documentation comment in .proto file""" + + def SimpleMethod(self, request, context): + """Missing associated documentation comment in .proto file""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + + def ClientStreamingMethod(self, request_iterator, context): + """Missing associated documentation comment in .proto file""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + + def ServerStreamingMethod(self, request, context): + """Missing associated documentation comment in .proto file""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + + def BidirectionalStreamingMethod(self, request_iterator, context): + """Missing associated documentation comment in .proto file""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + + +def add_GRPCTestServerServicer_to_server(servicer, server): + rpc_method_handlers = { + "SimpleMethod": grpc.unary_unary_rpc_method_handler( + servicer.SimpleMethod, + request_deserializer=test__server__pb2.Request.FromString, + response_serializer=test__server__pb2.Response.SerializeToString, + ), + "ClientStreamingMethod": grpc.stream_unary_rpc_method_handler( + servicer.ClientStreamingMethod, + request_deserializer=test__server__pb2.Request.FromString, + response_serializer=test__server__pb2.Response.SerializeToString, + ), + "ServerStreamingMethod": grpc.unary_stream_rpc_method_handler( + servicer.ServerStreamingMethod, + request_deserializer=test__server__pb2.Request.FromString, + response_serializer=test__server__pb2.Response.SerializeToString, + ), + "BidirectionalStreamingMethod": grpc.stream_stream_rpc_method_handler( + servicer.BidirectionalStreamingMethod, + request_deserializer=test__server__pb2.Request.FromString, + response_serializer=test__server__pb2.Response.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + "GRPCTestServer", rpc_method_handlers + ) + server.add_generic_rpc_handlers((generic_handler,)) + + +# This class is part of an EXPERIMENTAL API. +class GRPCTestServer(object): + """Missing associated documentation comment in .proto file""" + + @staticmethod + def SimpleMethod( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_unary( + request, + target, + "/GRPCTestServer/SimpleMethod", + test__server__pb2.Request.SerializeToString, + test__server__pb2.Response.FromString, + options, + channel_credentials, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) + + @staticmethod + def ClientStreamingMethod( + request_iterator, + target, + options=(), + channel_credentials=None, + call_credentials=None, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.stream_unary( + request_iterator, + target, + "/GRPCTestServer/ClientStreamingMethod", + test__server__pb2.Request.SerializeToString, + test__server__pb2.Response.FromString, + options, + channel_credentials, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) + + @staticmethod + def ServerStreamingMethod( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_stream( + request, + target, + "/GRPCTestServer/ServerStreamingMethod", + test__server__pb2.Request.SerializeToString, + test__server__pb2.Response.FromString, + options, + channel_credentials, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) + + @staticmethod + def BidirectionalStreamingMethod( + request_iterator, + target, + options=(), + channel_credentials=None, + call_credentials=None, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.stream_stream( + request_iterator, + target, + "/GRPCTestServer/BidirectionalStreamingMethod", + test__server__pb2.Request.SerializeToString, + test__server__pb2.Response.FromString, + options, + channel_credentials, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py new file mode 100644 index 000000000..458f32e04 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py @@ -0,0 +1,298 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import grpc + +import opentelemetry.instrumentation.grpc +from opentelemetry import trace +from opentelemetry.instrumentation.grpc import GrpcInstrumentorClient +from opentelemetry.sdk.metrics.export.aggregate import ( + MinMaxSumCountAggregator, + SumAggregator, +) +from opentelemetry.test.test_base import TestBase +from tests.protobuf import test_server_pb2_grpc + +from ._client import ( + bidirectional_streaming_method, + client_streaming_method, + server_streaming_method, + simple_method, +) +from ._server import create_test_server + + +class TestClientProto(TestBase): + def setUp(self): + super().setUp() + GrpcInstrumentorClient().instrument( + exporter=self.memory_metrics_exporter + ) + self.server = create_test_server(25565) + self.server.start() + self.channel = grpc.insecure_channel("localhost:25565") + self._stub = test_server_pb2_grpc.GRPCTestServerStub(self.channel) + + def tearDown(self): + super().tearDown() + GrpcInstrumentorClient().uninstrument() + self.memory_metrics_exporter.clear() + self.server.stop(None) + + def _verify_success_records(self, num_bytes_out, num_bytes_in, method): + # pylint: disable=protected-access,no-member + self.channel._interceptor.controller.tick() + records = self.memory_metrics_exporter.get_exported_metrics() + self.assertEqual(len(records), 3) + + bytes_out = None + bytes_in = None + duration = None + + for record in records: + if record.instrument.name == "grpcio/client/duration": + duration = record + elif record.instrument.name == "grpcio/client/bytes_out": + bytes_out = record + elif record.instrument.name == "grpcio/client/bytes_in": + bytes_in = record + + self.assertIsNotNone(bytes_out) + self.assertEqual(bytes_out.instrument.name, "grpcio/client/bytes_out") + self.assertEqual(bytes_out.labels, (("method", method),)) + + self.assertIsNotNone(bytes_in) + self.assertEqual(bytes_in.instrument.name, "grpcio/client/bytes_in") + self.assertEqual(bytes_in.labels, (("method", method),)) + + self.assertIsNotNone(duration) + self.assertEqual(duration.instrument.name, "grpcio/client/duration") + self.assertEqual( + duration.labels, + ( + ("error", False), + ("method", method), + ("status_code", grpc.StatusCode.OK), + ), + ) + + self.assertEqual(type(bytes_out.aggregator), SumAggregator) + self.assertEqual(type(bytes_in.aggregator), SumAggregator) + self.assertEqual(type(duration.aggregator), MinMaxSumCountAggregator) + + self.assertEqual(bytes_out.aggregator.checkpoint, num_bytes_out) + self.assertEqual(bytes_in.aggregator.checkpoint, num_bytes_in) + + self.assertEqual(duration.aggregator.checkpoint.count, 1) + self.assertGreaterEqual(duration.aggregator.checkpoint.sum, 0) + + def test_unary_unary(self): + simple_method(self._stub) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + self.assertEqual(span.name, "/GRPCTestServer/SimpleMethod") + self.assertIs(span.kind, trace.SpanKind.CLIENT) + + # Check version and name in span's instrumentation info + self.check_span_instrumentation_info( + span, opentelemetry.instrumentation.grpc + ) + + self._verify_success_records(8, 8, "/GRPCTestServer/SimpleMethod") + + def test_unary_stream(self): + server_streaming_method(self._stub) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + self.assertEqual(span.name, "/GRPCTestServer/ServerStreamingMethod") + self.assertIs(span.kind, trace.SpanKind.CLIENT) + + # Check version and name in span's instrumentation info + self.check_span_instrumentation_info( + span, opentelemetry.instrumentation.grpc + ) + + self._verify_success_records( + 8, 40, "/GRPCTestServer/ServerStreamingMethod" + ) + + def test_stream_unary(self): + client_streaming_method(self._stub) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + self.assertEqual(span.name, "/GRPCTestServer/ClientStreamingMethod") + self.assertIs(span.kind, trace.SpanKind.CLIENT) + + # Check version and name in span's instrumentation info + self.check_span_instrumentation_info( + span, opentelemetry.instrumentation.grpc + ) + + self._verify_success_records( + 40, 8, "/GRPCTestServer/ClientStreamingMethod" + ) + + def test_stream_stream(self): + bidirectional_streaming_method(self._stub) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + self.assertEqual( + span.name, "/GRPCTestServer/BidirectionalStreamingMethod" + ) + self.assertIs(span.kind, trace.SpanKind.CLIENT) + + # Check version and name in span's instrumentation info + self.check_span_instrumentation_info( + span, opentelemetry.instrumentation.grpc + ) + + self._verify_success_records( + 40, 40, "/GRPCTestServer/BidirectionalStreamingMethod" + ) + + def _verify_error_records(self, method): + # pylint: disable=protected-access,no-member + self.channel._interceptor.controller.tick() + records = self.memory_metrics_exporter.get_exported_metrics() + self.assertEqual(len(records), 3) + + bytes_out = None + errors = None + duration = None + + for record in records: + if record.instrument.name == "grpcio/client/duration": + duration = record + elif record.instrument.name == "grpcio/client/bytes_out": + bytes_out = record + elif record.instrument.name == "grpcio/client/errors": + errors = record + + self.assertIsNotNone(bytes_out) + self.assertIsNotNone(errors) + self.assertIsNotNone(duration) + + self.assertEqual(errors.instrument.name, "grpcio/client/errors") + self.assertEqual( + errors.labels, + ( + ("method", method), + ("status_code", grpc.StatusCode.INVALID_ARGUMENT), + ), + ) + self.assertEqual(errors.aggregator.checkpoint, 1) + + self.assertEqual( + duration.labels, + ( + ("error", True), + ("method", method), + ("status_code", grpc.StatusCode.INVALID_ARGUMENT), + ), + ) + + def test_error_simple(self): + with self.assertRaises(grpc.RpcError): + simple_method(self._stub, error=True) + + self._verify_error_records("/GRPCTestServer/SimpleMethod") + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertEqual( + span.status.canonical_code.value, + grpc.StatusCode.INVALID_ARGUMENT.value[0], + ) + + def test_error_stream_unary(self): + with self.assertRaises(grpc.RpcError): + client_streaming_method(self._stub, error=True) + + self._verify_error_records("/GRPCTestServer/ClientStreamingMethod") + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertEqual( + span.status.canonical_code.value, + grpc.StatusCode.INVALID_ARGUMENT.value[0], + ) + + def test_error_unary_stream(self): + with self.assertRaises(grpc.RpcError): + server_streaming_method(self._stub, error=True) + + self._verify_error_records("/GRPCTestServer/ServerStreamingMethod") + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertEqual( + span.status.canonical_code.value, + grpc.StatusCode.INVALID_ARGUMENT.value[0], + ) + + def test_error_stream_stream(self): + with self.assertRaises(grpc.RpcError): + bidirectional_streaming_method(self._stub, error=True) + + self._verify_error_records( + "/GRPCTestServer/BidirectionalStreamingMethod" + ) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertEqual( + span.status.canonical_code.value, + grpc.StatusCode.INVALID_ARGUMENT.value[0], + ) + + +class TestClientNoMetrics(TestBase): + def setUp(self): + super().setUp() + GrpcInstrumentorClient().instrument() + self.server = create_test_server(25565) + self.server.start() + self.channel = grpc.insecure_channel("localhost:25565") + self._stub = test_server_pb2_grpc.GRPCTestServerStub(self.channel) + + def tearDown(self): + super().tearDown() + GrpcInstrumentorClient().uninstrument() + self.memory_metrics_exporter.clear() + self.server.stop(None) + + def test_unary_unary(self): + simple_method(self._stub) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + self.assertEqual(span.name, "/GRPCTestServer/SimpleMethod") + self.assertIs(span.kind, trace.SpanKind.CLIENT) + + # Check version and name in span's instrumentation info + self.check_span_instrumentation_info( + span, opentelemetry.instrumentation.grpc + ) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py new file mode 100644 index 000000000..a41da47ae --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py @@ -0,0 +1,297 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint:disable=unused-argument +# pylint:disable=no-self-use + +import threading +from concurrent import futures + +import grpc + +import opentelemetry.instrumentation.grpc +from opentelemetry import trace +from opentelemetry.instrumentation.grpc import ( + GrpcInstrumentorServer, + server_interceptor, +) +from opentelemetry.instrumentation.grpc.grpcext import intercept_server +from opentelemetry.sdk import trace as trace_sdk +from opentelemetry.test.test_base import TestBase + + +class UnaryUnaryMethodHandler(grpc.RpcMethodHandler): + def __init__(self, handler): + self.request_streaming = False + self.response_streaming = False + self.request_deserializer = None + self.response_serializer = None + self.unary_unary = handler + self.unary_stream = None + self.stream_unary = None + self.stream_stream = None + + +class UnaryUnaryRpcHandler(grpc.GenericRpcHandler): + def __init__(self, handler): + self._unary_unary_handler = handler + + def service(self, handler_call_details): + return UnaryUnaryMethodHandler(self._unary_unary_handler) + + +class TestOpenTelemetryServerInterceptor(TestBase): + def test_instrumentor(self): + def handler(request, context): + return b"" + + grpc_server_instrumentor = GrpcInstrumentorServer() + grpc_server_instrumentor.instrument() + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=1), + options=(("grpc.so_reuseport", 0),), + ) + + server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),)) + + port = server.add_insecure_port("[::]:0") + channel = grpc.insecure_channel("localhost:{:d}".format(port)) + + try: + server.start() + channel.unary_unary("test")(b"test") + finally: + server.stop(None) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + self.assertEqual(span.name, "test") + self.assertIs(span.kind, trace.SpanKind.SERVER) + self.check_span_instrumentation_info( + span, opentelemetry.instrumentation.grpc + ) + grpc_server_instrumentor.uninstrument() + + def test_uninstrument(self): + def handler(request, context): + return b"" + + grpc_server_instrumentor = GrpcInstrumentorServer() + grpc_server_instrumentor.instrument() + grpc_server_instrumentor.uninstrument() + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=1), + options=(("grpc.so_reuseport", 0),), + ) + + server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),)) + + port = server.add_insecure_port("[::]:0") + channel = grpc.insecure_channel("localhost:{:d}".format(port)) + + try: + server.start() + channel.unary_unary("test")(b"test") + finally: + server.stop(None) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 0) + + def test_create_span(self): + """Check that the interceptor wraps calls with spans server-side.""" + + # Intercept gRPC calls... + interceptor = server_interceptor() + + # No-op RPC handler + def handler(request, context): + return b"" + + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=1), + options=(("grpc.so_reuseport", 0),), + ) + # FIXME: grpcext interceptor doesn't apply to handlers passed to server + # init, should use intercept_service API instead. + server = intercept_server(server, interceptor) + server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),)) + + port = server.add_insecure_port("[::]:0") + channel = grpc.insecure_channel("localhost:{:d}".format(port)) + + try: + server.start() + channel.unary_unary("")(b"") + finally: + server.stop(None) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + + self.assertEqual(span.name, "") + self.assertIs(span.kind, trace.SpanKind.SERVER) + + # Check version and name in span's instrumentation info + self.check_span_instrumentation_info( + span, opentelemetry.instrumentation.grpc + ) + + def test_span_lifetime(self): + """Check that the span is active for the duration of the call.""" + + interceptor = server_interceptor() + + # To capture the current span at the time the handler is called + active_span_in_handler = None + + def handler(request, context): + nonlocal active_span_in_handler + active_span_in_handler = trace.get_current_span() + return b"" + + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=1), + options=(("grpc.so_reuseport", 0),), + ) + server = intercept_server(server, interceptor) + server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),)) + + port = server.add_insecure_port("[::]:0") + channel = grpc.insecure_channel("localhost:{:d}".format(port)) + + active_span_before_call = trace.get_current_span() + try: + server.start() + channel.unary_unary("")(b"") + finally: + server.stop(None) + active_span_after_call = trace.get_current_span() + + self.assertEqual(active_span_before_call, trace.INVALID_SPAN) + self.assertEqual(active_span_after_call, trace.INVALID_SPAN) + self.assertIsInstance(active_span_in_handler, trace_sdk.Span) + self.assertIsNone(active_span_in_handler.parent) + + def test_sequential_server_spans(self): + """Check that sequential RPCs get separate server spans.""" + + interceptor = server_interceptor() + + # Capture the currently active span in each thread + active_spans_in_handler = [] + + def handler(request, context): + active_spans_in_handler.append(trace.get_current_span()) + return b"" + + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=1), + options=(("grpc.so_reuseport", 0),), + ) + server = intercept_server(server, interceptor) + server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),)) + + port = server.add_insecure_port("[::]:0") + channel = grpc.insecure_channel("localhost:{:d}".format(port)) + + try: + server.start() + channel.unary_unary("")(b"") + channel.unary_unary("")(b"") + finally: + server.stop(None) + + self.assertEqual(len(active_spans_in_handler), 2) + # pylint:disable=unbalanced-tuple-unpacking + span1, span2 = active_spans_in_handler + # Spans should belong to separate traces, and each should be a root + # span + self.assertNotEqual(span1.context.span_id, span2.context.span_id) + self.assertNotEqual(span1.context.trace_id, span2.context.trace_id) + self.assertIsNone(span1.parent) + self.assertIsNone(span1.parent) + + def test_concurrent_server_spans(self): + """Check that concurrent RPC calls don't interfere with each other. + + This is the same check as test_sequential_server_spans except that the + RPCs are concurrent. Two handlers are invoked at the same time on two + separate threads. Each one should see a different active span and + context. + """ + + interceptor = server_interceptor() + + # Capture the currently active span in each thread + active_spans_in_handler = [] + latch = get_latch(2) + + def handler(request, context): + latch() + active_spans_in_handler.append(trace.get_current_span()) + return b"" + + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=2), + options=(("grpc.so_reuseport", 0),), + ) + server = intercept_server(server, interceptor) + server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),)) + + port = server.add_insecure_port("[::]:0") + channel = grpc.insecure_channel("localhost:{:d}".format(port)) + + try: + server.start() + # Interleave calls so spans are active on each thread at the same + # time + with futures.ThreadPoolExecutor(max_workers=2) as tpe: + f1 = tpe.submit(channel.unary_unary(""), b"") + f2 = tpe.submit(channel.unary_unary(""), b"") + futures.wait((f1, f2)) + finally: + server.stop(None) + + self.assertEqual(len(active_spans_in_handler), 2) + # pylint:disable=unbalanced-tuple-unpacking + span1, span2 = active_spans_in_handler + # Spans should belong to separate traces, and each should be a root + # span + self.assertNotEqual(span1.context.span_id, span2.context.span_id) + self.assertNotEqual(span1.context.trace_id, span2.context.trace_id) + self.assertIsNone(span1.parent) + self.assertIsNone(span1.parent) + + +def get_latch(num): + """Get a countdown latch function for use in n threads.""" + cv = threading.Condition() + count = 0 + + def countdown_latch(): + """Block until n-1 other threads have called.""" + nonlocal count + cv.acquire() + count += 1 + cv.notify() + cv.release() + cv.acquire() + while count < num: + cv.wait() + cv.release() + + return countdown_latch