Log streaming and correct decoding of multiplexed log streams

Implement log streaming with the stream parameter on logs(), returning a
generator of log lines based on the selected streams (stdout/stderr).
Also correctly decode the multiplexed log streams (current version was
buggy).

Signed-off-by: Maxime Petazzoni <max@signalfuse.com>
This commit is contained in:
Maxime Petazzoni 2013-11-19 16:58:43 -08:00
parent 5e68ed1df8
commit 4bc4ee3cf0
2 changed files with 77 additions and 32 deletions

View File

@ -29,6 +29,7 @@ if not six.PY3:
import websocket
DEFAULT_TIMEOUT_SECONDS = 60
STREAM_HEADER_SIZE_BYTES = 8
class APIError(requests.exceptions.HTTPError):
@ -104,22 +105,14 @@ class Client(requests.Session):
except requests.exceptions.HTTPError as e:
raise APIError(e, response, explanation=explanation)
def _stream_result(self, response):
self._raise_for_status(response)
for line in response.iter_lines(chunk_size=1):
# filter out keep-alive new lines
if line:
yield line + '\n'
def _stream_result_socket(self, response):
self._raise_for_status(response)
return response.raw._fp.fp._sock
def _result(self, response, json=False):
def _result(self, response, json=False, binary=False):
assert not (json and binary)
self._raise_for_status(response)
if json:
return response.json()
if binary:
return response.content
return response.text
def _container_config(self, image, command, hostname=None, user=None,
@ -219,7 +212,20 @@ class Client(requests.Session):
def _create_websocket_connection(self, url):
return websocket.create_connection(url)
def _stream_result(self, response):
"""Generator for straight-out, non chunked-encoded HTTP responses."""
self._raise_for_status(response)
for line in response.iter_lines(chunk_size=1):
# filter out keep-alive new lines
if line:
yield line + '\n'
def _stream_result_socket(self, response):
self._raise_for_status(response)
return response.raw._fp.fp._sock
def _stream_helper(self, response):
"""Generator for data coming from a chunked-encoded HTTP response."""
socket = self._stream_result_socket(response).makefile()
while True:
size = int(socket.readline(), 16)
@ -230,6 +236,34 @@ class Client(requests.Session):
break
yield data
def _multiplexed_buffer_helper(self, response):
"""A generator of multiplexed data blocks read from a buffered
response."""
buf = self._result(response, binary=True)
walker = 0
while True:
if len(buf[walker:]) < 8:
break
_, length = struct.unpack_from('>BxxxL', buf[walker:])
start = walker + STREAM_HEADER_SIZE_BYTES
end = start + length
walker = end
yield str(buf[start:end])
def _multiplexed_socket_stream_helper(self, response):
"""A generator of multiplexed data blocks coming from a response
socket."""
socket = self._stream_result_socket(response)
while True:
socket.settimeout(None)
header = socket.recv(8)
if not header:
break
_, length = struct.unpack('>BxxxL', header)
if not length:
break
yield socket.recv(length).strip()
def attach(self, container):
socket = self.attach_socket(container)
@ -481,29 +515,25 @@ class Client(requests.Session):
self._cfg['Configs'][registry] = req_data
return res
def logs(self, container):
def logs(self, container, stdout=True, stderr=True, stream=False):
if isinstance(container, dict):
container = container.get('Id')
params = {
'logs': 1,
'stdout': 1,
'stderr': 1
'stdout': stdout and 1 or 0,
'stderr': stderr and 1 or 0,
'stream': stream and 1 or 0,
}
u = self._url("/containers/{0}/attach".format(container))
response = self._post(u, params=params, stream=stream)
# Stream multi-plexing was introduced in API v1.6.
if utils.compare_version('1.6', self._version) < 0:
return self._result(self._post(u, params=params))
res = ''
response = self._result(self._post(u, params=params))
walker = 0
while walker < len(response):
header = response[walker:walker+8]
walker += 8
# we don't care about the type of stream since we want both
# stdout and stderr
length = struct.unpack(">L", header[4:].encode())[0]
res += response[walker:walker+length]
walker += length
return res
return stream and self._stream_result(response) or \
self._result(response, binary=True)
return stream and self._multiplexed_socket_stream_helper(response) or \
''.join([x for x in self._multiplexed_buffer_helper(response)])
def port(self, container, private_port):
if isinstance(container, dict):

View File

@ -492,8 +492,9 @@ class DockerClientTest(unittest.TestCase):
fake_request.assert_called_with(
'unix://var/run/docker.sock/v1.6/containers/3cc2351ab11b/attach',
params={'logs': 1, 'stderr': 1, 'stdout': 1},
timeout=docker.client.DEFAULT_TIMEOUT_SECONDS
params={'stream': 0, 'logs': 1, 'stderr': 1, 'stdout': 1},
timeout=docker.client.DEFAULT_TIMEOUT_SECONDS,
stream=False
)
def test_logs_with_dict_instead_of_id(self):
@ -504,8 +505,22 @@ class DockerClientTest(unittest.TestCase):
fake_request.assert_called_with(
'unix://var/run/docker.sock/v1.6/containers/3cc2351ab11b/attach',
params={'logs': 1, 'stderr': 1, 'stdout': 1},
timeout=docker.client.DEFAULT_TIMEOUT_SECONDS
params={'stream': 0, 'logs': 1, 'stderr': 1, 'stdout': 1},
timeout=docker.client.DEFAULT_TIMEOUT_SECONDS,
stream=False
)
def test_log_streaming(self):
try:
self.client.logs(fake_api.FAKE_CONTAINER_ID, stream=True)
except Exception as e:
self.fail('Command should not raise exception: {0}'.format(e))
fake_request.assert_called_with(
'unix://var/run/docker.sock/v1.6/containers/3cc2351ab11b/attach',
params={'stream': 1, 'logs': 1, 'stderr': 1, 'stdout': 1},
timeout=docker.client.DEFAULT_TIMEOUT_SECONDS,
stream=True
)
def test_diff(self):