Consume from chunked streams without data loss

Data already in local python buffers was lost when stream socket
fileobject was re-created. We now use http.client to handle the chunk
transfer encoding, and we read only the data from each chunk.

This adds a test harness for stream responses which tries to trigger the
lost-buffer behaviour by responding to the HTTP request in full, as
early as possible.

NB: Python's http.client will close the socket fileobj early if there is
no content length or chunked transfer encoding header. If this happens,
requests/urllib3 will reopen it, but we lose some data which was stored
in buffers.
This commit is contained in:
Marcus Cobden 2014-10-03 21:41:46 +01:00
parent a87dfcf267
commit 578dda64c9
2 changed files with 98 additions and 24 deletions

View File

@ -17,7 +17,6 @@ import os
import re
import shlex
import struct
from socket import socket as socket_obj
import warnings
import requests
@ -283,31 +282,15 @@ class Client(requests.Session):
def _stream_helper(self, response):
"""Generator for data coming from a chunked-encoded HTTP response."""
if six.PY3:
socket_fp = self._get_raw_response_socket(response)
else:
socket_fp = socket_obj(
_sock=self._get_raw_response_socket(response)
)
socket_fp.setblocking(1)
socket = socket_fp.makefile()
while True:
# Because Docker introduced newlines at the end of chunks in v0.9,
# and only on some API endpoints, we have to cater for both cases.
size_line = socket.readline()
if size_line == '\r\n' or size_line == '\n':
size_line = socket.readline()
if len(size_line.strip()) > 0:
size = int(size_line, 16)
else:
break
if size <= 0:
break
data = socket.readline()
reader = response.raw
assert reader._fp.chunked
while not reader.closed:
# this read call will block until we get a chunk
data = reader.read(1)
if not data:
break
if reader._fp.chunk_left:
data += reader.read(reader._fp.chunk_left)
yield data
def _multiplexed_buffer_helper(self, response):

View File

@ -24,6 +24,10 @@ import tarfile
import tempfile
import unittest
import gzip
import re
import socket
import threading
import time
import docker
import requests
@ -1819,5 +1823,92 @@ class DockerClientTest(Cleanup, unittest.TestCase):
tar = tarfile.open(fileobj=archive)
self.assertEqual(sorted(tar.getnames()), ['bar', 'bar/foo', 'foo'])
class StreamTest(Cleanup, unittest.TestCase):
def setUp(self):
folder = tempfile.mkdtemp()
self.build_context = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, folder)
self.addCleanup(shutil.rmtree, self.build_context)
self.socket_file = os.path.join(folder, 'test_sock.sock')
self.server_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.server_sock.bind(self.socket_file)
server_thread = threading.Thread(target=self.run_server)
server_thread.setDaemon(True)
self.stop_server = False
server_thread.start()
self.response = None
self.request_handler = None
self.addCleanup(server_thread.join)
self.addCleanup(self.stop)
def stop(self):
self.stop_server = True
def run_server(self):
self.server_sock.setblocking(0)
self.server_sock.listen(5)
while not self.stop_server:
try:
connection, client_address = self.server_sock.accept()
except socket.error:
time.sleep(0.01)
continue
connection.setblocking(1)
try:
self.request_handler(connection)
finally:
connection.close()
self.server_sock.close()
def early_response_sending_handler(self, connection):
data = b''
headers = None
connection.sendall(self.response)
while not headers:
data += connection.recv(2048)
parts = data.split(b'\r\n\r\n', 1)
if len(parts) == 2:
headers, data = parts
mo = re.search(r'Content-Length: ([0-9]+)', headers.decode())
assert mo
content_length = int(mo.group(1))
while True:
if len(data) >= content_length:
break
data += connection.recv(2048)
def test_early_stream_response(self):
self.request_handler = self.early_response_sending_handler
lines = []
for i in range(0, 50):
line = str(i).encode()
lines += [('%x' % len(line)).encode(), line]
lines.append(b'0')
lines.append(b'')
self.response = (
b'HTTP/1.1 200 OK\r\n'
b'Transfer-Encoding: chunked\r\n'
b'\r\n'
) + b'\r\n'.join(lines)
client = docker.Client(base_url="http+unix:/" + self.socket_file)
stream = client.build(
path=os.path.dirname(self.build_context),
stream=True)
self.assertEqual(list(stream), [
str(i).encode() for i in range(50)])
if __name__ == '__main__':
unittest.main()