Refactor so there's only one queue

Signed-off-by: Aanand Prasad <aanand.prasad@gmail.com>
This commit is contained in:
Aanand Prasad 2016-04-08 18:54:02 +01:00
parent ffab27c049
commit 54b6fc4219
1 changed files with 34 additions and 45 deletions

View File

@ -69,24 +69,33 @@ def parallel_execute_stream(objects, func, get_deps):
get_deps = _no_deps get_deps = _no_deps
results = Queue() results = Queue()
output = Queue()
t = Thread(target=queue_consumer, args=(objects, func, get_deps, results, output)) started = set() # objects being processed
t.daemon = True finished = set() # objects which have been processed
t.start() failed = set() # objects which either failed or whose dependencies failed
done = 0 while len(finished) + len(failed) < len(objects):
for event in feed_queue(objects, func, get_deps, results, started, finished, failed):
yield event
while done < len(objects):
try: try:
yield output.get(timeout=1) event = results.get(timeout=1)
done += 1
except Empty: except Empty:
continue continue
# See https://github.com/docker/compose/issues/189 # See https://github.com/docker/compose/issues/189
except thread.error: except thread.error:
raise ShutdownException() raise ShutdownException()
obj, _, exception = event
if exception is None:
log.debug('Finished processing: {}'.format(obj))
finished.add(obj)
else:
log.debug('Failed: {}'.format(obj))
failed.add(obj)
yield event
def queue_producer(obj, func, results): def queue_producer(obj, func, results):
try: try:
@ -96,46 +105,26 @@ def queue_producer(obj, func, results):
results.put((obj, None, e)) results.put((obj, None, e))
def queue_consumer(objects, func, get_deps, results, output): def feed_queue(objects, func, get_deps, results, started, finished, failed):
started = set() # objects being processed pending = set(objects) - started - finished - failed
finished = set() # objects which have been processed log.debug('Pending: {}'.format(pending))
failed = set() # objects which either failed or whose dependencies failed
while len(finished) + len(failed) < len(objects): for obj in pending:
pending = set(objects) - started - finished - failed deps = get_deps(obj)
log.debug('Pending: {}'.format(pending))
for obj in pending: if any(dep in failed for dep in deps):
deps = get_deps(obj) log.debug('{} has upstream errors - not processing'.format(obj))
yield (obj, None, UpstreamError())
if any(dep in failed for dep in deps):
log.debug('{} has upstream errors - not processing'.format(obj))
output.put((obj, None, UpstreamError()))
failed.add(obj)
elif all(
dep not in objects or dep in finished
for dep in deps
):
log.debug('Starting producer thread for {}'.format(obj))
t = Thread(target=queue_producer, args=(obj, func, results))
t.daemon = True
t.start()
started.add(obj)
try:
event = results.get(timeout=1)
except Empty:
continue
obj, _, exception = event
if exception is None:
log.debug('Finished processing: {}'.format(obj))
finished.add(obj)
else:
log.debug('Failed: {}'.format(obj))
failed.add(obj) failed.add(obj)
elif all(
output.put(event) dep not in objects or dep in finished
for dep in deps
):
log.debug('Starting producer thread for {}'.format(obj))
t = Thread(target=queue_producer, args=(obj, func, results))
t.daemon = True
t.start()
started.add(obj)
class UpstreamError(Exception): class UpstreamError(Exception):