From fe65c0258d2f3412a18c07e0f701be6b292c2286 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Fri, 2 Oct 2015 19:26:45 -0400 Subject: [PATCH 1/3] Remove unused attach_socket function from Container. Signed-off-by: Daniel Nephin --- compose/container.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/compose/container.py b/compose/container.py index 28af093d76..a03acf56fd 100644 --- a/compose/container.py +++ b/compose/container.py @@ -212,9 +212,6 @@ class Container(object): def attach(self, *args, **kwargs): return self.client.attach(self.id, *args, **kwargs) - def attach_socket(self, **kwargs): - return self.client.attach_socket(self.id, **kwargs) - def __repr__(self): return '' % (self.name, self.id[:6]) From 3661e8bc7419ae34e4639edec91df2e1db707312 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Fri, 2 Oct 2015 19:47:27 -0400 Subject: [PATCH 2/3] Fix build against the swarm cluster by joining buffered output before parsing json. Signed-off-by: Daniel Nephin --- compose/cli/log_printer.py | 4 ++-- compose/cli/utils.py | 26 ---------------------- compose/progress_stream.py | 6 +----- compose/service.py | 4 +++- compose/utils.py | 38 +++++++++++++++++++++++++++++++++ tests/integration/testcases.py | 6 ++++-- tests/unit/split_buffer_test.py | 2 +- 7 files changed, 49 insertions(+), 37 deletions(-) diff --git a/compose/cli/log_printer.py b/compose/cli/log_printer.py index 845f799b79..6e1499e1d5 100644 --- a/compose/cli/log_printer.py +++ b/compose/cli/log_printer.py @@ -6,8 +6,8 @@ from itertools import cycle from . import colors from .multiplexer import Multiplexer -from .utils import split_buffer from compose import utils +from compose.utils import split_buffer class LogPrinter(object): @@ -75,7 +75,7 @@ def build_no_log_generator(container, prefix, color_func): def build_log_generator(container, prefix, color_func): # Attach to container before log printer starts running stream = container.attach(stdout=True, stderr=True, stream=True, logs=True) - line_generator = split_buffer(stream, u'\n') + line_generator = split_buffer(stream) for line in line_generator: yield prefix + line diff --git a/compose/cli/utils.py b/compose/cli/utils.py index 5840f0a8ce..07510e2f31 100644 --- a/compose/cli/utils.py +++ b/compose/cli/utils.py @@ -7,7 +7,6 @@ import platform import ssl import subprocess -import six from docker import version as docker_py_version from six.moves import input @@ -36,31 +35,6 @@ def yesno(prompt, default=None): return None -def split_buffer(reader, separator): - """ - Given a generator which yields strings and a separator string, - joins all input, splits on the separator and yields each chunk. - - Unlike string.split(), each chunk includes the trailing - separator, except for the last one if none was found on the end - of the input. - """ - buffered = six.text_type('') - separator = six.text_type(separator) - - for data in reader: - buffered += data.decode('utf-8') - while True: - index = buffered.find(separator) - if index == -1: - break - yield buffered[:index + 1] - buffered = buffered[index + 1:] - - if len(buffered) > 0: - yield buffered - - def call_silently(*args, **kwargs): """ Like subprocess.call(), but redirects stdout and stderr to /dev/null. diff --git a/compose/progress_stream.py b/compose/progress_stream.py index c44b33e561..ca8f351350 100644 --- a/compose/progress_stream.py +++ b/compose/progress_stream.py @@ -1,7 +1,5 @@ import json -import six - from compose import utils @@ -16,9 +14,7 @@ def stream_output(output, stream): lines = {} diff = 0 - for chunk in output: - if six.PY3: - chunk = chunk.decode('utf-8') + for chunk in utils.stream_as_text(output): event = json.loads(chunk) all_events.append(event) diff --git a/compose/service.py b/compose/service.py index c9ca00ae41..bce2e534c9 100644 --- a/compose/service.py +++ b/compose/service.py @@ -33,6 +33,8 @@ from .progress_stream import stream_output from .progress_stream import StreamOutputError from .utils import json_hash from .utils import parallel_execute +from .utils import split_buffer + log = logging.getLogger(__name__) @@ -722,7 +724,7 @@ class Service(object): ) try: - all_events = stream_output(build_output, sys.stdout) + all_events = stream_output(split_buffer(build_output), sys.stdout) except StreamOutputError as e: raise BuildError(self, six.text_type(e)) diff --git a/compose/utils.py b/compose/utils.py index e0304ba506..f201e2d6cf 100644 --- a/compose/utils.py +++ b/compose/utils.py @@ -83,6 +83,44 @@ def get_output_stream(stream): return codecs.getwriter('utf-8')(stream) +def stream_as_text(stream): + """Given a stream of bytes or text, if any of the items in the stream + are bytes convert them to text. + + This function can be removed once docker-py returns text streams instead + of byte streams. + """ + for data in stream: + if not isinstance(data, six.text_type): + data = data.decode('utf-8') + yield data + + +def split_buffer(reader, separator=u'\n'): + """ + Given a generator which yields strings and a separator string, + joins all input, splits on the separator and yields each chunk. + + Unlike string.split(), each chunk includes the trailing + separator, except for the last one if none was found on the end + of the input. + """ + buffered = six.text_type('') + separator = six.text_type(separator) + + for data in stream_as_text(reader): + buffered += data + while True: + index = buffered.find(separator) + if index == -1: + break + yield buffered[:index + 1] + buffered = buffered[index + 1:] + + if len(buffered) > 0: + yield buffered + + def write_out_msg(stream, lines, msg_index, msg, status="done"): """ Using special ANSI code characters we can write out the msg over the top of diff --git a/tests/integration/testcases.py b/tests/integration/testcases.py index 26a0a108a1..7dec3728b8 100644 --- a/tests/integration/testcases.py +++ b/tests/integration/testcases.py @@ -9,6 +9,8 @@ from compose.config.config import ServiceLoader from compose.const import LABEL_PROJECT from compose.progress_stream import stream_output from compose.service import Service +from compose.utils import split_buffer +from compose.utils import stream_as_text def pull_busybox(client): @@ -71,5 +73,5 @@ class DockerClientTestCase(unittest.TestCase): def check_build(self, *args, **kwargs): kwargs.setdefault('rm', True) - build_output = self.client.build(*args, **kwargs) - stream_output(build_output, open('/dev/null', 'w')) + build_output = stream_as_text(self.client.build(*args, **kwargs)) + stream_output(split_buffer(build_output), open('/dev/null', 'w')) diff --git a/tests/unit/split_buffer_test.py b/tests/unit/split_buffer_test.py index 47c72f0865..1775e4cb15 100644 --- a/tests/unit/split_buffer_test.py +++ b/tests/unit/split_buffer_test.py @@ -2,7 +2,7 @@ from __future__ import absolute_import from __future__ import unicode_literals from .. import unittest -from compose.cli.utils import split_buffer +from compose.utils import split_buffer class SplitBufferTest(unittest.TestCase): From 15d0c60a73bf700400de826bd122f3f1c30bd0c0 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Mon, 5 Oct 2015 12:56:10 -0400 Subject: [PATCH 3/3] Fix split buffer with inconsistently delimited json objects. Signed-off-by: Daniel Nephin --- compose/progress_stream.py | 5 +--- compose/service.py | 3 +- compose/utils.py | 52 ++++++++++++++++++++++++++------- tests/integration/testcases.py | 6 ++-- tests/unit/split_buffer_test.py | 2 +- tests/unit/utils_test.py | 16 ++++++++++ 6 files changed, 62 insertions(+), 22 deletions(-) create mode 100644 tests/unit/utils_test.py diff --git a/compose/progress_stream.py b/compose/progress_stream.py index ca8f351350..ac8e4b410f 100644 --- a/compose/progress_stream.py +++ b/compose/progress_stream.py @@ -1,5 +1,3 @@ -import json - from compose import utils @@ -14,8 +12,7 @@ def stream_output(output, stream): lines = {} diff = 0 - for chunk in utils.stream_as_text(output): - event = json.loads(chunk) + for event in utils.json_stream(output): all_events.append(event) if 'progress' in event or 'progressDetail' in event: diff --git a/compose/service.py b/compose/service.py index bce2e534c9..698ab4844f 100644 --- a/compose/service.py +++ b/compose/service.py @@ -33,7 +33,6 @@ from .progress_stream import stream_output from .progress_stream import StreamOutputError from .utils import json_hash from .utils import parallel_execute -from .utils import split_buffer log = logging.getLogger(__name__) @@ -724,7 +723,7 @@ class Service(object): ) try: - all_events = stream_output(split_buffer(build_output), sys.stdout) + all_events = stream_output(build_output, sys.stdout) except StreamOutputError as e: raise BuildError(self, six.text_type(e)) diff --git a/compose/utils.py b/compose/utils.py index f201e2d6cf..c8fddc5f16 100644 --- a/compose/utils.py +++ b/compose/utils.py @@ -1,6 +1,7 @@ import codecs import hashlib import json +import json.decoder import logging import sys from threading import Thread @@ -13,6 +14,8 @@ from six.moves.queue import Queue log = logging.getLogger(__name__) +json_decoder = json.JSONDecoder() + def parallel_execute(objects, obj_callable, msg_index, msg): """ @@ -96,29 +99,56 @@ def stream_as_text(stream): yield data -def split_buffer(reader, separator=u'\n'): - """ - Given a generator which yields strings and a separator string, +def line_splitter(buffer, separator=u'\n'): + index = buffer.find(six.text_type(separator)) + if index == -1: + return None, None + return buffer[:index + 1], buffer[index + 1:] + + +def split_buffer(stream, splitter=None, decoder=lambda a: a): + """Given a generator which yields strings and a splitter function, joins all input, splits on the separator and yields each chunk. Unlike string.split(), each chunk includes the trailing separator, except for the last one if none was found on the end of the input. """ + splitter = splitter or line_splitter buffered = six.text_type('') - separator = six.text_type(separator) - for data in stream_as_text(reader): + for data in stream_as_text(stream): buffered += data while True: - index = buffered.find(separator) - if index == -1: + item, rest = splitter(buffered) + if not item: break - yield buffered[:index + 1] - buffered = buffered[index + 1:] - if len(buffered) > 0: - yield buffered + buffered = rest + yield item + + if buffered: + yield decoder(buffered) + + +def json_splitter(buffer): + """Attempt to parse a json object from a buffer. If there is at least one + object, return it and the rest of the buffer, otherwise return None. + """ + try: + obj, index = json_decoder.raw_decode(buffer) + rest = buffer[json.decoder.WHITESPACE.match(buffer, index).end():] + return obj, rest + except ValueError: + return None, None + + +def json_stream(stream): + """Given a stream of text, return a stream of json objects. + This handles streams which are inconsistently buffered (some entries may + be newline delimited, and others are not). + """ + return split_buffer(stream_as_text(stream), json_splitter, json_decoder.decode) def write_out_msg(stream, lines, msg_index, msg, status="done"): diff --git a/tests/integration/testcases.py b/tests/integration/testcases.py index 7dec3728b8..26a0a108a1 100644 --- a/tests/integration/testcases.py +++ b/tests/integration/testcases.py @@ -9,8 +9,6 @@ from compose.config.config import ServiceLoader from compose.const import LABEL_PROJECT from compose.progress_stream import stream_output from compose.service import Service -from compose.utils import split_buffer -from compose.utils import stream_as_text def pull_busybox(client): @@ -73,5 +71,5 @@ class DockerClientTestCase(unittest.TestCase): def check_build(self, *args, **kwargs): kwargs.setdefault('rm', True) - build_output = stream_as_text(self.client.build(*args, **kwargs)) - stream_output(split_buffer(build_output), open('/dev/null', 'w')) + build_output = self.client.build(*args, **kwargs) + stream_output(build_output, open('/dev/null', 'w')) diff --git a/tests/unit/split_buffer_test.py b/tests/unit/split_buffer_test.py index 1775e4cb15..c41ea27d40 100644 --- a/tests/unit/split_buffer_test.py +++ b/tests/unit/split_buffer_test.py @@ -47,7 +47,7 @@ class SplitBufferTest(unittest.TestCase): self.assert_produces(reader, [string]) def assert_produces(self, reader, expectations): - split = split_buffer(reader(), u'\n') + split = split_buffer(reader()) for (actual, expected) in zip(split, expectations): self.assertEqual(type(actual), type(expected)) diff --git a/tests/unit/utils_test.py b/tests/unit/utils_test.py new file mode 100644 index 0000000000..b272c7349a --- /dev/null +++ b/tests/unit/utils_test.py @@ -0,0 +1,16 @@ +from .. import unittest +from compose import utils + + +class JsonSplitterTestCase(unittest.TestCase): + + def test_json_splitter_no_object(self): + data = '{"foo": "bar' + self.assertEqual(utils.json_splitter(data), (None, None)) + + def test_json_splitter_with_object(self): + data = '{"foo": "bar"}\n \n{"next": "obj"}' + self.assertEqual( + utils.json_splitter(data), + ({'foo': 'bar'}, '{"next": "obj"}') + )