Merge pull request #441 from dims/fix-log-streams

Fix to enable streaming container logs reliably
This commit is contained in:
Joffrey F 2015-02-01 14:47:59 -08:00
commit 3d6d5e1012
1 changed files with 14 additions and 28 deletions

View File

@ -321,40 +321,26 @@ class Client(requests.Session):
walker = end
yield buf[start:end]
def _multiplexed_socket_stream_helper(self, response):
def _multiplexed_response_stream_helper(self, response):
"""A generator of multiplexed data blocks coming from a response
socket."""
stream."""
# Disable timeout on the underlying socket to prevent
# Read timed out(s) for long running processes
socket = self._get_raw_response_socket(response)
def recvall(socket, size):
blocks = []
while size > 0:
if six.PY3:
block = socket._sock.recv(size)
else:
block = socket.recv(size)
if not block:
return None
blocks.append(block)
size -= len(block)
sep = bytes() if six.PY3 else str()
data = sep.join(blocks)
return data
if six.PY3:
socket._sock.settimeout(None)
else:
socket.settimeout(None)
while True:
if six.PY3:
socket._sock.settimeout(None)
else:
socket.settimeout(None)
header = recvall(socket, STREAM_HEADER_SIZE_BYTES)
header = response.raw.read(STREAM_HEADER_SIZE_BYTES)
if not header:
break
_, length = struct.unpack('>BxxxL', header)
if not length:
break
data = recvall(socket, length)
data = response.raw.read(length)
if not data:
break
yield data
@ -388,7 +374,7 @@ class Client(requests.Session):
sep = bytes() if six.PY3 else str()
return stream and self._multiplexed_socket_stream_helper(response) or \
return stream and self._multiplexed_response_stream_helper(response) or \
sep.join([x for x in self._multiplexed_buffer_helper(response)])
def attach_socket(self, container, params=None, ws=False):
@ -606,7 +592,7 @@ class Client(requests.Session):
data=data, stream=stream)
self._raise_for_status(res)
if stream:
return self._multiplexed_socket_stream_helper(res)
return self._multiplexed_response_stream_helper(res)
elif six.PY3:
return bytes().join(
[x for x in self._multiplexed_buffer_helper(res)]
@ -776,7 +762,7 @@ class Client(requests.Session):
url = self._url("/containers/{0}/logs".format(container))
res = self._get(url, params=params, stream=stream)
if stream:
return self._multiplexed_socket_stream_helper(res)
return self._multiplexed_response_stream_helper(res)
elif six.PY3:
return bytes().join(
[x for x in self._multiplexed_buffer_helper(res)]