From 2db0a377e29d221a66d3889d5a62f919bc4f15dc Mon Sep 17 00:00:00 2001 From: Mazz Mosley Date: Fri, 10 Jul 2015 14:18:27 +0100 Subject: [PATCH 1/3] Minor test refactor Rather than creating a docker client within each test, create one at setup and make it accessible to the whole class. Signed-off-by: Mazz Mosley --- tests/unit/project_test.py | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/tests/unit/project_test.py b/tests/unit/project_test.py index e8aecae33f..39ad30a152 100644 --- a/tests/unit/project_test.py +++ b/tests/unit/project_test.py @@ -9,6 +9,9 @@ import docker class ProjectTest(unittest.TestCase): + def setUp(self): + self.mock_client = mock.create_autospec(docker.Client) + def test_from_dict(self): project = Project.from_dicts('composetest', [ { @@ -155,21 +158,19 @@ class ProjectTest(unittest.TestCase): def test_use_volumes_from_container(self): container_id = 'aabbccddee' container_dict = dict(Name='aaa', Id=container_id) - mock_client = mock.create_autospec(docker.Client) - mock_client.inspect_container.return_value = container_dict + self.mock_client.inspect_container.return_value = container_dict project = Project.from_dicts('test', [ { 'name': 'test', 'image': 'busybox:latest', 'volumes_from': ['aaa'] } - ], mock_client) + ], self.mock_client) self.assertEqual(project.get_service('test')._get_volumes_from(), [container_id]) def test_use_volumes_from_service_no_container(self): container_name = 'test_vol_1' - mock_client = mock.create_autospec(docker.Client) - mock_client.containers.return_value = [ + self.mock_client.containers.return_value = [ { "Name": container_name, "Names": [container_name], @@ -187,7 +188,7 @@ class ProjectTest(unittest.TestCase): 'image': 'busybox:latest', 'volumes_from': ['vol'] } - ], mock_client) + ], self.mock_client) self.assertEqual(project.get_service('test')._get_volumes_from(), [container_name]) @mock.patch.object(Service, 'containers') @@ -211,13 +212,12 @@ class ProjectTest(unittest.TestCase): self.assertEqual(project.get_service('test')._get_volumes_from(), container_ids) def test_net_unset(self): - mock_client = mock.create_autospec(docker.Client) project = Project.from_dicts('test', [ { 'name': 'test', 'image': 'busybox:latest', } - ], mock_client) + ], self.mock_client) service = project.get_service('test') self.assertEqual(service._get_net(), None) self.assertNotIn('NetworkMode', service._get_container_host_config({})) @@ -225,22 +225,20 @@ class ProjectTest(unittest.TestCase): def test_use_net_from_container(self): container_id = 'aabbccddee' container_dict = dict(Name='aaa', Id=container_id) - mock_client = mock.create_autospec(docker.Client) - mock_client.inspect_container.return_value = container_dict + self.mock_client.inspect_container.return_value = container_dict project = Project.from_dicts('test', [ { 'name': 'test', 'image': 'busybox:latest', 'net': 'container:aaa' } - ], mock_client) + ], self.mock_client) service = project.get_service('test') self.assertEqual(service._get_net(), 'container:' + container_id) def test_use_net_from_service(self): container_name = 'test_aaa_1' - mock_client = mock.create_autospec(docker.Client) - mock_client.containers.return_value = [ + self.mock_client.containers.return_value = [ { "Name": container_name, "Names": [container_name], @@ -258,7 +256,7 @@ class ProjectTest(unittest.TestCase): 'image': 'busybox:latest', 'net': 'container:aaa' } - ], mock_client) + ], self.mock_client) service = project.get_service('test') self.assertEqual(service._get_net(), 'container:' + container_name) From a68ca199a2f1258d96b09cf2ea62a819aecfcbdb Mon Sep 17 00:00:00 2001 From: Mazz Mosley Date: Mon, 13 Jul 2015 14:03:44 +0100 Subject: [PATCH 2/3] Execute container commands in parallel Commands able to use this parallelisation are `stop`, `kill` and `rm`. We're using a backported function from python 3, to allow us to make the most of a pool of threads without having to write the low level code for managing this ourselves. A default value for number of threads is a low enough number so it shouldn't cause performance problems but if someone knows the capability of their system and wants to increase it, they can via an environment variable DEFAULT_MAX_WORKERS Signed-off-by: Mazz Mosley --- compose/const.py | 1 + compose/project.py | 22 +++++++++++----------- compose/service.py | 3 +++ compose/utils.py | 36 +++++++++++++++++++++++++++++++++++- requirements.txt | 1 + setup.py | 1 + 6 files changed, 52 insertions(+), 12 deletions(-) diff --git a/compose/const.py b/compose/const.py index 709c3a10d7..9c39d5f899 100644 --- a/compose/const.py +++ b/compose/const.py @@ -1,4 +1,5 @@ +DEFAULT_MAX_WORKERS = 5 DEFAULT_TIMEOUT = 10 LABEL_CONTAINER_NUMBER = 'com.docker.compose.container-number' LABEL_ONE_OFF = 'com.docker.compose.oneoff' diff --git a/compose/project.py b/compose/project.py index 11c1e1ce9d..7928316a66 100644 --- a/compose/project.py +++ b/compose/project.py @@ -1,15 +1,16 @@ from __future__ import unicode_literals from __future__ import absolute_import -import logging from functools import reduce +import logging from docker.errors import APIError from .config import get_service_name_from_net, ConfigurationError -from .const import LABEL_PROJECT, LABEL_SERVICE, LABEL_ONE_OFF, DEFAULT_TIMEOUT -from .service import Service +from .const import DEFAULT_TIMEOUT, LABEL_PROJECT, LABEL_SERVICE, LABEL_ONE_OFF from .container import Container from .legacy import check_for_legacy_containers +from .service import Service +from .utils import parallel_execute log = logging.getLogger(__name__) @@ -197,12 +198,15 @@ class Project(object): service.start(**options) def stop(self, service_names=None, **options): - for service in reversed(self.get_services(service_names)): - service.stop(**options) + parallel_execute("stop", self.containers(service_names), "Stopping", "Stopped", **options) def kill(self, service_names=None, **options): - for service in reversed(self.get_services(service_names)): - service.kill(**options) + parallel_execute("kill", self.containers(service_names), "Killing", "Killed", **options) + + def remove_stopped(self, service_names=None, **options): + all_containers = self.containers(service_names, stopped=True) + stopped_containers = [c for c in all_containers if not c.is_running] + parallel_execute("remove", stopped_containers, "Removing", "Removed", **options) def restart(self, service_names=None, **options): for service in self.get_services(service_names): @@ -284,10 +288,6 @@ class Project(object): for service in self.get_services(service_names, include_deps=True): service.pull(insecure_registry=insecure_registry) - def remove_stopped(self, service_names=None, **options): - for service in self.get_services(service_names): - service.remove_stopped(**options) - def containers(self, service_names=None, stopped=False, one_off=False): if service_names: self.validate_service_names(service_names) diff --git a/compose/service.py b/compose/service.py index 9a03192e66..213f54fadb 100644 --- a/compose/service.py +++ b/compose/service.py @@ -129,6 +129,7 @@ class Service(object): for c in self.containers(stopped=True): self.start_container_if_stopped(c, **options) + # TODO: remove these functions, project takes care of starting/stopping, def stop(self, **options): for c in self.containers(): log.info("Stopping %s..." % c.name) @@ -144,6 +145,8 @@ class Service(object): log.info("Restarting %s..." % c.name) c.restart(**options) + # end TODO + def scale(self, desired_num): """ Adjusts the number of containers to the specified number and ensures diff --git a/compose/utils.py b/compose/utils.py index 76a4c6b93a..cc7bd5dd03 100644 --- a/compose/utils.py +++ b/compose/utils.py @@ -1,5 +1,39 @@ -import json import hashlib +import json +import logging +import os + +import concurrent.futures + +from .const import DEFAULT_MAX_WORKERS + + +log = logging.getLogger(__name__) + + +def parallel_execute(command, containers, doing_msg, done_msg, **options): + """ + Execute a given command upon a list of containers in parallel. + """ + max_workers = os.environ.get('MAX_WORKERS', DEFAULT_MAX_WORKERS) + + def container_command_execute(container, command, **options): + log.info("{} {}...".format(doing_msg, container.name)) + return getattr(container, command)(**options) + + with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: + future_container = { + executor.submit( + container_command_execute, + container, + command, + **options + ): container for container in containers + } + + for future in concurrent.futures.as_completed(future_container): + container = future_container[future] + log.info("{} {}".format(done_msg, container.name)) def json_hash(obj): diff --git a/requirements.txt b/requirements.txt index 69bd4c5f95..4a0c5be532 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,6 +2,7 @@ PyYAML==3.10 docker-py==1.2.3 dockerpty==0.3.4 docopt==0.6.1 +futures==3.0.3 requests==2.6.1 six==1.7.3 texttable==0.8.2 diff --git a/setup.py b/setup.py index d2e81e175b..ebd5311926 100644 --- a/setup.py +++ b/setup.py @@ -33,6 +33,7 @@ install_requires = [ 'docker-py >= 1.2.3, < 1.3', 'dockerpty >= 0.3.4, < 0.4', 'six >= 1.3.0, < 2', + 'futures >= 3.0.3', ] From 03d34336a85e189c8e40bfcd7dbbfe801f0c76dc Mon Sep 17 00:00:00 2001 From: Mazz Mosley Date: Wed, 15 Jul 2015 11:56:06 +0100 Subject: [PATCH 3/3] Document DEFAULT_MAX_WORKERS Signed-off-by: Mazz Mosley --- docs/cli.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/docs/cli.md b/docs/cli.md index c178fc7882..ce0f0447e1 100644 --- a/docs/cli.md +++ b/docs/cli.md @@ -193,6 +193,12 @@ the daemon. Configures the path to the `ca.pem`, `cert.pem`, and `key.pem` files used for TLS verification. Defaults to `~/.docker`. +### DEFAULT\_MAX\_WORKERS + +Configures the maximum number of worker threads to be used when executing +commands in parallel. Only a subset of commands execute in parallel, `stop`, +`kill` and `rm`. + ## Compose documentation - [User guide](/)