Merge branch 'core-instrumentation-grpc-v0.15b0'

This commit is contained in:
Nathaniel Ruiz Nowell 2020-11-02 11:22:21 -08:00
commit aa37611d89
21 changed files with 2819 additions and 0 deletions

View File

@ -0,0 +1,43 @@
# Changelog
## Unreleased
## Version 0.13b0
Released 2020-09-17
- Drop support for Python 3.4
([#1099](https://github.com/open-telemetry/opentelemetry-python/pull/1099))
## Version 0.12b0
Released 2020-08-14
- Change package name to opentelemetry-instrumentation-grpc
([#969](https://github.com/open-telemetry/opentelemetry-python/pull/969))
## Version 0.11b0
Released 2020-07-28
- Add status code to gRPC client spans
([896](https://github.com/open-telemetry/opentelemetry-python/pull/896))
- Add gRPC client and server instrumentors
([788](https://github.com/open-telemetry/opentelemetry-python/pull/788))
- Add metric recording (bytes in/out, errors, latency) to gRPC client
## 0.8b0
Released 2020-05-27
- lint: version of grpc causes lint issues
([#696](https://github.com/open-telemetry/opentelemetry-python/pull/696))
## 0.6b0
Released 2020-03-30
- Add gRPC integration
([#476](https://github.com/open-telemetry/opentelemetry-python/pull/476))
- Initial release

View File

@ -0,0 +1,201 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@ -0,0 +1,9 @@
graft src
graft tests
global-exclude *.pyc
global-exclude *.pyo
global-exclude __pycache__/*
include CHANGELOG.md
include MANIFEST.in
include README.rst
include LICENSE

View File

@ -0,0 +1,18 @@
OpenTelemetry gRPC Integration
==============================
|pypi|
.. |pypi| image:: https://badge.fury.io/py/opentelemetry-instrumentation-grpc.svg
:target: https://pypi.org/project/opentelemetry-instrumentation-grpc/
Client and server interceptors for `gRPC Python`_.
.. _gRPC Python: https://grpc.github.io/grpc/python/grpc.html
Installation
------------
::
pip install opentelemetry-instrumentation-grpc

View File

@ -0,0 +1,58 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
[metadata]
name = opentelemetry-instrumentation-grpc
description = OpenTelemetry gRPC instrumentation
long_description = file: README.rst
long_description_content_type = text/x-rst
author = OpenTelemetry Authors
author_email = cncf-opentelemetry-contributors@lists.cncf.io
url = https://github.com/open-telemetry/opentelemetry-python/tree/master/instrumentation/opentelemetry-instrumentation-grpc
platforms = any
license = Apache-2.0
classifiers =
Development Status :: 4 - Beta
Intended Audience :: Developers
License :: OSI Approved :: Apache Software License
Programming Language :: Python
Programming Language :: Python :: 3
Programming Language :: Python :: 3.5
Programming Language :: Python :: 3.6
Programming Language :: Python :: 3.7
Programming Language :: Python :: 3.8
[options]
python_requires = >=3.5
package_dir=
=src
packages=find_namespace:
install_requires =
opentelemetry-api == 0.15b0
opentelemetry-sdk == 0.15b0
grpcio ~= 1.27
[options.extras_require]
test =
opentelemetry-test == 0.15b0
opentelemetry-sdk == 0.15b0
protobuf == 3.12.2
[options.packages.find]
where = src
[options.entry_points]
opentelemetry_instrumentor =
grpc_client = opentelemetry.instrumentation.grpc:GrpcInstrumentorClient
grpc_server = opentelemetry.instrumentation.grpc:GrpcInstrumentorServer

View File

@ -0,0 +1,27 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import setuptools
BASE_DIR = os.path.dirname(__file__)
VERSION_FILENAME = os.path.join(
BASE_DIR, "src", "opentelemetry", "instrumentation", "grpc", "version.py"
)
PACKAGE_INFO = {}
with open(VERSION_FILENAME) as f:
exec(f.read(), PACKAGE_INFO)
setuptools.setup(version=PACKAGE_INFO["__version__"])

View File

@ -0,0 +1,248 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint:disable=no-name-in-module
# pylint:disable=relative-beyond-top-level
# pylint:disable=import-error
# pylint:disable=no-self-use
"""
Usage Client
------------
.. code-block:: python
import logging
import grpc
from opentelemetry import trace
from opentelemetry.instrumentation.grpc import GrpcInstrumentorClient, client_interceptor
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
ConsoleSpanExporter,
SimpleExportSpanProcessor,
)
from opentelemetry import metrics
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import ConsoleMetricsExporter
try:
from .gen import helloworld_pb2, helloworld_pb2_grpc
except ImportError:
from gen import helloworld_pb2, helloworld_pb2_grpc
trace.set_tracer_provider(TracerProvider())
trace.get_tracer_provider().add_span_processor(
SimpleExportSpanProcessor(ConsoleSpanExporter())
)
# Set meter provider to opentelemetry-sdk's MeterProvider
metrics.set_meter_provider(MeterProvider())
# Optional - export GRPC specific metrics (latency, bytes in/out, errors) by passing an exporter
instrumentor = GrpcInstrumentorClient(exporter=ConsoleMetricsExporter(), interval=10)
instrumentor.instrument()
def run():
with grpc.insecure_channel("localhost:50051") as channel:
stub = helloworld_pb2_grpc.GreeterStub(channel)
response = stub.SayHello(helloworld_pb2.HelloRequest(name="YOU"))
print("Greeter client received: " + response.message)
if __name__ == "__main__":
logging.basicConfig()
run()
Usage Server
------------
.. code-block:: python
import logging
from concurrent import futures
import grpc
from opentelemetry import trace
from opentelemetry.instrumentation.grpc import GrpcInstrumentorServer
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
ConsoleSpanExporter,
SimpleExportSpanProcessor,
)
try:
from .gen import helloworld_pb2, helloworld_pb2_grpc
except ImportError:
from gen import helloworld_pb2, helloworld_pb2_grpc
trace.set_tracer_provider(TracerProvider())
trace.get_tracer_provider().add_span_processor(
SimpleExportSpanProcessor(ConsoleSpanExporter())
)
grpc_server_instrumentor = GrpcInstrumentorServer()
grpc_server_instrumentor.instrument()
class Greeter(helloworld_pb2_grpc.GreeterServicer):
def SayHello(self, request, context):
return helloworld_pb2.HelloReply(message="Hello, %s!" % request.name)
def serve():
server = grpc.server(futures.ThreadPoolExecutor())
helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
server.add_insecure_port("[::]:50051")
server.start()
server.wait_for_termination()
if __name__ == "__main__":
logging.basicConfig()
serve()
You can also add the instrumentor manually, rather than using
:py:class:`~opentelemetry.instrumentation.grpc.GrpcInstrumentorServer`:
.. code-block:: python
from opentelemetry.instrumentation.grpc import server_interceptor
server = grpc.server(futures.ThreadPoolExecutor(),
interceptors = [server_interceptor()])
"""
from functools import partial
import grpc
from wrapt import wrap_function_wrapper as _wrap
from opentelemetry import trace
from opentelemetry.instrumentation.grpc.grpcext import intercept_channel
from opentelemetry.instrumentation.grpc.version import __version__
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import unwrap
# pylint:disable=import-outside-toplevel
# pylint:disable=import-self
# pylint:disable=unused-argument
# isort:skip
class GrpcInstrumentorServer(BaseInstrumentor):
"""
Globally instrument the grpc server.
Usage::
grpc_server_instrumentor = GrpcInstrumentorServer()
grpc_server_instrumentor.instrument()
"""
# pylint:disable=attribute-defined-outside-init
def _instrument(self, **kwargs):
self._original_func = grpc.server
def server(*args, **kwargs):
if "interceptors" in kwargs:
# add our interceptor as the first
kwargs["interceptors"].insert(0, server_interceptor())
else:
kwargs["interceptors"] = [server_interceptor()]
return self._original_func(*args, **kwargs)
grpc.server = server
def _uninstrument(self, **kwargs):
grpc.server = self._original_func
class GrpcInstrumentorClient(BaseInstrumentor):
def _instrument(self, **kwargs):
exporter = kwargs.get("exporter", None)
interval = kwargs.get("interval", 30)
if kwargs.get("channel_type") == "secure":
_wrap(
"grpc",
"secure_channel",
partial(self.wrapper_fn, exporter, interval),
)
else:
_wrap(
"grpc",
"insecure_channel",
partial(self.wrapper_fn, exporter, interval),
)
def _uninstrument(self, **kwargs):
if kwargs.get("channel_type") == "secure":
unwrap(grpc, "secure_channel")
else:
unwrap(grpc, "insecure_channel")
def wrapper_fn(
self, exporter, interval, original_func, instance, args, kwargs
):
channel = original_func(*args, **kwargs)
tracer_provider = kwargs.get("tracer_provider")
return intercept_channel(
channel,
client_interceptor(
tracer_provider=tracer_provider,
exporter=exporter,
interval=interval,
),
)
def client_interceptor(tracer_provider=None, exporter=None, interval=30):
"""Create a gRPC client channel interceptor.
Args:
tracer: The tracer to use to create client-side spans.
exporter: The exporter that will receive client metrics
interval: Time between every export call
Returns:
An invocation-side interceptor object.
"""
from . import _client
tracer = trace.get_tracer(__name__, __version__, tracer_provider)
return _client.OpenTelemetryClientInterceptor(tracer, exporter, interval)
def server_interceptor(tracer_provider=None):
"""Create a gRPC server interceptor.
Args:
tracer: The tracer to use to create server-side spans.
Returns:
A service-side interceptor object.
"""
from . import _server
tracer = trace.get_tracer(__name__, __version__, tracer_provider)
return _server.OpenTelemetryServerInterceptor(tracer)

View File

@ -0,0 +1,273 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint:disable=relative-beyond-top-level
# pylint:disable=arguments-differ
# pylint:disable=no-member
# pylint:disable=signature-differs
"""Implementation of the invocation-side open-telemetry interceptor."""
from collections import OrderedDict
from typing import MutableMapping
import grpc
from opentelemetry import metrics, propagators, trace
from opentelemetry.sdk.metrics.export.controller import PushController
from opentelemetry.trace.status import Status, StatusCode
from . import grpcext
from ._utilities import RpcInfo, TimedMetricRecorder
class _GuardedSpan:
def __init__(self, span):
self.span = span
self.generated_span = None
self._engaged = True
def __enter__(self):
self.generated_span = self.span.__enter__()
return self
def __exit__(self, *args, **kwargs):
if self._engaged:
self.generated_span = None
return self.span.__exit__(*args, **kwargs)
return False
def release(self):
self._engaged = False
return self.span
def _inject_span_context(metadata: MutableMapping[str, str]) -> None:
# pylint:disable=unused-argument
def append_metadata(
carrier: MutableMapping[str, str], key: str, value: str
):
metadata[key] = value
# Inject current active span from the context
propagators.inject(append_metadata, metadata)
def _make_future_done_callback(span, rpc_info, client_info, metrics_recorder):
def callback(response_future):
with span:
code = response_future.code()
if code != grpc.StatusCode.OK:
rpc_info.error = code
return
response = response_future.result()
rpc_info.response = response
if "ByteSize" in dir(response):
metrics_recorder.record_bytes_in(
response.ByteSize(), client_info.full_method
)
return callback
class OpenTelemetryClientInterceptor(
grpcext.UnaryClientInterceptor, grpcext.StreamClientInterceptor
):
def __init__(self, tracer, exporter, interval):
self._tracer = tracer
self._meter = None
if exporter and interval:
self._meter = metrics.get_meter(__name__)
self.controller = PushController(
meter=self._meter, exporter=exporter, interval=interval
)
self._metrics_recorder = TimedMetricRecorder(self._meter, "client")
def _start_span(self, method):
return self._tracer.start_as_current_span(
name=method, kind=trace.SpanKind.CLIENT
)
# pylint:disable=no-self-use
def _trace_result(self, guarded_span, rpc_info, result, client_info):
# If the RPC is called asynchronously, release the guard and add a
# callback so that the span can be finished once the future is done.
if isinstance(result, grpc.Future):
result.add_done_callback(
_make_future_done_callback(
guarded_span.release(),
rpc_info,
client_info,
self._metrics_recorder,
)
)
return result
response = result
# Handle the case when the RPC is initiated via the with_call
# method and the result is a tuple with the first element as the
# response.
# http://www.grpc.io/grpc/python/grpc.html#grpc.UnaryUnaryMultiCallable.with_call
if isinstance(result, tuple):
response = result[0]
rpc_info.response = response
if "ByteSize" in dir(response):
self._metrics_recorder.record_bytes_in(
response.ByteSize(), client_info.full_method
)
return result
def _start_guarded_span(self, *args, **kwargs):
return _GuardedSpan(self._start_span(*args, **kwargs))
def _bytes_out_iterator_wrapper(self, iterator, client_info):
for request in iterator:
if "ByteSize" in dir(request):
self._metrics_recorder.record_bytes_out(
request.ByteSize(), client_info.full_method
)
yield request
def intercept_unary(self, request, metadata, client_info, invoker):
if not metadata:
mutable_metadata = OrderedDict()
else:
mutable_metadata = OrderedDict(metadata)
with self._start_guarded_span(client_info.full_method) as guarded_span:
with self._metrics_recorder.record_latency(
client_info.full_method
):
_inject_span_context(mutable_metadata)
metadata = tuple(mutable_metadata.items())
# If protobuf is used, we can record the bytes in/out. Otherwise, we have no way
# to get the size of the request/response properly, so don't record anything
if "ByteSize" in dir(request):
self._metrics_recorder.record_bytes_out(
request.ByteSize(), client_info.full_method
)
rpc_info = RpcInfo(
full_method=client_info.full_method,
metadata=metadata,
timeout=client_info.timeout,
request=request,
)
try:
result = invoker(request, metadata)
except grpc.RpcError:
guarded_span.generated_span.set_status(
Status(StatusCode.ERROR)
)
raise
return self._trace_result(
guarded_span, rpc_info, result, client_info
)
# For RPCs that stream responses, the result can be a generator. To record
# the span across the generated responses and detect any errors, we wrap
# the result in a new generator that yields the response values.
def _intercept_server_stream(
self, request_or_iterator, metadata, client_info, invoker
):
if not metadata:
mutable_metadata = OrderedDict()
else:
mutable_metadata = OrderedDict(metadata)
with self._start_span(client_info.full_method) as span:
with self._metrics_recorder.record_latency(
client_info.full_method
):
_inject_span_context(mutable_metadata)
metadata = tuple(mutable_metadata.items())
rpc_info = RpcInfo(
full_method=client_info.full_method,
metadata=metadata,
timeout=client_info.timeout,
)
if client_info.is_client_stream:
rpc_info.request = request_or_iterator
request_or_iterator = self._bytes_out_iterator_wrapper(
request_or_iterator, client_info
)
else:
if "ByteSize" in dir(request_or_iterator):
self._metrics_recorder.record_bytes_out(
request_or_iterator.ByteSize(),
client_info.full_method,
)
try:
result = invoker(request_or_iterator, metadata)
# Rewrap the result stream into a generator, and record the bytes received
for response in result:
if "ByteSize" in dir(response):
self._metrics_recorder.record_bytes_in(
response.ByteSize(), client_info.full_method
)
yield response
except grpc.RpcError:
span.set_status(Status(StatusCode.ERROR))
raise
def intercept_stream(
self, request_or_iterator, metadata, client_info, invoker
):
if client_info.is_server_stream:
return self._intercept_server_stream(
request_or_iterator, metadata, client_info, invoker
)
if not metadata:
mutable_metadata = OrderedDict()
else:
mutable_metadata = OrderedDict(metadata)
with self._start_guarded_span(client_info.full_method) as guarded_span:
with self._metrics_recorder.record_latency(
client_info.full_method
):
_inject_span_context(mutable_metadata)
metadata = tuple(mutable_metadata.items())
rpc_info = RpcInfo(
full_method=client_info.full_method,
metadata=metadata,
timeout=client_info.timeout,
request=request_or_iterator,
)
rpc_info.request = request_or_iterator
request_or_iterator = self._bytes_out_iterator_wrapper(
request_or_iterator, client_info
)
try:
result = invoker(request_or_iterator, metadata)
except grpc.RpcError:
guarded_span.generated_span.set_status(
Status(StatusCode.ERROR)
)
raise
return self._trace_result(
guarded_span, rpc_info, result, client_info
)

View File

@ -0,0 +1,246 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint:disable=relative-beyond-top-level
# pylint:disable=arguments-differ
# pylint:disable=no-member
# pylint:disable=signature-differs
"""
Implementation of the service-side open-telemetry interceptor.
"""
import logging
from contextlib import contextmanager
import grpc
from opentelemetry import propagators, trace
from opentelemetry.context import attach, detach
from opentelemetry.trace.propagation.textmap import DictGetter
from opentelemetry.trace.status import Status, StatusCode
logger = logging.getLogger(__name__)
# wrap an RPC call
# see https://github.com/grpc/grpc/issues/18191
def _wrap_rpc_behavior(handler, continuation):
if handler is None:
return None
if handler.request_streaming and handler.response_streaming:
behavior_fn = handler.stream_stream
handler_factory = grpc.stream_stream_rpc_method_handler
elif handler.request_streaming and not handler.response_streaming:
behavior_fn = handler.stream_unary
handler_factory = grpc.stream_unary_rpc_method_handler
elif not handler.request_streaming and handler.response_streaming:
behavior_fn = handler.unary_stream
handler_factory = grpc.unary_stream_rpc_method_handler
else:
behavior_fn = handler.unary_unary
handler_factory = grpc.unary_unary_rpc_method_handler
return handler_factory(
continuation(
behavior_fn, handler.request_streaming, handler.response_streaming
),
request_deserializer=handler.request_deserializer,
response_serializer=handler.response_serializer,
)
# pylint:disable=abstract-method
class _OpenTelemetryServicerContext(grpc.ServicerContext):
def __init__(self, servicer_context, active_span):
self._servicer_context = servicer_context
self._active_span = active_span
self.code = grpc.StatusCode.OK
self.details = None
super().__init__()
def is_active(self, *args, **kwargs):
return self._servicer_context.is_active(*args, **kwargs)
def time_remaining(self, *args, **kwargs):
return self._servicer_context.time_remaining(*args, **kwargs)
def cancel(self, *args, **kwargs):
return self._servicer_context.cancel(*args, **kwargs)
def add_callback(self, *args, **kwargs):
return self._servicer_context.add_callback(*args, **kwargs)
def disable_next_message_compression(self):
return self._service_context.disable_next_message_compression()
def invocation_metadata(self, *args, **kwargs):
return self._servicer_context.invocation_metadata(*args, **kwargs)
def peer(self):
return self._servicer_context.peer()
def peer_identities(self):
return self._servicer_context.peer_identities()
def peer_identity_key(self):
return self._servicer_context.peer_identity_key()
def auth_context(self):
return self._servicer_context.auth_context()
def set_compression(self, compression):
return self._servicer_context.set_compression(compression)
def send_initial_metadata(self, *args, **kwargs):
return self._servicer_context.send_initial_metadata(*args, **kwargs)
def set_trailing_metadata(self, *args, **kwargs):
return self._servicer_context.set_trailing_metadata(*args, **kwargs)
def abort(self, code, details):
self.code = code
self.details = details
self._active_span.set_attribute("rpc.grpc.status_code", code.name)
self._active_span.set_status(
Status(status_code=StatusCode.ERROR, description=details)
)
return self._servicer_context.abort(code, details)
def abort_with_status(self, status):
return self._servicer_context.abort_with_status(status)
def set_code(self, code):
self.code = code
# use details if we already have it, otherwise the status description
details = self.details or code.value[1]
self._active_span.set_attribute("rpc.grpc.status_code", code.name)
self._active_span.set_status(
Status(status_code=StatusCode.ERROR, description=details)
)
return self._servicer_context.set_code(code)
def set_details(self, details):
self.details = details
self._active_span.set_status(
Status(status_code=StatusCode.ERROR, description=details)
)
return self._servicer_context.set_details(details)
# pylint:disable=abstract-method
# pylint:disable=no-self-use
# pylint:disable=unused-argument
class OpenTelemetryServerInterceptor(grpc.ServerInterceptor):
"""
A gRPC server interceptor, to add OpenTelemetry.
Usage::
tracer = some OpenTelemetry tracer
interceptors = [
OpenTelemetryServerInterceptor(tracer),
]
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=concurrency),
interceptors = interceptors)
"""
def __init__(self, tracer):
self._tracer = tracer
self._carrier_getter = DictGetter()
@contextmanager
def _set_remote_context(self, servicer_context):
metadata = servicer_context.invocation_metadata()
if metadata:
md_dict = {md.key: md.value for md in metadata}
ctx = propagators.extract(self._carrier_getter, md_dict)
token = attach(ctx)
try:
yield
finally:
detach(token)
else:
yield
def _start_span(self, handler_call_details, context):
attributes = {
"rpc.method": handler_call_details.method,
"rpc.system": "grpc",
"rpc.grpc.status_code": grpc.StatusCode.OK,
}
metadata = dict(context.invocation_metadata())
if "user-agent" in metadata:
attributes["rpc.user_agent"] = metadata["user-agent"]
# Split up the peer to keep with how other telemetry sources
# do it. This looks like:
# * ipv6:[::1]:57284
# * ipv4:127.0.0.1:57284
# * ipv4:10.2.1.1:57284,127.0.0.1:57284
#
try:
host, port = (
context.peer().split(",")[0].split(":", 1)[1].rsplit(":", 1)
)
# other telemetry sources convert this, so we will too
if host in ("[::1]", "127.0.0.1"):
host = "localhost"
attributes.update({"net.peer.name": host, "net.peer.port": port})
except IndexError:
logger.warning("Failed to parse peer address '%s'", context.peer())
return self._tracer.start_as_current_span(
name=handler_call_details.method,
kind=trace.SpanKind.SERVER,
attributes=attributes,
)
def intercept_service(self, continuation, handler_call_details):
def telemetry_wrapper(behavior, request_streaming, response_streaming):
def telemetry_interceptor(request_or_iterator, context):
with self._set_remote_context(context):
with self._start_span(
handler_call_details, context
) as span:
# wrap the context
context = _OpenTelemetryServicerContext(context, span)
# And now we run the actual RPC.
try:
return behavior(request_or_iterator, context)
except Exception as error:
# Bare exceptions are likely to be gRPC aborts, which
# we handle in our context wrapper.
# Here, we're interested in uncaught exceptions.
# pylint:disable=unidiomatic-typecheck
if type(error) != Exception:
span.record_exception(error)
raise error
return telemetry_interceptor
return _wrap_rpc_behavior(
continuation(handler_call_details), telemetry_wrapper
)

View File

@ -0,0 +1,101 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Internal utilities."""
from contextlib import contextmanager
from time import time
import grpc
class RpcInfo:
def __init__(
self,
full_method=None,
metadata=None,
timeout=None,
request=None,
response=None,
error=None,
):
self.full_method = full_method
self.metadata = metadata
self.timeout = timeout
self.request = request
self.response = response
self.error = error
class TimedMetricRecorder:
def __init__(self, meter, span_kind):
self._meter = meter
service_name = "grpcio"
self._span_kind = span_kind
if self._meter:
self._duration = self._meter.create_valuerecorder(
name="{}/{}/duration".format(service_name, span_kind),
description="Duration of grpc requests to the server",
unit="ms",
value_type=float,
)
self._error_count = self._meter.create_counter(
name="{}/{}/errors".format(service_name, span_kind),
description="Number of errors that were returned from the server",
unit="1",
value_type=int,
)
self._bytes_in = self._meter.create_counter(
name="{}/{}/bytes_in".format(service_name, span_kind),
description="Number of bytes received from the server",
unit="by",
value_type=int,
)
self._bytes_out = self._meter.create_counter(
name="{}/{}/bytes_out".format(service_name, span_kind),
description="Number of bytes sent out through gRPC",
unit="by",
value_type=int,
)
def record_bytes_in(self, bytes_in, method):
if self._meter:
labels = {"method": method}
self._bytes_in.add(bytes_in, labels)
def record_bytes_out(self, bytes_out, method):
if self._meter:
labels = {"method": method}
self._bytes_out.add(bytes_out, labels)
@contextmanager
def record_latency(self, method):
start_time = time()
labels = {"method": method, "status_code": grpc.StatusCode.OK}
try:
yield labels
except grpc.RpcError as exc:
if self._meter:
# pylint: disable=no-member
labels["status_code"] = exc.code()
self._error_count.add(1, labels)
labels["error"] = True
raise
finally:
if self._meter:
if "error" not in labels:
labels["error"] = False
elapsed_time = (time() - start_time) * 1000
self._duration.record(elapsed_time, labels)

View File

@ -0,0 +1,125 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint:disable=import-outside-toplevel
# pylint:disable=import-self
# pylint:disable=no-name-in-module
import abc
class UnaryClientInfo(abc.ABC):
"""Consists of various information about a unary RPC on the
invocation-side.
Attributes:
full_method: A string of the full RPC method, i.e.,
/package.service/method.
timeout: The length of time in seconds to wait for the computation to
terminate or be cancelled, or None if this method should block until
the computation is terminated or is cancelled no matter how long that
takes.
"""
class StreamClientInfo(abc.ABC):
"""Consists of various information about a stream RPC on the
invocation-side.
Attributes:
full_method: A string of the full RPC method, i.e.,
/package.service/method.
is_client_stream: Indicates whether the RPC is client-streaming.
is_server_stream: Indicates whether the RPC is server-streaming.
timeout: The length of time in seconds to wait for the computation to
terminate or be cancelled, or None if this method should block until
the computation is terminated or is cancelled no matter how long that
takes.
"""
class UnaryClientInterceptor(abc.ABC):
"""Affords intercepting unary-unary RPCs on the invocation-side."""
@abc.abstractmethod
def intercept_unary(self, request, metadata, client_info, invoker):
"""Intercepts unary-unary RPCs on the invocation-side.
Args:
request: The request value for the RPC.
metadata: Optional :term:`metadata` to be transmitted to the
service-side of the RPC.
client_info: A UnaryClientInfo containing various information about
the RPC.
invoker: The handler to complete the RPC on the client. It is the
interceptor's responsibility to call it.
Returns:
The result from calling invoker(request, metadata).
"""
raise NotImplementedError()
class StreamClientInterceptor(abc.ABC):
"""Affords intercepting stream RPCs on the invocation-side."""
@abc.abstractmethod
def intercept_stream(
self, request_or_iterator, metadata, client_info, invoker
):
"""Intercepts stream RPCs on the invocation-side.
Args:
request_or_iterator: The request value for the RPC if
`client_info.is_client_stream` is `false`; otherwise, an iterator of
request values.
metadata: Optional :term:`metadata` to be transmitted to the service-side
of the RPC.
client_info: A StreamClientInfo containing various information about
the RPC.
invoker: The handler to complete the RPC on the client. It is the
interceptor's responsibility to call it.
Returns:
The result from calling invoker(metadata).
"""
raise NotImplementedError()
def intercept_channel(channel, *interceptors):
"""Creates an intercepted channel.
Args:
channel: A Channel.
interceptors: Zero or more UnaryClientInterceptors or
StreamClientInterceptors
Returns:
A Channel.
Raises:
TypeError: If an interceptor derives from neither UnaryClientInterceptor
nor StreamClientInterceptor.
"""
from . import _interceptor
return _interceptor.intercept_channel(channel, *interceptors)
__all__ = (
"UnaryClientInterceptor",
"StreamClientInfo",
"StreamClientInterceptor",
"intercept_channel",
)

View File

@ -0,0 +1,254 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint:disable=relative-beyond-top-level
# pylint:disable=arguments-differ
# pylint:disable=no-member
# pylint:disable=signature-differs
"""Implementation of gRPC Python interceptors."""
import collections
import grpc
from .. import grpcext
class _UnaryClientInfo(
collections.namedtuple("_UnaryClientInfo", ("full_method", "timeout"))
):
pass
class _StreamClientInfo(
collections.namedtuple(
"_StreamClientInfo",
("full_method", "is_client_stream", "is_server_stream", "timeout"),
)
):
pass
class _InterceptorUnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
def __init__(self, method, base_callable, interceptor):
self._method = method
self._base_callable = base_callable
self._interceptor = interceptor
def __call__(self, request, timeout=None, metadata=None, credentials=None):
def invoker(request, metadata):
return self._base_callable(request, timeout, metadata, credentials)
client_info = _UnaryClientInfo(self._method, timeout)
return self._interceptor.intercept_unary(
request, metadata, client_info, invoker
)
def with_call(
self, request, timeout=None, metadata=None, credentials=None
):
def invoker(request, metadata):
return self._base_callable.with_call(
request, timeout, metadata, credentials
)
client_info = _UnaryClientInfo(self._method, timeout)
return self._interceptor.intercept_unary(
request, metadata, client_info, invoker
)
def future(self, request, timeout=None, metadata=None, credentials=None):
def invoker(request, metadata):
return self._base_callable.future(
request, timeout, metadata, credentials
)
client_info = _UnaryClientInfo(self._method, timeout)
return self._interceptor.intercept_unary(
request, metadata, client_info, invoker
)
class _InterceptorUnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
def __init__(self, method, base_callable, interceptor):
self._method = method
self._base_callable = base_callable
self._interceptor = interceptor
def __call__(self, request, timeout=None, metadata=None, credentials=None):
def invoker(request, metadata):
return self._base_callable(request, timeout, metadata, credentials)
client_info = _StreamClientInfo(self._method, False, True, timeout)
return self._interceptor.intercept_stream(
request, metadata, client_info, invoker
)
class _InterceptorStreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
def __init__(self, method, base_callable, interceptor):
self._method = method
self._base_callable = base_callable
self._interceptor = interceptor
def __call__(
self, request_iterator, timeout=None, metadata=None, credentials=None
):
def invoker(request_iterator, metadata):
return self._base_callable(
request_iterator, timeout, metadata, credentials
)
client_info = _StreamClientInfo(self._method, True, False, timeout)
return self._interceptor.intercept_stream(
request_iterator, metadata, client_info, invoker
)
def with_call(
self, request_iterator, timeout=None, metadata=None, credentials=None
):
def invoker(request_iterator, metadata):
return self._base_callable.with_call(
request_iterator, timeout, metadata, credentials
)
client_info = _StreamClientInfo(self._method, True, False, timeout)
return self._interceptor.intercept_stream(
request_iterator, metadata, client_info, invoker
)
def future(
self, request_iterator, timeout=None, metadata=None, credentials=None
):
def invoker(request_iterator, metadata):
return self._base_callable.future(
request_iterator, timeout, metadata, credentials
)
client_info = _StreamClientInfo(self._method, True, False, timeout)
return self._interceptor.intercept_stream(
request_iterator, metadata, client_info, invoker
)
class _InterceptorStreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
def __init__(self, method, base_callable, interceptor):
self._method = method
self._base_callable = base_callable
self._interceptor = interceptor
def __call__(
self, request_iterator, timeout=None, metadata=None, credentials=None
):
def invoker(request_iterator, metadata):
return self._base_callable(
request_iterator, timeout, metadata, credentials
)
client_info = _StreamClientInfo(self._method, True, True, timeout)
return self._interceptor.intercept_stream(
request_iterator, metadata, client_info, invoker
)
class _InterceptorChannel(grpc.Channel):
def __init__(self, channel, interceptor):
self._channel = channel
self._interceptor = interceptor
def subscribe(self, *args, **kwargs):
self._channel.subscribe(*args, **kwargs)
def unsubscribe(self, *args, **kwargs):
self._channel.unsubscribe(*args, **kwargs)
def unary_unary(
self, method, request_serializer=None, response_deserializer=None
):
base_callable = self._channel.unary_unary(
method, request_serializer, response_deserializer
)
if isinstance(self._interceptor, grpcext.UnaryClientInterceptor):
return _InterceptorUnaryUnaryMultiCallable(
method, base_callable, self._interceptor
)
return base_callable
def unary_stream(
self, method, request_serializer=None, response_deserializer=None
):
base_callable = self._channel.unary_stream(
method, request_serializer, response_deserializer
)
if isinstance(self._interceptor, grpcext.StreamClientInterceptor):
return _InterceptorUnaryStreamMultiCallable(
method, base_callable, self._interceptor
)
return base_callable
def stream_unary(
self, method, request_serializer=None, response_deserializer=None
):
base_callable = self._channel.stream_unary(
method, request_serializer, response_deserializer
)
if isinstance(self._interceptor, grpcext.StreamClientInterceptor):
return _InterceptorStreamUnaryMultiCallable(
method, base_callable, self._interceptor
)
return base_callable
def stream_stream(
self, method, request_serializer=None, response_deserializer=None
):
base_callable = self._channel.stream_stream(
method, request_serializer, response_deserializer
)
if isinstance(self._interceptor, grpcext.StreamClientInterceptor):
return _InterceptorStreamStreamMultiCallable(
method, base_callable, self._interceptor
)
return base_callable
def close(self):
if not hasattr(self._channel, "close"):
raise RuntimeError(
"close() is not supported with the installed version of grpcio"
)
self._channel.close()
def __enter__(self):
"""Enters the runtime context related to the channel object."""
raise NotImplementedError()
def __exit__(self, exc_type, exc_val, exc_tb):
"""Exits the runtime context related to the channel object."""
raise NotImplementedError()
def intercept_channel(channel, *interceptors):
result = channel
for interceptor in interceptors:
if not isinstance(
interceptor, grpcext.UnaryClientInterceptor
) and not isinstance(interceptor, grpcext.StreamClientInterceptor):
raise TypeError(
"interceptor must be either a "
"grpcext.UnaryClientInterceptor or a "
"grpcext.StreamClientInterceptor"
)
result = _InterceptorChannel(result, interceptor)
return result

View File

@ -0,0 +1,15 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
__version__ = "0.15b0"

View File

@ -0,0 +1,13 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

@ -0,0 +1,57 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .protobuf.test_server_pb2 import Request
CLIENT_ID = 1
def simple_method(stub, error=False):
request = Request(
client_id=CLIENT_ID, request_data="error" if error else "data"
)
stub.SimpleMethod(request)
def client_streaming_method(stub, error=False):
# create a generator
def request_messages():
for _ in range(5):
request = Request(
client_id=CLIENT_ID, request_data="error" if error else "data"
)
yield request
stub.ClientStreamingMethod(request_messages())
def server_streaming_method(stub, error=False):
request = Request(
client_id=CLIENT_ID, request_data="error" if error else "data"
)
response_iterator = stub.ServerStreamingMethod(request)
list(response_iterator)
def bidirectional_streaming_method(stub, error=False):
def request_messages():
for _ in range(5):
request = Request(
client_id=CLIENT_ID, request_data="error" if error else "data"
)
yield request
response_iterator = stub.BidirectionalStreamingMethod(request_messages())
list(response_iterator)

View File

@ -0,0 +1,87 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from concurrent import futures
import grpc
from .protobuf import test_server_pb2, test_server_pb2_grpc
SERVER_ID = 1
class TestServer(test_server_pb2_grpc.GRPCTestServerServicer):
def SimpleMethod(self, request, context):
if request.request_data == "error":
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
return test_server_pb2.Response()
response = test_server_pb2.Response(
server_id=SERVER_ID, response_data="data"
)
return response
def ClientStreamingMethod(self, request_iterator, context):
data = list(request_iterator)
if data[0].request_data == "error":
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
return test_server_pb2.Response()
response = test_server_pb2.Response(
server_id=SERVER_ID, response_data="data"
)
return response
def ServerStreamingMethod(self, request, context):
if request.request_data == "error":
context.abort(
code=grpc.StatusCode.INVALID_ARGUMENT,
details="server stream error",
)
return test_server_pb2.Response()
# create a generator
def response_messages():
for _ in range(5):
response = test_server_pb2.Response(
server_id=SERVER_ID, response_data="data"
)
yield response
return response_messages()
def BidirectionalStreamingMethod(self, request_iterator, context):
data = list(request_iterator)
if data[0].request_data == "error":
context.abort(
code=grpc.StatusCode.INVALID_ARGUMENT,
details="bidirectional error",
)
return
for _ in range(5):
yield test_server_pb2.Response(
server_id=SERVER_ID, response_data="data"
)
def create_test_server(port):
server = grpc.server(futures.ThreadPoolExecutor(max_workers=1))
test_server_pb2_grpc.add_GRPCTestServerServicer_to_server(
TestServer(), server
)
server.add_insecure_port("localhost:{}".format(port))
return server

View File

@ -0,0 +1,34 @@
// Copyright 2019 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
message Request {
int64 client_id = 1;
string request_data = 2;
}
message Response {
int64 server_id = 1;
string response_data = 2;
}
service GRPCTestServer {
rpc SimpleMethod (Request) returns (Response);
rpc ClientStreamingMethod (stream Request) returns (Response);
rpc ServerStreamingMethod (Request) returns (stream Response);
rpc BidirectionalStreamingMethod (stream Request) returns (stream Response);
}

View File

@ -0,0 +1,215 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: test_server.proto
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor.FileDescriptor(
name="test_server.proto",
package="",
syntax="proto3",
serialized_options=None,
serialized_pb=b'\n\x11test_server.proto"2\n\x07Request\x12\x11\n\tclient_id\x18\x01 \x01(\x03\x12\x14\n\x0crequest_data\x18\x02 \x01(\t"4\n\x08Response\x12\x11\n\tserver_id\x18\x01 \x01(\x03\x12\x15\n\rresponse_data\x18\x02 \x01(\t2\xce\x01\n\x0eGRPCTestServer\x12#\n\x0cSimpleMethod\x12\x08.Request\x1a\t.Response\x12.\n\x15\x43lientStreamingMethod\x12\x08.Request\x1a\t.Response(\x01\x12.\n\x15ServerStreamingMethod\x12\x08.Request\x1a\t.Response0\x01\x12\x37\n\x1c\x42idirectionalStreamingMethod\x12\x08.Request\x1a\t.Response(\x01\x30\x01\x62\x06proto3',
)
_REQUEST = _descriptor.Descriptor(
name="Request",
full_name="Request",
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name="client_id",
full_name="Request.client_id",
index=0,
number=1,
type=3,
cpp_type=2,
label=1,
has_default_value=False,
default_value=0,
message_type=None,
enum_type=None,
containing_type=None,
is_extension=False,
extension_scope=None,
serialized_options=None,
file=DESCRIPTOR,
),
_descriptor.FieldDescriptor(
name="request_data",
full_name="Request.request_data",
index=1,
number=2,
type=9,
cpp_type=9,
label=1,
has_default_value=False,
default_value=b"".decode("utf-8"),
message_type=None,
enum_type=None,
containing_type=None,
is_extension=False,
extension_scope=None,
serialized_options=None,
file=DESCRIPTOR,
),
],
extensions=[],
nested_types=[],
enum_types=[],
serialized_options=None,
is_extendable=False,
syntax="proto3",
extension_ranges=[],
oneofs=[],
serialized_start=21,
serialized_end=71,
)
_RESPONSE = _descriptor.Descriptor(
name="Response",
full_name="Response",
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name="server_id",
full_name="Response.server_id",
index=0,
number=1,
type=3,
cpp_type=2,
label=1,
has_default_value=False,
default_value=0,
message_type=None,
enum_type=None,
containing_type=None,
is_extension=False,
extension_scope=None,
serialized_options=None,
file=DESCRIPTOR,
),
_descriptor.FieldDescriptor(
name="response_data",
full_name="Response.response_data",
index=1,
number=2,
type=9,
cpp_type=9,
label=1,
has_default_value=False,
default_value=b"".decode("utf-8"),
message_type=None,
enum_type=None,
containing_type=None,
is_extension=False,
extension_scope=None,
serialized_options=None,
file=DESCRIPTOR,
),
],
extensions=[],
nested_types=[],
enum_types=[],
serialized_options=None,
is_extendable=False,
syntax="proto3",
extension_ranges=[],
oneofs=[],
serialized_start=73,
serialized_end=125,
)
DESCRIPTOR.message_types_by_name["Request"] = _REQUEST
DESCRIPTOR.message_types_by_name["Response"] = _RESPONSE
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
Request = _reflection.GeneratedProtocolMessageType(
"Request",
(_message.Message,),
{
"DESCRIPTOR": _REQUEST,
"__module__": "test_server_pb2"
# @@protoc_insertion_point(class_scope:Request)
},
)
_sym_db.RegisterMessage(Request)
Response = _reflection.GeneratedProtocolMessageType(
"Response",
(_message.Message,),
{
"DESCRIPTOR": _RESPONSE,
"__module__": "test_server_pb2"
# @@protoc_insertion_point(class_scope:Response)
},
)
_sym_db.RegisterMessage(Response)
_GRPCTESTSERVER = _descriptor.ServiceDescriptor(
name="GRPCTestServer",
full_name="GRPCTestServer",
file=DESCRIPTOR,
index=0,
serialized_options=None,
serialized_start=128,
serialized_end=334,
methods=[
_descriptor.MethodDescriptor(
name="SimpleMethod",
full_name="GRPCTestServer.SimpleMethod",
index=0,
containing_service=None,
input_type=_REQUEST,
output_type=_RESPONSE,
serialized_options=None,
),
_descriptor.MethodDescriptor(
name="ClientStreamingMethod",
full_name="GRPCTestServer.ClientStreamingMethod",
index=1,
containing_service=None,
input_type=_REQUEST,
output_type=_RESPONSE,
serialized_options=None,
),
_descriptor.MethodDescriptor(
name="ServerStreamingMethod",
full_name="GRPCTestServer.ServerStreamingMethod",
index=2,
containing_service=None,
input_type=_REQUEST,
output_type=_RESPONSE,
serialized_options=None,
),
_descriptor.MethodDescriptor(
name="BidirectionalStreamingMethod",
full_name="GRPCTestServer.BidirectionalStreamingMethod",
index=3,
containing_service=None,
input_type=_REQUEST,
output_type=_RESPONSE,
serialized_options=None,
),
],
)
_sym_db.RegisterServiceDescriptor(_GRPCTESTSERVER)
DESCRIPTOR.services_by_name["GRPCTestServer"] = _GRPCTESTSERVER
# @@protoc_insertion_point(module_scope)

View File

@ -0,0 +1,205 @@
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
import grpc
from tests.protobuf import test_server_pb2 as test__server__pb2
class GRPCTestServerStub(object):
"""Missing associated documentation comment in .proto file"""
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.SimpleMethod = channel.unary_unary(
"/GRPCTestServer/SimpleMethod",
request_serializer=test__server__pb2.Request.SerializeToString,
response_deserializer=test__server__pb2.Response.FromString,
)
self.ClientStreamingMethod = channel.stream_unary(
"/GRPCTestServer/ClientStreamingMethod",
request_serializer=test__server__pb2.Request.SerializeToString,
response_deserializer=test__server__pb2.Response.FromString,
)
self.ServerStreamingMethod = channel.unary_stream(
"/GRPCTestServer/ServerStreamingMethod",
request_serializer=test__server__pb2.Request.SerializeToString,
response_deserializer=test__server__pb2.Response.FromString,
)
self.BidirectionalStreamingMethod = channel.stream_stream(
"/GRPCTestServer/BidirectionalStreamingMethod",
request_serializer=test__server__pb2.Request.SerializeToString,
response_deserializer=test__server__pb2.Response.FromString,
)
class GRPCTestServerServicer(object):
"""Missing associated documentation comment in .proto file"""
def SimpleMethod(self, request, context):
"""Missing associated documentation comment in .proto file"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details("Method not implemented!")
raise NotImplementedError("Method not implemented!")
def ClientStreamingMethod(self, request_iterator, context):
"""Missing associated documentation comment in .proto file"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details("Method not implemented!")
raise NotImplementedError("Method not implemented!")
def ServerStreamingMethod(self, request, context):
"""Missing associated documentation comment in .proto file"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details("Method not implemented!")
raise NotImplementedError("Method not implemented!")
def BidirectionalStreamingMethod(self, request_iterator, context):
"""Missing associated documentation comment in .proto file"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details("Method not implemented!")
raise NotImplementedError("Method not implemented!")
def add_GRPCTestServerServicer_to_server(servicer, server):
rpc_method_handlers = {
"SimpleMethod": grpc.unary_unary_rpc_method_handler(
servicer.SimpleMethod,
request_deserializer=test__server__pb2.Request.FromString,
response_serializer=test__server__pb2.Response.SerializeToString,
),
"ClientStreamingMethod": grpc.stream_unary_rpc_method_handler(
servicer.ClientStreamingMethod,
request_deserializer=test__server__pb2.Request.FromString,
response_serializer=test__server__pb2.Response.SerializeToString,
),
"ServerStreamingMethod": grpc.unary_stream_rpc_method_handler(
servicer.ServerStreamingMethod,
request_deserializer=test__server__pb2.Request.FromString,
response_serializer=test__server__pb2.Response.SerializeToString,
),
"BidirectionalStreamingMethod": grpc.stream_stream_rpc_method_handler(
servicer.BidirectionalStreamingMethod,
request_deserializer=test__server__pb2.Request.FromString,
response_serializer=test__server__pb2.Response.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
"GRPCTestServer", rpc_method_handlers
)
server.add_generic_rpc_handlers((generic_handler,))
# This class is part of an EXPERIMENTAL API.
class GRPCTestServer(object):
"""Missing associated documentation comment in .proto file"""
@staticmethod
def SimpleMethod(
request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None,
):
return grpc.experimental.unary_unary(
request,
target,
"/GRPCTestServer/SimpleMethod",
test__server__pb2.Request.SerializeToString,
test__server__pb2.Response.FromString,
options,
channel_credentials,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
)
@staticmethod
def ClientStreamingMethod(
request_iterator,
target,
options=(),
channel_credentials=None,
call_credentials=None,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None,
):
return grpc.experimental.stream_unary(
request_iterator,
target,
"/GRPCTestServer/ClientStreamingMethod",
test__server__pb2.Request.SerializeToString,
test__server__pb2.Response.FromString,
options,
channel_credentials,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
)
@staticmethod
def ServerStreamingMethod(
request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None,
):
return grpc.experimental.unary_stream(
request,
target,
"/GRPCTestServer/ServerStreamingMethod",
test__server__pb2.Request.SerializeToString,
test__server__pb2.Response.FromString,
options,
channel_credentials,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
)
@staticmethod
def BidirectionalStreamingMethod(
request_iterator,
target,
options=(),
channel_credentials=None,
call_credentials=None,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None,
):
return grpc.experimental.stream_stream(
request_iterator,
target,
"/GRPCTestServer/BidirectionalStreamingMethod",
test__server__pb2.Request.SerializeToString,
test__server__pb2.Response.FromString,
options,
channel_credentials,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
)

View File

@ -0,0 +1,295 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc
import opentelemetry.instrumentation.grpc
from opentelemetry import trace
from opentelemetry.instrumentation.grpc import GrpcInstrumentorClient
from opentelemetry.sdk.metrics.export.aggregate import (
MinMaxSumCountAggregator,
SumAggregator,
)
from opentelemetry.test.test_base import TestBase
from tests.protobuf import test_server_pb2_grpc
from ._client import (
bidirectional_streaming_method,
client_streaming_method,
server_streaming_method,
simple_method,
)
from ._server import create_test_server
class TestClientProto(TestBase):
def setUp(self):
super().setUp()
GrpcInstrumentorClient().instrument(
exporter=self.memory_metrics_exporter
)
self.server = create_test_server(25565)
self.server.start()
self.channel = grpc.insecure_channel("localhost:25565")
self._stub = test_server_pb2_grpc.GRPCTestServerStub(self.channel)
def tearDown(self):
super().tearDown()
GrpcInstrumentorClient().uninstrument()
self.memory_metrics_exporter.clear()
self.server.stop(None)
self.channel.close()
def _verify_success_records(self, num_bytes_out, num_bytes_in, method):
# pylint: disable=protected-access,no-member
self.channel._interceptor.controller.tick()
records = self.memory_metrics_exporter.get_exported_metrics()
self.assertEqual(len(records), 3)
bytes_out = None
bytes_in = None
duration = None
for record in records:
if record.instrument.name == "grpcio/client/duration":
duration = record
elif record.instrument.name == "grpcio/client/bytes_out":
bytes_out = record
elif record.instrument.name == "grpcio/client/bytes_in":
bytes_in = record
self.assertIsNotNone(bytes_out)
self.assertEqual(bytes_out.instrument.name, "grpcio/client/bytes_out")
self.assertEqual(bytes_out.labels, (("method", method),))
self.assertIsNotNone(bytes_in)
self.assertEqual(bytes_in.instrument.name, "grpcio/client/bytes_in")
self.assertEqual(bytes_in.labels, (("method", method),))
self.assertIsNotNone(duration)
self.assertEqual(duration.instrument.name, "grpcio/client/duration")
self.assertEqual(
duration.labels,
(
("error", False),
("method", method),
("status_code", grpc.StatusCode.OK),
),
)
self.assertEqual(type(bytes_out.aggregator), SumAggregator)
self.assertEqual(type(bytes_in.aggregator), SumAggregator)
self.assertEqual(type(duration.aggregator), MinMaxSumCountAggregator)
self.assertEqual(bytes_out.aggregator.checkpoint, num_bytes_out)
self.assertEqual(bytes_in.aggregator.checkpoint, num_bytes_in)
self.assertEqual(duration.aggregator.checkpoint.count, 1)
self.assertGreaterEqual(duration.aggregator.checkpoint.sum, 0)
def test_unary_unary(self):
simple_method(self._stub)
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
self.assertEqual(span.name, "/GRPCTestServer/SimpleMethod")
self.assertIs(span.kind, trace.SpanKind.CLIENT)
# Check version and name in span's instrumentation info
self.check_span_instrumentation_info(
span, opentelemetry.instrumentation.grpc
)
self._verify_success_records(8, 8, "/GRPCTestServer/SimpleMethod")
def test_unary_stream(self):
server_streaming_method(self._stub)
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
self.assertEqual(span.name, "/GRPCTestServer/ServerStreamingMethod")
self.assertIs(span.kind, trace.SpanKind.CLIENT)
# Check version and name in span's instrumentation info
self.check_span_instrumentation_info(
span, opentelemetry.instrumentation.grpc
)
self._verify_success_records(
8, 40, "/GRPCTestServer/ServerStreamingMethod"
)
def test_stream_unary(self):
client_streaming_method(self._stub)
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
self.assertEqual(span.name, "/GRPCTestServer/ClientStreamingMethod")
self.assertIs(span.kind, trace.SpanKind.CLIENT)
# Check version and name in span's instrumentation info
self.check_span_instrumentation_info(
span, opentelemetry.instrumentation.grpc
)
self._verify_success_records(
40, 8, "/GRPCTestServer/ClientStreamingMethod"
)
def test_stream_stream(self):
bidirectional_streaming_method(self._stub)
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
self.assertEqual(
span.name, "/GRPCTestServer/BidirectionalStreamingMethod"
)
self.assertIs(span.kind, trace.SpanKind.CLIENT)
# Check version and name in span's instrumentation info
self.check_span_instrumentation_info(
span, opentelemetry.instrumentation.grpc
)
self._verify_success_records(
40, 40, "/GRPCTestServer/BidirectionalStreamingMethod"
)
def _verify_error_records(self, method):
# pylint: disable=protected-access,no-member
self.channel._interceptor.controller.tick()
records = self.memory_metrics_exporter.get_exported_metrics()
self.assertEqual(len(records), 3)
bytes_out = None
errors = None
duration = None
for record in records:
if record.instrument.name == "grpcio/client/duration":
duration = record
elif record.instrument.name == "grpcio/client/bytes_out":
bytes_out = record
elif record.instrument.name == "grpcio/client/errors":
errors = record
self.assertIsNotNone(bytes_out)
self.assertIsNotNone(errors)
self.assertIsNotNone(duration)
self.assertEqual(errors.instrument.name, "grpcio/client/errors")
self.assertEqual(
errors.labels,
(
("method", method),
("status_code", grpc.StatusCode.INVALID_ARGUMENT),
),
)
self.assertEqual(errors.aggregator.checkpoint, 1)
self.assertEqual(
duration.labels,
(
("error", True),
("method", method),
("status_code", grpc.StatusCode.INVALID_ARGUMENT),
),
)
def test_error_simple(self):
with self.assertRaises(grpc.RpcError):
simple_method(self._stub, error=True)
self._verify_error_records("/GRPCTestServer/SimpleMethod")
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
self.assertIs(
span.status.status_code, trace.status.StatusCode.ERROR,
)
def test_error_stream_unary(self):
with self.assertRaises(grpc.RpcError):
client_streaming_method(self._stub, error=True)
self._verify_error_records("/GRPCTestServer/ClientStreamingMethod")
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
self.assertIs(
span.status.status_code, trace.status.StatusCode.ERROR,
)
def test_error_unary_stream(self):
with self.assertRaises(grpc.RpcError):
server_streaming_method(self._stub, error=True)
self._verify_error_records("/GRPCTestServer/ServerStreamingMethod")
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
self.assertIs(
span.status.status_code, trace.status.StatusCode.ERROR,
)
def test_error_stream_stream(self):
with self.assertRaises(grpc.RpcError):
bidirectional_streaming_method(self._stub, error=True)
self._verify_error_records(
"/GRPCTestServer/BidirectionalStreamingMethod"
)
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
self.assertIs(
span.status.status_code, trace.status.StatusCode.ERROR,
)
class TestClientNoMetrics(TestBase):
def setUp(self):
super().setUp()
GrpcInstrumentorClient().instrument()
self.server = create_test_server(25565)
self.server.start()
self.channel = grpc.insecure_channel("localhost:25565")
self._stub = test_server_pb2_grpc.GRPCTestServerStub(self.channel)
def tearDown(self):
super().tearDown()
GrpcInstrumentorClient().uninstrument()
self.memory_metrics_exporter.clear()
self.server.stop(None)
def test_unary_unary(self):
simple_method(self._stub)
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
self.assertEqual(span.name, "/GRPCTestServer/SimpleMethod")
self.assertIs(span.kind, trace.SpanKind.CLIENT)
# Check version and name in span's instrumentation info
self.check_span_instrumentation_info(
span, opentelemetry.instrumentation.grpc
)

View File

@ -0,0 +1,295 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint:disable=unused-argument
# pylint:disable=no-self-use
import threading
from concurrent import futures
import grpc
import opentelemetry.instrumentation.grpc
from opentelemetry import trace
from opentelemetry.instrumentation.grpc import (
GrpcInstrumentorServer,
server_interceptor,
)
from opentelemetry.sdk import trace as trace_sdk
from opentelemetry.test.test_base import TestBase
class UnaryUnaryMethodHandler(grpc.RpcMethodHandler):
def __init__(self, handler):
self.request_streaming = False
self.response_streaming = False
self.request_deserializer = None
self.response_serializer = None
self.unary_unary = handler
self.unary_stream = None
self.stream_unary = None
self.stream_stream = None
class UnaryUnaryRpcHandler(grpc.GenericRpcHandler):
def __init__(self, handler):
self._unary_unary_handler = handler
def service(self, handler_call_details):
return UnaryUnaryMethodHandler(self._unary_unary_handler)
class TestOpenTelemetryServerInterceptor(TestBase):
def test_instrumentor(self):
def handler(request, context):
return b""
grpc_server_instrumentor = GrpcInstrumentorServer()
grpc_server_instrumentor.instrument()
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=1),
options=(("grpc.so_reuseport", 0),),
)
server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),))
port = server.add_insecure_port("[::]:0")
channel = grpc.insecure_channel("localhost:{:d}".format(port))
try:
server.start()
channel.unary_unary("test")(b"test")
finally:
server.stop(None)
spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)
span = spans_list[0]
self.assertEqual(span.name, "test")
self.assertIs(span.kind, trace.SpanKind.SERVER)
self.check_span_instrumentation_info(
span, opentelemetry.instrumentation.grpc
)
grpc_server_instrumentor.uninstrument()
def test_uninstrument(self):
def handler(request, context):
return b""
grpc_server_instrumentor = GrpcInstrumentorServer()
grpc_server_instrumentor.instrument()
grpc_server_instrumentor.uninstrument()
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=1),
options=(("grpc.so_reuseport", 0),),
)
server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),))
port = server.add_insecure_port("[::]:0")
channel = grpc.insecure_channel("localhost:{:d}".format(port))
try:
server.start()
channel.unary_unary("test")(b"test")
finally:
server.stop(None)
spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 0)
def test_create_span(self):
"""Check that the interceptor wraps calls with spans server-side."""
# Intercept gRPC calls...
interceptor = server_interceptor()
# No-op RPC handler
def handler(request, context):
return b""
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=1),
options=(("grpc.so_reuseport", 0),),
interceptors=[interceptor],
)
server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),))
port = server.add_insecure_port("[::]:0")
channel = grpc.insecure_channel("localhost:{:d}".format(port))
try:
server.start()
channel.unary_unary("")(b"")
finally:
server.stop(None)
spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)
span = spans_list[0]
self.assertEqual(span.name, "")
self.assertIs(span.kind, trace.SpanKind.SERVER)
# Check version and name in span's instrumentation info
self.check_span_instrumentation_info(
span, opentelemetry.instrumentation.grpc
)
def test_span_lifetime(self):
"""Check that the span is active for the duration of the call."""
interceptor = server_interceptor()
# To capture the current span at the time the handler is called
active_span_in_handler = None
def handler(request, context):
nonlocal active_span_in_handler
active_span_in_handler = trace.get_current_span()
return b""
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=1),
options=(("grpc.so_reuseport", 0),),
interceptors=[interceptor],
)
server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),))
port = server.add_insecure_port("[::]:0")
channel = grpc.insecure_channel("localhost:{:d}".format(port))
active_span_before_call = trace.get_current_span()
try:
server.start()
channel.unary_unary("")(b"")
finally:
server.stop(None)
active_span_after_call = trace.get_current_span()
self.assertEqual(active_span_before_call, trace.INVALID_SPAN)
self.assertEqual(active_span_after_call, trace.INVALID_SPAN)
self.assertIsInstance(active_span_in_handler, trace_sdk.Span)
self.assertIsNone(active_span_in_handler.parent)
def test_sequential_server_spans(self):
"""Check that sequential RPCs get separate server spans."""
interceptor = server_interceptor()
# Capture the currently active span in each thread
active_spans_in_handler = []
def handler(request, context):
active_spans_in_handler.append(trace.get_current_span())
return b""
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=1),
options=(("grpc.so_reuseport", 0),),
interceptors=[interceptor],
)
server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),))
port = server.add_insecure_port("[::]:0")
channel = grpc.insecure_channel("localhost:{:d}".format(port))
try:
server.start()
channel.unary_unary("")(b"")
channel.unary_unary("")(b"")
finally:
server.stop(None)
self.assertEqual(len(active_spans_in_handler), 2)
# pylint:disable=unbalanced-tuple-unpacking
span1, span2 = active_spans_in_handler
# Spans should belong to separate traces, and each should be a root
# span
self.assertNotEqual(span1.context.span_id, span2.context.span_id)
self.assertNotEqual(span1.context.trace_id, span2.context.trace_id)
self.assertIsNone(span1.parent)
self.assertIsNone(span1.parent)
def test_concurrent_server_spans(self):
"""Check that concurrent RPC calls don't interfere with each other.
This is the same check as test_sequential_server_spans except that the
RPCs are concurrent. Two handlers are invoked at the same time on two
separate threads. Each one should see a different active span and
context.
"""
interceptor = server_interceptor()
# Capture the currently active span in each thread
active_spans_in_handler = []
latch = get_latch(2)
def handler(request, context):
latch()
active_spans_in_handler.append(trace.get_current_span())
return b""
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=2),
options=(("grpc.so_reuseport", 0),),
interceptors=[interceptor],
)
server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),))
port = server.add_insecure_port("[::]:0")
channel = grpc.insecure_channel("localhost:{:d}".format(port))
try:
server.start()
# Interleave calls so spans are active on each thread at the same
# time
with futures.ThreadPoolExecutor(max_workers=2) as tpe:
f1 = tpe.submit(channel.unary_unary(""), b"")
f2 = tpe.submit(channel.unary_unary(""), b"")
futures.wait((f1, f2))
finally:
server.stop(None)
self.assertEqual(len(active_spans_in_handler), 2)
# pylint:disable=unbalanced-tuple-unpacking
span1, span2 = active_spans_in_handler
# Spans should belong to separate traces, and each should be a root
# span
self.assertNotEqual(span1.context.span_id, span2.context.span_id)
self.assertNotEqual(span1.context.trace_id, span2.context.trace_id)
self.assertIsNone(span1.parent)
self.assertIsNone(span1.parent)
def get_latch(num):
"""Get a countdown latch function for use in n threads."""
cv = threading.Condition()
count = 0
def countdown_latch():
"""Block until n-1 other threads have called."""
nonlocal count
cv.acquire()
count += 1
cv.notify()
cv.release()
cv.acquire()
while count < num:
cv.wait()
cv.release()
return countdown_latch