diff --git a/docker/api/container.py b/docker/api/container.py index f8d52de4..cb97b794 100644 --- a/docker/api/container.py +++ b/docker/api/container.py @@ -5,7 +5,8 @@ from .. import errors from .. import utils from ..constants import DEFAULT_DATA_CHUNK_SIZE from ..types import ( - ContainerConfig, EndpointConfig, HostConfig, NetworkingConfig + CancellableStream, ContainerConfig, EndpointConfig, HostConfig, + NetworkingConfig ) @@ -52,10 +53,15 @@ class ContainerApiMixin(object): u = self._url("/containers/{0}/attach", container) response = self._post(u, headers=headers, params=params, stream=True) - return self._read_from_socket( + output = self._read_from_socket( response, stream, self._check_is_tty(container) ) + if stream: + return CancellableStream(output, response) + else: + return output + @utils.check_resource('container') def attach_socket(self, container, params=None, ws=False): """ @@ -815,7 +821,12 @@ class ContainerApiMixin(object): url = self._url("/containers/{0}/logs", container) res = self._get(url, params=params, stream=stream) - return self._get_result(container, stream, res) + output = self._get_result(container, stream, res) + + if stream: + return CancellableStream(output, res) + else: + return output @utils.check_resource('container') def pause(self, container): diff --git a/docker/api/daemon.py b/docker/api/daemon.py index 0e1c7538..fc3692c2 100644 --- a/docker/api/daemon.py +++ b/docker/api/daemon.py @@ -1,7 +1,7 @@ import os from datetime import datetime -from .. import auth, utils +from .. import auth, types, utils class DaemonApiMixin(object): @@ -34,8 +34,7 @@ class DaemonApiMixin(object): the fly. False by default. Returns: - (generator): A blocking generator you can iterate over to retrieve - events as they happen. + A :py:class:`docker.types.daemon.CancellableStream` generator Raises: :py:class:`docker.errors.APIError` @@ -50,6 +49,14 @@ class DaemonApiMixin(object): u'status': u'start', u'time': 1423339459} ... + + or + + >>> events = client.events() + >>> for event in events: + ... print event + >>> # and cancel from another thread + >>> events.close() """ if isinstance(since, datetime): @@ -68,10 +75,10 @@ class DaemonApiMixin(object): } url = self._url('/events') - return self._stream_helper( - self._get(url, params=params, stream=True, timeout=None), - decode=decode - ) + response = self._get(url, params=params, stream=True, timeout=None) + stream = self._stream_helper(response, decode=decode) + + return types.CancellableStream(stream, response) def info(self): """ diff --git a/docker/types/__init__.py b/docker/types/__init__.py index 39c93e34..0b0d847f 100644 --- a/docker/types/__init__.py +++ b/docker/types/__init__.py @@ -1,5 +1,6 @@ # flake8: noqa from .containers import ContainerConfig, HostConfig, LogConfig, Ulimit +from .daemon import CancellableStream from .healthcheck import Healthcheck from .networks import EndpointConfig, IPAMConfig, IPAMPool, NetworkingConfig from .services import ( diff --git a/docker/types/daemon.py b/docker/types/daemon.py new file mode 100644 index 00000000..ba0334d0 --- /dev/null +++ b/docker/types/daemon.py @@ -0,0 +1,63 @@ +import socket + +try: + import requests.packages.urllib3 as urllib3 +except ImportError: + import urllib3 + + +class CancellableStream(object): + """ + Stream wrapper for real-time events, logs, etc. from the server. + + Example: + >>> events = client.events() + >>> for event in events: + ... print event + >>> # and cancel from another thread + >>> events.close() + """ + + def __init__(self, stream, response): + self._stream = stream + self._response = response + + def __iter__(self): + return self + + def __next__(self): + try: + return next(self._stream) + except urllib3.exceptions.ProtocolError: + raise StopIteration + except socket.error: + raise StopIteration + + next = __next__ + + def close(self): + """ + Closes the event streaming. + """ + + if not self._response.raw.closed: + # find the underlying socket object + # based on api.client._get_raw_response_socket + + sock_fp = self._response.raw._fp.fp + + if hasattr(sock_fp, 'raw'): + sock_raw = sock_fp.raw + + if hasattr(sock_raw, 'sock'): + sock = sock_raw.sock + + elif hasattr(sock_raw, '_sock'): + sock = sock_raw._sock + + else: + sock = sock_fp._sock + + sock.shutdown(socket.SHUT_RDWR) + sock.makefile().close() + sock.close() diff --git a/tests/integration/api_container_test.py b/tests/integration/api_container_test.py index 8447aa5f..cc2c0719 100644 --- a/tests/integration/api_container_test.py +++ b/tests/integration/api_container_test.py @@ -2,6 +2,7 @@ import os import re import signal import tempfile +import threading from datetime import datetime import docker @@ -880,6 +881,30 @@ Line2''' assert logs == (snippet + '\n').encode(encoding='ascii') + def test_logs_streaming_and_follow_and_cancel(self): + snippet = 'Flowering Nights (Sakuya Iyazoi)' + container = self.client.create_container( + BUSYBOX, 'sh -c "echo \\"{0}\\" && sleep 3"'.format(snippet) + ) + id = container['Id'] + self.tmp_containers.append(id) + self.client.start(id) + logs = six.binary_type() + + generator = self.client.logs(id, stream=True, follow=True) + + exit_timer = threading.Timer(3, os._exit, args=[1]) + exit_timer.start() + + threading.Timer(1, generator.close).start() + + for chunk in generator: + logs += chunk + + exit_timer.cancel() + + assert logs == (snippet + '\n').encode(encoding='ascii') + def test_logs_with_dict_instead_of_id(self): snippet = 'Flowering Nights (Sakuya Iyazoi)' container = self.client.create_container( @@ -1226,6 +1251,29 @@ class AttachContainerTest(BaseAPIIntegrationTest): output = self.client.attach(container, stream=False, logs=True) assert output == 'hello\n'.encode(encoding='ascii') + def test_attach_stream_and_cancel(self): + container = self.client.create_container( + BUSYBOX, 'sh -c "echo hello && sleep 60"', + tty=True + ) + self.tmp_containers.append(container) + self.client.start(container) + output = self.client.attach(container, stream=True, logs=True) + + exit_timer = threading.Timer(3, os._exit, args=[1]) + exit_timer.start() + + threading.Timer(1, output.close).start() + + lines = [] + for line in output: + lines.append(line) + + exit_timer.cancel() + + assert len(lines) == 1 + assert lines[0] == 'hello\r\n'.encode(encoding='ascii') + def test_detach_with_default(self): container = self.client.create_container( BUSYBOX, 'cat', diff --git a/tests/integration/client_test.py b/tests/integration/client_test.py index 8f6bd86b..7df172c8 100644 --- a/tests/integration/client_test.py +++ b/tests/integration/client_test.py @@ -1,7 +1,10 @@ +import threading import unittest import docker +from datetime import datetime, timedelta + from ..helpers import requires_api_version from .base import TEST_API_VERSION @@ -27,3 +30,20 @@ class ClientTest(unittest.TestCase): assert 'Containers' in data assert 'Volumes' in data assert 'Images' in data + + +class CancellableEventsTest(unittest.TestCase): + client = docker.from_env(version=TEST_API_VERSION) + + def test_cancel_events(self): + start = datetime.now() + + events = self.client.events(until=start + timedelta(seconds=5)) + + cancel_thread = threading.Timer(2, events.close) + cancel_thread.start() + + for _ in events: + pass + + self.assertLess(datetime.now() - start, timedelta(seconds=3)) diff --git a/tests/integration/models_containers_test.py b/tests/integration/models_containers_test.py index 38aae4d2..41faff35 100644 --- a/tests/integration/models_containers_test.py +++ b/tests/integration/models_containers_test.py @@ -1,4 +1,6 @@ +import os import tempfile +import threading import docker import pytest @@ -141,6 +143,25 @@ class ContainerCollectionTest(BaseIntegrationTest): assert logs[0] == b'hello\n' assert logs[1] == b'world\n' + def test_run_with_streamed_logs_and_cancel(self): + client = docker.from_env(version=TEST_API_VERSION) + out = client.containers.run( + 'alpine', 'sh -c "echo hello && echo world"', stream=True + ) + + exit_timer = threading.Timer(3, os._exit, args=[1]) + exit_timer.start() + + threading.Timer(1, out.close).start() + + logs = [line for line in out] + + exit_timer.cancel() + + assert len(logs) == 2 + assert logs[0] == b'hello\n' + assert logs[1] == b'world\n' + def test_get(self): client = docker.from_env(version=TEST_API_VERSION) container = client.containers.run("alpine", "sleep 300", detach=True)