import codecs import hashlib import json import logging import sys from docker.errors import APIError from Queue import Queue, Empty from threading import Thread log = logging.getLogger(__name__) def parallel_execute(command, containers, doing_msg, done_msg, **options): """ Execute a given command upon a list of containers in parallel. """ stream = codecs.getwriter('utf-8')(sys.stdout) lines = [] errors = {} for container in containers: write_out_msg(stream, lines, container.name, doing_msg) q = Queue() def container_command_execute(container, command, **options): try: getattr(container, command)(**options) except APIError as e: errors[container.name] = e.explanation q.put(container) for container in containers: t = Thread( target=container_command_execute, args=(container, command), kwargs=options, ) t.daemon = True t.start() done = 0 while done < len(containers): try: container = q.get(timeout=1) write_out_msg(stream, lines, container.name, done_msg) done += 1 except Empty: pass if errors: for container in errors: stream.write("ERROR: for {} {} \n".format(container, errors[container])) def write_out_msg(stream, lines, container_name, msg): """ Using special ANSI code characters we can write out the msg over the top of a previous status message, if it exists. """ if container_name in lines: position = lines.index(container_name) diff = len(lines) - position # move up stream.write("%c[%dA" % (27, diff)) # erase stream.write("%c[2K\r" % 27) stream.write("{}: {} \n".format(container_name, msg)) # move back down stream.write("%c[%dB" % (27, diff)) else: diff = 0 lines.append(container_name) stream.write("{}: {}... \r\n".format(container_name, msg)) stream.flush() def json_hash(obj): dump = json.dumps(obj, sort_keys=True, separators=(',', ':')) h = hashlib.sha256() h.update(dump) return h.hexdigest()