diff --git a/docker/client.py b/docker/client.py index 678d9a34..5202484b 100644 --- a/docker/client.py +++ b/docker/client.py @@ -17,7 +17,6 @@ import os import re import shlex import struct -from socket import socket as socket_obj import warnings import requests @@ -283,31 +282,15 @@ class Client(requests.Session): def _stream_helper(self, response): """Generator for data coming from a chunked-encoded HTTP response.""" - if six.PY3: - socket_fp = self._get_raw_response_socket(response) - else: - socket_fp = socket_obj( - _sock=self._get_raw_response_socket(response) - ) - socket_fp.setblocking(1) - socket = socket_fp.makefile() - while True: - # Because Docker introduced newlines at the end of chunks in v0.9, - # and only on some API endpoints, we have to cater for both cases. - size_line = socket.readline() - if size_line == '\r\n' or size_line == '\n': - size_line = socket.readline() - - if len(size_line.strip()) > 0: - size = int(size_line, 16) - else: - break - - if size <= 0: - break - data = socket.readline() + reader = response.raw + assert reader._fp.chunked + while not reader.closed: + # this read call will block until we get a chunk + data = reader.read(1) if not data: break + if reader._fp.chunk_left: + data += reader.read(reader._fp.chunk_left) yield data def _multiplexed_buffer_helper(self, response): diff --git a/tests/test.py b/tests/test.py index fc14977e..5a5c30e5 100644 --- a/tests/test.py +++ b/tests/test.py @@ -24,6 +24,10 @@ import tarfile import tempfile import unittest import gzip +import re +import socket +import threading +import time import docker import requests @@ -1819,5 +1823,92 @@ class DockerClientTest(Cleanup, unittest.TestCase): tar = tarfile.open(fileobj=archive) self.assertEqual(sorted(tar.getnames()), ['bar', 'bar/foo', 'foo']) + +class StreamTest(Cleanup, unittest.TestCase): + + def setUp(self): + folder = tempfile.mkdtemp() + self.build_context = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, folder) + self.addCleanup(shutil.rmtree, self.build_context) + self.socket_file = os.path.join(folder, 'test_sock.sock') + self.server_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self.server_sock.bind(self.socket_file) + server_thread = threading.Thread(target=self.run_server) + server_thread.setDaemon(True) + self.stop_server = False + server_thread.start() + self.response = None + self.request_handler = None + self.addCleanup(server_thread.join) + self.addCleanup(self.stop) + + def stop(self): + self.stop_server = True + + def run_server(self): + self.server_sock.setblocking(0) + + self.server_sock.listen(5) + while not self.stop_server: + try: + connection, client_address = self.server_sock.accept() + except socket.error: + time.sleep(0.01) + continue + + connection.setblocking(1) + try: + self.request_handler(connection) + finally: + connection.close() + + self.server_sock.close() + + def early_response_sending_handler(self, connection): + data = b'' + headers = None + + connection.sendall(self.response) + while not headers: + data += connection.recv(2048) + parts = data.split(b'\r\n\r\n', 1) + if len(parts) == 2: + headers, data = parts + + mo = re.search(r'Content-Length: ([0-9]+)', headers.decode()) + assert mo + content_length = int(mo.group(1)) + + while True: + if len(data) >= content_length: + break + + data += connection.recv(2048) + + def test_early_stream_response(self): + self.request_handler = self.early_response_sending_handler + lines = [] + for i in range(0, 50): + line = str(i).encode() + lines += [('%x' % len(line)).encode(), line] + lines.append(b'0') + lines.append(b'') + + self.response = ( + b'HTTP/1.1 200 OK\r\n' + b'Transfer-Encoding: chunked\r\n' + b'\r\n' + ) + b'\r\n'.join(lines) + + client = docker.Client(base_url="http+unix:/" + self.socket_file) + stream = client.build( + path=os.path.dirname(self.build_context), + stream=True) + + self.assertEqual(list(stream), [ + str(i).encode() for i in range(50)]) + + if __name__ == '__main__': unittest.main()