diff --git a/compose/const.py b/compose/const.py index 709c3a10d7..9c39d5f899 100644 --- a/compose/const.py +++ b/compose/const.py @@ -1,4 +1,5 @@ +DEFAULT_MAX_WORKERS = 5 DEFAULT_TIMEOUT = 10 LABEL_CONTAINER_NUMBER = 'com.docker.compose.container-number' LABEL_ONE_OFF = 'com.docker.compose.oneoff' diff --git a/compose/project.py b/compose/project.py index 11c1e1ce9d..7928316a66 100644 --- a/compose/project.py +++ b/compose/project.py @@ -1,15 +1,16 @@ from __future__ import unicode_literals from __future__ import absolute_import -import logging from functools import reduce +import logging from docker.errors import APIError from .config import get_service_name_from_net, ConfigurationError -from .const import LABEL_PROJECT, LABEL_SERVICE, LABEL_ONE_OFF, DEFAULT_TIMEOUT -from .service import Service +from .const import DEFAULT_TIMEOUT, LABEL_PROJECT, LABEL_SERVICE, LABEL_ONE_OFF from .container import Container from .legacy import check_for_legacy_containers +from .service import Service +from .utils import parallel_execute log = logging.getLogger(__name__) @@ -197,12 +198,15 @@ class Project(object): service.start(**options) def stop(self, service_names=None, **options): - for service in reversed(self.get_services(service_names)): - service.stop(**options) + parallel_execute("stop", self.containers(service_names), "Stopping", "Stopped", **options) def kill(self, service_names=None, **options): - for service in reversed(self.get_services(service_names)): - service.kill(**options) + parallel_execute("kill", self.containers(service_names), "Killing", "Killed", **options) + + def remove_stopped(self, service_names=None, **options): + all_containers = self.containers(service_names, stopped=True) + stopped_containers = [c for c in all_containers if not c.is_running] + parallel_execute("remove", stopped_containers, "Removing", "Removed", **options) def restart(self, service_names=None, **options): for service in self.get_services(service_names): @@ -284,10 +288,6 @@ class Project(object): for service in self.get_services(service_names, include_deps=True): service.pull(insecure_registry=insecure_registry) - def remove_stopped(self, service_names=None, **options): - for service in self.get_services(service_names): - service.remove_stopped(**options) - def containers(self, service_names=None, stopped=False, one_off=False): if service_names: self.validate_service_names(service_names) diff --git a/compose/service.py b/compose/service.py index 9a03192e66..213f54fadb 100644 --- a/compose/service.py +++ b/compose/service.py @@ -129,6 +129,7 @@ class Service(object): for c in self.containers(stopped=True): self.start_container_if_stopped(c, **options) + # TODO: remove these functions, project takes care of starting/stopping, def stop(self, **options): for c in self.containers(): log.info("Stopping %s..." % c.name) @@ -144,6 +145,8 @@ class Service(object): log.info("Restarting %s..." % c.name) c.restart(**options) + # end TODO + def scale(self, desired_num): """ Adjusts the number of containers to the specified number and ensures diff --git a/compose/utils.py b/compose/utils.py index 76a4c6b93a..cc7bd5dd03 100644 --- a/compose/utils.py +++ b/compose/utils.py @@ -1,5 +1,39 @@ -import json import hashlib +import json +import logging +import os + +import concurrent.futures + +from .const import DEFAULT_MAX_WORKERS + + +log = logging.getLogger(__name__) + + +def parallel_execute(command, containers, doing_msg, done_msg, **options): + """ + Execute a given command upon a list of containers in parallel. + """ + max_workers = os.environ.get('MAX_WORKERS', DEFAULT_MAX_WORKERS) + + def container_command_execute(container, command, **options): + log.info("{} {}...".format(doing_msg, container.name)) + return getattr(container, command)(**options) + + with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: + future_container = { + executor.submit( + container_command_execute, + container, + command, + **options + ): container for container in containers + } + + for future in concurrent.futures.as_completed(future_container): + container = future_container[future] + log.info("{} {}".format(done_msg, container.name)) def json_hash(obj): diff --git a/requirements.txt b/requirements.txt index 69bd4c5f95..4a0c5be532 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,6 +2,7 @@ PyYAML==3.10 docker-py==1.2.3 dockerpty==0.3.4 docopt==0.6.1 +futures==3.0.3 requests==2.6.1 six==1.7.3 texttable==0.8.2 diff --git a/setup.py b/setup.py index d2e81e175b..ebd5311926 100644 --- a/setup.py +++ b/setup.py @@ -33,6 +33,7 @@ install_requires = [ 'docker-py >= 1.2.3, < 1.3', 'dockerpty >= 0.3.4, < 0.4', 'six >= 1.3.0, < 2', + 'futures >= 3.0.3', ]