from __future__ import absolute_import from __future__ import unicode_literals from threading import Thread from six.moves import _thread as thread try: from Queue import Queue, Empty except ImportError: from queue import Queue, Empty # Python 3.x from compose.cli.signals import ShutdownException STOP = object() class Multiplexer(object): """ Create a single iterator from several iterators by running all of them in parallel and yielding results as they come in. """ def __init__(self, iterators, cascade_stop=False): self.iterators = iterators self.cascade_stop = cascade_stop self._num_running = len(iterators) self.queue = Queue() def loop(self): self._init_readers() while self._num_running > 0: try: item, exception = self.queue.get(timeout=0.1) if exception: raise exception if item is STOP: if self.cascade_stop is True: break else: self._num_running -= 1 else: yield item except Empty: pass # See https://github.com/docker/compose/issues/189 except thread.error: raise ShutdownException() def _init_readers(self): for iterator in self.iterators: t = Thread(target=_enqueue_output, args=(iterator, self.queue)) t.daemon = True t.start() def _enqueue_output(iterator, queue): try: for item in iterator: queue.put((item, None)) queue.put((STOP, None)) except Exception as e: queue.put((None, e))