Merge branch 'little-dude-master'

This commit is contained in:
Joffrey F 2018-11-30 15:27:49 -08:00
commit b72fb1e571
13 changed files with 368 additions and 66 deletions

View File

@ -32,7 +32,7 @@ from ..errors import (
from ..tls import TLSConfig from ..tls import TLSConfig
from ..transport import SSLAdapter, UnixAdapter from ..transport import SSLAdapter, UnixAdapter
from ..utils import utils, check_resource, update_headers, config from ..utils import utils, check_resource, update_headers, config
from ..utils.socket import frames_iter, socket_raw_iter from ..utils.socket import frames_iter, consume_socket_output, demux_adaptor
from ..utils.json_stream import json_stream from ..utils.json_stream import json_stream
try: try:
from ..transport import NpipeAdapter from ..transport import NpipeAdapter
@ -381,19 +381,23 @@ class APIClient(
for out in response.iter_content(chunk_size, decode): for out in response.iter_content(chunk_size, decode):
yield out yield out
def _read_from_socket(self, response, stream, tty=False): def _read_from_socket(self, response, stream, tty=True, demux=False):
socket = self._get_raw_response_socket(response) socket = self._get_raw_response_socket(response)
gen = None gen = frames_iter(socket, tty)
if tty is False:
gen = frames_iter(socket) if demux:
# The generator will output tuples (stdout, stderr)
gen = (demux_adaptor(*frame) for frame in gen)
else: else:
gen = socket_raw_iter(socket) # The generator will output strings
gen = (data for (_, data) in gen)
if stream: if stream:
return gen return gen
else: else:
return six.binary_type().join(gen) # Wait for all the frames, concatenate them, and return the result
return consume_socket_output(gen, demux=demux)
def _disable_socket_timeout(self, socket): def _disable_socket_timeout(self, socket):
""" Depending on the combination of python version and whether we're """ Depending on the combination of python version and whether we're

View File

@ -13,7 +13,7 @@ from ..types import (
class ContainerApiMixin(object): class ContainerApiMixin(object):
@utils.check_resource('container') @utils.check_resource('container')
def attach(self, container, stdout=True, stderr=True, def attach(self, container, stdout=True, stderr=True,
stream=False, logs=False): stream=False, logs=False, demux=False):
""" """
Attach to a container. Attach to a container.
@ -28,11 +28,15 @@ class ContainerApiMixin(object):
stream (bool): Return container output progressively as an iterator stream (bool): Return container output progressively as an iterator
of strings, rather than a single string. of strings, rather than a single string.
logs (bool): Include the container's previous output. logs (bool): Include the container's previous output.
demux (bool): Keep stdout and stderr separate.
Returns: Returns:
By default, the container's output as a single string. By default, the container's output as a single string (two if
``demux=True``: one for stdout and one for stderr).
If ``stream=True``, an iterator of output strings. If ``stream=True``, an iterator of output strings. If
``demux=True``, two iterators are returned: one for stdout and one
for stderr.
Raises: Raises:
:py:class:`docker.errors.APIError` :py:class:`docker.errors.APIError`
@ -54,8 +58,7 @@ class ContainerApiMixin(object):
response = self._post(u, headers=headers, params=params, stream=True) response = self._post(u, headers=headers, params=params, stream=True)
output = self._read_from_socket( output = self._read_from_socket(
response, stream, self._check_is_tty(container) response, stream, self._check_is_tty(container), demux=demux)
)
if stream: if stream:
return CancellableStream(output, response) return CancellableStream(output, response)

View File

@ -118,7 +118,7 @@ class ExecApiMixin(object):
@utils.check_resource('exec_id') @utils.check_resource('exec_id')
def exec_start(self, exec_id, detach=False, tty=False, stream=False, def exec_start(self, exec_id, detach=False, tty=False, stream=False,
socket=False): socket=False, demux=False):
""" """
Start a previously set up exec instance. Start a previously set up exec instance.
@ -130,11 +130,14 @@ class ExecApiMixin(object):
stream (bool): Stream response data. Default: False stream (bool): Stream response data. Default: False
socket (bool): Return the connection socket to allow custom socket (bool): Return the connection socket to allow custom
read/write operations. read/write operations.
demux (bool): Return stdout and stderr separately
Returns: Returns:
(generator or str): If ``stream=True``, a generator yielding
response chunks. If ``socket=True``, a socket object for the (generator or str or tuple): If ``stream=True``, a generator
connection. A string containing response data otherwise. yielding response chunks. If ``socket=True``, a socket object for
the connection. A string containing response data otherwise. If
``demux=True``, stdout and stderr are separated.
Raises: Raises:
:py:class:`docker.errors.APIError` :py:class:`docker.errors.APIError`
@ -162,4 +165,4 @@ class ExecApiMixin(object):
return self._result(res) return self._result(res)
if socket: if socket:
return self._get_raw_response_socket(res) return self._get_raw_response_socket(res)
return self._read_from_socket(res, stream, tty) return self._read_from_socket(res, stream, tty=tty, demux=demux)

View File

@ -144,7 +144,7 @@ class Container(Model):
def exec_run(self, cmd, stdout=True, stderr=True, stdin=False, tty=False, def exec_run(self, cmd, stdout=True, stderr=True, stdin=False, tty=False,
privileged=False, user='', detach=False, stream=False, privileged=False, user='', detach=False, stream=False,
socket=False, environment=None, workdir=None): socket=False, environment=None, workdir=None, demux=False):
""" """
Run a command inside this container. Similar to Run a command inside this container. Similar to
``docker exec``. ``docker exec``.
@ -166,6 +166,7 @@ class Container(Model):
the following format ``["PASSWORD=xxx"]`` or the following format ``["PASSWORD=xxx"]`` or
``{"PASSWORD": "xxx"}``. ``{"PASSWORD": "xxx"}``.
workdir (str): Path to working directory for this exec session workdir (str): Path to working directory for this exec session
demux (bool): Return stdout and stderr separately
Returns: Returns:
(ExecResult): A tuple of (exit_code, output) (ExecResult): A tuple of (exit_code, output)
@ -187,7 +188,8 @@ class Container(Model):
workdir=workdir workdir=workdir
) )
exec_output = self.client.api.exec_start( exec_output = self.client.api.exec_start(
resp['Id'], detach=detach, tty=tty, stream=stream, socket=socket resp['Id'], detach=detach, tty=tty, stream=stream, socket=socket,
demux=demux
) )
if socket or stream: if socket or stream:
return ExecResult(None, exec_output) return ExecResult(None, exec_output)

View File

@ -12,6 +12,10 @@ except ImportError:
NpipeSocket = type(None) NpipeSocket = type(None)
STDOUT = 1
STDERR = 2
class SocketError(Exception): class SocketError(Exception):
pass pass
@ -51,28 +55,43 @@ def read_exactly(socket, n):
return data return data
def next_frame_size(socket): def next_frame_header(socket):
""" """
Returns the size of the next frame of data waiting to be read from socket, Returns the stream and size of the next frame of data waiting to be read
according to the protocol defined here: from socket, according to the protocol defined here:
https://docs.docker.com/engine/reference/api/docker_remote_api_v1.24/#/attach-to-a-container https://docs.docker.com/engine/api/v1.24/#attach-to-a-container
""" """
try: try:
data = read_exactly(socket, 8) data = read_exactly(socket, 8)
except SocketError: except SocketError:
return -1 return (-1, -1)
_, actual = struct.unpack('>BxxxL', data) stream, actual = struct.unpack('>BxxxL', data)
return actual return (stream, actual)
def frames_iter(socket): def frames_iter(socket, tty):
""" """
Returns a generator of frames read from socket Return a generator of frames read from socket. A frame is a tuple where
the first item is the stream number and the second item is a chunk of data.
If the tty setting is enabled, the streams are multiplexed into the stdout
stream.
"""
if tty:
return ((STDOUT, frame) for frame in frames_iter_tty(socket))
else:
return frames_iter_no_tty(socket)
def frames_iter_no_tty(socket):
"""
Returns a generator of data read from the socket when the tty setting is
not enabled.
""" """
while True: while True:
n = next_frame_size(socket) (stream, n) = next_frame_header(socket)
if n < 0: if n < 0:
break break
while n > 0: while n > 0:
@ -84,13 +103,13 @@ def frames_iter(socket):
# We have reached EOF # We have reached EOF
return return
n -= data_length n -= data_length
yield result yield (stream, result)
def socket_raw_iter(socket): def frames_iter_tty(socket):
""" """
Returns a generator of data read from the socket. Return a generator of data read from the socket when the tty setting is
This is used for non-multiplexed streams. enabled.
""" """
while True: while True:
result = read(socket) result = read(socket)
@ -98,3 +117,53 @@ def socket_raw_iter(socket):
# We have reached EOF # We have reached EOF
return return
yield result yield result
def consume_socket_output(frames, demux=False):
"""
Iterate through frames read from the socket and return the result.
Args:
demux (bool):
If False, stdout and stderr are multiplexed, and the result is the
concatenation of all the frames. If True, the streams are
demultiplexed, and the result is a 2-tuple where each item is the
concatenation of frames belonging to the same stream.
"""
if demux is False:
# If the streams are multiplexed, the generator returns strings, that
# we just need to concatenate.
return six.binary_type().join(frames)
# If the streams are demultiplexed, the generator yields tuples
# (stdout, stderr)
out = [None, None]
for frame in frames:
# It is guaranteed that for each frame, one and only one stream
# is not None.
assert frame != (None, None)
if frame[0] is not None:
if out[0] is None:
out[0] = frame[0]
else:
out[0] += frame[0]
else:
if out[1] is None:
out[1] = frame[1]
else:
out[1] += frame[1]
return tuple(out)
def demux_adaptor(stream_id, data):
"""
Utility to demultiplex stdout and stderr when reading frames from the
socket.
"""
if stream_id == STDOUT:
return (data, None)
elif stream_id == STDERR:
return (None, data)
else:
raise ValueError('{0} is not a valid stream'.format(stream_id))

View File

@ -92,4 +92,5 @@ That's just a taste of what you can do with the Docker SDK for Python. For more,
volumes volumes
api api
tls tls
user_guides/index
change-log change-log

View File

@ -0,0 +1,8 @@
User guides and tutorials
=========================
.. toctree::
:maxdepth: 2
multiplex
swarm_services

View File

@ -0,0 +1,66 @@
Handling multiplexed streams
============================
.. note::
The following instruction assume you're interested in getting output from
an ``exec`` command. These instruction are similarly applicable to the
output of ``attach``.
First create a container that runs in the background:
>>> client = docker.from_env()
>>> container = client.containers.run(
... 'bfirsh/reticulate-splines', detach=True)
Prepare the command we are going to use. It prints "hello stdout"
in `stdout`, followed by "hello stderr" in `stderr`:
>>> cmd = '/bin/sh -c "echo hello stdout ; echo hello stderr >&2"'
We'll run this command with all four the combinations of ``stream``
and ``demux``.
With ``stream=False`` and ``demux=False``, the output is a string
that contains both the `stdout` and the `stderr` output:
>>> res = container.exec_run(cmd, stream=False, demux=False)
>>> res.output
b'hello stderr\nhello stdout\n'
With ``stream=True``, and ``demux=False``, the output is a
generator that yields strings containing the output of both
`stdout` and `stderr`:
>>> res = container.exec_run(cmd, stream=True, demux=False)
>>> next(res.output)
b'hello stdout\n'
>>> next(res.output)
b'hello stderr\n'
>>> next(res.output)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration
With ``stream=True`` and ``demux=True``, the generator now
separates the streams, and yield tuples
``(stdout, stderr)``:
>>> res = container.exec_run(cmd, stream=True, demux=True)
>>> next(res.output)
(b'hello stdout\n', None)
>>> next(res.output)
(None, b'hello stderr\n')
>>> next(res.output)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration
Finally, with ``stream=False`` and ``demux=True``, the whole output
is returned, but the streams are still separated:
>>> res = container.exec_run(cmd, stream=True, demux=True)
>>> next(res.output)
(b'hello stdout\n', None)
>>> next(res.output)
(None, b'hello stderr\n')
>>> next(res.output)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration

View File

@ -1,5 +1,9 @@
# Swarm services # Swarm services
> Warning:
> This is a stale document and may contain outdated information.
> Refer to the API docs for updated classes and method signatures.
Starting with Engine version 1.12 (API 1.24), it is possible to manage services Starting with Engine version 1.12 (API 1.24), it is possible to manage services
using the Docker Engine API. Note that the engine needs to be part of a using the Docker Engine API. Note that the engine needs to be part of a
[Swarm cluster](../swarm.rst) before you can use the service-related methods. [Swarm cluster](../swarm.rst) before you can use the service-related methods.

View File

@ -7,7 +7,7 @@ from datetime import datetime
import docker import docker
from docker.constants import IS_WINDOWS_PLATFORM from docker.constants import IS_WINDOWS_PLATFORM
from docker.utils.socket import next_frame_size from docker.utils.socket import next_frame_header
from docker.utils.socket import read_exactly from docker.utils.socket import read_exactly
import pytest import pytest
@ -1242,7 +1242,8 @@ class AttachContainerTest(BaseAPIIntegrationTest):
self.client.start(container) self.client.start(container)
next_size = next_frame_size(pty_stdout) (stream, next_size) = next_frame_header(pty_stdout)
assert stream == 1 # correspond to stdout
assert next_size == len(line) assert next_size == len(line)
data = read_exactly(pty_stdout, next_size) data = read_exactly(pty_stdout, next_size)
assert data.decode('utf-8') == line assert data.decode('utf-8') == line

View File

@ -1,4 +1,4 @@
from docker.utils.socket import next_frame_size from docker.utils.socket import next_frame_header
from docker.utils.socket import read_exactly from docker.utils.socket import read_exactly
from .base import BaseAPIIntegrationTest, BUSYBOX from .base import BaseAPIIntegrationTest, BUSYBOX
@ -75,6 +75,75 @@ class ExecTest(BaseAPIIntegrationTest):
res += chunk res += chunk
assert res == b'hello\nworld\n' assert res == b'hello\nworld\n'
def test_exec_command_demux(self):
container = self.client.create_container(
BUSYBOX, 'cat', detach=True, stdin_open=True)
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
script = ' ; '.join([
# Write something on stdout
'echo hello out',
# Busybox's sleep does not handle sub-second times.
# This loops takes ~0.3 second to execute on my machine.
'for i in $(seq 1 50000); do echo $i>/dev/null; done',
# Write something on stderr
'echo hello err >&2'])
cmd = 'sh -c "{}"'.format(script)
# tty=False, stream=False, demux=False
res = self.client.exec_create(id, cmd)
exec_log = self.client.exec_start(res)
assert exec_log == b'hello out\nhello err\n'
# tty=False, stream=True, demux=False
res = self.client.exec_create(id, cmd)
exec_log = self.client.exec_start(res, stream=True)
assert next(exec_log) == b'hello out\n'
assert next(exec_log) == b'hello err\n'
with self.assertRaises(StopIteration):
next(exec_log)
# tty=False, stream=False, demux=True
res = self.client.exec_create(id, cmd)
exec_log = self.client.exec_start(res, demux=True)
assert exec_log == (b'hello out\n', b'hello err\n')
# tty=False, stream=True, demux=True
res = self.client.exec_create(id, cmd)
exec_log = self.client.exec_start(res, demux=True, stream=True)
assert next(exec_log) == (b'hello out\n', None)
assert next(exec_log) == (None, b'hello err\n')
with self.assertRaises(StopIteration):
next(exec_log)
# tty=True, stream=False, demux=False
res = self.client.exec_create(id, cmd, tty=True)
exec_log = self.client.exec_start(res)
assert exec_log == b'hello out\r\nhello err\r\n'
# tty=True, stream=True, demux=False
res = self.client.exec_create(id, cmd, tty=True)
exec_log = self.client.exec_start(res, stream=True)
assert next(exec_log) == b'hello out\r\n'
assert next(exec_log) == b'hello err\r\n'
with self.assertRaises(StopIteration):
next(exec_log)
# tty=True, stream=False, demux=True
res = self.client.exec_create(id, cmd, tty=True)
exec_log = self.client.exec_start(res, demux=True)
assert exec_log == (b'hello out\r\nhello err\r\n', None)
# tty=True, stream=True, demux=True
res = self.client.exec_create(id, cmd, tty=True)
exec_log = self.client.exec_start(res, demux=True, stream=True)
assert next(exec_log) == (b'hello out\r\n', None)
assert next(exec_log) == (b'hello err\r\n', None)
with self.assertRaises(StopIteration):
next(exec_log)
def test_exec_start_socket(self): def test_exec_start_socket(self):
container = self.client.create_container(BUSYBOX, 'cat', container = self.client.create_container(BUSYBOX, 'cat',
detach=True, stdin_open=True) detach=True, stdin_open=True)
@ -91,7 +160,8 @@ class ExecTest(BaseAPIIntegrationTest):
socket = self.client.exec_start(exec_id, socket=True) socket = self.client.exec_start(exec_id, socket=True)
self.addCleanup(socket.close) self.addCleanup(socket.close)
next_size = next_frame_size(socket) (stream, next_size) = next_frame_header(socket)
assert stream == 1 # stdout (0 = stdin, 1 = stdout, 2 = stderr)
assert next_size == len(line) assert next_size == len(line)
data = read_exactly(socket, next_size) data = read_exactly(socket, next_size)
assert data.decode('utf-8') == line assert data.decode('utf-8') == line

View File

@ -15,6 +15,7 @@ from docker.api import APIClient
import requests import requests
from requests.packages import urllib3 from requests.packages import urllib3
import six import six
import struct
from . import fake_api from . import fake_api
@ -83,7 +84,7 @@ def fake_delete(self, url, *args, **kwargs):
return fake_request('DELETE', url, *args, **kwargs) return fake_request('DELETE', url, *args, **kwargs)
def fake_read_from_socket(self, response, stream, tty=False): def fake_read_from_socket(self, response, stream, tty=False, demux=False):
return six.binary_type() return six.binary_type()
@ -467,56 +468,124 @@ class UnixSocketStreamTest(unittest.TestCase):
class TCPSocketStreamTest(unittest.TestCase): class TCPSocketStreamTest(unittest.TestCase):
text_data = b''' stdout_data = b'''
Now, those children out there, they're jumping through the Now, those children out there, they're jumping through the
flames in the hope that the god of the fire will make them fruitful. flames in the hope that the god of the fire will make them fruitful.
Really, you can't blame them. After all, what girl would not prefer the Really, you can't blame them. After all, what girl would not prefer the
child of a god to that of some acne-scarred artisan? child of a god to that of some acne-scarred artisan?
''' '''
stderr_data = b'''
And what of the true God? To whose glory churches and monasteries have been
built on these islands for generations past? Now shall what of Him?
'''
def setUp(self): @classmethod
def setup_class(cls):
cls.server = six.moves.socketserver.ThreadingTCPServer(
('', 0), cls.get_handler_class())
cls.thread = threading.Thread(target=cls.server.serve_forever)
cls.thread.setDaemon(True)
cls.thread.start()
cls.address = 'http://{}:{}'.format(
socket.gethostname(), cls.server.server_address[1])
self.server = six.moves.socketserver.ThreadingTCPServer( @classmethod
('', 0), self.get_handler_class() def teardown_class(cls):
) cls.server.shutdown()
self.thread = threading.Thread(target=self.server.serve_forever) cls.server.server_close()
self.thread.setDaemon(True) cls.thread.join()
self.thread.start()
self.address = 'http://{}:{}'.format(
socket.gethostname(), self.server.server_address[1]
)
def tearDown(self): @classmethod
self.server.shutdown() def get_handler_class(cls):
self.server.server_close() stdout_data = cls.stdout_data
self.thread.join() stderr_data = cls.stderr_data
def get_handler_class(self):
text_data = self.text_data
class Handler(six.moves.BaseHTTPServer.BaseHTTPRequestHandler, object): class Handler(six.moves.BaseHTTPServer.BaseHTTPRequestHandler, object):
def do_POST(self): def do_POST(self):
resp_data = self.get_resp_data()
self.send_response(101) self.send_response(101)
self.send_header( self.send_header(
'Content-Type', 'application/vnd.docker.raw-stream' 'Content-Type', 'application/vnd.docker.raw-stream')
)
self.send_header('Connection', 'Upgrade') self.send_header('Connection', 'Upgrade')
self.send_header('Upgrade', 'tcp') self.send_header('Upgrade', 'tcp')
self.end_headers() self.end_headers()
self.wfile.flush() self.wfile.flush()
time.sleep(0.2) time.sleep(0.2)
self.wfile.write(text_data) self.wfile.write(resp_data)
self.wfile.flush() self.wfile.flush()
def get_resp_data(self):
path = self.path.split('/')[-1]
if path == 'tty':
return stdout_data + stderr_data
elif path == 'no-tty':
data = b''
data += self.frame_header(1, stdout_data)
data += stdout_data
data += self.frame_header(2, stderr_data)
data += stderr_data
return data
else:
raise Exception('Unknown path {0}'.format(path))
@staticmethod
def frame_header(stream, data):
return struct.pack('>BxxxL', stream, len(data))
return Handler return Handler
def test_read_from_socket(self): def request(self, stream=None, tty=None, demux=None):
assert stream is not None and tty is not None and demux is not None
with APIClient(base_url=self.address) as client: with APIClient(base_url=self.address) as client:
resp = client._post(client._url('/dummy'), stream=True) if tty:
data = client._read_from_socket(resp, stream=True, tty=True) url = client._url('/tty')
results = b''.join(data) else:
url = client._url('/no-tty')
resp = client._post(url, stream=True)
return client._read_from_socket(
resp, stream=stream, tty=tty, demux=demux)
assert results == self.text_data def test_read_from_socket_tty(self):
res = self.request(stream=True, tty=True, demux=False)
assert next(res) == self.stdout_data + self.stderr_data
with self.assertRaises(StopIteration):
next(res)
def test_read_from_socket_tty_demux(self):
res = self.request(stream=True, tty=True, demux=True)
assert next(res) == (self.stdout_data + self.stderr_data, None)
with self.assertRaises(StopIteration):
next(res)
def test_read_from_socket_no_tty(self):
res = self.request(stream=True, tty=False, demux=False)
assert next(res) == self.stdout_data
assert next(res) == self.stderr_data
with self.assertRaises(StopIteration):
next(res)
def test_read_from_socket_no_tty_demux(self):
res = self.request(stream=True, tty=False, demux=True)
assert (self.stdout_data, None) == next(res)
assert (None, self.stderr_data) == next(res)
with self.assertRaises(StopIteration):
next(res)
def test_read_from_socket_no_stream_tty(self):
res = self.request(stream=False, tty=True, demux=False)
assert res == self.stdout_data + self.stderr_data
def test_read_from_socket_no_stream_tty_demux(self):
res = self.request(stream=False, tty=True, demux=True)
assert res == (self.stdout_data + self.stderr_data, None)
def test_read_from_socket_no_stream_no_tty(self):
res = self.request(stream=False, tty=False, demux=False)
res == self.stdout_data + self.stderr_data
def test_read_from_socket_no_stream_no_tty_demux(self):
res = self.request(stream=False, tty=False, demux=True)
assert res == (self.stdout_data, self.stderr_data)
class UserAgentTest(unittest.TestCase): class UserAgentTest(unittest.TestCase):

View File

@ -419,7 +419,8 @@ class ContainerTest(unittest.TestCase):
workdir=None workdir=None
) )
client.api.exec_start.assert_called_with( client.api.exec_start.assert_called_with(
FAKE_EXEC_ID, detach=False, tty=False, stream=True, socket=False FAKE_EXEC_ID, detach=False, tty=False, stream=True, socket=False,
demux=False,
) )
def test_exec_run_failure(self): def test_exec_run_failure(self):
@ -432,7 +433,8 @@ class ContainerTest(unittest.TestCase):
workdir=None workdir=None
) )
client.api.exec_start.assert_called_with( client.api.exec_start.assert_called_with(
FAKE_EXEC_ID, detach=False, tty=False, stream=False, socket=False FAKE_EXEC_ID, detach=False, tty=False, stream=False, socket=False,
demux=False,
) )
def test_export(self): def test_export(self):