diff --git a/compose/cli/log_printer.py b/compose/cli/log_printer.py index 845f799b79..6e1499e1d5 100644 --- a/compose/cli/log_printer.py +++ b/compose/cli/log_printer.py @@ -6,8 +6,8 @@ from itertools import cycle from . import colors from .multiplexer import Multiplexer -from .utils import split_buffer from compose import utils +from compose.utils import split_buffer class LogPrinter(object): @@ -75,7 +75,7 @@ def build_no_log_generator(container, prefix, color_func): def build_log_generator(container, prefix, color_func): # Attach to container before log printer starts running stream = container.attach(stdout=True, stderr=True, stream=True, logs=True) - line_generator = split_buffer(stream, u'\n') + line_generator = split_buffer(stream) for line in line_generator: yield prefix + line diff --git a/compose/cli/utils.py b/compose/cli/utils.py index 5840f0a8ce..07510e2f31 100644 --- a/compose/cli/utils.py +++ b/compose/cli/utils.py @@ -7,7 +7,6 @@ import platform import ssl import subprocess -import six from docker import version as docker_py_version from six.moves import input @@ -36,31 +35,6 @@ def yesno(prompt, default=None): return None -def split_buffer(reader, separator): - """ - Given a generator which yields strings and a separator string, - joins all input, splits on the separator and yields each chunk. - - Unlike string.split(), each chunk includes the trailing - separator, except for the last one if none was found on the end - of the input. - """ - buffered = six.text_type('') - separator = six.text_type(separator) - - for data in reader: - buffered += data.decode('utf-8') - while True: - index = buffered.find(separator) - if index == -1: - break - yield buffered[:index + 1] - buffered = buffered[index + 1:] - - if len(buffered) > 0: - yield buffered - - def call_silently(*args, **kwargs): """ Like subprocess.call(), but redirects stdout and stderr to /dev/null. diff --git a/compose/container.py b/compose/container.py index 28af093d76..a03acf56fd 100644 --- a/compose/container.py +++ b/compose/container.py @@ -212,9 +212,6 @@ class Container(object): def attach(self, *args, **kwargs): return self.client.attach(self.id, *args, **kwargs) - def attach_socket(self, **kwargs): - return self.client.attach_socket(self.id, **kwargs) - def __repr__(self): return '' % (self.name, self.id[:6]) diff --git a/compose/progress_stream.py b/compose/progress_stream.py index c44b33e561..ac8e4b410f 100644 --- a/compose/progress_stream.py +++ b/compose/progress_stream.py @@ -1,7 +1,3 @@ -import json - -import six - from compose import utils @@ -16,10 +12,7 @@ def stream_output(output, stream): lines = {} diff = 0 - for chunk in output: - if six.PY3: - chunk = chunk.decode('utf-8') - event = json.loads(chunk) + for event in utils.json_stream(output): all_events.append(event) if 'progress' in event or 'progressDetail' in event: diff --git a/compose/service.py b/compose/service.py index c9ca00ae41..698ab4844f 100644 --- a/compose/service.py +++ b/compose/service.py @@ -34,6 +34,7 @@ from .progress_stream import StreamOutputError from .utils import json_hash from .utils import parallel_execute + log = logging.getLogger(__name__) diff --git a/compose/utils.py b/compose/utils.py index e0304ba506..c8fddc5f16 100644 --- a/compose/utils.py +++ b/compose/utils.py @@ -1,6 +1,7 @@ import codecs import hashlib import json +import json.decoder import logging import sys from threading import Thread @@ -13,6 +14,8 @@ from six.moves.queue import Queue log = logging.getLogger(__name__) +json_decoder = json.JSONDecoder() + def parallel_execute(objects, obj_callable, msg_index, msg): """ @@ -83,6 +86,71 @@ def get_output_stream(stream): return codecs.getwriter('utf-8')(stream) +def stream_as_text(stream): + """Given a stream of bytes or text, if any of the items in the stream + are bytes convert them to text. + + This function can be removed once docker-py returns text streams instead + of byte streams. + """ + for data in stream: + if not isinstance(data, six.text_type): + data = data.decode('utf-8') + yield data + + +def line_splitter(buffer, separator=u'\n'): + index = buffer.find(six.text_type(separator)) + if index == -1: + return None, None + return buffer[:index + 1], buffer[index + 1:] + + +def split_buffer(stream, splitter=None, decoder=lambda a: a): + """Given a generator which yields strings and a splitter function, + joins all input, splits on the separator and yields each chunk. + + Unlike string.split(), each chunk includes the trailing + separator, except for the last one if none was found on the end + of the input. + """ + splitter = splitter or line_splitter + buffered = six.text_type('') + + for data in stream_as_text(stream): + buffered += data + while True: + item, rest = splitter(buffered) + if not item: + break + + buffered = rest + yield item + + if buffered: + yield decoder(buffered) + + +def json_splitter(buffer): + """Attempt to parse a json object from a buffer. If there is at least one + object, return it and the rest of the buffer, otherwise return None. + """ + try: + obj, index = json_decoder.raw_decode(buffer) + rest = buffer[json.decoder.WHITESPACE.match(buffer, index).end():] + return obj, rest + except ValueError: + return None, None + + +def json_stream(stream): + """Given a stream of text, return a stream of json objects. + This handles streams which are inconsistently buffered (some entries may + be newline delimited, and others are not). + """ + return split_buffer(stream_as_text(stream), json_splitter, json_decoder.decode) + + def write_out_msg(stream, lines, msg_index, msg, status="done"): """ Using special ANSI code characters we can write out the msg over the top of diff --git a/tests/unit/split_buffer_test.py b/tests/unit/split_buffer_test.py index 47c72f0865..c41ea27d40 100644 --- a/tests/unit/split_buffer_test.py +++ b/tests/unit/split_buffer_test.py @@ -2,7 +2,7 @@ from __future__ import absolute_import from __future__ import unicode_literals from .. import unittest -from compose.cli.utils import split_buffer +from compose.utils import split_buffer class SplitBufferTest(unittest.TestCase): @@ -47,7 +47,7 @@ class SplitBufferTest(unittest.TestCase): self.assert_produces(reader, [string]) def assert_produces(self, reader, expectations): - split = split_buffer(reader(), u'\n') + split = split_buffer(reader()) for (actual, expected) in zip(split, expectations): self.assertEqual(type(actual), type(expected)) diff --git a/tests/unit/utils_test.py b/tests/unit/utils_test.py new file mode 100644 index 0000000000..b272c7349a --- /dev/null +++ b/tests/unit/utils_test.py @@ -0,0 +1,16 @@ +from .. import unittest +from compose import utils + + +class JsonSplitterTestCase(unittest.TestCase): + + def test_json_splitter_no_object(self): + data = '{"foo": "bar' + self.assertEqual(utils.json_splitter(data), (None, None)) + + def test_json_splitter_with_object(self): + data = '{"foo": "bar"}\n \n{"next": "obj"}' + self.assertEqual( + utils.json_splitter(data), + ({'foo': 'bar'}, '{"next": "obj"}') + )