Support `global-job` and `replicated-job` modes in Docker Swarm (#3016)

Add `global-job` and `replicated-job` modes

Fixes #2829.

Signed-off-by: Leonard Kinday <leonard@kinday.ru>
This commit is contained in:
Leonard Kinday 2022-08-11 22:20:31 +02:00 committed by GitHub
parent 42789818be
commit 66402435d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 122 additions and 28 deletions

View File

@ -224,10 +224,10 @@ class ImageCollection(Collection):
Build an image and return it. Similar to the ``docker build`` Build an image and return it. Similar to the ``docker build``
command. Either ``path`` or ``fileobj`` must be set. command. Either ``path`` or ``fileobj`` must be set.
If you already have a tar file for the Docker build context (including a If you already have a tar file for the Docker build context (including
Dockerfile), pass a readable file-like object to ``fileobj`` a Dockerfile), pass a readable file-like object to ``fileobj``
and also pass ``custom_context=True``. If the stream is also compressed, and also pass ``custom_context=True``. If the stream is also
set ``encoding`` to the correct value (e.g ``gzip``). compressed, set ``encoding`` to the correct value (e.g ``gzip``).
If you want to get the raw output of the build, use the If you want to get the raw output of the build, use the
:py:meth:`~docker.api.build.BuildApiMixin.build` method in the :py:meth:`~docker.api.build.BuildApiMixin.build` method in the

View File

@ -29,6 +29,7 @@ class TaskTemplate(dict):
force_update (int): A counter that triggers an update even if no force_update (int): A counter that triggers an update even if no
relevant parameters have been changed. relevant parameters have been changed.
""" """
def __init__(self, container_spec, resources=None, restart_policy=None, def __init__(self, container_spec, resources=None, restart_policy=None,
placement=None, log_driver=None, networks=None, placement=None, log_driver=None, networks=None,
force_update=None): force_update=None):
@ -115,6 +116,7 @@ class ContainerSpec(dict):
cap_drop (:py:class:`list`): A list of kernel capabilities to drop from cap_drop (:py:class:`list`): A list of kernel capabilities to drop from
the default set for the container. the default set for the container.
""" """
def __init__(self, image, command=None, args=None, hostname=None, env=None, def __init__(self, image, command=None, args=None, hostname=None, env=None,
workdir=None, user=None, labels=None, mounts=None, workdir=None, user=None, labels=None, mounts=None,
stop_grace_period=None, secrets=None, tty=None, groups=None, stop_grace_period=None, secrets=None, tty=None, groups=None,
@ -231,6 +233,7 @@ class Mount(dict):
tmpfs_size (int or string): The size for the tmpfs mount in bytes. tmpfs_size (int or string): The size for the tmpfs mount in bytes.
tmpfs_mode (int): The permission mode for the tmpfs mount. tmpfs_mode (int): The permission mode for the tmpfs mount.
""" """
def __init__(self, target, source, type='volume', read_only=False, def __init__(self, target, source, type='volume', read_only=False,
consistency=None, propagation=None, no_copy=False, consistency=None, propagation=None, no_copy=False,
labels=None, driver_config=None, tmpfs_size=None, labels=None, driver_config=None, tmpfs_size=None,
@ -331,6 +334,7 @@ class Resources(dict):
``{ resource_name: resource_value }``. Alternatively, a list of ``{ resource_name: resource_value }``. Alternatively, a list of
of resource specifications as defined by the Engine API. of resource specifications as defined by the Engine API.
""" """
def __init__(self, cpu_limit=None, mem_limit=None, cpu_reservation=None, def __init__(self, cpu_limit=None, mem_limit=None, cpu_reservation=None,
mem_reservation=None, generic_resources=None): mem_reservation=None, generic_resources=None):
limits = {} limits = {}
@ -401,6 +405,7 @@ class UpdateConfig(dict):
order (string): Specifies the order of operations when rolling out an order (string): Specifies the order of operations when rolling out an
updated task. Either ``start-first`` or ``stop-first`` are accepted. updated task. Either ``start-first`` or ``stop-first`` are accepted.
""" """
def __init__(self, parallelism=0, delay=None, failure_action='continue', def __init__(self, parallelism=0, delay=None, failure_action='continue',
monitor=None, max_failure_ratio=None, order=None): monitor=None, max_failure_ratio=None, order=None):
self['Parallelism'] = parallelism self['Parallelism'] = parallelism
@ -512,6 +517,7 @@ class DriverConfig(dict):
name (string): Name of the driver to use. name (string): Name of the driver to use.
options (dict): Driver-specific options. Default: ``None``. options (dict): Driver-specific options. Default: ``None``.
""" """
def __init__(self, name, options=None): def __init__(self, name, options=None):
self['Name'] = name self['Name'] = name
if options: if options:
@ -533,6 +539,7 @@ class EndpointSpec(dict):
is ``(target_port [, protocol [, publish_mode]])``. is ``(target_port [, protocol [, publish_mode]])``.
Ports can only be provided if the ``vip`` resolution mode is used. Ports can only be provided if the ``vip`` resolution mode is used.
""" """
def __init__(self, mode=None, ports=None): def __init__(self, mode=None, ports=None):
if ports: if ports:
self['Ports'] = convert_service_ports(ports) self['Ports'] = convert_service_ports(ports)
@ -575,37 +582,70 @@ def convert_service_ports(ports):
class ServiceMode(dict): class ServiceMode(dict):
""" """
Indicate whether a service should be deployed as a replicated or global Indicate whether a service or a job should be deployed as a replicated
service, and associated parameters or global service, and associated parameters
Args: Args:
mode (string): Can be either ``replicated`` or ``global`` mode (string): Can be either ``replicated``, ``global``,
``replicated-job`` or ``global-job``
replicas (int): Number of replicas. For replicated services only. replicas (int): Number of replicas. For replicated services only.
concurrency (int): Number of concurrent jobs. For replicated job
services only.
""" """
def __init__(self, mode, replicas=None):
if mode not in ('replicated', 'global'):
raise errors.InvalidArgument(
'mode must be either "replicated" or "global"'
)
if mode != 'replicated' and replicas is not None:
raise errors.InvalidArgument(
'replicas can only be used for replicated mode'
)
self[mode] = {}
if replicas is not None:
self[mode]['Replicas'] = replicas
@property def __init__(self, mode, replicas=None, concurrency=None):
def mode(self): replicated_modes = ('replicated', 'replicated-job')
if 'global' in self: supported_modes = replicated_modes + ('global', 'global-job')
return 'global'
return 'replicated' if mode not in supported_modes:
raise errors.InvalidArgument(
'mode must be either "replicated", "global", "replicated-job"'
' or "global-job"'
)
if mode not in replicated_modes:
if replicas is not None:
raise errors.InvalidArgument(
'replicas can only be used for "replicated" or'
' "replicated-job" mode'
)
if concurrency is not None:
raise errors.InvalidArgument(
'concurrency can only be used for "replicated-job" mode'
)
service_mode = self._convert_mode(mode)
self.mode = service_mode
self[service_mode] = {}
if replicas is not None:
if mode == 'replicated':
self[service_mode]['Replicas'] = replicas
if mode == 'replicated-job':
self[service_mode]['MaxConcurrent'] = concurrency or 1
self[service_mode]['TotalCompletions'] = replicas
@staticmethod
def _convert_mode(original_mode):
if original_mode == 'global-job':
return 'GlobalJob'
if original_mode == 'replicated-job':
return 'ReplicatedJob'
return original_mode
@property @property
def replicas(self): def replicas(self):
if self.mode != 'replicated': if 'replicated' in self:
return None return self['replicated'].get('Replicas')
return self['replicated'].get('Replicas')
if 'ReplicatedJob' in self:
return self['ReplicatedJob'].get('TotalCompletions')
return None
class SecretReference(dict): class SecretReference(dict):
@ -679,6 +719,7 @@ class Placement(dict):
platforms (:py:class:`list` of tuple): A list of platforms platforms (:py:class:`list` of tuple): A list of platforms
expressed as ``(arch, os)`` tuples expressed as ``(arch, os)`` tuples
""" """
def __init__(self, constraints=None, preferences=None, platforms=None, def __init__(self, constraints=None, preferences=None, platforms=None,
maxreplicas=None): maxreplicas=None):
if constraints is not None: if constraints is not None:
@ -711,6 +752,7 @@ class PlacementPreference(dict):
the scheduler will try to spread tasks evenly over groups of the scheduler will try to spread tasks evenly over groups of
nodes identified by this label. nodes identified by this label.
""" """
def __init__(self, strategy, descriptor): def __init__(self, strategy, descriptor):
if strategy != 'spread': if strategy != 'spread':
raise errors.InvalidArgument( raise errors.InvalidArgument(
@ -732,6 +774,7 @@ class DNSConfig(dict):
options (:py:class:`list`): A list of internal resolver variables options (:py:class:`list`): A list of internal resolver variables
to be modified (e.g., ``debug``, ``ndots:3``, etc.). to be modified (e.g., ``debug``, ``ndots:3``, etc.).
""" """
def __init__(self, nameservers=None, search=None, options=None): def __init__(self, nameservers=None, search=None, options=None):
self['Nameservers'] = nameservers self['Nameservers'] = nameservers
self['Search'] = search self['Search'] = search
@ -762,6 +805,7 @@ class Privileges(dict):
selinux_type (string): SELinux type label selinux_type (string): SELinux type label
selinux_level (string): SELinux level label selinux_level (string): SELinux level label
""" """
def __init__(self, credentialspec_file=None, credentialspec_registry=None, def __init__(self, credentialspec_file=None, credentialspec_registry=None,
selinux_disable=None, selinux_user=None, selinux_role=None, selinux_disable=None, selinux_user=None, selinux_role=None,
selinux_type=None, selinux_level=None): selinux_type=None, selinux_level=None):
@ -804,6 +848,7 @@ class NetworkAttachmentConfig(dict):
options (:py:class:`dict`): Driver attachment options for the options (:py:class:`dict`): Driver attachment options for the
network target. network target.
""" """
def __init__(self, target, aliases=None, options=None): def __init__(self, target, aliases=None, options=None):
self['Target'] = target self['Target'] = target
self['Aliases'] = aliases self['Aliases'] = aliases

View File

@ -143,4 +143,4 @@ def ctrl_with(char):
if re.match('[a-z]', char): if re.match('[a-z]', char):
return chr(ord(char) - ord('a') + 1).encode('ascii') return chr(ord(char) - ord('a') + 1).encode('ascii')
else: else:
raise(Exception('char must be [a-z]')) raise Exception('char must be [a-z]')

View File

@ -626,6 +626,39 @@ class ServiceTest(BaseAPIIntegrationTest):
assert 'Replicated' in svc_info['Spec']['Mode'] assert 'Replicated' in svc_info['Spec']['Mode']
assert svc_info['Spec']['Mode']['Replicated'] == {'Replicas': 5} assert svc_info['Spec']['Mode']['Replicated'] == {'Replicas': 5}
@requires_api_version('1.41')
def test_create_service_global_job_mode(self):
container_spec = docker.types.ContainerSpec(
TEST_IMG, ['echo', 'hello']
)
task_tmpl = docker.types.TaskTemplate(container_spec)
name = self.get_service_name()
svc_id = self.client.create_service(
task_tmpl, name=name, mode='global-job'
)
svc_info = self.client.inspect_service(svc_id)
assert 'Mode' in svc_info['Spec']
assert 'GlobalJob' in svc_info['Spec']['Mode']
@requires_api_version('1.41')
def test_create_service_replicated_job_mode(self):
container_spec = docker.types.ContainerSpec(
TEST_IMG, ['echo', 'hello']
)
task_tmpl = docker.types.TaskTemplate(container_spec)
name = self.get_service_name()
svc_id = self.client.create_service(
task_tmpl, name=name,
mode=docker.types.ServiceMode('replicated-job', 5)
)
svc_info = self.client.inspect_service(svc_id)
assert 'Mode' in svc_info['Spec']
assert 'ReplicatedJob' in svc_info['Spec']['Mode']
assert svc_info['Spec']['Mode']['ReplicatedJob'] == {
'MaxConcurrent': 1,
'TotalCompletions': 5
}
@requires_api_version('1.25') @requires_api_version('1.25')
def test_update_service_force_update(self): def test_update_service_force_update(self):
container_spec = docker.types.ContainerSpec( container_spec = docker.types.ContainerSpec(

View File

@ -325,10 +325,26 @@ class ServiceModeTest(unittest.TestCase):
assert mode.mode == 'global' assert mode.mode == 'global'
assert mode.replicas is None assert mode.replicas is None
def test_replicated_job_simple(self):
mode = ServiceMode('replicated-job')
assert mode == {'ReplicatedJob': {}}
assert mode.mode == 'ReplicatedJob'
assert mode.replicas is None
def test_global_job_simple(self):
mode = ServiceMode('global-job')
assert mode == {'GlobalJob': {}}
assert mode.mode == 'GlobalJob'
assert mode.replicas is None
def test_global_replicas_error(self): def test_global_replicas_error(self):
with pytest.raises(InvalidArgument): with pytest.raises(InvalidArgument):
ServiceMode('global', 21) ServiceMode('global', 21)
def test_global_job_replicas_simple(self):
with pytest.raises(InvalidArgument):
ServiceMode('global-job', 21)
def test_replicated_replicas(self): def test_replicated_replicas(self):
mode = ServiceMode('replicated', 21) mode = ServiceMode('replicated', 21)
assert mode == {'replicated': {'Replicas': 21}} assert mode == {'replicated': {'Replicas': 21}}