Allow cancelling the streams from other threads

Signed-off-by: Viktor Adam <rycus86@gmail.com>
This commit is contained in:
Viktor Adam 2018-02-21 22:16:21 +00:00 committed by Joffrey F
parent a4e642b015
commit 719d4e9e20
7 changed files with 181 additions and 10 deletions

View File

@ -5,7 +5,8 @@ from .. import errors
from .. import utils from .. import utils
from ..constants import DEFAULT_DATA_CHUNK_SIZE from ..constants import DEFAULT_DATA_CHUNK_SIZE
from ..types import ( from ..types import (
ContainerConfig, EndpointConfig, HostConfig, NetworkingConfig CancellableStream, ContainerConfig, EndpointConfig, HostConfig,
NetworkingConfig
) )
@ -52,10 +53,15 @@ class ContainerApiMixin(object):
u = self._url("/containers/{0}/attach", container) u = self._url("/containers/{0}/attach", container)
response = self._post(u, headers=headers, params=params, stream=True) response = self._post(u, headers=headers, params=params, stream=True)
return self._read_from_socket( output = self._read_from_socket(
response, stream, self._check_is_tty(container) response, stream, self._check_is_tty(container)
) )
if stream:
return CancellableStream(output, response)
else:
return output
@utils.check_resource('container') @utils.check_resource('container')
def attach_socket(self, container, params=None, ws=False): def attach_socket(self, container, params=None, ws=False):
""" """
@ -815,7 +821,12 @@ class ContainerApiMixin(object):
url = self._url("/containers/{0}/logs", container) url = self._url("/containers/{0}/logs", container)
res = self._get(url, params=params, stream=stream) res = self._get(url, params=params, stream=stream)
return self._get_result(container, stream, res) output = self._get_result(container, stream, res)
if stream:
return CancellableStream(output, res)
else:
return output
@utils.check_resource('container') @utils.check_resource('container')
def pause(self, container): def pause(self, container):

View File

@ -1,7 +1,7 @@
import os import os
from datetime import datetime from datetime import datetime
from .. import auth, utils from .. import auth, types, utils
class DaemonApiMixin(object): class DaemonApiMixin(object):
@ -34,8 +34,7 @@ class DaemonApiMixin(object):
the fly. False by default. the fly. False by default.
Returns: Returns:
(generator): A blocking generator you can iterate over to retrieve A :py:class:`docker.types.daemon.CancellableStream` generator
events as they happen.
Raises: Raises:
:py:class:`docker.errors.APIError` :py:class:`docker.errors.APIError`
@ -50,6 +49,14 @@ class DaemonApiMixin(object):
u'status': u'start', u'status': u'start',
u'time': 1423339459} u'time': 1423339459}
... ...
or
>>> events = client.events()
>>> for event in events:
... print event
>>> # and cancel from another thread
>>> events.close()
""" """
if isinstance(since, datetime): if isinstance(since, datetime):
@ -68,10 +75,10 @@ class DaemonApiMixin(object):
} }
url = self._url('/events') url = self._url('/events')
return self._stream_helper( response = self._get(url, params=params, stream=True, timeout=None)
self._get(url, params=params, stream=True, timeout=None), stream = self._stream_helper(response, decode=decode)
decode=decode
) return types.CancellableStream(stream, response)
def info(self): def info(self):
""" """

View File

@ -1,5 +1,6 @@
# flake8: noqa # flake8: noqa
from .containers import ContainerConfig, HostConfig, LogConfig, Ulimit from .containers import ContainerConfig, HostConfig, LogConfig, Ulimit
from .daemon import CancellableStream
from .healthcheck import Healthcheck from .healthcheck import Healthcheck
from .networks import EndpointConfig, IPAMConfig, IPAMPool, NetworkingConfig from .networks import EndpointConfig, IPAMConfig, IPAMPool, NetworkingConfig
from .services import ( from .services import (

63
docker/types/daemon.py Normal file
View File

@ -0,0 +1,63 @@
import socket
try:
import requests.packages.urllib3 as urllib3
except ImportError:
import urllib3
class CancellableStream(object):
"""
Stream wrapper for real-time events, logs, etc. from the server.
Example:
>>> events = client.events()
>>> for event in events:
... print event
>>> # and cancel from another thread
>>> events.close()
"""
def __init__(self, stream, response):
self._stream = stream
self._response = response
def __iter__(self):
return self
def __next__(self):
try:
return next(self._stream)
except urllib3.exceptions.ProtocolError:
raise StopIteration
except socket.error:
raise StopIteration
next = __next__
def close(self):
"""
Closes the event streaming.
"""
if not self._response.raw.closed:
# find the underlying socket object
# based on api.client._get_raw_response_socket
sock_fp = self._response.raw._fp.fp
if hasattr(sock_fp, 'raw'):
sock_raw = sock_fp.raw
if hasattr(sock_raw, 'sock'):
sock = sock_raw.sock
elif hasattr(sock_raw, '_sock'):
sock = sock_raw._sock
else:
sock = sock_fp._sock
sock.shutdown(socket.SHUT_RDWR)
sock.makefile().close()
sock.close()

View File

@ -2,6 +2,7 @@ import os
import re import re
import signal import signal
import tempfile import tempfile
import threading
from datetime import datetime from datetime import datetime
import docker import docker
@ -880,6 +881,30 @@ Line2'''
assert logs == (snippet + '\n').encode(encoding='ascii') assert logs == (snippet + '\n').encode(encoding='ascii')
def test_logs_streaming_and_follow_and_cancel(self):
snippet = 'Flowering Nights (Sakuya Iyazoi)'
container = self.client.create_container(
BUSYBOX, 'sh -c "echo \\"{0}\\" && sleep 3"'.format(snippet)
)
id = container['Id']
self.tmp_containers.append(id)
self.client.start(id)
logs = six.binary_type()
generator = self.client.logs(id, stream=True, follow=True)
exit_timer = threading.Timer(3, os._exit, args=[1])
exit_timer.start()
threading.Timer(1, generator.close).start()
for chunk in generator:
logs += chunk
exit_timer.cancel()
assert logs == (snippet + '\n').encode(encoding='ascii')
def test_logs_with_dict_instead_of_id(self): def test_logs_with_dict_instead_of_id(self):
snippet = 'Flowering Nights (Sakuya Iyazoi)' snippet = 'Flowering Nights (Sakuya Iyazoi)'
container = self.client.create_container( container = self.client.create_container(
@ -1226,6 +1251,29 @@ class AttachContainerTest(BaseAPIIntegrationTest):
output = self.client.attach(container, stream=False, logs=True) output = self.client.attach(container, stream=False, logs=True)
assert output == 'hello\n'.encode(encoding='ascii') assert output == 'hello\n'.encode(encoding='ascii')
def test_attach_stream_and_cancel(self):
container = self.client.create_container(
BUSYBOX, 'sh -c "echo hello && sleep 60"',
tty=True
)
self.tmp_containers.append(container)
self.client.start(container)
output = self.client.attach(container, stream=True, logs=True)
exit_timer = threading.Timer(3, os._exit, args=[1])
exit_timer.start()
threading.Timer(1, output.close).start()
lines = []
for line in output:
lines.append(line)
exit_timer.cancel()
assert len(lines) == 1
assert lines[0] == 'hello\r\n'.encode(encoding='ascii')
def test_detach_with_default(self): def test_detach_with_default(self):
container = self.client.create_container( container = self.client.create_container(
BUSYBOX, 'cat', BUSYBOX, 'cat',

View File

@ -1,7 +1,10 @@
import threading
import unittest import unittest
import docker import docker
from datetime import datetime, timedelta
from ..helpers import requires_api_version from ..helpers import requires_api_version
from .base import TEST_API_VERSION from .base import TEST_API_VERSION
@ -27,3 +30,20 @@ class ClientTest(unittest.TestCase):
assert 'Containers' in data assert 'Containers' in data
assert 'Volumes' in data assert 'Volumes' in data
assert 'Images' in data assert 'Images' in data
class CancellableEventsTest(unittest.TestCase):
client = docker.from_env(version=TEST_API_VERSION)
def test_cancel_events(self):
start = datetime.now()
events = self.client.events(until=start + timedelta(seconds=5))
cancel_thread = threading.Timer(2, events.close)
cancel_thread.start()
for _ in events:
pass
self.assertLess(datetime.now() - start, timedelta(seconds=3))

View File

@ -1,4 +1,6 @@
import os
import tempfile import tempfile
import threading
import docker import docker
import pytest import pytest
@ -141,6 +143,25 @@ class ContainerCollectionTest(BaseIntegrationTest):
assert logs[0] == b'hello\n' assert logs[0] == b'hello\n'
assert logs[1] == b'world\n' assert logs[1] == b'world\n'
def test_run_with_streamed_logs_and_cancel(self):
client = docker.from_env(version=TEST_API_VERSION)
out = client.containers.run(
'alpine', 'sh -c "echo hello && echo world"', stream=True
)
exit_timer = threading.Timer(3, os._exit, args=[1])
exit_timer.start()
threading.Timer(1, out.close).start()
logs = [line for line in out]
exit_timer.cancel()
assert len(logs) == 2
assert logs[0] == b'hello\n'
assert logs[1] == b'world\n'
def test_get(self): def test_get(self):
client = docker.from_env(version=TEST_API_VERSION) client = docker.from_env(version=TEST_API_VERSION)
container = client.containers.run("alpine", "sleep 300", detach=True) container = client.containers.run("alpine", "sleep 300", detach=True)