opentelemetry-python-contrib/reference/tests/contrib/tornado/web/compat.py

36 lines
1.1 KiB
Python

from tornado.concurrent import Future
import tornado.gen
from tornado.ioloop import IOLoop
try:
from concurrent.futures import ThreadPoolExecutor
except ImportError:
from tornado.concurrent import DummyExecutor
class ThreadPoolExecutor(DummyExecutor):
"""
Fake executor class used to test our tracer when Python 2 is used
without the `futures` backport. This is not a real use case, but
it's required to be defensive when we have different `Executor`
implementations.
"""
def __init__(self, *args, **kwargs):
# we accept any kind of interface
super(ThreadPoolExecutor, self).__init__()
if hasattr(tornado.gen, 'sleep'):
sleep = tornado.gen.sleep
else:
# Tornado <= 4.0
def sleep(duration):
"""
Compatibility helper that return a Future() that can be yielded.
This is used because Tornado 4.0 doesn't have a ``gen.sleep()``
function, that we require to test the ``TracerStackContext``.
"""
f = Future()
IOLoop.current().call_later(duration, lambda: f.set_result(None))
return f