Add new user-focused API

See #1086

Signed-off-by: Ben Firshman <ben@firshman.co.uk>
This commit is contained in:
Ben Firshman 2016-11-07 17:56:02 -08:00
parent f32c0c1709
commit 1984f68730
No known key found for this signature in database
GPG Key ID: 18296449E36D2F1E
36 changed files with 3944 additions and 79 deletions

View File

@ -1,26 +1,73 @@
docker-py
=========
# Docker SDK for Python
[![Build Status](https://travis-ci.org/docker/docker-py.png)](https://travis-ci.org/docker/docker-py)
A Python library for the Docker Remote API. It does everything the `docker` command does, but from within Python  run containers, manage them, pull/push images, etc.
A Python library for the Docker API. It lets you do anything the `docker` command does, but from within Python apps run containers, manage containers, manage Swarms, etc.
Installation
------------
## Installation
The latest stable version is always available on PyPi.
The latest stable version [is available on PyPi](https://pypi.python.org/pypi/docker/). Either add `docker` to your `requirements.txt` file or install with pip:
pip install docker-py
pip install docker
Documentation
-------------
## Usage
[![Documentation Status](https://readthedocs.org/projects/docker-py/badge/?version=latest)](https://readthedocs.org/projects/docker-py/?badge=latest)
Connect to Docker using the default socket or the configuration in your environment:
[Read the full documentation here](https://docker-py.readthedocs.io/en/latest/).
The source is available in the `docs/` directory.
```python
import docker
client = docker.from_env()
```
You can run containers:
License
-------
Docker is licensed under the Apache License, Version 2.0. See LICENSE for full license text
```python
>>> client.containers.run("ubuntu", "echo hello world")
'hello world\n'
```
You can run containers in the background:
```python
>>> client.containers.run("bfirsh/reticulate-splines", detach=True)
<Container '45e6d2de7c54'>
```
You can manage containers:
```python
>>> client.containers.list()
[<Container '45e6d2de7c54'>, <Container 'db18e4f20eaa'>, ...]
>>> container = client.containers.get('45e6d2de7c54')
>>> container.attrs['Config']['Image']
"bfirsh/reticulate-splines"
>>> container.logs()
"Reticulating spline 1...\n"
>>> container.stop()
```
You can stream logs:
```python
>>> for line in container.logs(stream=True):
... print line.strip()
Reticulating spline 2...
Reticulating spline 3...
...
```
You can manage images:
```python
>>> client.images.pull('nginx')
<Image 'nginx'>
>>> client.images.list()
[<Image 'ubuntu'>, <Image 'nginx'>, ...]
```
[Read the full documentation](https://docs.docker.com/sdk/python/) to see everything you can do.

View File

@ -1,5 +1,6 @@
# flake8: noqa
from .api import APIClient
from .client import Client, from_env
from .version import version, version_info
__version__ = version

View File

@ -22,10 +22,11 @@ from ..constants import (DEFAULT_TIMEOUT_SECONDS, DEFAULT_USER_AGENT,
IS_WINDOWS_PLATFORM, DEFAULT_DOCKER_API_VERSION,
STREAM_HEADER_SIZE_BYTES, DEFAULT_NUM_POOLS,
MINIMUM_DOCKER_API_VERSION)
from ..errors import DockerException, APIError, TLSParameterError, NotFound
from ..errors import (DockerException, TLSParameterError,
create_api_error_from_http_exception)
from ..tls import TLSConfig
from ..transport import UnixAdapter
from ..utils import utils, check_resource, update_headers, kwargs_from_env
from ..utils import utils, check_resource, update_headers
from ..utils.socket import frames_iter
try:
from ..transport import NpipeAdapter
@ -33,10 +34,6 @@ except ImportError:
pass
def from_env(**kwargs):
return APIClient.from_env(**kwargs)
class APIClient(
requests.Session,
BuildApiMixin,
@ -152,13 +149,6 @@ class APIClient(
MINIMUM_DOCKER_API_VERSION, self._version)
)
@classmethod
def from_env(cls, **kwargs):
timeout = kwargs.pop('timeout', None)
version = kwargs.pop('version', None)
return cls(timeout=timeout, version=version,
**kwargs_from_env(**kwargs))
def _retrieve_server_version(self):
try:
return self.version(api_version=False)["ApiVersion"]
@ -212,14 +202,12 @@ class APIClient(
else:
return '{0}{1}'.format(self.base_url, pathfmt.format(*args))
def _raise_for_status(self, response, explanation=None):
def _raise_for_status(self, response):
"""Raises stored :class:`APIError`, if one occurred."""
try:
response.raise_for_status()
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
raise NotFound(e, response, explanation=explanation)
raise APIError(e, response, explanation=explanation)
raise create_api_error_from_http_exception(e)
def _result(self, response, json=False, binary=False):
assert not (json and binary)

157
docker/client.py Normal file
View File

@ -0,0 +1,157 @@
from .api.client import APIClient
from .models.containers import ContainerCollection
from .models.images import ImageCollection
from .models.networks import NetworkCollection
from .models.nodes import NodeCollection
from .models.services import ServiceCollection
from .models.swarm import Swarm
from .models.volumes import VolumeCollection
from .utils import kwargs_from_env
class Client(object):
"""
A client for communicating with a Docker server.
Example:
>>> import docker
>>> client = Client(base_url='unix://var/run/docker.sock')
Args:
base_url (str): URL to the Docker server. For example,
``unix:///var/run/docker.sock`` or ``tcp://127.0.0.1:1234``.
version (str): The version of the API to use. Set to ``auto`` to
automatically detect the server's version. Default: ``1.24``
timeout (int): Default timeout for API calls, in seconds.
tls (bool or :py:class:`~docker.tls.TLSConfig`): Enable TLS. Pass
``True`` to enable it with default options, or pass a
:py:class:`~docker.tls.TLSConfig` object to use custom
configuration.
user_agent (str): Set a custom user agent for requests to the server.
"""
def __init__(self, *args, **kwargs):
self.api = APIClient(*args, **kwargs)
@classmethod
def from_env(cls, **kwargs):
"""
Return a client configured from environment variables.
The environment variables used are the same as those used by the
Docker command-line client. They are:
.. envvar:: DOCKER_HOST
The URL to the Docker host.
.. envvar:: DOCKER_TLS_VERIFY
Verify the host against a CA certificate.
.. envvar:: DOCKER_CERT_PATH
A path to a directory containing TLS certificates to use when
connecting to the Docker host.
Args:
version (str): The version of the API to use. Set to ``auto`` to
automatically detect the server's version. Default: ``1.24``
timeout (int): Default timeout for API calls, in seconds.
ssl_version (int): A valid `SSL version`_.
assert_hostname (bool): Verify the hostname of the server.
environment (dict): The environment to read environment variables
from. Default: the value of ``os.environ``
Example:
>>> import docker
>>> client = docker.from_env()
.. _`SSL version`:
https://docs.python.org/3.5/library/ssl.html#ssl.PROTOCOL_TLSv1
"""
timeout = kwargs.pop('timeout', None)
version = kwargs.pop('version', None)
return cls(timeout=timeout, version=version,
**kwargs_from_env(**kwargs))
# Resources
@property
def containers(self):
"""
An object for managing containers on the server. See the
:doc:`containers documentation <containers>` for full details.
"""
return ContainerCollection(client=self)
@property
def images(self):
"""
An object for managing images on the server. See the
:doc:`images documentation <images>` for full details.
"""
return ImageCollection(client=self)
@property
def networks(self):
"""
An object for managing networks on the server. See the
:doc:`networks documentation <networks>` for full details.
"""
return NetworkCollection(client=self)
@property
def nodes(self):
"""
An object for managing nodes on the server. See the
:doc:`nodes documentation <nodes>` for full details.
"""
return NodeCollection(client=self)
@property
def services(self):
"""
An object for managing services on the server. See the
:doc:`services documentation <services>` for full details.
"""
return ServiceCollection(client=self)
@property
def swarm(self):
"""
An object for managing a swarm on the server. See the
:doc:`swarm documentation <swarm>` for full details.
"""
return Swarm(client=self)
@property
def volumes(self):
"""
An object for managing volumes on the server. See the
:doc:`volumes documentation <volumes>` for full details.
"""
return VolumeCollection(client=self)
# Top-level methods
def events(self, *args, **kwargs):
return self.api.events(*args, **kwargs)
events.__doc__ = APIClient.events.__doc__
def info(self, *args, **kwargs):
return self.api.info(*args, **kwargs)
info.__doc__ = APIClient.info.__doc__
def login(self, *args, **kwargs):
return self.api.login(*args, **kwargs)
login.__doc__ = APIClient.login.__doc__
def ping(self, *args, **kwargs):
return self.api.ping(*args, **kwargs)
ping.__doc__ = APIClient.ping.__doc__
def version(self, *args, **kwargs):
return self.api.version(*args, **kwargs)
version.__doc__ = APIClient.version.__doc__
from_env = Client.from_env

View File

@ -1,21 +1,44 @@
import requests
class APIError(requests.exceptions.HTTPError):
def __init__(self, message, response, explanation=None):
class DockerException(Exception):
"""
A base class from which all other exceptions inherit.
If you want to catch all errors that the Docker SDK might raise,
catch this base exception.
"""
def create_api_error_from_http_exception(e):
"""
Create a suitable APIError from requests.exceptions.HTTPError.
"""
response = e.response
try:
explanation = response.json()['message']
except ValueError:
explanation = response.content.strip()
cls = APIError
if response.status_code == 404:
if explanation and 'No such image' in str(explanation):
cls = ImageNotFound
else:
cls = NotFound
raise cls(e, response=response, explanation=explanation)
class APIError(requests.exceptions.HTTPError, DockerException):
"""
An HTTP error from the API.
"""
def __init__(self, message, response=None, explanation=None):
# requests 1.2 supports response as a keyword argument, but
# requests 1.1 doesn't
super(APIError, self).__init__(message)
self.response = response
self.explanation = explanation
if self.explanation is None and response.content:
try:
self.explanation = response.json()['message']
except ValueError:
self.explanation = response.content.strip()
def __str__(self):
message = super(APIError, self).__str__()
@ -32,21 +55,30 @@ class APIError(requests.exceptions.HTTPError):
return message
@property
def status_code(self):
if self.response:
return self.response.status_code
def is_client_error(self):
return 400 <= self.response.status_code < 500
if self.status_code is None:
return False
return 400 <= self.status_code < 500
def is_server_error(self):
return 500 <= self.response.status_code < 600
class DockerException(Exception):
pass
if self.status_code is None:
return False
return 500 <= self.status_code < 600
class NotFound(APIError):
pass
class ImageNotFound(NotFound):
pass
class InvalidVersion(DockerException):
pass
@ -76,3 +108,38 @@ class TLSParameterError(DockerException):
class NullResource(DockerException, ValueError):
pass
class ContainerError(DockerException):
"""
Represents a container that has exited with a non-zero exit code.
"""
def __init__(self, container, exit_status, command, image, stderr):
self.container = container
self.exit_status = exit_status
self.command = command
self.image = image
self.stderr = stderr
msg = ("Command '{}' in image '{}' returned non-zero exit status {}: "
"{}").format(command, image, exit_status, stderr)
super(ContainerError, self).__init__(msg)
class StreamParseError(RuntimeError):
def __init__(self, reason):
self.msg = reason
class BuildError(Exception):
pass
def create_unexpected_kwargs_error(name, kwargs):
quoted_kwargs = ["'{}'".format(k) for k in sorted(kwargs)]
text = ["{}() ".format(name)]
if len(quoted_kwargs) == 1:
text.append("got an unexpected keyword argument ")
else:
text.append("got unexpected keyword arguments ")
text.append(', '.join(quoted_kwargs))
return TypeError(''.join(text))

View File

883
docker/models/containers.py Normal file
View File

@ -0,0 +1,883 @@
import copy
from ..errors import (ContainerError, ImageNotFound,
create_unexpected_kwargs_error)
from ..utils import create_host_config
from .images import Image
from .resource import Collection, Model
class Container(Model):
@property
def name(self):
"""
The name of the container.
"""
if self.attrs.get('Name') is not None:
return self.attrs['Name'].lstrip('/')
@property
def status(self):
"""
The status of the container. For example, ``running``, or ``exited``.
"""
return self.attrs['State']['Status']
def attach(self, **kwargs):
"""
Attach to this container.
:py:meth:`logs` is a wrapper around this method, which you can
use instead if you want to fetch/stream container output without first
retrieving the entire backlog.
Args:
stdout (bool): Include stdout.
stderr (bool): Include stderr.
stream (bool): Return container output progressively as an iterator
of strings, rather than a single string.
logs (bool): Include the container's previous output.
Returns:
By default, the container's output as a single string.
If ``stream=True``, an iterator of output strings.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
return self.client.api.attach(self.id, **kwargs)
def attach_socket(self, **kwargs):
"""
Like :py:meth:`attach`, but returns the underlying socket-like object
for the HTTP request.
Args:
params (dict): Dictionary of request parameters (e.g. ``stdout``,
``stderr``, ``stream``).
ws (bool): Use websockets instead of raw HTTP.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
return self.client.api.attach_socket(self.id, **kwargs)
def commit(self, repository=None, tag=None, **kwargs):
"""
Commit a container to an image. Similar to the ``docker commit``
command.
Args:
repository (str): The repository to push the image to
tag (str): The tag to push
message (str): A commit message
author (str): The name of the author
changes (str): Dockerfile instructions to apply while committing
conf (dict): The configuration for the container. See the
`Remote API documentation
<https://docs.docker.com/reference/api/docker_remote_api/>`_
for full details.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
resp = self.client.api.commit(self.id, repository=repository, tag=tag,
**kwargs)
return self.client.images.get(resp['Id'])
def diff(self):
"""
Inspect changes on a container's filesystem.
Returns:
(str)
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
return self.client.api.diff(self.id)
def exec_run(self, cmd, stdout=True, stderr=True, stdin=False, tty=False,
privileged=False, user='', detach=False, stream=False,
socket=False):
"""
Run a command inside this container. Similar to
``docker exec``.
Args:
cmd (str or list): Command to be executed
stdout (bool): Attach to stdout. Default: ``True``
stderr (bool): Attach to stderr. Default: ``True``
stdin (bool): Attach to stdin. Default: ``False``
tty (bool): Allocate a pseudo-TTY. Default: False
privileged (bool): Run as privileged.
user (str): User to execute command as. Default: root
detach (bool): If true, detach from the exec command.
Default: False
tty (bool): Allocate a pseudo-TTY. Default: False
stream (bool): Stream response data. Default: False
Returns:
(generator or str): If ``stream=True``, a generator yielding
response chunks. A string containing response data otherwise.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
resp = self.client.api.exec_create(
self.id, cmd, stdout=stdout, stderr=stderr, stdin=stdin, tty=tty,
privileged=privileged, user=user
)
return self.client.api.exec_start(
resp['Id'], detach=detach, tty=tty, stream=stream, socket=socket
)
def export(self):
"""
Export the contents of the container's filesystem as a tar archive.
Returns:
(str): The filesystem tar archive
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
return self.client.api.export(self.id)
def get_archive(self, path):
"""
Retrieve a file or folder from the container in the form of a tar
archive.
Args:
path (str): Path to the file or folder to retrieve
Returns:
(tuple): First element is a raw tar data stream. Second element is
a dict containing ``stat`` information on the specified ``path``.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
return self.client.api.get_archive(self.id, path)
def kill(self, signal=None):
"""
Kill or send a signal to the container.
Args:
signal (str or int): The signal to send. Defaults to ``SIGKILL``
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
return self.client.api.kill(self.id, signal=signal)
def logs(self, **kwargs):
"""
Get logs from this container. Similar to the ``docker logs`` command.
The ``stream`` parameter makes the ``logs`` function return a blocking
generator you can iterate over to retrieve log output as it happens.
Args:
stdout (bool): Get ``STDOUT``
stderr (bool): Get ``STDERR``
stream (bool): Stream the response
timestamps (bool): Show timestamps
tail (str or int): Output specified number of lines at the end of
logs. Either an integer of number of lines or the string
``all``. Default ``all``
since (datetime or int): Show logs since a given datetime or
integer epoch (in seconds)
follow (bool): Follow log output
Returns:
(generator or str): Logs from the container.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
return self.client.api.logs(self.id, **kwargs)
def pause(self):
"""
Pauses all processes within this container.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
return self.client.api.pause(self.id)
def put_archive(self, path, data):
"""
Insert a file or folder in this container using a tar archive as
source.
Args:
path (str): Path inside the container where the file(s) will be
extracted. Must exist.
data (bytes): tar data to be extracted
Returns:
(bool): True if the call succeeds.
Raises:
:py:class:`~docker.errors.APIError` If an error occurs.
"""
return self.client.api.put_archive(self.id, path, data)
def remove(self, **kwargs):
"""
Remove this container. Similar to the ``docker rm`` command.
Args:
v (bool): Remove the volumes associated with the container
link (bool): Remove the specified link and not the underlying
container
force (bool): Force the removal of a running container (uses
``SIGKILL``)
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
return self.client.api.remove_container(self.id, **kwargs)
def rename(self, name):
"""
Rename this container. Similar to the ``docker rename`` command.
Args:
name (str): New name for the container
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
return self.client.api.rename(self.id, name)
def resize(self, height, width):
"""
Resize the tty session.
Args:
height (int): Height of tty session
width (int): Width of tty session
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
return self.client.api.resize(self.id, height, width)
def restart(self, **kwargs):
"""
Restart this container. Similar to the ``docker restart`` command.
Args:
timeout (int): Number of seconds to try to stop for before killing
the container. Once killed it will then be restarted. Default
is 10 seconds.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
return self.client.api.restart(self.id, **kwargs)
def start(self, **kwargs):
"""
Start this container. Similar to the ``docker start`` command, but
doesn't support attach options.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
return self.client.api.start(self.id, **kwargs)
def stats(self, **kwargs):
"""
Stream statistics for this container. Similar to the
``docker stats`` command.
Args:
decode (bool): If set to true, stream will be decoded into dicts
on the fly. False by default.
stream (bool): If set to false, only the current stats will be
returned instead of a stream. True by default.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
return self.client.api.stats(self.id, **kwargs)
def stop(self, **kwargs):
"""
Stops a container. Similar to the ``docker stop`` command.
Args:
timeout (int): Timeout in seconds to wait for the container to
stop before sending a ``SIGKILL``. Default: 10
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
return self.client.api.stop(self.id, **kwargs)
def top(self, **kwargs):
"""
Display the running processes of the container.
Args:
ps_args (str): An optional arguments passed to ps (e.g. ``aux``)
Returns:
(str): The output of the top
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
return self.client.api.top(self.id, **kwargs)
def unpause(self):
"""
Unpause all processes within the container.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
return self.client.api.unpause(self.id)
def update(self, **kwargs):
"""
Update resource configuration of the containers.
Args:
blkio_weight (int): Block IO (relative weight), between 10 and 1000
cpu_period (int): Limit CPU CFS (Completely Fair Scheduler) period
cpu_quota (int): Limit CPU CFS (Completely Fair Scheduler) quota
cpu_shares (int): CPU shares (relative weight)
cpuset_cpus (str): CPUs in which to allow execution
cpuset_mems (str): MEMs in which to allow execution
mem_limit (int or str): Memory limit
mem_reservation (int or str): Memory soft limit
memswap_limit (int or str): Total memory (memory + swap), -1 to
disable swap
kernel_memory (int or str): Kernel memory limit
restart_policy (dict): Restart policy dictionary
Returns:
(dict): Dictionary containing a ``Warnings`` key.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
return self.client.api.update_container(self.id, **kwargs)
def wait(self, **kwargs):
"""
Block until the container stops, then return its exit code. Similar to
the ``docker wait`` command.
Args:
timeout (int): Request timeout
Returns:
(int): The exit code of the container. Returns ``-1`` if the API
responds without a ``StatusCode`` attribute.
Raises:
:py:class:`requests.exceptions.ReadTimeout`
If the timeout is exceeded.
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
return self.client.api.wait(self.id, **kwargs)
class ContainerCollection(Collection):
model = Container
def run(self, image, command=None, stdout=True, stderr=False,
remove=False, **kwargs):
"""
Run a container. By default, it will wait for the container to finish
and return its logs, similar to ``docker run``.
If the ``detach`` argument is ``True``, it will start the container
and immediately return a :py:class:`Container` object, similar to
``docker run -d``.
Example:
Run a container and get its output:
>>> import docker
>>> client = docker.from_env()
>>> client.containers.run('alpine', 'echo hello world')
b'hello world\\n'
Run a container and detach:
>>> container = client.containers.run('bfirsh/reticulate-splines',
detach=True)
>>> container.logs()
'Reticulating spline 1...\\nReticulating spline 2...\\n'
Args:
image (str): The image to run.
command (str or list): The command to run in the container.
blkio_weight_device: Block IO weight (relative device weight) in
the form of: ``[{"Path": "device_path", "Weight": weight}]``.
blkio_weight: Block IO weight (relative weight), accepts a weight
value between 10 and 1000.
cap_add (list of str): Add kernel capabilities. For example,
``["SYS_ADMIN", "MKNOD"]``.
cap_drop (list of str): Drop kernel capabilities.
cpu_group (int): The length of a CPU period in microseconds.
cpu_period (int): Microseconds of CPU time that the container can
get in a CPU period.
cpu_shares (int): CPU shares (relative weight).
cpuset_cpus (str): CPUs in which to allow execution (``0-3``,
``0,1``).
detach (bool): Run container in the background and return a
:py:class:`Container` object.
device_read_bps: Limit read rate (bytes per second) from a device
in the form of: `[{"Path": "device_path", "Rate": rate}]`
device_read_iops: Limit read rate (IO per second) from a device.
device_write_bps: Limit write rate (bytes per second) from a
device.
device_write_iops: Limit write rate (IO per second) from a device.
devices (list): Expose host devices to the container, as a list
of strings in the form
``<path_on_host>:<path_in_container>:<cgroup_permissions>``.
For example, ``/dev/sda:/dev/xvda:rwm`` allows the container
to have read-write access to the host's ``/dev/sda`` via a
node named ``/dev/xvda`` inside the container.
dns (list): Set custom DNS servers.
dns_opt (list): Additional options to be added to the container's
``resolv.conf`` file.
dns_search (list): DNS search domains.
domainname (str or list): Set custom DNS search domains.
entrypoint (str or list): The entrypoint for the container.
environment (dict or list): Environment variables to set inside
the container, as a dictionary or a list of strings in the
format ``["SOMEVARIABLE=xxx"]``.
extra_hosts (dict): Addtional hostnames to resolve inside the
container, as a mapping of hostname to IP address.
group_add (list): List of additional group names and/or IDs that
the container process will run as.
hostname (str): Optional hostname for the container.
ipc_mode (str): Set the IPC mode for the container.
isolation (str): Isolation technology to use. Default: `None`.
labels (dict or list): A dictionary of name-value labels (e.g.
``{"label1": "value1", "label2": "value2"}``) or a list of
names of labels to set with empty values (e.g.
``["label1", "label2"]``)
links (dict or list of tuples): Either a dictionary mapping name
to alias or as a list of ``(name, alias)`` tuples.
log_config (dict): Logging configuration, as a dictionary with
keys:
- ``type`` The logging driver name.
- ``config`` A dictionary of configuration for the logging
driver.
mac_address (str): MAC address to assign to the container.
mem_limit (float or str): Memory limit. Accepts float values
(which represent the memory limit of the created container in
bytes) or a string with a units identification char
(``100000b``, ``1000k``, ``128m``, ``1g``). If a string is
specified without a units character, bytes are assumed as an
intended unit.
mem_limit (str or int): Maximum amount of memory container is
allowed to consume. (e.g. ``1G``).
mem_swappiness (int): Tune a container's memory swappiness
behavior. Accepts number between 0 and 100.
memswap_limit (str or int): Maximum amount of memory + swap a
container is allowed to consume.
networks (list): A list of network names to connect this
container to.
name (str): The name for this container.
network_disabled (bool): Disable networking.
network_mode (str): One of:
- ``bridge`` Create a new network stack for the container on
on the bridge network.
- ``none`` No networking for this container.
- ``container:<name|id>`` Reuse another container's network
stack.
- ``host`` Use the host network stack.
oom_kill_disable (bool): Whether to disable OOM killer.
oom_score_adj (int): An integer value containing the score given
to the container in order to tune OOM killer preferences.
pid_mode (str): If set to ``host``, use the host PID namespace
inside the container.
pids_limit (int): Tune a container's pids limit. Set ``-1`` for
unlimited.
ports (dict): Ports to bind inside the container.
The keys of the dictionary are the ports to bind inside the
container, either as an integer or a string in the form
``port/protocol``, where the protocol is either ``tcp`` or
``udp``.
The values of the dictionary are the corresponding ports to
open on the host, which can be either:
- The port number, as an integer. For example,
``{'2222/tcp': 3333}`` will expose port 2222 inside the
container as port 3333 on the host.
- ``None``, to assign a random host port. For example,
``{'2222/tcp': None}``.
- A tuple of ``(address, port)`` if you want to specify the
host interface. For example,
``{'1111/tcp': ('127.0.0.1', 1111)}``.
- A list of integers, if you want to bind multiple host ports
to a single container port. For example,
``{'1111/tcp': [1234, 4567]}``.
privileged (bool): Give extended privileges to this container.
publish_all_ports (bool): Publish all ports to the host.
read_only (bool): Mount the container's root filesystem as read
only.
remove (bool): Remove the container when it has finished running.
Default: ``False``.
restart_policy (dict): Restart the container when it exits.
Configured as a dictionary with keys:
- ``Name`` One of ``on-failure``, or ``always``.
- ``MaximumRetryCount`` Number of times to restart the
container on failure.
For example:
``{"Name": "on-failure", "MaximumRetryCount": 5}``
security_opt (list): A list of string values to customize labels
for MLS systems, such as SELinux.
shm_size (str or int): Size of /dev/shm (e.g. ``1G``).
stdin_open (bool): Keep ``STDIN`` open even if not attached.
stdout (bool): Return logs from ``STDOUT`` when ``detach=False``.
Default: ``True``.
stdout (bool): Return logs from ``STDERR`` when ``detach=False``.
Default: ``False``.
stop_signal (str): The stop signal to use to stop the container
(e.g. ``SIGINT``).
sysctls (dict): Kernel parameters to set in the container.
tmpfs (dict): Temporary filesystems to mount, as a dictionary
mapping a path inside the container to options for that path.
For example:
.. code-block:: python
{
'/mnt/vol2': '',
'/mnt/vol1': 'size=3G,uid=1000'
}
tty (bool): Allocate a pseudo-TTY.
ulimits (list): Ulimits to set inside the container, as a list of
dicts.
user (str or int): Username or UID to run commands as inside the
container.
userns_mode (str): Sets the user namespace mode for the container
when user namespace remapping option is enabled. Supported
values are: ``host``
volume_driver (str): The name of a volume driver/plugin.
volumes (dict or list): A dictionary to configure volumes mounted
inside the container. The key is either the host path or a
volume name, and the value is a dictionary with the keys:
- ``bind`` The path to mount the volume inside the container
- ``mode`` Either ``rw`` to mount the volume read/write, or
``ro`` to mount it read-only.
For example:
.. code-block:: python
{'/home/user1/': {'bind': '/mnt/vol2', 'mode': 'rw'},
'/var/www': {'bind': '/mnt/vol1', 'mode': 'ro'}}
volumes_from (list): List of container names or IDs to get
volumes from.
working_dir (str): Path to the working directory.
Returns:
The container logs, either ``STDOUT``, ``STDERR``, or both,
depending on the value of the ``stdout`` and ``stderr`` arguments.
If ``detach`` is ``True``, a :py:class:`Container` object is
returned instead.
Raises:
:py:class:`docker.errors.ContainerError`
If the container exits with a non-zero exit code and
``detach`` is ``False``.
:py:class:`docker.errors.ImageNotFound`
If the specified image does not exist.
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
if isinstance(image, Image):
image = image.id
detach = kwargs.pop("detach", False)
if detach and remove:
raise RuntimeError("The options 'detach' and 'remove' cannot be "
"used together.")
try:
container = self.create(image=image, command=command,
detach=detach, **kwargs)
except ImageNotFound:
self.client.images.pull(image)
container = self.create(image=image, command=command,
detach=detach, **kwargs)
container.start()
if detach:
return container
exit_status = container.wait()
if exit_status != 0:
stdout = False
stderr = True
out = container.logs(stdout=stdout, stderr=stderr)
if remove:
container.remove()
if exit_status != 0:
raise ContainerError(container, exit_status, command, image, out)
return out
def create(self, image, command=None, **kwargs):
"""
Create a container without starting it. Similar to ``docker create``.
Takes the same arguments as :py:meth:`run`, except for ``stdout``,
``stderr``, and ``remove``.
Returns:
A :py:class:`Container` object.
Raises:
:py:class:`docker.errors.ImageNotFound`
If the specified image does not exist.
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
if isinstance(image, Image):
image = image.id
kwargs['image'] = image
kwargs['command'] = command
kwargs['version'] = self.client.api._version
create_kwargs = _create_container_args(kwargs)
resp = self.client.api.create_container(**create_kwargs)
return self.get(resp['Id'])
def get(self, container_id):
"""
Get a container by name or ID.
Args:
container_id (str): Container name or ID.
Returns:
A :py:class:`Container` object.
Raises:
:py:class:`docker.errors.NotFound`
If the container does not exist.
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
resp = self.client.api.inspect_container(container_id)
return self.prepare_model(resp)
def list(self, all=False, before=None, filters=None, limit=-1, since=None):
"""
List containers. Similar to the ``docker ps`` command.
Args:
all (bool): Show all containers. Only running containers are shown
by default trunc (bool): Truncate output
since (str): Show only containers created since Id or Name, include
non-running ones
before (str): Show only container created before Id or Name,
include non-running ones
limit (int): Show `limit` last created containers, include
non-running ones
filters (dict): Filters to be processed on the image list.
Available filters:
- `exited` (int): Only containers with specified exit code
- `status` (str): One of ``restarting``, ``running``,
``paused``, ``exited``
- `label` (str): format either ``"key"`` or ``"key=value"``
- `id` (str): The id of the container.
- `name` (str): The name of the container.
- `ancestor` (str): Filter by container ancestor. Format of
``<image-name>[:tag]``, ``<image-id>``, or
``<image@digest>``.
- `before` (str): Only containers created before a particular
container. Give the container name or id.
- `since` (str): Only containers created after a particular
container. Give container name or id.
A comprehensive list can be found in the documentation for
`docker ps
<https://docs.docker.com/engine/reference/commandline/ps>`_.
Returns:
(list of :py:class:`Container`)
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
resp = self.client.api.containers(all=all, before=before,
filters=filters, limit=limit,
since=since)
return [self.get(r['Id']) for r in resp]
# kwargs to copy straight from run to create
RUN_CREATE_KWARGS = [
'command',
'detach',
'domainname',
'entrypoint',
'environment',
'healthcheck',
'hostname',
'image',
'labels',
'mac_address',
'name',
'network_disabled',
'stdin_open',
'stop_signal',
'tty',
'user',
'volume_driver',
'working_dir',
]
# kwargs to copy straight from run to host_config
RUN_HOST_CONFIG_KWARGS = [
'blkio_weight_device',
'blkio_weight',
'cap_add',
'cap_drop',
'cgroup_parent',
'cpu_period',
'cpu_quota',
'cpu_shares',
'cpuset_cpus',
'device_read_bps',
'device_read_iops',
'device_write_bps',
'device_write_iops',
'devices',
'dns_opt',
'dns_search',
'dns',
'extra_hosts',
'group_add',
'ipc_mode',
'isolation',
'kernel_memory',
'links',
'log_config',
'lxc_conf',
'mem_limit',
'mem_reservation',
'mem_swappiness',
'memswap_limit',
'network_mode',
'oom_kill_disable',
'oom_score_adj',
'pid_mode',
'pids_limit',
'privileged',
'publish_all_ports',
'read_only',
'restart_policy',
'security_opt',
'shm_size',
'sysctls',
'tmpfs',
'ulimits',
'userns_mode',
'version',
'volumes_from',
]
def _create_container_args(kwargs):
"""
Convert arguments to create() to arguments to create_container().
"""
# Copy over kwargs which can be copied directly
create_kwargs = {}
for key in copy.copy(kwargs):
if key in RUN_CREATE_KWARGS:
create_kwargs[key] = kwargs.pop(key)
host_config_kwargs = {}
for key in copy.copy(kwargs):
if key in RUN_HOST_CONFIG_KWARGS:
host_config_kwargs[key] = kwargs.pop(key)
# Process kwargs which are split over both create and host_config
ports = kwargs.pop('ports', {})
if ports:
host_config_kwargs['port_bindings'] = ports
volumes = kwargs.pop('volumes', {})
if volumes:
host_config_kwargs['binds'] = volumes
networks = kwargs.pop('networks', [])
if networks:
create_kwargs['networking_config'] = {network: None
for network in networks}
# All kwargs should have been consumed by this point, so raise
# error if any are left
if kwargs:
raise create_unexpected_kwargs_error('run', kwargs)
create_kwargs['host_config'] = create_host_config(**host_config_kwargs)
# Fill in any kwargs which need processing by create_host_config first
port_bindings = create_kwargs['host_config'].get('PortBindings')
if port_bindings:
# sort to make consistent for tests
create_kwargs['ports'] = [tuple(p.split('/', 1))
for p in sorted(port_bindings.keys())]
binds = create_kwargs['host_config'].get('Binds')
if binds:
create_kwargs['volumes'] = [v.split(':')[0] for v in binds]
return create_kwargs

269
docker/models/images.py Normal file
View File

@ -0,0 +1,269 @@
import re
import six
from ..api import APIClient
from ..errors import BuildError
from ..utils.json_stream import json_stream
from .resource import Collection, Model
class Image(Model):
"""
An image on the server.
"""
def __repr__(self):
return "<%s: '%s'>" % (self.__class__.__name__, "', '".join(self.tags))
@property
def short_id(self):
"""
The ID of the image truncated to 10 characters, plus the ``sha256:``
prefix.
"""
if self.id.startswith('sha256:'):
return self.id[:17]
return self.id[:10]
@property
def tags(self):
"""
The image's tags.
"""
return [
tag for tag in self.attrs.get('RepoTags', [])
if tag != '<none>:<none>'
]
def history(self):
"""
Show the history of an image.
Returns:
(str): The history of the image.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
return self.client.api.history(self.id)
def save(self):
"""
Get a tarball of an image. Similar to the ``docker save`` command.
Returns:
(urllib3.response.HTTPResponse object): The response from the
daemon.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
Example:
>>> image = cli.get("fedora:latest")
>>> resp = image.save()
>>> f = open('/tmp/fedora-latest.tar', 'w')
>>> f.write(resp.data)
>>> f.close()
"""
return self.client.api.get_image(self.id)
def tag(self, repository, tag=None, **kwargs):
"""
Tag this image into a repository. Similar to the ``docker tag``
command.
Args:
repository (str): The repository to set for the tag
tag (str): The tag name
force (bool): Force
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
Returns:
(bool): ``True`` if successful
"""
self.client.api.tag(self.id, repository, tag=tag, **kwargs)
class ImageCollection(Collection):
model = Image
def build(self, **kwargs):
"""
Build an image and return it. Similar to the ``docker build``
command. Either ``path`` or ``fileobj`` must be set.
If you have a tar file for the Docker build context (including a
Dockerfile) already, pass a readable file-like object to ``fileobj``
and also pass ``custom_context=True``. If the stream is compressed
also, set ``encoding`` to the correct value (e.g ``gzip``).
If you want to get the raw output of the build, use the
:py:meth:`~docker.api.build.BuildApiMixin.build` method in the
low-level API.
Args:
path (str): Path to the directory containing the Dockerfile
fileobj: A file object to use as the Dockerfile. (Or a file-like
object)
tag (str): A tag to add to the final image
quiet (bool): Whether to return the status
nocache (bool): Don't use the cache when set to ``True``
rm (bool): Remove intermediate containers. The ``docker build``
command now defaults to ``--rm=true``, but we have kept the old
default of `False` to preserve backward compatibility
stream (bool): *Deprecated for API version > 1.8 (always True)*.
Return a blocking generator you can iterate over to retrieve
build output as it happens
timeout (int): HTTP timeout
custom_context (bool): Optional if using ``fileobj``
encoding (str): The encoding for a stream. Set to ``gzip`` for
compressing
pull (bool): Downloads any updates to the FROM image in Dockerfiles
forcerm (bool): Always remove intermediate containers, even after
unsuccessful builds
dockerfile (str): path within the build context to the Dockerfile
buildargs (dict): A dictionary of build arguments
container_limits (dict): A dictionary of limits applied to each
container created by the build process. Valid keys:
- memory (int): set memory limit for build
- memswap (int): Total memory (memory + swap), -1 to disable
swap
- cpushares (int): CPU shares (relative weight)
- cpusetcpus (str): CPUs in which to allow execution, e.g.,
``"0-3"``, ``"0,1"``
decode (bool): If set to ``True``, the returned stream will be
decoded into dicts on the fly. Default ``False``.
Returns:
(:py:class:`Image`): The built image.
Raises:
:py:class:`docker.errors.BuildError`
If there is an error during the build.
:py:class:`docker.errors.APIError`
If the server returns any other error.
``TypeError``
If neither ``path`` nor ``fileobj`` is specified.
"""
resp = self.client.api.build(**kwargs)
if isinstance(resp, six.string_types):
return self.get(resp)
events = list(json_stream(resp))
if not events:
return BuildError('Unknown')
event = events[-1]
if 'stream' in event:
match = re.search(r'Successfully built ([0-9a-f]+)',
event.get('stream', ''))
if match:
image_id = match.group(1)
return self.get(image_id)
raise BuildError(event.get('error') or event)
def get(self, name):
"""
Gets an image.
Args:
name (str): The name of the image.
Returns:
(:py:class:`Image`): The image.
Raises:
:py:class:`docker.errors.ImageNotFound` If the image does not
exist.
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
return self.prepare_model(self.client.api.inspect_image(name))
def list(self, name=None, all=False, filters=None):
"""
List images on the server.
Args:
name (str): Only show images belonging to the repository ``name``
all (bool): Show intermediate image layers. By default, these are
filtered out.
filters (dict): Filters to be processed on the image list.
Available filters:
- ``dangling`` (bool)
- ``label`` (str): format either ``key`` or ``key=value``
Returns:
(list of :py:class:`Image`): The images.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
resp = self.client.api.images(name=name, all=all, filters=filters)
return [self.prepare_model(r) for r in resp]
def load(self, data):
"""
Load an image that was previously saved using
:py:meth:`~docker.models.images.Image.save` (or ``docker save``).
Similar to ``docker load``.
Args:
data (binary): Image data to be loaded.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
return self.client.api.load_image(data)
def pull(self, name, **kwargs):
"""
Pull an image of the given name and return it. Similar to the
``docker pull`` command.
If you want to get the raw pull output, use the
:py:meth:`~docker.api.image.ImageApiMixin.pull` method in the
low-level API.
Args:
repository (str): The repository to pull
tag (str): The tag to pull
insecure_registry (bool): Use an insecure registry
auth_config (dict): Override the credentials that
:py:meth:`~docker.client.Client.login` has set for
this request. ``auth_config`` should contain the ``username``
and ``password`` keys to be valid.
Returns:
(:py:class:`Image`): The image that has been pulled.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
Example:
>>> image = client.images.pull('busybox')
"""
self.client.api.pull(name, **kwargs)
return self.get(name)
def push(self, repository, tag=None, **kwargs):
return self.client.api.push(repository, tag=tag, **kwargs)
push.__doc__ = APIClient.push.__doc__
def remove(self, *args, **kwargs):
self.client.api.remove_image(*args, **kwargs)
remove.__doc__ = APIClient.remove_image.__doc__
def search(self, *args, **kwargs):
return self.client.api.search(*args, **kwargs)
search.__doc__ = APIClient.search.__doc__

181
docker/models/networks.py Normal file
View File

@ -0,0 +1,181 @@
from .containers import Container
from .resource import Model, Collection
class Network(Model):
"""
A Docker network.
"""
@property
def name(self):
"""
The name of the network.
"""
return self.attrs.get('Name')
@property
def containers(self):
"""
The containers that are connected to the network, as a list of
:py:class:`~docker.models.containers.Container` objects.
"""
return [
self.client.containers.get(cid) for cid in
self.attrs.get('Containers', {}).keys()
]
def connect(self, container):
"""
Connect a container to this network.
Args:
container (str): Container to connect to this network, as either
an ID, name, or :py:class:`~docker.models.containers.Container`
object.
aliases (list): A list of aliases for this endpoint. Names in that
list can be used within the network to reach the container.
Defaults to ``None``.
links (list): A list of links for this endpoint. Containers
declared in this list will be linkedto this container.
Defaults to ``None``.
ipv4_address (str): The IP address of this container on the
network, using the IPv4 protocol. Defaults to ``None``.
ipv6_address (str): The IP address of this container on the
network, using the IPv6 protocol. Defaults to ``None``.
link_local_ips (list): A list of link-local (IPv4/IPv6) addresses.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
if isinstance(container, Container):
container = container.id
return self.client.api.connect_container_to_network(container, self.id)
def disconnect(self, container):
"""
Disconnect a container from this network.
Args:
container (str): Container to disconnect from this network, as
either an ID, name, or
:py:class:`~docker.models.containers.Container` object.
force (bool): Force the container to disconnect from a network.
Default: ``False``
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
if isinstance(container, Container):
container = container.id
return self.client.api.disconnect_container_from_network(container,
self.id)
def remove(self):
"""
Remove this network.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
return self.client.api.remove_network(self.id)
class NetworkCollection(Collection):
"""
Networks on the Docker server.
"""
model = Network
def create(self, name, *args, **kwargs):
"""
Create a network. Similar to the ``docker network create``.
Args:
name (str): Name of the network
driver (str): Name of the driver used to create the network
options (dict): Driver options as a key-value dictionary
ipam (dict): Optional custom IP scheme for the network.
Created with :py:meth:`~docker.utils.create_ipam_config`.
check_duplicate (bool): Request daemon to check for networks with
same name. Default: ``True``.
internal (bool): Restrict external access to the network. Default
``False``.
labels (dict): Map of labels to set on the network. Default
``None``.
enable_ipv6 (bool): Enable IPv6 on the network. Default ``False``.
Returns:
(:py:class:`Network`): The network that was created.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
Example:
A network using the bridge driver:
>>> client.networks.create("network1", driver="bridge")
You can also create more advanced networks with custom IPAM
configurations. For example, setting the subnet to
``192.168.52.0/24`` and gateway address to ``192.168.52.254``.
.. code-block:: python
>>> ipam_pool = docker.utils.create_ipam_pool(
subnet='192.168.52.0/24',
gateway='192.168.52.254'
)
>>> ipam_config = docker.utils.create_ipam_config(
pool_configs=[ipam_pool]
)
>>> client.networks.create(
"network1",
driver="bridge",
ipam=ipam_config
)
"""
resp = self.client.api.create_network(name, *args, **kwargs)
return self.get(resp['Id'])
def get(self, network_id):
"""
Get a network by its ID.
Args:
network_id (str): The ID of the network.
Returns:
(:py:class:`Network`) The network.
Raises:
:py:class:`docker.errors.NotFound`
If the network does not exist.
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
return self.prepare_model(self.client.api.inspect_network(network_id))
def list(self, *args, **kwargs):
"""
List networks. Similar to the ``docker networks ls`` command.
Args:
names (list): List of names to filter by.
ids (list): List of ids to filter by.
Returns:
(list of :py:class:`Network`) The networks on the server.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
resp = self.client.api.networks(*args, **kwargs)
return [self.prepare_model(item) for item in resp]

88
docker/models/nodes.py Normal file
View File

@ -0,0 +1,88 @@
from .resource import Model, Collection
class Node(Model):
"""A node in a swarm."""
id_attribute = 'ID'
@property
def version(self):
"""
The version number of the service. If this is not the same as the
server, the :py:meth:`update` function will not work and you will
need to call :py:meth:`reload` before calling it again.
"""
return self.attrs.get('Version').get('Index')
def update(self, node_spec):
"""
Update the node's configuration.
Args:
node_spec (dict): Configuration settings to update. Any values
not provided will be removed. Default: ``None``
Returns:
`True` if the request went through.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
Example:
>>> node_spec = {'Availability': 'active',
'Name': 'node-name',
'Role': 'manager',
'Labels': {'foo': 'bar'}
}
>>> node.update(node_spec)
"""
return self.client.api.update_node(self.id, self.version, node_spec)
class NodeCollection(Collection):
"""Nodes on the Docker server."""
model = Node
def get(self, node_id):
"""
Get a node.
Args:
node_id (string): ID of the node to be inspected.
Returns:
A :py:class:`Node` object.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
return self.prepare_model(self.client.api.inspect_node(node_id))
def list(self, *args, **kwargs):
"""
List swarm nodes.
Args:
filters (dict): Filters to process on the nodes list. Valid
filters: ``id``, ``name``, ``membership`` and ``role``.
Default: ``None``
Returns:
A list of :py:class:`Node` objects.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
Example:
>>> client.nodes.list(filters={'role': 'manager'})
"""
return [
self.prepare_model(n)
for n in self.client.api.nodes(*args, **kwargs)
]

84
docker/models/resource.py Normal file
View File

@ -0,0 +1,84 @@
class Model(object):
"""
A base class for representing a single object on the server.
"""
id_attribute = 'Id'
def __init__(self, attrs=None, client=None, collection=None):
#: A client pointing at the server that this object is on.
self.client = client
#: The collection that this model is part of.
self.collection = collection
#: The raw representation of this object from the API
self.attrs = attrs
if self.attrs is None:
self.attrs = {}
def __repr__(self):
return "<%s: %s>" % (self.__class__.__name__, self.short_id)
def __eq__(self, other):
return isinstance(other, self.__class__) and self.id == other.id
@property
def id(self):
"""
The ID of the object.
"""
return self.attrs.get(self.id_attribute)
@property
def short_id(self):
"""
The ID of the object, truncated to 10 characters.
"""
return self.id[:10]
def reload(self):
"""
Load this object from the server again and update ``attrs`` with the
new data.
"""
new_model = self.collection.get(self.id)
self.attrs = new_model.attrs
class Collection(object):
"""
A base class for representing all objects of a particular type on the
server.
"""
#: The type of object this collection represents, set by subclasses
model = None
def __init__(self, client=None):
#: The client pointing at the server that this collection of objects
#: is on.
self.client = client
def list(self):
raise NotImplementedError
def get(self, key):
raise NotImplementedError
def create(self, attrs=None):
raise NotImplementedError
def prepare_model(self, attrs):
"""
Create a model from a set of attributes.
"""
if isinstance(attrs, Model):
attrs.client = self.client
attrs.collection = self
return attrs
elif isinstance(attrs, dict):
return self.model(attrs=attrs, client=self.client, collection=self)
else:
raise Exception("Can't create %s from %s" %
(self.model.__name__, attrs))

240
docker/models/services.py Normal file
View File

@ -0,0 +1,240 @@
import copy
from docker.errors import create_unexpected_kwargs_error
from docker.types import TaskTemplate, ContainerSpec
from .resource import Model, Collection
class Service(Model):
"""A service."""
id_attribute = 'ID'
@property
def name(self):
"""The service's name."""
return self.attrs['Spec']['Name']
@property
def version(self):
"""
The version number of the service. If this is not the same as the
server, the :py:meth:`update` function will not work and you will
need to call :py:meth:`reload` before calling it again.
"""
return self.attrs.get('Version').get('Index')
def remove(self):
"""
Stop and remove the service.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
return self.client.api.remove_service(self.id)
def tasks(self, filters=None):
"""
List the tasks in this service.
Args:
filters (dict): A map of filters to process on the tasks list.
Valid filters: ``id``, ``name``, ``node``,
``label``, and ``desired-state``.
Returns:
(list): List of task dictionaries.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
if filters is None:
filters = {}
filters['service'] = self.id
return self.client.api.tasks(filters=filters)
def update(self, **kwargs):
"""
Update a service's configuration. Similar to the ``docker service
update`` command.
Takes the same parameters as :py:meth:`~ServiceCollection.create`.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
# Image is required, so if it hasn't been set, use current image
if 'image' not in kwargs:
spec = self.attrs['Spec']['TaskTemplate']['ContainerSpec']
kwargs['image'] = spec['Image']
create_kwargs = _get_create_service_kwargs('update', kwargs)
return self.client.api.update_service(
self.id,
self.version,
**create_kwargs
)
class ServiceCollection(Collection):
"""Services on the Docker server."""
model = Service
def create(self, image, command=None, **kwargs):
"""
Create a service. Similar to the ``docker service create`` command.
Args:
image (str): The image name to use for the containers.
command (list of str or str): Command to run.
args (list of str): Arguments to the command.
constraints (list of str): Placement constraints.
container_labels (dict): Labels to apply to the container.
endpoint_spec (dict): Properties that can be configured to
access and load balance a service. Default: ``None``.
env (list of str): Environment variables, in the form
``KEY=val``.
labels (dict): Labels to apply to the service.
log_driver (str): Log driver to use for containers.
log_driver_options (dict): Log driver options.
mode (string): Scheduling mode for the service (``replicated`` or
``global``). Defaults to ``replicated``.
mounts (list of str): Mounts for the containers, in the form
``source:target:options``, where options is either
``ro`` or ``rw``.
name (str): Name to give to the service.
networks (list): List of network names or IDs to attach the
service to. Default: ``None``.
resources (dict): Resource limits and reservations. For the
format, see the Remote API documentation.
restart_policy (dict): Restart policy for containers. For the
format, see the Remote API documentation.
stop_grace_period (int): Amount of time to wait for
containers to terminate before forcefully killing them.
update_config (dict): Specification for the update strategy of the
service. Default: ``None``
user (str): User to run commands as.
workdir (str): Working directory for commands to run.
Returns:
(:py:class:`Service`) The created service.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
kwargs['image'] = image
kwargs['command'] = command
create_kwargs = _get_create_service_kwargs('create', kwargs)
service_id = self.client.api.create_service(**create_kwargs)
return self.get(service_id)
def get(self, service_id):
"""
Get a service.
Args:
service_id (str): The ID of the service.
Returns:
(:py:class:`Service`): The service.
Raises:
:py:class:`docker.errors.NotFound`
If the service does not exist.
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
return self.prepare_model(self.client.api.inspect_service(service_id))
def list(self, **kwargs):
"""
List services.
Args:
filters (dict): Filters to process on the nodes list. Valid
filters: ``id`` and ``name``. Default: ``None``.
Returns:
(list of :py:class:`Service`): The services.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
return [
self.prepare_model(s)
for s in self.client.api.services(**kwargs)
]
# kwargs to copy straight over to ContainerSpec
CONTAINER_SPEC_KWARGS = [
'image',
'command',
'args',
'env',
'workdir',
'user',
'labels',
'mounts',
'stop_grace_period',
]
# kwargs to copy straight over to TaskTemplate
TASK_TEMPLATE_KWARGS = [
'resources',
'restart_policy',
]
# kwargs to copy straight over to create_service
CREATE_SERVICE_KWARGS = [
'name',
'labels',
'mode',
'update_config',
'networks',
'endpoint_spec',
]
def _get_create_service_kwargs(func_name, kwargs):
# Copy over things which can be copied directly
create_kwargs = {}
for key in copy.copy(kwargs):
if key in CREATE_SERVICE_KWARGS:
create_kwargs[key] = kwargs.pop(key)
container_spec_kwargs = {}
for key in copy.copy(kwargs):
if key in CONTAINER_SPEC_KWARGS:
container_spec_kwargs[key] = kwargs.pop(key)
task_template_kwargs = {}
for key in copy.copy(kwargs):
if key in TASK_TEMPLATE_KWARGS:
task_template_kwargs[key] = kwargs.pop(key)
if 'container_labels' in kwargs:
container_spec_kwargs['labels'] = kwargs.pop('container_labels')
if 'constraints' in kwargs:
task_template_kwargs['placement'] = {
'Constraints': kwargs.pop('constraints')
}
if 'log_driver' in kwargs:
task_template_kwargs['log_driver'] = {
'Name': kwargs.pop('log_driver'),
'Options': kwargs.pop('log_driver_options', {})
}
# All kwargs should have been consumed by this point, so raise
# error if any are left
if kwargs:
raise create_unexpected_kwargs_error(func_name, kwargs)
container_spec = ContainerSpec(**container_spec_kwargs)
task_template_kwargs['container_spec'] = container_spec
create_kwargs['task_template'] = TaskTemplate(**task_template_kwargs)
return create_kwargs

146
docker/models/swarm.py Normal file
View File

@ -0,0 +1,146 @@
from docker.api import APIClient
from docker.errors import APIError
from docker.types import SwarmSpec
from .resource import Model
class Swarm(Model):
"""
The server's Swarm state. This a singleton that must be reloaded to get
the current state of the Swarm.
"""
def __init__(self, *args, **kwargs):
super(Swarm, self).__init__(*args, **kwargs)
if self.client:
try:
self.reload()
except APIError as e:
if e.response.status_code != 406:
raise
@property
def version(self):
"""
The version number of the swarm. If this is not the same as the
server, the :py:meth:`update` function will not work and you will
need to call :py:meth:`reload` before calling it again.
"""
return self.attrs.get('Version').get('Index')
def init(self, advertise_addr=None, listen_addr='0.0.0.0:2377',
force_new_cluster=False, swarm_spec=None, **kwargs):
"""
Initialize a new swarm on this Engine.
Args:
advertise_addr (str): Externally reachable address advertised to
other nodes. This can either be an address/port combination in
the form ``192.168.1.1:4567``, or an interface followed by a
port number, like ``eth0:4567``. If the port number is omitted,
the port number from the listen address is used.
If not specified, it will be automatically detected when
possible.
listen_addr (str): Listen address used for inter-manager
communication, as well as determining the networking interface
used for the VXLAN Tunnel Endpoint (VTEP). This can either be
an address/port combination in the form ``192.168.1.1:4567``,
or an interface followed by a port number, like ``eth0:4567``.
If the port number is omitted, the default swarm listening port
is used. Default: ``0.0.0.0:2377``
force_new_cluster (bool): Force creating a new Swarm, even if
already part of one. Default: False
task_history_retention_limit (int): Maximum number of tasks
history stored.
snapshot_interval (int): Number of logs entries between snapshot.
keep_old_snapshots (int): Number of snapshots to keep beyond the
current snapshot.
log_entries_for_slow_followers (int): Number of log entries to
keep around to sync up slow followers after a snapshot is
created.
heartbeat_tick (int): Amount of ticks (in seconds) between each
heartbeat.
election_tick (int): Amount of ticks (in seconds) needed without a
leader to trigger a new election.
dispatcher_heartbeat_period (int): The delay for an agent to send
a heartbeat to the dispatcher.
node_cert_expiry (int): Automatic expiry for nodes certificates.
external_ca (dict): Configuration for forwarding signing requests
to an external certificate authority. Use
``docker.types.SwarmExternalCA``.
name (string): Swarm's name
Returns:
``True`` if the request went through.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
Example:
>>> client.swarm.init(
advertise_addr='eth0', listen_addr='0.0.0.0:5000',
force_new_cluster=False, snapshot_interval=5000,
log_entries_for_slow_followers=1200
)
"""
init_kwargs = {}
for arg in ['advertise_addr', 'listen_addr', 'force_new_cluster']:
if arg in kwargs:
init_kwargs[arg] = kwargs[arg]
del kwargs[arg]
init_kwargs['swarm_spec'] = SwarmSpec(**kwargs)
self.client.api.init_swarm(**init_kwargs)
self.reload()
def join(self, *args, **kwargs):
return self.client.api.join_swarm(*args, **kwargs)
join.__doc__ = APIClient.join_swarm.__doc__
def leave(self, *args, **kwargs):
return self.client.api.leave_swarm(*args, **kwargs)
leave.__doc__ = APIClient.leave_swarm.__doc__
def reload(self):
"""
Inspect the swarm on the server and store the response in
:py:attr:`attrs`.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
self.attrs = self.client.api.inspect_swarm()
def update(self, rotate_worker_token=False, rotate_manager_token=False,
**kwargs):
"""
Update the swarm's configuration.
It takes the same arguments as :py:meth:`init`, except
``advertise_addr``, ``listen_addr``, and ``force_new_cluster``. In
addition, it takes these arguments:
Args:
rotate_worker_token (bool): Rotate the worker join token. Default:
``False``.
rotate_manager_token (bool): Rotate the manager join token.
Default: ``False``.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
# this seems to have to be set
if kwargs.get('node_cert_expiry') is None:
kwargs['node_cert_expiry'] = 7776000000000000
return self.client.api.update_swarm(
version=self.version,
swarm_spec=SwarmSpec(**kwargs),
rotate_worker_token=rotate_worker_token,
rotate_manager_token=rotate_manager_token
)

84
docker/models/volumes.py Normal file
View File

@ -0,0 +1,84 @@
from .resource import Model, Collection
class Volume(Model):
"""A volume."""
id_attribute = 'Name'
@property
def name(self):
"""The name of the volume."""
return self.attrs['Name']
def remove(self):
"""Remove this volume."""
return self.client.api.remove_volume(self.id)
class VolumeCollection(Collection):
"""Volumes on the Docker server."""
model = Volume
def create(self, name, **kwargs):
"""
Create a volume.
Args:
name (str): Name of the volume
driver (str): Name of the driver used to create the volume
driver_opts (dict): Driver options as a key-value dictionary
labels (dict): Labels to set on the volume
Returns:
(:py:class:`Volume`): The volume created.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
Example:
>>> volume = client.volumes.create(name='foobar', driver='local',
driver_opts={'foo': 'bar', 'baz': 'false'},
labels={"key": "value"})
"""
obj = self.client.api.create_volume(name, **kwargs)
return self.prepare_model(obj)
def get(self, volume_id):
"""
Get a volume.
Args:
volume_id (str): Volume name.
Returns:
(:py:class:`Volume`): The volume.
Raises:
:py:class:`docker.errors.NotFound`
If the volume does not exist.
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
return self.prepare_model(self.client.api.inspect_volume(volume_id))
def list(self, **kwargs):
"""
List volumes. Similar to the ``docker volume ls`` command.
Args:
filters (dict): Server-side list filtering options.
Returns:
(list of :py:class:`Volume`): The volumes.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
resp = self.client.api.volumes(**kwargs)
if not resp.get('Volumes'):
return []
return [self.prepare_model(obj) for obj in resp['Volumes']]

View File

@ -0,0 +1,79 @@
from __future__ import absolute_import
from __future__ import unicode_literals
import json
import json.decoder
import six
from ..errors import StreamParseError
json_decoder = json.JSONDecoder()
def stream_as_text(stream):
"""Given a stream of bytes or text, if any of the items in the stream
are bytes convert them to text.
This function can be removed once docker-py returns text streams instead
of byte streams.
"""
for data in stream:
if not isinstance(data, six.text_type):
data = data.decode('utf-8', 'replace')
yield data
def json_splitter(buffer):
"""Attempt to parse a json object from a buffer. If there is at least one
object, return it and the rest of the buffer, otherwise return None.
"""
buffer = buffer.strip()
try:
obj, index = json_decoder.raw_decode(buffer)
rest = buffer[json.decoder.WHITESPACE.match(buffer, index).end():]
return obj, rest
except ValueError:
return None
def json_stream(stream):
"""Given a stream of text, return a stream of json objects.
This handles streams which are inconsistently buffered (some entries may
be newline delimited, and others are not).
"""
return split_buffer(stream, json_splitter, json_decoder.decode)
def line_splitter(buffer, separator=u'\n'):
index = buffer.find(six.text_type(separator))
if index == -1:
return None
return buffer[:index + 1], buffer[index + 1:]
def split_buffer(stream, splitter=None, decoder=lambda a: a):
"""Given a generator which yields strings and a splitter function,
joins all input, splits on the separator and yields each chunk.
Unlike string.split(), each chunk includes the trailing
separator, except for the last one if none was found on the end
of the input.
"""
splitter = splitter or line_splitter
buffered = six.text_type('')
for data in stream_as_text(stream):
buffered += data
while True:
buffer_split = splitter(buffered)
if buffer_split is None:
break
item, buffered = buffer_split
yield item
if buffered:
try:
yield decoder(buffered)
except Exception as e:
raise StreamParseError(e)

View File

@ -61,3 +61,16 @@ def wait_on_condition(condition, delay=0.1, timeout=40):
def random_name():
return u'dockerpytest_{0:x}'.format(random.getrandbits(64))
def force_leave_swarm(client):
"""Actually force leave a Swarm. There seems to be a bug in Swarm that
occasionally throws "context deadline exceeded" errors when leaving."""
while True:
try:
return client.swarm.leave(force=True)
except docker.errors.APIError as e:
if e.explanation == "context deadline exceeded":
continue
else:
raise

View File

@ -0,0 +1,20 @@
import unittest
import docker
class ClientTest(unittest.TestCase):
def test_info(self):
client = docker.from_env()
info = client.info()
assert 'ID' in info
assert 'Name' in info
def test_ping(self):
client = docker.from_env()
assert client.ping() is True
def test_version(self):
client = docker.from_env()
assert 'Version' in client.version()

View File

@ -0,0 +1,204 @@
import docker
from .base import BaseIntegrationTest
class ContainerCollectionTest(BaseIntegrationTest):
def test_run(self):
client = docker.from_env()
self.assertEqual(
client.containers.run("alpine", "echo hello world", remove=True),
b'hello world\n'
)
def test_run_detach(self):
client = docker.from_env()
container = client.containers.run("alpine", "sleep 300", detach=True)
self.tmp_containers.append(container.id)
assert container.attrs['Config']['Image'] == "alpine"
assert container.attrs['Config']['Cmd'] == ['sleep', '300']
def test_run_with_error(self):
client = docker.from_env()
with self.assertRaises(docker.errors.ContainerError) as cm:
client.containers.run("alpine", "cat /test", remove=True)
assert cm.exception.exit_status == 1
assert "cat /test" in str(cm.exception)
assert "alpine" in str(cm.exception)
assert "No such file or directory" in str(cm.exception)
def test_run_with_image_that_does_not_exist(self):
client = docker.from_env()
with self.assertRaises(docker.errors.ImageNotFound):
client.containers.run("dockerpytest_does_not_exist")
def test_get(self):
client = docker.from_env()
container = client.containers.run("alpine", "sleep 300", detach=True)
self.tmp_containers.append(container.id)
assert client.containers.get(container.id).attrs[
'Config']['Image'] == "alpine"
def test_list(self):
client = docker.from_env()
container_id = client.containers.run(
"alpine", "sleep 300", detach=True).id
self.tmp_containers.append(container_id)
containers = [c for c in client.containers.list() if c.id ==
container_id]
assert len(containers) == 1
container = containers[0]
assert container.attrs['Config']['Image'] == 'alpine'
container.kill()
container.remove()
assert container_id not in [c.id for c in client.containers.list()]
class ContainerTest(BaseIntegrationTest):
def test_commit(self):
client = docker.from_env()
container = client.containers.run(
"alpine", "sh -c 'echo \"hello\" > /test'",
detach=True
)
self.tmp_containers.append(container.id)
container.wait()
image = container.commit()
self.assertEqual(
client.containers.run(image.id, "cat /test", remove=True),
b"hello\n"
)
def test_diff(self):
client = docker.from_env()
container = client.containers.run("alpine", "touch /test", detach=True)
self.tmp_containers.append(container.id)
container.wait()
assert container.diff() == [{'Path': '/test', 'Kind': 1}]
def test_exec_run(self):
client = docker.from_env()
container = client.containers.run(
"alpine", "sh -c 'echo \"hello\" > /test; sleep 60'", detach=True
)
self.tmp_containers.append(container.id)
assert container.exec_run("cat /test") == b"hello\n"
def test_kill(self):
client = docker.from_env()
container = client.containers.run("alpine", "sleep 300", detach=True)
self.tmp_containers.append(container.id)
while container.status != 'running':
container.reload()
assert container.status == 'running'
container.kill()
container.reload()
assert container.status == 'exited'
def test_logs(self):
client = docker.from_env()
container = client.containers.run("alpine", "echo hello world",
detach=True)
self.tmp_containers.append(container.id)
container.wait()
assert container.logs() == b"hello world\n"
def test_pause(self):
client = docker.from_env()
container = client.containers.run("alpine", "sleep 300", detach=True)
self.tmp_containers.append(container.id)
container.pause()
container.reload()
assert container.status == "paused"
container.unpause()
container.reload()
assert container.status == "running"
def test_remove(self):
client = docker.from_env()
container = client.containers.run("alpine", "echo hello", detach=True)
self.tmp_containers.append(container.id)
assert container.id in [c.id for c in client.containers.list(all=True)]
container.wait()
container.remove()
containers = client.containers.list(all=True)
assert container.id not in [c.id for c in containers]
def test_rename(self):
client = docker.from_env()
container = client.containers.run("alpine", "echo hello", name="test1",
detach=True)
self.tmp_containers.append(container.id)
assert container.name == "test1"
container.rename("test2")
container.reload()
assert container.name == "test2"
def test_restart(self):
client = docker.from_env()
container = client.containers.run("alpine", "sleep 100", detach=True)
self.tmp_containers.append(container.id)
first_started_at = container.attrs['State']['StartedAt']
container.restart()
container.reload()
second_started_at = container.attrs['State']['StartedAt']
assert first_started_at != second_started_at
def test_start(self):
client = docker.from_env()
container = client.containers.create("alpine", "sleep 50", detach=True)
self.tmp_containers.append(container.id)
assert container.status == "created"
container.start()
container.reload()
assert container.status == "running"
def test_stats(self):
client = docker.from_env()
container = client.containers.run("alpine", "sleep 100", detach=True)
self.tmp_containers.append(container.id)
stats = container.stats(stream=False)
for key in ['read', 'networks', 'precpu_stats', 'cpu_stats',
'memory_stats', 'blkio_stats']:
assert key in stats
def test_stop(self):
client = docker.from_env()
container = client.containers.run("alpine", "top", detach=True)
self.tmp_containers.append(container.id)
assert container.status in ("running", "created")
container.stop(timeout=2)
container.reload()
assert container.status == "exited"
def test_top(self):
client = docker.from_env()
container = client.containers.run("alpine", "sleep 60", detach=True)
self.tmp_containers.append(container.id)
top = container.top()
assert len(top['Processes']) == 1
assert 'sleep 60' in top['Processes'][0]
def test_update(self):
client = docker.from_env()
container = client.containers.run("alpine", "sleep 60", detach=True,
cpu_shares=2)
self.tmp_containers.append(container.id)
assert container.attrs['HostConfig']['CpuShares'] == 2
container.update(cpu_shares=3)
container.reload()
assert container.attrs['HostConfig']['CpuShares'] == 3
def test_wait(self):
client = docker.from_env()
container = client.containers.run("alpine", "sh -c 'exit 0'",
detach=True)
self.tmp_containers.append(container.id)
assert container.wait() == 0
container = client.containers.run("alpine", "sh -c 'exit 1'",
detach=True)
self.tmp_containers.append(container.id)
assert container.wait() == 1

View File

@ -0,0 +1,67 @@
import io
import docker
from .base import BaseIntegrationTest
class ImageCollectionTest(BaseIntegrationTest):
def test_build(self):
client = docker.from_env()
image = client.images.build(fileobj=io.BytesIO(
"FROM alpine\n"
"CMD echo hello world".encode('ascii')
))
self.tmp_imgs.append(image.id)
assert client.containers.run(image) == b"hello world\n"
def test_build_with_error(self):
client = docker.from_env()
with self.assertRaises(docker.errors.BuildError) as cm:
client.images.build(fileobj=io.BytesIO(
"FROM alpine\n"
"NOTADOCKERFILECOMMAND".encode('ascii')
))
assert str(cm.exception) == ("Unknown instruction: "
"NOTADOCKERFILECOMMAND")
def test_list(self):
client = docker.from_env()
image = client.images.pull('alpine:latest')
assert image.id in get_ids(client.images.list())
def test_list_with_repository(self):
client = docker.from_env()
image = client.images.pull('alpine:latest')
assert image.id in get_ids(client.images.list('alpine'))
assert image.id in get_ids(client.images.list('alpine:latest'))
def test_pull(self):
client = docker.from_env()
image = client.images.pull('alpine:latest')
assert 'alpine:latest' in image.attrs['RepoTags']
class ImageTest(BaseIntegrationTest):
def test_tag_and_remove(self):
repo = 'dockersdk.tests.images.test_tag'
tag = 'some-tag'
identifier = '{}:{}'.format(repo, tag)
client = docker.from_env()
image = client.images.pull('alpine:latest')
image.tag(repo, tag)
self.tmp_imgs.append(identifier)
assert image.id in get_ids(client.images.list(repo))
assert image.id in get_ids(client.images.list(identifier))
client.images.remove(identifier)
assert image.id not in get_ids(client.images.list(repo))
assert image.id not in get_ids(client.images.list(identifier))
assert image.id in get_ids(client.images.list('alpine:latest'))
def get_ids(images):
return [i.id for i in images]

View File

@ -0,0 +1,64 @@
import docker
from .. import helpers
from .base import BaseIntegrationTest
class ImageCollectionTest(BaseIntegrationTest):
def test_create(self):
client = docker.from_env()
name = helpers.random_name()
network = client.networks.create(name, labels={'foo': 'bar'})
self.tmp_networks.append(network.id)
assert network.name == name
assert network.attrs['Labels']['foo'] == "bar"
def test_get(self):
client = docker.from_env()
name = helpers.random_name()
network_id = client.networks.create(name).id
self.tmp_networks.append(network_id)
network = client.networks.get(network_id)
assert network.name == name
def test_list_remove(self):
client = docker.from_env()
name = helpers.random_name()
network = client.networks.create(name)
self.tmp_networks.append(network.id)
assert network.id in [n.id for n in client.networks.list()]
assert network.id not in [
n.id for n in
client.networks.list(ids=["fdhjklfdfdshjkfds"])
]
assert network.id in [
n.id for n in
client.networks.list(ids=[network.id])
]
assert network.id not in [
n.id for n in
client.networks.list(names=["fdshjklfdsjhkl"])
]
assert network.id in [
n.id for n in
client.networks.list(names=[name])
]
network.remove()
assert network.id not in [n.id for n in client.networks.list()]
class ImageTest(BaseIntegrationTest):
def test_connect_disconnect(self):
client = docker.from_env()
network = client.networks.create(helpers.random_name())
self.tmp_networks.append(network.id)
container = client.containers.create("alpine", "sleep 300")
self.tmp_containers.append(container.id)
assert network.containers == []
network.connect(container)
container.start()
assert client.networks.get(network.id).containers == [container]
network.disconnect(container)
assert network.containers == []
assert client.networks.get(network.id).containers == []

View File

@ -0,0 +1,34 @@
import unittest
import docker
from .. import helpers
class NodesTest(unittest.TestCase):
def setUp(self):
helpers.force_leave_swarm(docker.from_env())
def tearDown(self):
helpers.force_leave_swarm(docker.from_env())
def test_list_get_update(self):
client = docker.from_env()
client.swarm.init()
nodes = client.nodes.list()
assert len(nodes) == 1
assert nodes[0].attrs['Spec']['Role'] == 'manager'
node = client.nodes.get(nodes[0].id)
assert node.id == nodes[0].id
assert node.attrs['Spec']['Role'] == 'manager'
assert node.version > 0
node = client.nodes.list()[0]
assert not node.attrs['Spec'].get('Labels')
node.update({
'Availability': 'active',
'Name': 'node-name',
'Role': 'manager',
'Labels': {'foo': 'bar'}
})
node.reload()
assert node.attrs['Spec']['Labels'] == {'foo': 'bar'}

View File

@ -0,0 +1,16 @@
import docker
from .base import BaseIntegrationTest
class ModelTest(BaseIntegrationTest):
def test_reload(self):
client = docker.from_env()
container = client.containers.run("alpine", "sleep 300", detach=True)
self.tmp_containers.append(container.id)
first_started_at = container.attrs['State']['StartedAt']
container.kill()
container.start()
assert container.attrs['State']['StartedAt'] == first_started_at
container.reload()
assert container.attrs['State']['StartedAt'] != first_started_at

View File

@ -0,0 +1,100 @@
import unittest
import docker
from .. import helpers
class ServiceTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
client = docker.from_env()
helpers.force_leave_swarm(client)
client.swarm.init()
@classmethod
def tearDownClass(cls):
helpers.force_leave_swarm(docker.from_env())
def test_create(self):
client = docker.from_env()
name = helpers.random_name()
service = client.services.create(
# create arguments
name=name,
labels={'foo': 'bar'},
# ContainerSpec arguments
image="alpine",
command="sleep 300",
container_labels={'container': 'label'}
)
assert service.name == name
assert service.attrs['Spec']['Labels']['foo'] == 'bar'
container_spec = service.attrs['Spec']['TaskTemplate']['ContainerSpec']
assert container_spec['Image'] == "alpine"
assert container_spec['Labels'] == {'container': 'label'}
def test_get(self):
client = docker.from_env()
name = helpers.random_name()
service = client.services.create(
name=name,
image="alpine",
command="sleep 300"
)
service = client.services.get(service.id)
assert service.name == name
def test_list_remove(self):
client = docker.from_env()
service = client.services.create(
name=helpers.random_name(),
image="alpine",
command="sleep 300"
)
assert service in client.services.list()
service.remove()
assert service not in client.services.list()
def test_tasks(self):
client = docker.from_env()
service1 = client.services.create(
name=helpers.random_name(),
image="alpine",
command="sleep 300"
)
service2 = client.services.create(
name=helpers.random_name(),
image="alpine",
command="sleep 300"
)
tasks = []
while len(tasks) == 0:
tasks = service1.tasks()
assert len(tasks) == 1
assert tasks[0]['ServiceID'] == service1.id
tasks = []
while len(tasks) == 0:
tasks = service2.tasks()
assert len(tasks) == 1
assert tasks[0]['ServiceID'] == service2.id
def test_update(self):
client = docker.from_env()
service = client.services.create(
# create arguments
name=helpers.random_name(),
# ContainerSpec arguments
image="alpine",
command="sleep 300"
)
new_name = helpers.random_name()
service.update(
# create argument
name=new_name,
# ContainerSpec argument
command="sleep 600"
)
service.reload()
assert service.name == new_name
container_spec = service.attrs['Spec']['TaskTemplate']['ContainerSpec']
assert container_spec['Command'] == ["sleep", "600"]

View File

@ -0,0 +1,22 @@
import unittest
import docker
from .. import helpers
class SwarmTest(unittest.TestCase):
def setUp(self):
helpers.force_leave_swarm(docker.from_env())
def tearDown(self):
helpers.force_leave_swarm(docker.from_env())
def test_init_update_leave(self):
client = docker.from_env()
client.swarm.init(snapshot_interval=5000)
assert client.swarm.attrs['Spec']['Raft']['SnapshotInterval'] == 5000
client.swarm.update(snapshot_interval=10000)
assert client.swarm.attrs['Spec']['Raft']['SnapshotInterval'] == 10000
assert client.swarm.leave(force=True)
with self.assertRaises(docker.errors.APIError) as cm:
client.swarm.reload()
assert cm.exception.response.status_code == 406

View File

@ -0,0 +1,30 @@
import docker
from .base import BaseIntegrationTest
class VolumesTest(BaseIntegrationTest):
def test_create_get(self):
client = docker.from_env()
volume = client.volumes.create(
'dockerpytest_1',
driver='local',
labels={'labelkey': 'labelvalue'}
)
self.tmp_volumes.append(volume.id)
assert volume.id
assert volume.name == 'dockerpytest_1'
assert volume.attrs['Labels'] == {'labelkey': 'labelvalue'}
volume = client.volumes.get(volume.id)
assert volume.name == 'dockerpytest_1'
def test_list_remove(self):
client = docker.from_env()
volume = client.volumes.create('dockerpytest_1')
self.tmp_volumes.append(volume.id)
assert volume in client.volumes.list()
assert volume in client.volumes.list(filters={'name': 'dockerpytest_'})
assert volume not in client.volumes.list(filters={'name': 'foobar'})
volume.remove()
assert volume not in client.volumes.list()

View File

@ -27,7 +27,6 @@ except ImportError:
DEFAULT_TIMEOUT_SECONDS = docker.constants.DEFAULT_TIMEOUT_SECONDS
TEST_CERT_DIR = os.path.join(os.path.dirname(__file__), 'testdata/certs')
def response(status_code=200, content='', headers=None, reason=None, elapsed=0,
@ -487,32 +486,6 @@ class UserAgentTest(unittest.TestCase):
self.assertEqual(headers['User-Agent'], 'foo/bar')
class FromEnvTest(unittest.TestCase):
def setUp(self):
self.os_environ = os.environ.copy()
def tearDown(self):
os.environ = self.os_environ
def test_from_env(self):
"""Test that environment variables are passed through to
utils.kwargs_from_env(). KwargsFromEnvTest tests that environment
variables are parsed correctly."""
os.environ.update(DOCKER_HOST='tcp://192.168.59.103:2376',
DOCKER_CERT_PATH=TEST_CERT_DIR,
DOCKER_TLS_VERIFY='1')
client = APIClient.from_env()
self.assertEqual(client.base_url, "https://192.168.59.103:2376")
def test_from_env_with_version(self):
os.environ.update(DOCKER_HOST='tcp://192.168.59.103:2376',
DOCKER_CERT_PATH=TEST_CERT_DIR,
DOCKER_TLS_VERIFY='1')
client = APIClient.from_env(version='2.32')
self.assertEqual(client.base_url, "https://192.168.59.103:2376")
self.assertEqual(client._version, '2.32')
class DisableSocketTest(unittest.TestCase):
class DummySocket(object):
def __init__(self, timeout=60):

73
tests/unit/client_test.py Normal file
View File

@ -0,0 +1,73 @@
import datetime
import docker
import os
import unittest
from . import fake_api
try:
from unittest import mock
except ImportError:
import mock
TEST_CERT_DIR = os.path.join(os.path.dirname(__file__), 'testdata/certs')
class ClientTest(unittest.TestCase):
@mock.patch('docker.api.APIClient.events')
def test_events(self, mock_func):
since = datetime.datetime(2016, 1, 1, 0, 0)
mock_func.return_value = fake_api.get_fake_events()[1]
client = docker.from_env()
assert client.events(since=since) == mock_func.return_value
mock_func.assert_called_with(since=since)
@mock.patch('docker.api.APIClient.info')
def test_info(self, mock_func):
mock_func.return_value = fake_api.get_fake_info()[1]
client = docker.from_env()
assert client.info() == mock_func.return_value
mock_func.assert_called_with()
@mock.patch('docker.api.APIClient.ping')
def test_ping(self, mock_func):
mock_func.return_value = True
client = docker.from_env()
assert client.ping() is True
mock_func.assert_called_with()
@mock.patch('docker.api.APIClient.version')
def test_version(self, mock_func):
mock_func.return_value = fake_api.get_fake_version()[1]
client = docker.from_env()
assert client.version() == mock_func.return_value
mock_func.assert_called_with()
class FromEnvTest(unittest.TestCase):
def setUp(self):
self.os_environ = os.environ.copy()
def tearDown(self):
os.environ = self.os_environ
def test_from_env(self):
"""Test that environment variables are passed through to
utils.kwargs_from_env(). KwargsFromEnvTest tests that environment
variables are parsed correctly."""
os.environ.update(DOCKER_HOST='tcp://192.168.59.103:2376',
DOCKER_CERT_PATH=TEST_CERT_DIR,
DOCKER_TLS_VERIFY='1')
client = docker.from_env()
self.assertEqual(client.api.base_url, "https://192.168.59.103:2376")
def test_from_env_with_version(self):
os.environ.update(DOCKER_HOST='tcp://192.168.59.103:2376',
DOCKER_CERT_PATH=TEST_CERT_DIR,
DOCKER_TLS_VERIFY='1')
client = docker.from_env(version='2.32')
self.assertEqual(client.api.base_url, "https://192.168.59.103:2376")
self.assertEqual(client.api._version, '2.32')

22
tests/unit/errors_test.py Normal file
View File

@ -0,0 +1,22 @@
import unittest
from docker.errors import (APIError, DockerException,
create_unexpected_kwargs_error)
class APIErrorTest(unittest.TestCase):
def test_api_error_is_caught_by_dockerexception(self):
try:
raise APIError("this should be caught by DockerException")
except DockerException:
pass
class CreateUnexpectedKwargsErrorTest(unittest.TestCase):
def test_create_unexpected_kwargs_error_single(self):
e = create_unexpected_kwargs_error('f', {'foo': 'bar'})
assert str(e) == "f() got an unexpected keyword argument 'foo'"
def test_create_unexpected_kwargs_error_multiple(self):
e = create_unexpected_kwargs_error('f', {'foo': 'bar', 'baz': 'bosh'})
assert str(e) == "f() got unexpected keyword arguments 'baz', 'foo'"

View File

@ -6,6 +6,7 @@ CURRENT_VERSION = 'v{0}'.format(constants.DEFAULT_DOCKER_API_VERSION)
FAKE_CONTAINER_ID = '3cc2351ab11b'
FAKE_IMAGE_ID = 'e9aa60c60128'
FAKE_EXEC_ID = 'd5d177f121dc'
FAKE_NETWORK_ID = '33fb6a3462b8'
FAKE_IMAGE_NAME = 'test_image'
FAKE_TARBALL_PATH = '/path/to/tarball'
FAKE_REPO_NAME = 'repo'
@ -46,6 +47,17 @@ def get_fake_info():
return status_code, response
def post_fake_auth():
status_code = 200
response = {'Status': 'Login Succeeded',
'IdentityToken': '9cbaf023786cd7'}
return status_code, response
def get_fake_ping():
return 200, "OK"
def get_fake_search():
status_code = 200
response = [{'Name': 'busybox', 'Description': 'Fake Description'}]
@ -125,7 +137,9 @@ def get_fake_inspect_container(tty=False):
'Config': {'Privileged': True, 'Tty': tty},
'ID': FAKE_CONTAINER_ID,
'Image': 'busybox:latest',
'Name': 'foobar',
"State": {
"Status": "running",
"Running": True,
"Pid": 0,
"ExitCode": 0,
@ -140,11 +154,11 @@ def get_fake_inspect_container(tty=False):
def get_fake_inspect_image():
status_code = 200
response = {
'id': FAKE_IMAGE_ID,
'parent': "27cf784147099545",
'created': "2013-03-23T22:24:18.818426-07:00",
'container': FAKE_CONTAINER_ID,
'container_config':
'Id': FAKE_IMAGE_ID,
'Parent': "27cf784147099545",
'Created': "2013-03-23T22:24:18.818426-07:00",
'Container': FAKE_CONTAINER_ID,
'ContainerConfig':
{
"Hostname": "",
"User": "",
@ -411,6 +425,61 @@ def post_fake_update_node():
return 200, None
def get_fake_network_list():
return 200, [{
"Name": "bridge",
"Id": FAKE_NETWORK_ID,
"Scope": "local",
"Driver": "bridge",
"EnableIPv6": False,
"Internal": False,
"IPAM": {
"Driver": "default",
"Config": [
{
"Subnet": "172.17.0.0/16"
}
]
},
"Containers": {
FAKE_CONTAINER_ID: {
"EndpointID": "ed2419a97c1d99",
"MacAddress": "02:42:ac:11:00:02",
"IPv4Address": "172.17.0.2/16",
"IPv6Address": ""
}
},
"Options": {
"com.docker.network.bridge.default_bridge": "true",
"com.docker.network.bridge.enable_icc": "true",
"com.docker.network.bridge.enable_ip_masquerade": "true",
"com.docker.network.bridge.host_binding_ipv4": "0.0.0.0",
"com.docker.network.bridge.name": "docker0",
"com.docker.network.driver.mtu": "1500"
}
}]
def get_fake_network():
return 200, get_fake_network_list()[1][0]
def post_fake_network():
return 201, {"Id": FAKE_NETWORK_ID, "Warnings": []}
def delete_fake_network():
return 204, None
def post_fake_network_connect():
return 200, None
def post_fake_network_disconnect():
return 200, None
# Maps real api url to fake response callback
prefix = 'http+docker://localunixsocket'
if constants.IS_WINDOWS_PLATFORM:
@ -423,6 +492,10 @@ fake_responses = {
get_fake_version,
'{1}/{0}/info'.format(CURRENT_VERSION, prefix):
get_fake_info,
'{1}/{0}/auth'.format(CURRENT_VERSION, prefix):
post_fake_auth,
'{1}/{0}/_ping'.format(CURRENT_VERSION, prefix):
get_fake_ping,
'{1}/{0}/images/search'.format(CURRENT_VERSION, prefix):
get_fake_search,
'{1}/{0}/images/json'.format(CURRENT_VERSION, prefix):
@ -516,4 +589,24 @@ fake_responses = {
CURRENT_VERSION, prefix, FAKE_NODE_ID
), 'POST'):
post_fake_update_node,
('{1}/{0}/networks'.format(CURRENT_VERSION, prefix), 'GET'):
get_fake_network_list,
('{1}/{0}/networks/create'.format(CURRENT_VERSION, prefix), 'POST'):
post_fake_network,
('{1}/{0}/networks/{2}'.format(
CURRENT_VERSION, prefix, FAKE_NETWORK_ID
), 'GET'):
get_fake_network,
('{1}/{0}/networks/{2}'.format(
CURRENT_VERSION, prefix, FAKE_NETWORK_ID
), 'DELETE'):
delete_fake_network,
('{1}/{0}/networks/{2}/connect'.format(
CURRENT_VERSION, prefix, FAKE_NETWORK_ID
), 'POST'):
post_fake_network_connect,
('{1}/{0}/networks/{2}/disconnect'.format(
CURRENT_VERSION, prefix, FAKE_NETWORK_ID
), 'POST'):
post_fake_network_disconnect,
}

View File

@ -0,0 +1,61 @@
import copy
import docker
from . import fake_api
try:
from unittest import mock
except ImportError:
import mock
class CopyReturnMagicMock(mock.MagicMock):
"""
A MagicMock which deep copies every return value.
"""
def _mock_call(self, *args, **kwargs):
ret = super(CopyReturnMagicMock, self)._mock_call(*args, **kwargs)
if isinstance(ret, (dict, list)):
ret = copy.deepcopy(ret)
return ret
def make_fake_api_client():
"""
Returns non-complete fake APIClient.
This returns most of the default cases correctly, but most arguments that
change behaviour will not work.
"""
api_client = docker.APIClient()
mock_client = CopyReturnMagicMock(**{
'build.return_value': fake_api.FAKE_IMAGE_ID,
'commit.return_value': fake_api.post_fake_commit()[1],
'containers.return_value': fake_api.get_fake_containers()[1],
'create_container.return_value':
fake_api.post_fake_create_container()[1],
'create_host_config.side_effect': api_client.create_host_config,
'create_network.return_value': fake_api.post_fake_network()[1],
'exec_create.return_value': fake_api.post_fake_exec_create()[1],
'exec_start.return_value': fake_api.post_fake_exec_start()[1],
'images.return_value': fake_api.get_fake_images()[1],
'inspect_container.return_value':
fake_api.get_fake_inspect_container()[1],
'inspect_image.return_value': fake_api.get_fake_inspect_image()[1],
'inspect_network.return_value': fake_api.get_fake_network()[1],
'logs.return_value': 'hello world\n',
'networks.return_value': fake_api.get_fake_network_list()[1],
'start.return_value': None,
'wait.return_value': 0,
})
mock_client._version = docker.constants.DEFAULT_DOCKER_API_VERSION
return mock_client
def make_fake_client():
"""
Returns a Client with a fake APIClient.
"""
client = docker.Client()
client.api = make_fake_api_client()
return client

View File

@ -0,0 +1,465 @@
import docker
from docker.models.containers import Container, _create_container_args
from docker.models.images import Image
import unittest
from .fake_api import FAKE_CONTAINER_ID, FAKE_IMAGE_ID, FAKE_EXEC_ID
from .fake_api_client import make_fake_client
class ContainerCollectionTest(unittest.TestCase):
def test_run(self):
client = make_fake_client()
out = client.containers.run("alpine", "echo hello world")
assert out == 'hello world\n'
client.api.create_container.assert_called_with(
image="alpine",
command="echo hello world",
detach=False,
host_config={'NetworkMode': 'default'}
)
client.api.inspect_container.assert_called_with(FAKE_CONTAINER_ID)
client.api.start.assert_called_with(FAKE_CONTAINER_ID)
client.api.wait.assert_called_with(FAKE_CONTAINER_ID)
client.api.logs.assert_called_with(
FAKE_CONTAINER_ID,
stderr=False,
stdout=True
)
def test_create_container_args(self):
create_kwargs = _create_container_args(dict(
image='alpine',
command='echo hello world',
blkio_weight_device=[{'Path': 'foo', 'Weight': 3}],
blkio_weight=2,
cap_add=['foo'],
cap_drop=['bar'],
cgroup_parent='foobar',
cpu_period=1,
cpu_quota=2,
cpu_shares=5,
cpuset_cpus='0-3',
detach=False,
device_read_bps=[{'Path': 'foo', 'Rate': 3}],
device_read_iops=[{'Path': 'foo', 'Rate': 3}],
device_write_bps=[{'Path': 'foo', 'Rate': 3}],
device_write_iops=[{'Path': 'foo', 'Rate': 3}],
devices=['/dev/sda:/dev/xvda:rwm'],
dns=['8.8.8.8'],
domainname='example.com',
dns_opt=['foo'],
dns_search=['example.com'],
entrypoint='/bin/sh',
environment={'FOO': 'BAR'},
extra_hosts={'foo': '1.2.3.4'},
group_add=['blah'],
ipc_mode='foo',
kernel_memory=123,
labels={'key': 'value'},
links={'foo': 'bar'},
log_config={'Type': 'json-file', 'Config': {}},
lxc_conf={'foo': 'bar'},
healthcheck={'test': 'true'},
hostname='somehost',
mac_address='abc123',
mem_limit=123,
mem_reservation=123,
mem_swappiness=2,
memswap_limit=456,
name='somename',
network_disabled=False,
network_mode='blah',
networks=['foo'],
oom_kill_disable=True,
oom_score_adj=5,
pid_mode='host',
pids_limit=500,
ports={
1111: 4567,
2222: None
},
privileged=True,
publish_all_ports=True,
read_only=True,
restart_policy={'Name': 'always'},
security_opt=['blah'],
shm_size=123,
stdin_open=True,
stop_signal=9,
sysctls={'foo': 'bar'},
tmpfs={'/blah': ''},
tty=True,
ulimits=[{"Name": "nofile", "Soft": 1024, "Hard": 2048}],
user='bob',
userns_mode='host',
version='1.23',
volume_driver='some_driver',
volumes=[
'/home/user1/:/mnt/vol2',
'/var/www:/mnt/vol1:ro',
],
volumes_from=['container'],
working_dir='/code'
))
expected = dict(
image='alpine',
command='echo hello world',
domainname='example.com',
detach=False,
entrypoint='/bin/sh',
environment={'FOO': 'BAR'},
host_config={
'Binds': [
'/home/user1/:/mnt/vol2',
'/var/www:/mnt/vol1:ro',
],
'BlkioDeviceReadBps': [{'Path': 'foo', 'Rate': 3}],
'BlkioDeviceReadIOps': [{'Path': 'foo', 'Rate': 3}],
'BlkioDeviceWriteBps': [{'Path': 'foo', 'Rate': 3}],
'BlkioDeviceWriteIOps': [{'Path': 'foo', 'Rate': 3}],
'BlkioWeightDevice': [{'Path': 'foo', 'Weight': 3}],
'BlkioWeight': 2,
'CapAdd': ['foo'],
'CapDrop': ['bar'],
'CgroupParent': 'foobar',
'CpuPeriod': 1,
'CpuQuota': 2,
'CpuShares': 5,
'CpuSetCpus': '0-3',
'Devices': [{'PathOnHost': '/dev/sda',
'CgroupPermissions': 'rwm',
'PathInContainer': '/dev/xvda'}],
'Dns': ['8.8.8.8'],
'DnsOptions': ['foo'],
'DnsSearch': ['example.com'],
'ExtraHosts': ['foo:1.2.3.4'],
'GroupAdd': ['blah'],
'IpcMode': 'foo',
'KernelMemory': 123,
'Links': ['foo:bar'],
'LogConfig': {'Type': 'json-file', 'Config': {}},
'LxcConf': [{'Key': 'foo', 'Value': 'bar'}],
'Memory': 123,
'MemoryReservation': 123,
'MemorySwap': 456,
'MemorySwappiness': 2,
'NetworkMode': 'blah',
'OomKillDisable': True,
'OomScoreAdj': 5,
'PidMode': 'host',
'PidsLimit': 500,
'PortBindings': {
'1111/tcp': [{'HostIp': '', 'HostPort': '4567'}],
'2222/tcp': [{'HostIp': '', 'HostPort': ''}]
},
'Privileged': True,
'PublishAllPorts': True,
'ReadonlyRootfs': True,
'RestartPolicy': {'Name': 'always'},
'SecurityOpt': ['blah'],
'ShmSize': 123,
'Sysctls': {'foo': 'bar'},
'Tmpfs': {'/blah': ''},
'Ulimits': [{"Name": "nofile", "Soft": 1024, "Hard": 2048}],
'UsernsMode': 'host',
'VolumesFrom': ['container'],
},
healthcheck={'test': 'true'},
hostname='somehost',
labels={'key': 'value'},
mac_address='abc123',
name='somename',
network_disabled=False,
networking_config={'foo': None},
ports=[('1111', 'tcp'), ('2222', 'tcp')],
stdin_open=True,
stop_signal=9,
tty=True,
user='bob',
volume_driver='some_driver',
volumes=['/home/user1/', '/var/www'],
working_dir='/code'
)
assert create_kwargs == expected
def test_run_detach(self):
client = make_fake_client()
container = client.containers.run('alpine', 'sleep 300', detach=True)
assert isinstance(container, Container)
assert container.id == FAKE_CONTAINER_ID
client.api.create_container.assert_called_with(
image='alpine',
command='sleep 300',
detach=True,
host_config={
'NetworkMode': 'default',
}
)
client.api.inspect_container.assert_called_with(FAKE_CONTAINER_ID)
client.api.start.assert_called_with(FAKE_CONTAINER_ID)
def test_run_pull(self):
client = make_fake_client()
# raise exception on first call, then return normal value
client.api.create_container.side_effect = [
docker.errors.ImageNotFound(""),
client.api.create_container.return_value
]
container = client.containers.run('alpine', 'sleep 300', detach=True)
assert container.id == FAKE_CONTAINER_ID
client.api.pull.assert_called_with('alpine')
def test_run_with_error(self):
client = make_fake_client()
client.api.logs.return_value = "some error"
client.api.wait.return_value = 1
with self.assertRaises(docker.errors.ContainerError) as cm:
client.containers.run('alpine', 'echo hello world')
assert cm.exception.exit_status == 1
assert "some error" in str(cm.exception)
def test_run_with_image_object(self):
client = make_fake_client()
image = client.images.get(FAKE_IMAGE_ID)
client.containers.run(image)
client.api.create_container.assert_called_with(
image=image.id,
command=None,
detach=False,
host_config={
'NetworkMode': 'default',
}
)
def test_run_remove(self):
client = make_fake_client()
client.containers.run("alpine")
client.api.remove_container.assert_not_called()
client = make_fake_client()
client.api.wait.return_value = 1
with self.assertRaises(docker.errors.ContainerError):
client.containers.run("alpine")
client.api.remove_container.assert_not_called()
client = make_fake_client()
client.containers.run("alpine", remove=True)
client.api.remove_container.assert_called_with(FAKE_CONTAINER_ID)
client = make_fake_client()
client.api.wait.return_value = 1
with self.assertRaises(docker.errors.ContainerError):
client.containers.run("alpine", remove=True)
client.api.remove_container.assert_called_with(FAKE_CONTAINER_ID)
client = make_fake_client()
with self.assertRaises(RuntimeError):
client.containers.run("alpine", detach=True, remove=True)
def test_create(self):
client = make_fake_client()
container = client.containers.create(
'alpine',
'echo hello world',
environment={'FOO': 'BAR'}
)
assert isinstance(container, Container)
assert container.id == FAKE_CONTAINER_ID
client.api.create_container.assert_called_with(
image='alpine',
command='echo hello world',
environment={'FOO': 'BAR'},
host_config={'NetworkMode': 'default'}
)
client.api.inspect_container.assert_called_with(FAKE_CONTAINER_ID)
def test_create_with_image_object(self):
client = make_fake_client()
image = client.images.get(FAKE_IMAGE_ID)
client.containers.create(image)
client.api.create_container.assert_called_with(
image=image.id,
command=None,
host_config={'NetworkMode': 'default'}
)
def test_get(self):
client = make_fake_client()
container = client.containers.get(FAKE_CONTAINER_ID)
assert isinstance(container, Container)
assert container.id == FAKE_CONTAINER_ID
client.api.inspect_container.assert_called_with(FAKE_CONTAINER_ID)
def test_list(self):
client = make_fake_client()
containers = client.containers.list(all=True)
client.api.containers.assert_called_with(
all=True,
before=None,
filters=None,
limit=-1,
since=None
)
client.api.inspect_container.assert_called_with(FAKE_CONTAINER_ID)
assert len(containers) == 1
assert isinstance(containers[0], Container)
assert containers[0].id == FAKE_CONTAINER_ID
class ContainerTest(unittest.TestCase):
def test_name(self):
client = make_fake_client()
container = client.containers.get(FAKE_CONTAINER_ID)
assert container.name == 'foobar'
def test_status(self):
client = make_fake_client()
container = client.containers.get(FAKE_CONTAINER_ID)
assert container.status == "running"
def test_attach(self):
client = make_fake_client()
container = client.containers.get(FAKE_CONTAINER_ID)
container.attach(stream=True)
client.api.attach.assert_called_with(FAKE_CONTAINER_ID, stream=True)
def test_commit(self):
client = make_fake_client()
container = client.containers.get(FAKE_CONTAINER_ID)
image = container.commit()
client.api.commit.assert_called_with(FAKE_CONTAINER_ID,
repository=None,
tag=None)
assert isinstance(image, Image)
assert image.id == FAKE_IMAGE_ID
def test_diff(self):
client = make_fake_client()
container = client.containers.get(FAKE_CONTAINER_ID)
container.diff()
client.api.diff.assert_called_with(FAKE_CONTAINER_ID)
def test_exec_run(self):
client = make_fake_client()
container = client.containers.get(FAKE_CONTAINER_ID)
container.exec_run("echo hello world", privileged=True, stream=True)
client.api.exec_create.assert_called_with(
FAKE_CONTAINER_ID, "echo hello world", stdout=True, stderr=True,
stdin=False, tty=False, privileged=True, user=''
)
client.api.exec_start.assert_called_with(
FAKE_EXEC_ID, detach=False, tty=False, stream=True, socket=False
)
def test_export(self):
client = make_fake_client()
container = client.containers.get(FAKE_CONTAINER_ID)
container.export()
client.api.export.assert_called_with(FAKE_CONTAINER_ID)
def test_get_archive(self):
client = make_fake_client()
container = client.containers.get(FAKE_CONTAINER_ID)
container.get_archive('foo')
client.api.get_archive.assert_called_with(FAKE_CONTAINER_ID, 'foo')
def test_kill(self):
client = make_fake_client()
container = client.containers.get(FAKE_CONTAINER_ID)
container.kill(signal=5)
client.api.kill.assert_called_with(FAKE_CONTAINER_ID, signal=5)
def test_logs(self):
client = make_fake_client()
container = client.containers.get(FAKE_CONTAINER_ID)
container.logs()
client.api.logs.assert_called_with(FAKE_CONTAINER_ID)
def test_pause(self):
client = make_fake_client()
container = client.containers.get(FAKE_CONTAINER_ID)
container.pause()
client.api.pause.assert_called_with(FAKE_CONTAINER_ID)
def test_put_archive(self):
client = make_fake_client()
container = client.containers.get(FAKE_CONTAINER_ID)
container.put_archive('path', 'foo')
client.api.put_archive.assert_called_with(FAKE_CONTAINER_ID,
'path', 'foo')
def test_remove(self):
client = make_fake_client()
container = client.containers.get(FAKE_CONTAINER_ID)
container.remove()
client.api.remove_container.assert_called_with(FAKE_CONTAINER_ID)
def test_rename(self):
client = make_fake_client()
container = client.containers.get(FAKE_CONTAINER_ID)
container.rename("foo")
client.api.rename.assert_called_with(FAKE_CONTAINER_ID, "foo")
def test_resize(self):
client = make_fake_client()
container = client.containers.get(FAKE_CONTAINER_ID)
container.resize(1, 2)
client.api.resize.assert_called_with(FAKE_CONTAINER_ID, 1, 2)
def test_restart(self):
client = make_fake_client()
container = client.containers.get(FAKE_CONTAINER_ID)
container.restart()
client.api.restart.assert_called_with(FAKE_CONTAINER_ID)
def test_start(self):
client = make_fake_client()
container = client.containers.get(FAKE_CONTAINER_ID)
container.start()
client.api.start.assert_called_with(FAKE_CONTAINER_ID)
def test_stats(self):
client = make_fake_client()
container = client.containers.get(FAKE_CONTAINER_ID)
container.stats()
client.api.stats.assert_called_with(FAKE_CONTAINER_ID)
def test_stop(self):
client = make_fake_client()
container = client.containers.get(FAKE_CONTAINER_ID)
container.stop()
client.api.stop.assert_called_with(FAKE_CONTAINER_ID)
def test_top(self):
client = make_fake_client()
container = client.containers.get(FAKE_CONTAINER_ID)
container.top()
client.api.top.assert_called_with(FAKE_CONTAINER_ID)
def test_unpause(self):
client = make_fake_client()
container = client.containers.get(FAKE_CONTAINER_ID)
container.unpause()
client.api.unpause.assert_called_with(FAKE_CONTAINER_ID)
def test_update(self):
client = make_fake_client()
container = client.containers.get(FAKE_CONTAINER_ID)
container.update(cpu_shares=2)
client.api.update_container.assert_called_with(FAKE_CONTAINER_ID,
cpu_shares=2)
def test_wait(self):
client = make_fake_client()
container = client.containers.get(FAKE_CONTAINER_ID)
container.wait()
client.api.wait.assert_called_with(FAKE_CONTAINER_ID)

View File

@ -0,0 +1,102 @@
from docker.models.images import Image
import unittest
from .fake_api import FAKE_IMAGE_ID
from .fake_api_client import make_fake_client
class ImageCollectionTest(unittest.TestCase):
def test_build(self):
client = make_fake_client()
image = client.images.build()
client.api.build.assert_called_with()
client.api.inspect_image.assert_called_with(FAKE_IMAGE_ID)
assert isinstance(image, Image)
assert image.id == FAKE_IMAGE_ID
def test_get(self):
client = make_fake_client()
image = client.images.get(FAKE_IMAGE_ID)
client.api.inspect_image.assert_called_with(FAKE_IMAGE_ID)
assert isinstance(image, Image)
assert image.id == FAKE_IMAGE_ID
def test_list(self):
client = make_fake_client()
images = client.images.list(all=True)
client.api.images.assert_called_with(all=True, name=None, filters=None)
assert len(images) == 1
assert isinstance(images[0], Image)
assert images[0].id == FAKE_IMAGE_ID
def test_load(self):
client = make_fake_client()
client.images.load('byte stream')
client.api.load_image.assert_called_with('byte stream')
def test_pull(self):
client = make_fake_client()
image = client.images.pull('test_image')
client.api.pull.assert_called_with('test_image')
client.api.inspect_image.assert_called_with('test_image')
assert isinstance(image, Image)
assert image.id == FAKE_IMAGE_ID
def test_push(self):
client = make_fake_client()
client.images.push('foobar', insecure_registry=True)
client.api.push.assert_called_with(
'foobar',
tag=None,
insecure_registry=True
)
def test_remove(self):
client = make_fake_client()
client.images.remove('test_image')
client.api.remove_image.assert_called_with('test_image')
def test_search(self):
client = make_fake_client()
client.images.search('test')
client.api.search.assert_called_with('test')
class ImageTest(unittest.TestCase):
def test_short_id(self):
image = Image(attrs={'Id': 'sha256:b6846070672ce4e8f1f91564ea6782bd675'
'f69d65a6f73ef6262057ad0a15dcd'})
assert image.short_id == 'sha256:b684607067'
image = Image(attrs={'Id': 'b6846070672ce4e8f1f91564ea6782bd675'
'f69d65a6f73ef6262057ad0a15dcd'})
assert image.short_id == 'b684607067'
def test_tags(self):
image = Image(attrs={
'RepoTags': ['test_image:latest']
})
assert image.tags == ['test_image:latest']
image = Image(attrs={
'RepoTags': ['<none>:<none>']
})
assert image.tags == []
def test_history(self):
client = make_fake_client()
image = client.images.get(FAKE_IMAGE_ID)
image.history()
client.api.history.assert_called_with(FAKE_IMAGE_ID)
def test_save(self):
client = make_fake_client()
image = client.images.get(FAKE_IMAGE_ID)
image.save()
client.api.get_image.assert_called_with(FAKE_IMAGE_ID)
def test_tag(self):
client = make_fake_client()
image = client.images.get(FAKE_IMAGE_ID)
image.tag('foo')
client.api.tag.assert_called_with(FAKE_IMAGE_ID, 'foo', tag=None)

View File

@ -0,0 +1,64 @@
import unittest
from .fake_api import FAKE_NETWORK_ID, FAKE_CONTAINER_ID
from .fake_api_client import make_fake_client
class ImageCollectionTest(unittest.TestCase):
def test_create(self):
client = make_fake_client()
network = client.networks.create("foobar", labels={'foo': 'bar'})
assert network.id == FAKE_NETWORK_ID
assert client.api.inspect_network.called_once_with(FAKE_NETWORK_ID)
assert client.api.create_network.called_once_with(
"foobar",
labels={'foo': 'bar'}
)
def test_get(self):
client = make_fake_client()
network = client.networks.get(FAKE_NETWORK_ID)
assert network.id == FAKE_NETWORK_ID
assert client.api.inspect_network.called_once_with(FAKE_NETWORK_ID)
def test_list(self):
client = make_fake_client()
networks = client.networks.list()
assert networks[0].id == FAKE_NETWORK_ID
assert client.api.networks.called_once_with()
client = make_fake_client()
client.networks.list(ids=["abc"])
assert client.api.networks.called_once_with(ids=["abc"])
client = make_fake_client()
client.networks.list(names=["foobar"])
assert client.api.networks.called_once_with(names=["foobar"])
class ImageTest(unittest.TestCase):
def test_connect(self):
client = make_fake_client()
network = client.networks.get(FAKE_NETWORK_ID)
network.connect(FAKE_CONTAINER_ID)
assert client.api.connect_container_to_network.called_once_with(
FAKE_CONTAINER_ID,
FAKE_NETWORK_ID
)
def test_disconnect(self):
client = make_fake_client()
network = client.networks.get(FAKE_NETWORK_ID)
network.disconnect(FAKE_CONTAINER_ID)
assert client.api.disconnect_container_from_network.called_once_with(
FAKE_CONTAINER_ID,
FAKE_NETWORK_ID
)
def test_remove(self):
client = make_fake_client()
network = client.networks.get(FAKE_NETWORK_ID)
network.remove()
assert client.api.remove_network.called_once_with(FAKE_NETWORK_ID)

View File

@ -0,0 +1,14 @@
import unittest
from .fake_api import FAKE_CONTAINER_ID
from .fake_api_client import make_fake_client
class ModelTest(unittest.TestCase):
def test_reload(self):
client = make_fake_client()
container = client.containers.get(FAKE_CONTAINER_ID)
container.attrs['Name'] = "oldname"
container.reload()
assert client.api.inspect_container.call_count == 2
assert container.attrs['Name'] == "foobar"

View File

@ -0,0 +1,52 @@
import unittest
from docker.models.services import _get_create_service_kwargs
class CreateServiceKwargsTest(unittest.TestCase):
def test_get_create_service_kwargs(self):
kwargs = _get_create_service_kwargs('test', {
'image': 'foo',
'command': 'true',
'name': 'somename',
'labels': {'key': 'value'},
'mode': 'global',
'update_config': {'update': 'config'},
'networks': ['somenet'],
'endpoint_spec': {'blah': 'blah'},
'container_labels': {'containerkey': 'containervalue'},
'resources': {'foo': 'bar'},
'restart_policy': {'restart': 'policy'},
'log_driver': 'logdriver',
'log_driver_options': {'foo': 'bar'},
'args': ['some', 'args'],
'env': {'FOO': 'bar'},
'workdir': '/',
'user': 'bob',
'mounts': [{'some': 'mounts'}],
'stop_grace_period': 5,
'constraints': ['foo=bar'],
})
task_template = kwargs.pop('task_template')
assert kwargs == {
'name': 'somename',
'labels': {'key': 'value'},
'mode': 'global',
'update_config': {'update': 'config'},
'networks': ['somenet'],
'endpoint_spec': {'blah': 'blah'},
}
assert set(task_template.keys()) == set([
'ContainerSpec', 'Resources', 'RestartPolicy', 'Placement',
'LogDriver'
])
assert task_template['Placement'] == {'Constraints': ['foo=bar']}
assert task_template['LogDriver'] == {
'Name': 'logdriver',
'Options': {'foo': 'bar'}
}
assert set(task_template['ContainerSpec'].keys()) == set([
'Image', 'Command', 'Args', 'Env', 'Dir', 'User', 'Labels',
'Mounts', 'StopGracePeriod'
])

View File

@ -0,0 +1,62 @@
# encoding: utf-8
from __future__ import absolute_import
from __future__ import unicode_literals
from docker.utils.json_stream import json_splitter, stream_as_text, json_stream
class TestJsonSplitter(object):
def test_json_splitter_no_object(self):
data = '{"foo": "bar'
assert json_splitter(data) is None
def test_json_splitter_with_object(self):
data = '{"foo": "bar"}\n \n{"next": "obj"}'
assert json_splitter(data) == ({'foo': 'bar'}, '{"next": "obj"}')
def test_json_splitter_leading_whitespace(self):
data = '\n \r{"foo": "bar"}\n\n {"next": "obj"}'
assert json_splitter(data) == ({'foo': 'bar'}, '{"next": "obj"}')
class TestStreamAsText(object):
def test_stream_with_non_utf_unicode_character(self):
stream = [b'\xed\xf3\xf3']
output, = stream_as_text(stream)
assert output == '<EFBFBD><EFBFBD><EFBFBD>'
def test_stream_with_utf_character(self):
stream = ['ěĝ'.encode('utf-8')]
output, = stream_as_text(stream)
assert output == 'ěĝ'
class TestJsonStream(object):
def test_with_falsy_entries(self):
stream = [
'{"one": "two"}\n{}\n',
"[1, 2, 3]\n[]\n",
]
output = list(json_stream(stream))
assert output == [
{'one': 'two'},
{},
[1, 2, 3],
[],
]
def test_with_leading_whitespace(self):
stream = [
'\n \r\n {"one": "two"}{"x": 1}',
' {"three": "four"}\t\t{"x": 2}'
]
output = list(json_stream(stream))
assert output == [
{'one': 'two'},
{'x': 1},
{'three': 'four'},
{'x': 2}
]