Merge upstream branch 'master' into feature/logs_since

Signed-off-by: Viacheslav Boiko <v.e.boyko@gmail.com>
This commit is contained in:
Viacheslav Boiko 2015-11-05 11:33:16 +01:00
commit 3330569772
No known key found for this signature in database
GPG Key ID: 9A82D3B0D85AD24C
49 changed files with 4264 additions and 3390 deletions

View File

@ -11,25 +11,35 @@ build:
build-py3:
docker build -t docker-py3 -f Dockerfile-py3 .
test: flake8 unit-test unit-test-py3 integration-dind
build-dind-certs:
docker build -t dpy-dind-certs -f tests/Dockerfile-dind-certs .
test: flake8 unit-test unit-test-py3 integration-dind integration-dind-ssl
unit-test: build
docker run docker-py py.test tests/test.py tests/utils_test.py
docker run docker-py py.test tests/unit
unit-test-py3: build-py3
docker run docker-py3 py.test tests/test.py tests/utils_test.py
docker run docker-py3 py.test tests/unit
integration-test: build
docker run -v /var/run/docker.sock:/var/run/docker.sock docker-py py.test -rxs tests/integration_test.py
docker run -v /var/run/docker.sock:/var/run/docker.sock docker-py py.test tests/integration
integration-test-py3: build-py3
docker run -v /var/run/docker.sock:/var/run/docker.sock docker-py3 py.test -rxs tests/integration_test.py
docker run -v /var/run/docker.sock:/var/run/docker.sock docker-py3 py.test tests/integration
integration-dind: build build-py3
docker run -d --name dpy-dind --privileged dockerswarm/dind:1.8.1 docker -d -H tcp://0.0.0.0:2375
docker run --env="DOCKER_HOST=tcp://docker:2375" --link=dpy-dind:docker docker-py py.test -rxs tests/integration_test.py
docker run --env="DOCKER_HOST=tcp://docker:2375" --link=dpy-dind:docker docker-py3 py.test -rxs tests/integration_test.py
docker run -d --name dpy-dind --env="DOCKER_HOST=tcp://localhost:2375" --privileged dockerswarm/dind:1.8.1 docker -d -H tcp://0.0.0.0:2375
docker run --env="DOCKER_HOST=tcp://docker:2375" --link=dpy-dind:docker docker-py py.test tests/integration
docker run --env="DOCKER_HOST=tcp://docker:2375" --link=dpy-dind:docker docker-py3 py.test tests/integration
docker rm -vf dpy-dind
integration-dind-ssl: build-dind-certs build build-py3
docker run -d --name dpy-dind-certs dpy-dind-certs
docker run -d --env="DOCKER_HOST=tcp://localhost:2375" --env="DOCKER_TLS_VERIFY=1" --env="DOCKER_CERT_PATH=/certs" --volumes-from dpy-dind-certs --name dpy-dind-ssl -v /tmp --privileged dockerswarm/dind:1.8.1 docker daemon --tlsverify --tlscacert=/certs/ca.pem --tlscert=/certs/server-cert.pem --tlskey=/certs/server-key.pem -H tcp://0.0.0.0:2375
docker run --volumes-from dpy-dind-ssl --env="DOCKER_HOST=tcp://docker:2375" --env="DOCKER_TLS_VERIFY=1" --env="DOCKER_CERT_PATH=/certs" --link=dpy-dind-ssl:docker docker-py py.test tests/integration_test.py
docker run --volumes-from dpy-dind-ssl --env="DOCKER_HOST=tcp://docker:2375" --env="DOCKER_TLS_VERIFY=1" --env="DOCKER_CERT_PATH=/certs" --link=dpy-dind-ssl:docker docker-py3 py.test tests/integration_test.py
docker rm -vf dpy-dind-ssl dpy-dind-certs
flake8: build
docker run docker-py flake8 docker tests
docker run docker-py flake8 docker tests

View File

@ -5,3 +5,4 @@ from .daemon import DaemonApiMixin
from .exec_api import ExecApiMixin
from .image import ImageApiMixin
from .volume import VolumeApiMixin
from .network import NetworkApiMixin

View File

@ -76,6 +76,12 @@ class ContainerApiMixin(object):
@utils.check_resource
def copy(self, container, resource):
if utils.version_gte(self._version, '1.20'):
warnings.warn(
'Client.copy() is deprecated for API version >= 1.20, '
'please use get_archive() instead',
DeprecationWarning
)
res = self._post_json(
self._url("/containers/{0}/copy".format(container)),
data={"Resource": resource},
@ -146,6 +152,21 @@ class ContainerApiMixin(object):
self._raise_for_status(res)
return res.raw
@utils.check_resource
@utils.minimum_version('1.20')
def get_archive(self, container, path):
params = {
'path': path
}
url = self._url('/containers/{0}/archive', container)
res = self._get(url, params=params, stream=True)
self._raise_for_status(res)
encoded_stat = res.headers.get('x-docker-container-path-stat')
return (
res.raw,
utils.decode_json_header(encoded_stat) if encoded_stat else None
)
@utils.check_resource
def inspect_container(self, container):
return self._result(
@ -226,6 +247,15 @@ class ContainerApiMixin(object):
return h_ports
@utils.check_resource
@utils.minimum_version('1.20')
def put_archive(self, container, path, data):
params = {'path': path}
url = self._url('/containers/{0}/archive', container)
res = self._put(url, params=params, data=data)
self._raise_for_status(res)
return res.status_code == 200
@utils.check_resource
def remove_container(self, container, v=False, link=False, force=False):
params = {'v': v, 'link': link, 'force': force}

View File

@ -1,5 +1,3 @@
import shlex
import six
from .. import errors
@ -20,7 +18,7 @@ class ExecApiMixin(object):
'User-specific exec is not supported in API < 1.19'
)
if isinstance(cmd, six.string_types):
cmd = shlex.split(str(cmd))
cmd = utils.split_command(cmd)
data = {
'Container': container,

55
docker/api/network.py Normal file
View File

@ -0,0 +1,55 @@
import json
from ..utils import check_resource, minimum_version
class NetworkApiMixin(object):
@minimum_version('1.21')
def networks(self, names=None, ids=None):
filters = {}
if names:
filters['name'] = names
if ids:
filters['id'] = ids
params = {'filters': json.dumps(filters)}
url = self._url("/networks")
res = self._get(url, params=params)
return self._result(res, json=True)
@minimum_version('1.21')
def create_network(self, name, driver=None):
data = {
'name': name,
'driver': driver,
}
url = self._url("/networks/create")
res = self._post_json(url, data=data)
return self._result(res, json=True)
@minimum_version('1.21')
def remove_network(self, net_id):
url = self._url("/networks/{0}", net_id)
res = self._delete(url)
self._raise_for_status(res)
@minimum_version('1.21')
def inspect_network(self, net_id):
url = self._url("/networks/{0}", net_id)
res = self._get(url)
return self._result(res, json=True)
@check_resource
@minimum_version('1.21')
def connect_container_to_network(self, container, net_id):
data = {"container": container}
url = self._url("/networks/{0}/connect", net_id)
self._post_json(url, data=data)
@check_resource
@minimum_version('1.21')
def disconnect_container_from_network(self, container, net_id):
data = {"container": container}
url = self._url("/networks/{0}/disconnect", net_id)
self._post_json(url, data=data)

View File

@ -12,7 +12,7 @@ class VolumeApiMixin(object):
@utils.minimum_version('1.21')
def create_volume(self, name, driver=None, driver_opts=None):
url = self._url('/volumes')
url = self._url('/volumes/create')
if driver_opts is not None and not isinstance(driver_opts, dict):
raise TypeError('driver_opts must be a dictionary')

View File

@ -13,7 +13,6 @@
# limitations under the License.
import base64
import fileinput
import json
import logging
import os
@ -102,7 +101,7 @@ def decode_auth(auth):
def encode_header(auth):
auth_json = json.dumps(auth).encode('ascii')
return base64.b64encode(auth_json)
return base64.urlsafe_b64encode(auth_json)
def parse_auth(entries):
@ -132,78 +131,79 @@ def parse_auth(entries):
return conf
def find_config_file(config_path=None):
environment_path = os.path.join(
os.environ.get('DOCKER_CONFIG'),
os.path.basename(DOCKER_CONFIG_FILENAME)
) if os.environ.get('DOCKER_CONFIG') else None
paths = [
config_path, # 1
environment_path, # 2
os.path.join(os.path.expanduser('~'), DOCKER_CONFIG_FILENAME), # 3
os.path.join(
os.path.expanduser('~'), LEGACY_DOCKER_CONFIG_FILENAME
) # 4
]
for path in paths:
if path and os.path.exists(path):
return path
return None
def load_config(config_path=None):
"""
Loads authentication data from a Docker configuration file in the given
root directory or if config_path is passed use given path.
Lookup priority:
explicit config_path parameter > DOCKER_CONFIG environment variable >
~/.docker/config.json > ~/.dockercfg
"""
conf = {}
data = None
# Prefer ~/.docker/config.json.
config_file = config_path or os.path.join(os.path.expanduser('~'),
DOCKER_CONFIG_FILENAME)
config_file = find_config_file(config_path)
log.debug("Trying {0}".format(config_file))
if os.path.exists(config_file):
try:
with open(config_file) as f:
for section, data in six.iteritems(json.load(f)):
if section != 'auths':
continue
log.debug("Found 'auths' section")
return parse_auth(data)
log.debug("Couldn't find 'auths' section")
except (IOError, KeyError, ValueError) as e:
# Likely missing new Docker config file or it's in an
# unknown format, continue to attempt to read old location
# and format.
log.debug(e)
pass
else:
if not config_file:
log.debug("File doesn't exist")
config_file = config_path or os.path.join(os.path.expanduser('~'),
LEGACY_DOCKER_CONFIG_FILENAME)
log.debug("Trying {0}".format(config_file))
if not os.path.exists(config_file):
log.debug("File doesn't exist - returning empty config")
return {}
log.debug("Attempting to parse as JSON")
try:
with open(config_file) as f:
return parse_auth(json.load(f))
except Exception as e:
data = json.load(f)
if data.get('auths'):
log.debug("Found 'auths' section")
return parse_auth(data['auths'])
else:
log.debug("Couldn't find 'auths' section")
f.seek(0)
return parse_auth(json.load(f))
except (IOError, KeyError, ValueError) as e:
# Likely missing new Docker config file or it's in an
# unknown format, continue to attempt to read old location
# and format.
log.debug(e)
pass
# If that fails, we assume the configuration file contains a single
# authentication token for the public registry in the following format:
#
# auth = AUTH_TOKEN
# email = email@domain.com
log.debug("Attempting to parse legacy auth file format")
try:
data = []
for line in fileinput.input(config_file):
data.append(line.strip().split(' = ')[1])
if len(data) < 2:
# Not enough data
raise errors.InvalidConfigFile(
'Invalid or empty configuration file!')
with open(config_file) as f:
for line in f.readlines():
data.append(line.strip().split(' = ')[1])
if len(data) < 2:
# Not enough data
raise errors.InvalidConfigFile(
'Invalid or empty configuration file!'
)
username, password = decode_auth(data[0])
conf[INDEX_NAME] = {
'username': username,
'password': password,
'email': data[1],
'serveraddress': INDEX_URL,
return {
INDEX_NAME: {
'username': username,
'password': password,
'email': data[1],
'serveraddress': INDEX_URL,
}
}
return conf
except Exception as e:
log.debug(e)
pass

View File

@ -39,7 +39,8 @@ class Client(
api.DaemonApiMixin,
api.ExecApiMixin,
api.ImageApiMixin,
api.VolumeApiMixin):
api.VolumeApiMixin,
api.NetworkApiMixin):
def __init__(self, base_url=None, version=None,
timeout=constants.DEFAULT_TIMEOUT_SECONDS, tls=False):
super(Client, self).__init__()
@ -108,6 +109,9 @@ class Client(
def _get(self, url, **kwargs):
return self.get(url, **self._set_request_timeout(kwargs))
def _put(self, url, **kwargs):
return self.put(url, **self._set_request_timeout(kwargs))
def _delete(self, url, **kwargs):
return self.delete(url, **self._set_request_timeout(kwargs))
@ -184,6 +188,8 @@ class Client(
self._raise_for_status(response)
if six.PY3:
sock = response.raw._fp.fp.raw
if self.base_url.startswith("https://"):
sock = sock._sock
else:
sock = response.raw._fp.fp._sock
try:
@ -240,10 +246,7 @@ class Client(
# Disable timeout on the underlying socket to prevent
# Read timed out(s) for long running processes
socket = self._get_raw_response_socket(response)
if six.PY3:
socket._sock.settimeout(None)
else:
socket.settimeout(None)
self._disable_socket_timeout(socket)
while True:
header = response.raw.read(constants.STREAM_HEADER_SIZE_BYTES)
@ -272,6 +275,19 @@ class Client(
for out in response.iter_content(chunk_size=1, decode_unicode=True):
yield out
def _disable_socket_timeout(self, socket):
""" Depending on the combination of python version and whether we're
connecting over http or https, we might need to access _sock, which
may or may not exist; or we may need to just settimeout on socket
itself, which also may or may not have settimeout on it.
To avoid missing the correct one, we try both.
"""
if hasattr(socket, "settimeout"):
socket.settimeout(None)
if hasattr(socket, "_sock") and hasattr(socket._sock, "settimeout"):
socket._sock.settimeout(None)
def _get_result(self, container, stream, res):
cont = self.inspect_container(container)
return self._get_result_tty(stream, res, cont['Config']['Tty'])

View File

@ -73,12 +73,20 @@ class UnixAdapter(requests.adapters.HTTPAdapter):
if pool:
return pool
pool = UnixHTTPConnectionPool(url,
self.socket_path,
self.timeout)
pool = UnixHTTPConnectionPool(
url, self.socket_path, self.timeout
)
self.pools[url] = pool
return pool
def request_url(self, request, proxies):
# The select_proxy utility in requests errors out when the provided URL
# doesn't have a hostname, like is the case when using a UNIX socket.
# Since proxies are an irrelevant notion in the case of UNIX sockets
# anyway, we simply return the path URL directly.
# See also: https://github.com/docker/docker-py/issues/811
return request.path_url
def close(self):
self.pools.clear()

View File

@ -3,7 +3,7 @@ from .utils import (
mkbuildcontext, tar, exclude_paths, parse_repository_tag, parse_host,
kwargs_from_env, convert_filters, datetime_to_timestamp, create_host_config,
create_container_config, parse_bytes, ping_registry, parse_env_file,
version_lt, version_gte
version_lt, version_gte, decode_json_header, split_command,
) # flake8: noqa
from .types import Ulimit, LogConfig # flake8: noqa

View File

@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import io
import os
import os.path
@ -66,6 +67,13 @@ def mkbuildcontext(dockerfile):
return f
def decode_json_header(header):
data = base64.b64decode(header)
if six.PY3:
data = data.decode('utf-8')
return json.loads(data)
def tar(path, exclude=None, dockerfile=None):
f = tempfile.NamedTemporaryFile()
t = tarfile.open(mode='w', fileobj=f)
@ -242,6 +250,9 @@ def convert_volume_binds(binds):
result = []
for k, v in binds.items():
if isinstance(k, six.binary_type):
k = k.decode('utf-8')
if isinstance(v, dict):
if 'ro' in v and 'mode' in v:
raise ValueError(
@ -249,6 +260,10 @@ def convert_volume_binds(binds):
.format(repr(v))
)
bind = v['bind']
if isinstance(bind, six.binary_type):
bind = bind.decode('utf-8')
if 'ro' in v:
mode = 'ro' if v['ro'] else 'rw'
elif 'mode' in v:
@ -256,11 +271,15 @@ def convert_volume_binds(binds):
else:
mode = 'rw'
result.append('{0}:{1}:{2}'.format(
k, v['bind'], mode
))
result.append(
six.text_type('{0}:{1}:{2}').format(k, bind, mode)
)
else:
result.append('{0}:{1}:rw'.format(k, v))
if isinstance(v, six.binary_type):
v = v.decode('utf-8')
result.append(
six.text_type('{0}:{1}:rw').format(k, v)
)
return result
@ -654,6 +673,12 @@ def parse_env_file(env_file):
return environment
def split_command(command):
if six.PY2 and not isinstance(command, six.binary_type):
command = command.encode('utf-8')
return shlex.split(command)
def create_container_config(
version, image, command, hostname=None, user=None, detach=False,
stdin_open=False, tty=False, mem_limit=None, ports=None, environment=None,
@ -663,10 +688,10 @@ def create_container_config(
labels=None, volume_driver=None
):
if isinstance(command, six.string_types):
command = shlex.split(str(command))
command = split_command(command)
if isinstance(entrypoint, six.string_types):
entrypoint = shlex.split(str(entrypoint))
entrypoint = split_command(entrypoint)
if isinstance(environment, dict):
environment = [

View File

@ -1,2 +1,2 @@
version = "1.5.0-dev"
version = "1.6.0-dev"
version_info = tuple([int(d) for d in version.split("-")[0].split(".")])

View File

@ -4,8 +4,8 @@ To instantiate a `Client` class that will allow you to communicate with a
Docker daemon, simply do:
```python
from docker import Client
c = Client(base_url='unix://var/run/docker.sock')
>>> from docker import Client
>>> cli = Client(base_url='unix://var/run/docker.sock')
```
**Params**:
@ -165,6 +165,8 @@ non-running ones
## copy
Identical to the `docker cp` command. Get files/folders from the container.
**Deprecated for API version >= 1.20** &ndash; Consider using
[`get_archive`](#get_archive) **instead.**
**Params**:
@ -214,7 +216,7 @@ from. Optionally a single string joining container id's with commas
* network_disabled (bool): Disable networking
* name (str): A name for the container
* entrypoint (str or list): An entrypoint
* cpu_shares (int or float): CPU shares (relative weight)
* cpu_shares (int): CPU shares (relative weight)
* working_dir (str): Path to the working directory
* domainname (str or list): Set custom DNS search domains
* memswap_limit (int):
@ -248,9 +250,9 @@ PASSWORD=secret
The utility can be used as follows:
```python
>> import docker.utils
>> my_envs = docker.utils.parse_env_file('/path/to/file')
>> docker.utils.create_container_config('1.18', '_mongodb', 'foobar', environment=my_envs)
>>> import docker.utils
>>> my_envs = docker.utils.parse_env_file('/path/to/file')
>>> docker.utils.create_container_config('1.18', '_mongodb', 'foobar', environment=my_envs)
```
You can now use this with 'environment' for `create_container`.
@ -377,6 +379,27 @@ Export the contents of a filesystem as a tar archive to STDOUT.
**Returns** (str): The filesystem tar archive as a str
## get_archive
Retrieve a file or folder from a container in the form of a tar archive.
**Params**:
* container (str): The container where the file is located
* path (str): Path to the file or folder to retrieve
**Returns** (tuple): First element is a raw tar data stream. Second element is
a dict containing `stat` information on the specified `path`.
```python
>>> import docker
>>> cli = docker.Client()
>>> ctnr = cli.create_container('busybox', 'true')
>>> strm, stat = cli.get_archive(ctnr, '/bin/sh')
>>> print(stat)
{u'linkTarget': u'', u'mode': 493, u'mtime': u'2015-09-16T12:34:23-07:00', u'name': u'sh', u'size': 962860}
```
## get_image
Get an image from the docker daemon. Similar to the `docker save` command.
@ -713,6 +736,20 @@ command.
yourname/app/tags/latest}"}\\n']
```
## put_archive
Insert a file or folder in an existing container using a tar archive as source.
**Params**:
* container (str): The container where the file(s) will be extracted
* path (str): Path inside the container where the file(s) will be extracted.
Must exist.
* data (bytes): tar data to be extracted
**Returns** (bool): True if the call succeeds. `docker.errors.APIError` will
be raised if an error occurs.
## remove_container
Remove a container. Similar to the `docker rm` command.

View File

@ -15,8 +15,8 @@ You can then instantiate `docker.Client` like this:
from docker.client import Client
from docker.utils import kwargs_from_env
client = Client(**kwargs_from_env())
print client.version()
cli = Client(**kwargs_from_env())
print cli.version()
```
If you're encountering the following error:
@ -33,6 +33,6 @@ from docker.utils import kwargs_from_env
kwargs = kwargs_from_env()
kwargs['tls'].assert_hostname = False
client = Client(**kwargs)
print client.version()
cli = Client(**kwargs)
print cli.version()
```

View File

@ -1,6 +1,43 @@
Change Log
==========
1.5.0
-----
[List of PRs / issues for this release](https://github.com/docker/docker-py/issues?q=milestone%3A1.5.0+is%3Aclosed)
### Features
* Added support for the networking API introduced in Docker 1.9.0
(`Client.networks`, `Client.create_network`, `Client.remove_network`,
`Client.inspect_network`, `Client.connect_container_to_network`,
`Client.disconnect_container_from_network`).
* Added support for the volumes API introduced in Docker 1.9.0
(`Client.volumes`, `Client.create_volume`, `Client.inspect_volume`,
`Client.remove_volume`).
* Added support for the `group_add` parameter in `create_host_config`.
* Added support for the CPU CFS (`cpu_quota` and `cpu_period`) parameteres
in `create_host_config`.
* Added support for the archive API endpoint (`Client.get_archive`,
`Client.put_archive`).
* Added support for `ps_args` parameter in `Client.top`.
### Bugfixes
* Fixed a bug where specifying volume binds with unicode characters would
fail.
* Fixed a bug where providing an explicit protocol in `Client.port` would fail
to yield the expected result.
* Fixed a bug where the priority protocol returned by `Client.port` would be UDP
instead of the expected TCP.
### Miscellaneous
* Broke up Client code into several files to facilitate maintenance and
contribution.
* Added contributing guidelines to the repository.
1.4.0
-----

View File

@ -5,8 +5,8 @@ the devices parameter in the `host_config` param in `Client.create_container`
as shown below:
```python
c.create_container(
'busybox', 'true', host_config=docker.utils.create_host_config(devices=[
cli.create_container(
'busybox', 'true', host_config=cli.create_host_config(devices=[
'/dev/sda:/dev/xvda:rwm'
])
)

View File

@ -101,12 +101,20 @@ for example:
allowed to consume.
* group_add (list): List of additional group names and/or IDs that the
container process will run as.
* devices (list): A list of devices to add to the container specified as dicts
in the form:
```
{ "PathOnHost": "/dev/deviceName",
"PathInContainer": "/dev/deviceName",
"CgroupPermissions": "mrw"
}
```
**Returns** (dict) HostConfig dictionary
```python
>>> from docker import Client
>>> c = Client()
>>> c.create_host_config(privileged=True, cap_drop=['MKNOD'], volumes_from=['nostalgic_newton'])
>>> cli = Client()
>>> cli.create_host_config(privileged=True, cap_drop=['MKNOD'], volumes_from=['nostalgic_newton'])
{'CapDrop': ['MKNOD'], 'LxcConf': None, 'Privileged': True, 'VolumesFrom': ['nostalgic_newton'], 'PublishAllPorts': False}
```

View File

@ -4,9 +4,9 @@ open inside the container in the `Client().create_container()` method.
Bindings are declared in the `host_config` parameter.
```python
container_id = c.create_container(
container_id = cli.create_container(
'busybox', 'ls', ports=[1111, 2222],
host_config=docker.utils.create_host_config(port_bindings={
host_config=cli.create_host_config(port_bindings={
1111: 4567,
2222: None
})
@ -17,22 +17,22 @@ container_id = c.create_container(
You can limit the host address on which the port will be exposed like such:
```python
docker.utils.create_host_config(port_bindings={1111: ('127.0.0.1', 4567)})
cli.create_host_config(port_bindings={1111: ('127.0.0.1', 4567)})
```
Or without host port assignment:
```python
docker.utils.create_host_config(port_bindings={1111: ('127.0.0.1',)})
cli.create_host_config(port_bindings={1111: ('127.0.0.1',)})
```
If you wish to use UDP instead of TCP (default), you need to declare ports
as such in both the config and host config:
```python
container_id = c.create_container(
container_id = cli.create_container(
'busybox', 'ls', ports=[(1111, 'udp'), 2222],
host_config=docker.utils.create_host_config(port_bindings={
host_config=cli.create_host_config(port_bindings={
'1111/udp': 4567, 2222: None
})
)

View File

@ -5,9 +5,9 @@ the `Client().create_container()` method, and declare mappings in the
`host_config` section.
```python
container_id = c.create_container(
container_id = cli.create_container(
'busybox', 'ls', volumes=['/mnt/vol1', '/mnt/vol2'],
host_config=docker.utils.create_host_config(binds={
host_config=cli.create_host_config(binds={
'/home/user1/': {
'bind': '/mnt/vol2',
'mode': 'rw',
@ -24,9 +24,9 @@ You can alternatively specify binds as a list. This code is equivalent to the
example above:
```python
container_id = c.create_container(
container_id = cli.create_container(
'busybox', 'ls', volumes=['/mnt/vol1', '/mnt/vol2'],
host_config=docker.utils.create_host_config(binds=[
host_config=cli.create_host_config(binds=[
'/home/user1/:/mnt/vol2',
'/var/www:/mnt/vol1:ro',
])

2
pytest.ini Normal file
View File

@ -0,0 +1,2 @@
[pytest]
addopts = --tb=short -rxs -s

View File

@ -0,0 +1,20 @@
FROM python:2.7
RUN mkdir /tmp/certs
VOLUME /certs
WORKDIR /tmp/certs
RUN openssl genrsa -aes256 -passout pass:foobar -out ca-key.pem 4096
RUN echo "[req]\nprompt=no\ndistinguished_name = req_distinguished_name\n[req_distinguished_name]\ncountryName=AU" > /tmp/config
RUN openssl req -new -x509 -passin pass:foobar -config /tmp/config -days 365 -key ca-key.pem -sha256 -out ca.pem
RUN openssl genrsa -out server-key.pem -passout pass:foobar 4096
RUN openssl req -subj "/CN=docker" -sha256 -new -key server-key.pem -out server.csr
RUN echo subjectAltName = DNS:docker,DNS:localhost > extfile.cnf
RUN openssl x509 -req -days 365 -passin pass:foobar -sha256 -in server.csr -CA ca.pem -CAkey ca-key.pem -CAcreateserial -out server-cert.pem -extfile extfile.cnf
RUN openssl genrsa -out key.pem 4096
RUN openssl req -passin pass:foobar -subj '/CN=client' -new -key key.pem -out client.csr
RUN echo extendedKeyUsage = clientAuth > extfile.cnf
RUN openssl x509 -req -passin pass:foobar -days 365 -sha256 -in client.csr -CA ca.pem -CAkey ca-key.pem -CAcreateserial -out cert.pem -extfile extfile.cnf
RUN chmod -v 0400 ca-key.pem key.pem server-key.pem
RUN chmod -v 0444 ca.pem server-cert.pem cert.pem
CMD cp -R /tmp/certs/* /certs && while true; do sleep 1; done

View File

@ -21,3 +21,28 @@ def requires_api_version(version):
),
reason="API version is too low (< {0})".format(version)
)
class Cleanup(object):
if sys.version_info < (2, 7):
# Provide a basic implementation of addCleanup for Python < 2.7
def __init__(self, *args, **kwargs):
super(Cleanup, self).__init__(*args, **kwargs)
self._cleanups = []
def tearDown(self):
super(Cleanup, self).tearDown()
ok = True
while self._cleanups:
fn, args, kwargs = self._cleanups.pop(-1)
try:
fn(*args, **kwargs)
except KeyboardInterrupt:
raise
except:
ok = False
if not ok:
raise
def addCleanup(self, function, *args, **kwargs):
self._cleanups.append((function, args, kwargs))

View File

@ -1,5 +1,6 @@
import os
import os.path
import tarfile
import tempfile
@ -14,3 +15,23 @@ def make_tree(dirs, files):
f.write("content")
return base
def simple_tar(path):
f = tempfile.NamedTemporaryFile()
t = tarfile.open(mode='w', fileobj=f)
abs_path = os.path.abspath(path)
t.add(abs_path, arcname=os.path.basename(path), recursive=False)
t.close()
f.seek(0)
return f
def untar_file(tardata, filename):
with tarfile.open(mode='r', fileobj=tardata) as t:
f = t.extractfile(filename)
result = f.read()
f.close()
return result

View File

@ -0,0 +1,13 @@
# flake8: noqa
# FIXME: crutch while we transition to the new folder architecture
# Remove imports when merged in master and Jenkins is updated to find the
# tests in the new location.
from .api_test import *
from .build_test import *
from .container_test import *
from .exec_test import *
from .image_test import *
from .network_test import *
from .regression_test import *
from .volume_test import *

View File

@ -0,0 +1,292 @@
import base64
import json
import os
import shutil
import tempfile
import time
import unittest
import warnings
import docker
import six
BUSYBOX = 'busybox:buildroot-2014.02'
EXEC_DRIVER = []
def exec_driver_is_native():
global EXEC_DRIVER
if not EXEC_DRIVER:
c = docker_client()
EXEC_DRIVER = c.info()['ExecutionDriver']
c.close()
return EXEC_DRIVER.startswith('native')
def docker_client(**kwargs):
return docker.Client(**docker_client_kwargs(**kwargs))
def docker_client_kwargs(**kwargs):
client_kwargs = docker.utils.kwargs_from_env(assert_hostname=False)
client_kwargs.update(kwargs)
return client_kwargs
def setup_module():
warnings.simplefilter('error')
c = docker_client()
try:
c.inspect_image(BUSYBOX)
except docker.errors.NotFound:
os.write(2, "\npulling busybox\n".encode('utf-8'))
for data in c.pull(BUSYBOX, stream=True):
data = json.loads(data.decode('utf-8'))
os.write(2, ("%c[2K\r" % 27).encode('utf-8'))
status = data.get("status")
progress = data.get("progress")
detail = "{0} - {1}".format(status, progress).encode('utf-8')
os.write(2, detail)
os.write(2, "\npulled busybox\n".encode('utf-8'))
# Double make sure we now have busybox
c.inspect_image(BUSYBOX)
c.close()
class BaseTestCase(unittest.TestCase):
tmp_imgs = []
tmp_containers = []
tmp_folders = []
tmp_volumes = []
def setUp(self):
if six.PY2:
self.assertRegex = self.assertRegexpMatches
self.assertCountEqual = self.assertItemsEqual
self.client = docker_client(timeout=60)
self.tmp_imgs = []
self.tmp_containers = []
self.tmp_folders = []
self.tmp_volumes = []
self.tmp_networks = []
def tearDown(self):
for img in self.tmp_imgs:
try:
self.client.remove_image(img)
except docker.errors.APIError:
pass
for container in self.tmp_containers:
try:
self.client.stop(container, timeout=1)
self.client.remove_container(container)
except docker.errors.APIError:
pass
for network in self.tmp_networks:
try:
self.client.remove_network(network)
except docker.errors.APIError:
pass
for folder in self.tmp_folders:
shutil.rmtree(folder)
for volume in self.tmp_volumes:
try:
self.client.remove_volume(volume)
except docker.errors.APIError:
pass
self.client.close()
def run_container(self, *args, **kwargs):
container = self.client.create_container(*args, **kwargs)
self.tmp_containers.append(container)
self.client.start(container)
exitcode = self.client.wait(container)
if exitcode != 0:
output = self.client.logs(container)
raise Exception(
"Container exited with code {}:\n{}"
.format(exitcode, output))
return container
#########################
# INFORMATION TESTS #
#########################
class InformationTest(BaseTestCase):
def test_version(self):
res = self.client.version()
self.assertIn('GoVersion', res)
self.assertIn('Version', res)
self.assertEqual(len(res['Version'].split('.')), 3)
def test_info(self):
res = self.client.info()
self.assertIn('Containers', res)
self.assertIn('Images', res)
self.assertIn('Debug', res)
def test_search(self):
self.client = docker_client(timeout=10)
res = self.client.search('busybox')
self.assertTrue(len(res) >= 1)
base_img = [x for x in res if x['name'] == 'busybox']
self.assertEqual(len(base_img), 1)
self.assertIn('description', base_img[0])
#################
# LINKS TESTS #
#################
class LinkTest(BaseTestCase):
def test_remove_link(self):
# Create containers
container1 = self.client.create_container(
BUSYBOX, 'cat', detach=True, stdin_open=True
)
container1_id = container1['Id']
self.tmp_containers.append(container1_id)
self.client.start(container1_id)
# Create Link
# we don't want the first /
link_path = self.client.inspect_container(container1_id)['Name'][1:]
link_alias = 'mylink'
container2 = self.client.create_container(
BUSYBOX, 'cat', host_config=self.client.create_host_config(
links={link_path: link_alias}, network_mode='none'
)
)
container2_id = container2['Id']
self.tmp_containers.append(container2_id)
self.client.start(container2_id)
# Remove link
linked_name = self.client.inspect_container(container2_id)['Name'][1:]
link_name = '%s/%s' % (linked_name, link_alias)
self.client.remove_container(link_name, link=True)
# Link is gone
containers = self.client.containers(all=True)
retrieved = [x for x in containers if link_name in x['Names']]
self.assertEqual(len(retrieved), 0)
# Containers are still there
retrieved = [
x for x in containers if x['Id'].startswith(container1_id) or
x['Id'].startswith(container2_id)
]
self.assertEqual(len(retrieved), 2)
#######################
# PY SPECIFIC TESTS #
#######################
class LoadConfigTest(BaseTestCase):
def test_load_legacy_config(self):
folder = tempfile.mkdtemp()
self.tmp_folders.append(folder)
cfg_path = os.path.join(folder, '.dockercfg')
f = open(cfg_path, 'w')
auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii')
f.write('auth = {0}\n'.format(auth_))
f.write('email = sakuya@scarlet.net')
f.close()
cfg = docker.auth.load_config(cfg_path)
self.assertNotEqual(cfg[docker.auth.INDEX_NAME], None)
cfg = cfg[docker.auth.INDEX_NAME]
self.assertEqual(cfg['username'], 'sakuya')
self.assertEqual(cfg['password'], 'izayoi')
self.assertEqual(cfg['email'], 'sakuya@scarlet.net')
self.assertEqual(cfg.get('Auth'), None)
def test_load_json_config(self):
folder = tempfile.mkdtemp()
self.tmp_folders.append(folder)
cfg_path = os.path.join(folder, '.dockercfg')
f = open(os.path.join(folder, '.dockercfg'), 'w')
auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii')
email_ = 'sakuya@scarlet.net'
f.write('{{"{0}": {{"auth": "{1}", "email": "{2}"}}}}\n'.format(
docker.auth.INDEX_URL, auth_, email_))
f.close()
cfg = docker.auth.load_config(cfg_path)
self.assertNotEqual(cfg[docker.auth.INDEX_URL], None)
cfg = cfg[docker.auth.INDEX_URL]
self.assertEqual(cfg['username'], 'sakuya')
self.assertEqual(cfg['password'], 'izayoi')
self.assertEqual(cfg['email'], 'sakuya@scarlet.net')
self.assertEqual(cfg.get('Auth'), None)
class AutoDetectVersionTest(unittest.TestCase):
def test_client_init(self):
client = docker_client(version='auto')
client_version = client._version
api_version = client.version(api_version=False)['ApiVersion']
self.assertEqual(client_version, api_version)
api_version_2 = client.version()['ApiVersion']
self.assertEqual(client_version, api_version_2)
client.close()
def test_auto_client(self):
client = docker.AutoVersionClient(**docker_client_kwargs())
client_version = client._version
api_version = client.version(api_version=False)['ApiVersion']
self.assertEqual(client_version, api_version)
api_version_2 = client.version()['ApiVersion']
self.assertEqual(client_version, api_version_2)
client.close()
with self.assertRaises(docker.errors.DockerException):
docker.AutoVersionClient(**docker_client_kwargs(version='1.11'))
class ConnectionTimeoutTest(unittest.TestCase):
def setUp(self):
self.timeout = 0.5
self.client = docker.client.Client(base_url='http://192.168.10.2:4243',
timeout=self.timeout)
def test_timeout(self):
start = time.time()
res = None
# This call isn't supposed to complete, and it should fail fast.
try:
res = self.client.inspect_container('id')
except:
pass
end = time.time()
self.assertTrue(res is None)
self.assertTrue(end - start < 2 * self.timeout)
class UnixconnTest(unittest.TestCase):
"""
Test UNIX socket connection adapter.
"""
def test_resource_warnings(self):
"""
Test no warnings are produced when using the client.
"""
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter('always')
client = docker_client()
client.images()
client.close()
del client
assert len(w) == 0, \
"No warnings produced: {0}".format(w[0].message)

View File

@ -0,0 +1,98 @@
import io
import json
import os
import shutil
import tempfile
import six
from . import api_test
from ..base import requires_api_version
class BuildTest(api_test.BaseTestCase):
def test_build_streaming(self):
script = io.BytesIO('\n'.join([
'FROM busybox',
'MAINTAINER docker-py',
'RUN mkdir -p /tmp/test',
'EXPOSE 8080',
'ADD https://dl.dropboxusercontent.com/u/20637798/silence.tar.gz'
' /tmp/silence.tar.gz'
]).encode('ascii'))
stream = self.client.build(fileobj=script, stream=True)
logs = ''
for chunk in stream:
if six.PY3:
chunk = chunk.decode('utf-8')
json.loads(chunk) # ensure chunk is a single, valid JSON blob
logs += chunk
self.assertNotEqual(logs, '')
def test_build_from_stringio(self):
if six.PY3:
return
script = io.StringIO(six.text_type('\n').join([
'FROM busybox',
'MAINTAINER docker-py',
'RUN mkdir -p /tmp/test',
'EXPOSE 8080',
'ADD https://dl.dropboxusercontent.com/u/20637798/silence.tar.gz'
' /tmp/silence.tar.gz'
]))
stream = self.client.build(fileobj=script, stream=True)
logs = ''
for chunk in stream:
if six.PY3:
chunk = chunk.decode('utf-8')
logs += chunk
self.assertNotEqual(logs, '')
@requires_api_version('1.8')
def test_build_with_dockerignore(self):
base_dir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, base_dir)
with open(os.path.join(base_dir, 'Dockerfile'), 'w') as f:
f.write("\n".join([
'FROM busybox',
'MAINTAINER docker-py',
'ADD . /test',
]))
with open(os.path.join(base_dir, '.dockerignore'), 'w') as f:
f.write("\n".join([
'ignored',
'Dockerfile',
'.dockerignore',
'', # empty line
]))
with open(os.path.join(base_dir, 'not-ignored'), 'w') as f:
f.write("this file should not be ignored")
subdir = os.path.join(base_dir, 'ignored', 'subdir')
os.makedirs(subdir)
with open(os.path.join(subdir, 'file'), 'w') as f:
f.write("this file should be ignored")
tag = 'docker-py-test-build-with-dockerignore'
stream = self.client.build(
path=base_dir,
tag=tag,
)
for chunk in stream:
pass
c = self.client.create_container(tag, ['ls', '-1A', '/test'])
self.client.start(c)
self.client.wait(c)
logs = self.client.logs(c)
if six.PY3:
logs = logs.decode('utf-8')
self.assertEqual(
list(filter(None, logs.split('\n'))),
['not-ignored'],
)

View File

@ -0,0 +1,999 @@
import errno
import os
import shutil
import signal
import struct
import tempfile
import docker
import pytest
import six
from . import api_test
from ..base import requires_api_version
from .. import helpers
BUSYBOX = api_test.BUSYBOX
class ListContainersTest(api_test.BaseTestCase):
def test_list_containers(self):
res0 = self.client.containers(all=True)
size = len(res0)
res1 = self.client.create_container(BUSYBOX, 'true')
self.assertIn('Id', res1)
self.client.start(res1['Id'])
self.tmp_containers.append(res1['Id'])
res2 = self.client.containers(all=True)
self.assertEqual(size + 1, len(res2))
retrieved = [x for x in res2 if x['Id'].startswith(res1['Id'])]
self.assertEqual(len(retrieved), 1)
retrieved = retrieved[0]
self.assertIn('Command', retrieved)
self.assertEqual(retrieved['Command'], six.text_type('true'))
self.assertIn('Image', retrieved)
self.assertRegex(retrieved['Image'], r'busybox:.*')
self.assertIn('Status', retrieved)
class CreateContainerTest(api_test.BaseTestCase):
def test_create(self):
res = self.client.create_container(BUSYBOX, 'true')
self.assertIn('Id', res)
self.tmp_containers.append(res['Id'])
def test_create_with_host_pid_mode(self):
ctnr = self.client.create_container(
BUSYBOX, 'true', host_config=self.client.create_host_config(
pid_mode='host', network_mode='none'
)
)
self.assertIn('Id', ctnr)
self.tmp_containers.append(ctnr['Id'])
self.client.start(ctnr)
inspect = self.client.inspect_container(ctnr)
self.assertIn('HostConfig', inspect)
host_config = inspect['HostConfig']
self.assertIn('PidMode', host_config)
self.assertEqual(host_config['PidMode'], 'host')
def test_create_with_links(self):
res0 = self.client.create_container(
BUSYBOX, 'cat',
detach=True, stdin_open=True,
environment={'FOO': '1'})
container1_id = res0['Id']
self.tmp_containers.append(container1_id)
self.client.start(container1_id)
res1 = self.client.create_container(
BUSYBOX, 'cat',
detach=True, stdin_open=True,
environment={'FOO': '1'})
container2_id = res1['Id']
self.tmp_containers.append(container2_id)
self.client.start(container2_id)
# we don't want the first /
link_path1 = self.client.inspect_container(container1_id)['Name'][1:]
link_alias1 = 'mylink1'
link_env_prefix1 = link_alias1.upper()
link_path2 = self.client.inspect_container(container2_id)['Name'][1:]
link_alias2 = 'mylink2'
link_env_prefix2 = link_alias2.upper()
res2 = self.client.create_container(
BUSYBOX, 'env', host_config=self.client.create_host_config(
links={link_path1: link_alias1, link_path2: link_alias2},
network_mode='bridge'
)
)
container3_id = res2['Id']
self.tmp_containers.append(container3_id)
self.client.start(container3_id)
self.assertEqual(self.client.wait(container3_id), 0)
logs = self.client.logs(container3_id)
if six.PY3:
logs = logs.decode('utf-8')
self.assertIn('{0}_NAME='.format(link_env_prefix1), logs)
self.assertIn('{0}_ENV_FOO=1'.format(link_env_prefix1), logs)
self.assertIn('{0}_NAME='.format(link_env_prefix2), logs)
self.assertIn('{0}_ENV_FOO=1'.format(link_env_prefix2), logs)
def test_create_with_restart_policy(self):
container = self.client.create_container(
BUSYBOX, ['sleep', '2'],
host_config=self.client.create_host_config(
restart_policy={"Name": "always", "MaximumRetryCount": 0},
network_mode='none'
)
)
id = container['Id']
self.client.start(id)
self.client.wait(id)
with self.assertRaises(docker.errors.APIError) as exc:
self.client.remove_container(id)
err = exc.exception.response.text
self.assertIn(
'You cannot remove a running container', err
)
self.client.remove_container(id, force=True)
def test_create_container_with_volumes_from(self):
vol_names = ['foobar_vol0', 'foobar_vol1']
res0 = self.client.create_container(
BUSYBOX, 'true', name=vol_names[0]
)
container1_id = res0['Id']
self.tmp_containers.append(container1_id)
self.client.start(container1_id)
res1 = self.client.create_container(
BUSYBOX, 'true', name=vol_names[1]
)
container2_id = res1['Id']
self.tmp_containers.append(container2_id)
self.client.start(container2_id)
with self.assertRaises(docker.errors.DockerException):
self.client.create_container(
BUSYBOX, 'cat', detach=True, stdin_open=True,
volumes_from=vol_names
)
res2 = self.client.create_container(
BUSYBOX, 'cat', detach=True, stdin_open=True,
host_config=self.client.create_host_config(
volumes_from=vol_names, network_mode='none'
)
)
container3_id = res2['Id']
self.tmp_containers.append(container3_id)
self.client.start(container3_id)
info = self.client.inspect_container(res2['Id'])
self.assertCountEqual(info['HostConfig']['VolumesFrom'], vol_names)
def create_container_readonly_fs(self):
if not api_test.exec_driver_is_native():
pytest.skip('Exec driver not native')
ctnr = self.client.create_container(
BUSYBOX, ['mkdir', '/shrine'],
host_config=self.client.create_host_config(
read_only=True, network_mode='none'
)
)
self.assertIn('Id', ctnr)
self.tmp_containers.append(ctnr['Id'])
self.client.start(ctnr)
res = self.client.wait(ctnr)
self.assertNotEqual(res, 0)
def create_container_with_name(self):
res = self.client.create_container(BUSYBOX, 'true', name='foobar')
self.assertIn('Id', res)
self.tmp_containers.append(res['Id'])
inspect = self.client.inspect_container(res['Id'])
self.assertIn('Name', inspect)
self.assertEqual('/foobar', inspect['Name'])
def create_container_privileged(self):
res = self.client.create_container(
BUSYBOX, 'true', host_config=self.client.create_host_config(
privileged=True, network_mode='none'
)
)
self.assertIn('Id', res)
self.tmp_containers.append(res['Id'])
self.client.start(res['Id'])
inspect = self.client.inspect_container(res['Id'])
self.assertIn('Config', inspect)
self.assertIn('Id', inspect)
self.assertTrue(inspect['Id'].startswith(res['Id']))
self.assertIn('Image', inspect)
self.assertIn('State', inspect)
self.assertIn('Running', inspect['State'])
if not inspect['State']['Running']:
self.assertIn('ExitCode', inspect['State'])
self.assertEqual(inspect['State']['ExitCode'], 0)
# Since Nov 2013, the Privileged flag is no longer part of the
# container's config exposed via the API (safety concerns?).
#
if 'Privileged' in inspect['Config']:
self.assertEqual(inspect['Config']['Privileged'], True)
def test_create_with_mac_address(self):
mac_address_expected = "02:42:ac:11:00:0a"
container = self.client.create_container(
BUSYBOX, ['sleep', '60'], mac_address=mac_address_expected)
id = container['Id']
self.client.start(container)
res = self.client.inspect_container(container['Id'])
self.assertEqual(mac_address_expected,
res['NetworkSettings']['MacAddress'])
self.client.kill(id)
@requires_api_version('1.20')
def test_group_id_ints(self):
container = self.client.create_container(
BUSYBOX, 'id -G',
host_config=self.client.create_host_config(group_add=[1000, 1001])
)
self.tmp_containers.append(container)
self.client.start(container)
self.client.wait(container)
logs = self.client.logs(container)
if six.PY3:
logs = logs.decode('utf-8')
groups = logs.strip().split(' ')
self.assertIn('1000', groups)
self.assertIn('1001', groups)
@requires_api_version('1.20')
def test_group_id_strings(self):
container = self.client.create_container(
BUSYBOX, 'id -G', host_config=self.client.create_host_config(
group_add=['1000', '1001']
)
)
self.tmp_containers.append(container)
self.client.start(container)
self.client.wait(container)
logs = self.client.logs(container)
if six.PY3:
logs = logs.decode('utf-8')
groups = logs.strip().split(' ')
self.assertIn('1000', groups)
self.assertIn('1001', groups)
def test_valid_log_driver_and_log_opt(self):
log_config = docker.utils.LogConfig(
type='json-file',
config={'max-file': '100'}
)
container = self.client.create_container(
BUSYBOX, ['true'],
host_config=self.client.create_host_config(log_config=log_config)
)
self.tmp_containers.append(container['Id'])
self.client.start(container)
info = self.client.inspect_container(container)
container_log_config = info['HostConfig']['LogConfig']
self.assertEqual(container_log_config['Type'], log_config.type)
self.assertEqual(container_log_config['Config'], log_config.config)
def test_invalid_log_driver_raises_exception(self):
log_config = docker.utils.LogConfig(
type='asdf-nope',
config={}
)
container = self.client.create_container(
BUSYBOX, ['true'],
host_config=self.client.create_host_config(log_config=log_config)
)
expected_msg = "logger: no log driver named 'asdf-nope' is registered"
with pytest.raises(docker.errors.APIError) as excinfo:
# raises an internal server error 500
self.client.start(container)
assert expected_msg in str(excinfo.value)
@pytest.mark.skipif(True,
reason="https://github.com/docker/docker/issues/15633")
def test_valid_no_log_driver_specified(self):
log_config = docker.utils.LogConfig(
type="",
config={'max-file': '100'}
)
container = self.client.create_container(
BUSYBOX, ['true'],
host_config=self.client.create_host_config(log_config=log_config)
)
self.tmp_containers.append(container['Id'])
self.client.start(container)
info = self.client.inspect_container(container)
container_log_config = info['HostConfig']['LogConfig']
self.assertEqual(container_log_config['Type'], "json-file")
self.assertEqual(container_log_config['Config'], log_config.config)
def test_valid_no_config_specified(self):
log_config = docker.utils.LogConfig(
type="json-file",
config=None
)
container = self.client.create_container(
BUSYBOX, ['true'],
host_config=self.client.create_host_config(log_config=log_config)
)
self.tmp_containers.append(container['Id'])
self.client.start(container)
info = self.client.inspect_container(container)
container_log_config = info['HostConfig']['LogConfig']
self.assertEqual(container_log_config['Type'], "json-file")
self.assertEqual(container_log_config['Config'], {})
class VolumeBindTest(api_test.BaseTestCase):
def setUp(self):
super(VolumeBindTest, self).setUp()
self.mount_dest = '/mnt'
# Get a random pathname - we don't need it to exist locally
self.mount_origin = tempfile.mkdtemp()
shutil.rmtree(self.mount_origin)
self.filename = 'shared.txt'
self.run_with_volume(
False,
BUSYBOX,
['touch', os.path.join(self.mount_dest, self.filename)],
)
def test_create_with_binds_rw(self):
container = self.run_with_volume(
False,
BUSYBOX,
['ls', self.mount_dest],
)
logs = self.client.logs(container)
if six.PY3:
logs = logs.decode('utf-8')
self.assertIn(self.filename, logs)
inspect_data = self.client.inspect_container(container)
self.check_container_data(inspect_data, True)
def test_create_with_binds_ro(self):
self.run_with_volume(
False,
BUSYBOX,
['touch', os.path.join(self.mount_dest, self.filename)],
)
container = self.run_with_volume(
True,
BUSYBOX,
['ls', self.mount_dest],
)
logs = self.client.logs(container)
if six.PY3:
logs = logs.decode('utf-8')
self.assertIn(self.filename, logs)
inspect_data = self.client.inspect_container(container)
self.check_container_data(inspect_data, False)
def check_container_data(self, inspect_data, rw):
if docker.utils.compare_version('1.20', self.client._version) < 0:
self.assertIn('Volumes', inspect_data)
self.assertIn(self.mount_dest, inspect_data['Volumes'])
self.assertEqual(
self.mount_origin, inspect_data['Volumes'][self.mount_dest]
)
self.assertIn(self.mount_dest, inspect_data['VolumesRW'])
self.assertFalse(inspect_data['VolumesRW'][self.mount_dest])
else:
self.assertIn('Mounts', inspect_data)
filtered = list(filter(
lambda x: x['Destination'] == self.mount_dest,
inspect_data['Mounts']
))
self.assertEqual(len(filtered), 1)
mount_data = filtered[0]
self.assertEqual(mount_data['Source'], self.mount_origin)
self.assertEqual(mount_data['RW'], rw)
def run_with_volume(self, ro, *args, **kwargs):
return self.run_container(
*args,
volumes={self.mount_dest: {}},
host_config=self.client.create_host_config(
binds={
self.mount_origin: {
'bind': self.mount_dest,
'ro': ro,
},
},
network_mode='none'
),
**kwargs
)
@requires_api_version('1.20')
class ArchiveTest(api_test.BaseTestCase):
def test_get_file_archive_from_container(self):
data = 'The Maid and the Pocket Watch of Blood'
ctnr = self.client.create_container(
BUSYBOX, 'sh -c "echo {0} > /vol1/data.txt"'.format(data),
volumes=['/vol1']
)
self.tmp_containers.append(ctnr)
self.client.start(ctnr)
self.client.wait(ctnr)
with tempfile.NamedTemporaryFile() as destination:
strm, stat = self.client.get_archive(ctnr, '/vol1/data.txt')
for d in strm:
destination.write(d)
destination.seek(0)
retrieved_data = helpers.untar_file(destination, 'data.txt')
if six.PY3:
retrieved_data = retrieved_data.decode('utf-8')
self.assertEqual(data, retrieved_data.strip())
def test_get_file_stat_from_container(self):
data = 'The Maid and the Pocket Watch of Blood'
ctnr = self.client.create_container(
BUSYBOX, 'sh -c "echo -n {0} > /vol1/data.txt"'.format(data),
volumes=['/vol1']
)
self.tmp_containers.append(ctnr)
self.client.start(ctnr)
self.client.wait(ctnr)
strm, stat = self.client.get_archive(ctnr, '/vol1/data.txt')
self.assertIn('name', stat)
self.assertEqual(stat['name'], 'data.txt')
self.assertIn('size', stat)
self.assertEqual(stat['size'], len(data))
def test_copy_file_to_container(self):
data = b'Deaf To All But The Song'
with tempfile.NamedTemporaryFile() as test_file:
test_file.write(data)
test_file.seek(0)
ctnr = self.client.create_container(
BUSYBOX,
'cat {0}'.format(
os.path.join('/vol1', os.path.basename(test_file.name))
),
volumes=['/vol1']
)
self.tmp_containers.append(ctnr)
with helpers.simple_tar(test_file.name) as test_tar:
self.client.put_archive(ctnr, '/vol1', test_tar)
self.client.start(ctnr)
self.client.wait(ctnr)
logs = self.client.logs(ctnr)
if six.PY3:
logs = logs.decode('utf-8')
data = data.decode('utf-8')
self.assertEqual(logs.strip(), data)
def test_copy_directory_to_container(self):
files = ['a.py', 'b.py', 'foo/b.py']
dirs = ['foo', 'bar']
base = helpers.make_tree(dirs, files)
ctnr = self.client.create_container(
BUSYBOX, 'ls -p /vol1', volumes=['/vol1']
)
self.tmp_containers.append(ctnr)
with docker.utils.tar(base) as test_tar:
self.client.put_archive(ctnr, '/vol1', test_tar)
self.client.start(ctnr)
self.client.wait(ctnr)
logs = self.client.logs(ctnr)
if six.PY3:
logs = logs.decode('utf-8')
results = logs.strip().split()
self.assertIn('a.py', results)
self.assertIn('b.py', results)
self.assertIn('foo/', results)
self.assertIn('bar/', results)
class RenameContainerTest(api_test.BaseTestCase):
def test_rename_container(self):
version = self.client.version()['Version']
name = 'hong_meiling'
res = self.client.create_container(BUSYBOX, 'true')
self.assertIn('Id', res)
self.tmp_containers.append(res['Id'])
self.client.rename(res, name)
inspect = self.client.inspect_container(res['Id'])
self.assertIn('Name', inspect)
if version == '1.5.0':
self.assertEqual(name, inspect['Name'])
else:
self.assertEqual('/{0}'.format(name), inspect['Name'])
class StartContainerTest(api_test.BaseTestCase):
def test_start_container(self):
res = self.client.create_container(BUSYBOX, 'true')
self.assertIn('Id', res)
self.tmp_containers.append(res['Id'])
self.client.start(res['Id'])
inspect = self.client.inspect_container(res['Id'])
self.assertIn('Config', inspect)
self.assertIn('Id', inspect)
self.assertTrue(inspect['Id'].startswith(res['Id']))
self.assertIn('Image', inspect)
self.assertIn('State', inspect)
self.assertIn('Running', inspect['State'])
if not inspect['State']['Running']:
self.assertIn('ExitCode', inspect['State'])
self.assertEqual(inspect['State']['ExitCode'], 0)
def test_start_container_with_dict_instead_of_id(self):
res = self.client.create_container(BUSYBOX, 'true')
self.assertIn('Id', res)
self.tmp_containers.append(res['Id'])
self.client.start(res)
inspect = self.client.inspect_container(res['Id'])
self.assertIn('Config', inspect)
self.assertIn('Id', inspect)
self.assertTrue(inspect['Id'].startswith(res['Id']))
self.assertIn('Image', inspect)
self.assertIn('State', inspect)
self.assertIn('Running', inspect['State'])
if not inspect['State']['Running']:
self.assertIn('ExitCode', inspect['State'])
self.assertEqual(inspect['State']['ExitCode'], 0)
def test_run_shlex_commands(self):
commands = [
'true',
'echo "The Young Descendant of Tepes & Septette for the '
'Dead Princess"',
'echo -n "The Young Descendant of Tepes & Septette for the '
'Dead Princess"',
'/bin/sh -c "echo Hello World"',
'/bin/sh -c \'echo "Hello World"\'',
'echo "\"Night of Nights\""',
'true && echo "Night of Nights"'
]
for cmd in commands:
container = self.client.create_container(BUSYBOX, cmd)
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
exitcode = self.client.wait(id)
self.assertEqual(exitcode, 0, msg=cmd)
class WaitTest(api_test.BaseTestCase):
def test_wait(self):
res = self.client.create_container(BUSYBOX, ['sleep', '3'])
id = res['Id']
self.tmp_containers.append(id)
self.client.start(id)
exitcode = self.client.wait(id)
self.assertEqual(exitcode, 0)
inspect = self.client.inspect_container(id)
self.assertIn('Running', inspect['State'])
self.assertEqual(inspect['State']['Running'], False)
self.assertIn('ExitCode', inspect['State'])
self.assertEqual(inspect['State']['ExitCode'], exitcode)
def test_wait_with_dict_instead_of_id(self):
res = self.client.create_container(BUSYBOX, ['sleep', '3'])
id = res['Id']
self.tmp_containers.append(id)
self.client.start(res)
exitcode = self.client.wait(res)
self.assertEqual(exitcode, 0)
inspect = self.client.inspect_container(res)
self.assertIn('Running', inspect['State'])
self.assertEqual(inspect['State']['Running'], False)
self.assertIn('ExitCode', inspect['State'])
self.assertEqual(inspect['State']['ExitCode'], exitcode)
class LogsTest(api_test.BaseTestCase):
def test_logs(self):
snippet = 'Flowering Nights (Sakuya Iyazoi)'
container = self.client.create_container(
BUSYBOX, 'echo {0}'.format(snippet)
)
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
exitcode = self.client.wait(id)
self.assertEqual(exitcode, 0)
logs = self.client.logs(id)
self.assertEqual(logs, (snippet + '\n').encode(encoding='ascii'))
def test_logs_tail_option(self):
snippet = '''Line1
Line2'''
container = self.client.create_container(
BUSYBOX, 'echo "{0}"'.format(snippet)
)
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
exitcode = self.client.wait(id)
self.assertEqual(exitcode, 0)
logs = self.client.logs(id, tail=1)
self.assertEqual(logs, ('Line2\n').encode(encoding='ascii'))
# def test_logs_streaming(self):
# snippet = 'Flowering Nights (Sakuya Iyazoi)'
# container = self.client.create_container(
# BUSYBOX, 'echo {0}'.format(snippet)
# )
# id = container['Id']
# self.client.start(id)
# self.tmp_containers.append(id)
# logs = bytes() if six.PY3 else str()
# for chunk in self.client.logs(id, stream=True):
# logs += chunk
# exitcode = self.client.wait(id)
# self.assertEqual(exitcode, 0)
# self.assertEqual(logs, (snippet + '\n').encode(encoding='ascii'))
def test_logs_with_dict_instead_of_id(self):
snippet = 'Flowering Nights (Sakuya Iyazoi)'
container = self.client.create_container(
BUSYBOX, 'echo {0}'.format(snippet)
)
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
exitcode = self.client.wait(id)
self.assertEqual(exitcode, 0)
logs = self.client.logs(container)
self.assertEqual(logs, (snippet + '\n').encode(encoding='ascii'))
class DiffTest(api_test.BaseTestCase):
def test_diff(self):
container = self.client.create_container(BUSYBOX, ['touch', '/test'])
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
exitcode = self.client.wait(id)
self.assertEqual(exitcode, 0)
diff = self.client.diff(id)
test_diff = [x for x in diff if x.get('Path', None) == '/test']
self.assertEqual(len(test_diff), 1)
self.assertIn('Kind', test_diff[0])
self.assertEqual(test_diff[0]['Kind'], 1)
def test_diff_with_dict_instead_of_id(self):
container = self.client.create_container(BUSYBOX, ['touch', '/test'])
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
exitcode = self.client.wait(id)
self.assertEqual(exitcode, 0)
diff = self.client.diff(container)
test_diff = [x for x in diff if x.get('Path', None) == '/test']
self.assertEqual(len(test_diff), 1)
self.assertIn('Kind', test_diff[0])
self.assertEqual(test_diff[0]['Kind'], 1)
class StopTest(api_test.BaseTestCase):
def test_stop(self):
container = self.client.create_container(BUSYBOX, ['sleep', '9999'])
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
self.client.stop(id, timeout=2)
container_info = self.client.inspect_container(id)
self.assertIn('State', container_info)
state = container_info['State']
self.assertIn('ExitCode', state)
if api_test.exec_driver_is_native():
self.assertNotEqual(state['ExitCode'], 0)
self.assertIn('Running', state)
self.assertEqual(state['Running'], False)
def test_stop_with_dict_instead_of_id(self):
container = self.client.create_container(BUSYBOX, ['sleep', '9999'])
self.assertIn('Id', container)
id = container['Id']
self.client.start(container)
self.tmp_containers.append(id)
self.client.stop(container, timeout=2)
container_info = self.client.inspect_container(id)
self.assertIn('State', container_info)
state = container_info['State']
self.assertIn('ExitCode', state)
if api_test.exec_driver_is_native():
self.assertNotEqual(state['ExitCode'], 0)
self.assertIn('Running', state)
self.assertEqual(state['Running'], False)
class KillTest(api_test.BaseTestCase):
def test_kill(self):
container = self.client.create_container(BUSYBOX, ['sleep', '9999'])
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
self.client.kill(id)
container_info = self.client.inspect_container(id)
self.assertIn('State', container_info)
state = container_info['State']
self.assertIn('ExitCode', state)
if api_test.exec_driver_is_native():
self.assertNotEqual(state['ExitCode'], 0)
self.assertIn('Running', state)
self.assertEqual(state['Running'], False)
def test_kill_with_dict_instead_of_id(self):
container = self.client.create_container(BUSYBOX, ['sleep', '9999'])
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
self.client.kill(container)
container_info = self.client.inspect_container(id)
self.assertIn('State', container_info)
state = container_info['State']
self.assertIn('ExitCode', state)
if api_test.exec_driver_is_native():
self.assertNotEqual(state['ExitCode'], 0)
self.assertIn('Running', state)
self.assertEqual(state['Running'], False)
def test_kill_with_signal(self):
container = self.client.create_container(BUSYBOX, ['sleep', '60'])
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
self.client.kill(id, signal=signal.SIGKILL)
exitcode = self.client.wait(id)
self.assertNotEqual(exitcode, 0)
container_info = self.client.inspect_container(id)
self.assertIn('State', container_info)
state = container_info['State']
self.assertIn('ExitCode', state)
self.assertNotEqual(state['ExitCode'], 0)
self.assertIn('Running', state)
self.assertEqual(state['Running'], False, state)
class PortTest(api_test.BaseTestCase):
def test_port(self):
port_bindings = {
'1111': ('127.0.0.1', '4567'),
'2222': ('127.0.0.1', '4568')
}
container = self.client.create_container(
BUSYBOX, ['sleep', '60'], ports=list(port_bindings.keys()),
host_config=self.client.create_host_config(
port_bindings=port_bindings, network_mode='bridge'
)
)
id = container['Id']
self.client.start(container)
# Call the port function on each biding and compare expected vs actual
for port in port_bindings:
actual_bindings = self.client.port(container, port)
port_binding = actual_bindings.pop()
ip, host_port = port_binding['HostIp'], port_binding['HostPort']
self.assertEqual(ip, port_bindings[port][0])
self.assertEqual(host_port, port_bindings[port][1])
self.client.kill(id)
class ContainerTopTest(api_test.BaseTestCase):
def test_top(self):
container = self.client.create_container(
BUSYBOX, ['sleep', '60'])
id = container['Id']
self.client.start(container)
res = self.client.top(container['Id'])
self.assertEqual(
res['Titles'],
['UID', 'PID', 'PPID', 'C', 'STIME', 'TTY', 'TIME', 'CMD']
)
self.assertEqual(len(res['Processes']), 1)
self.assertEqual(res['Processes'][0][7], 'sleep 60')
self.client.kill(id)
def test_top_with_psargs(self):
container = self.client.create_container(
BUSYBOX, ['sleep', '60'])
id = container['Id']
self.client.start(container)
res = self.client.top(container['Id'], 'waux')
self.assertEqual(
res['Titles'],
['USER', 'PID', '%CPU', '%MEM', 'VSZ', 'RSS',
'TTY', 'STAT', 'START', 'TIME', 'COMMAND'],
)
self.assertEqual(len(res['Processes']), 1)
self.assertEqual(res['Processes'][0][10], 'sleep 60')
self.client.kill(id)
class RestartContainerTest(api_test.BaseTestCase):
def test_restart(self):
container = self.client.create_container(BUSYBOX, ['sleep', '9999'])
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
info = self.client.inspect_container(id)
self.assertIn('State', info)
self.assertIn('StartedAt', info['State'])
start_time1 = info['State']['StartedAt']
self.client.restart(id, timeout=2)
info2 = self.client.inspect_container(id)
self.assertIn('State', info2)
self.assertIn('StartedAt', info2['State'])
start_time2 = info2['State']['StartedAt']
self.assertNotEqual(start_time1, start_time2)
self.assertIn('Running', info2['State'])
self.assertEqual(info2['State']['Running'], True)
self.client.kill(id)
def test_restart_with_dict_instead_of_id(self):
container = self.client.create_container(BUSYBOX, ['sleep', '9999'])
self.assertIn('Id', container)
id = container['Id']
self.client.start(container)
self.tmp_containers.append(id)
info = self.client.inspect_container(id)
self.assertIn('State', info)
self.assertIn('StartedAt', info['State'])
start_time1 = info['State']['StartedAt']
self.client.restart(container, timeout=2)
info2 = self.client.inspect_container(id)
self.assertIn('State', info2)
self.assertIn('StartedAt', info2['State'])
start_time2 = info2['State']['StartedAt']
self.assertNotEqual(start_time1, start_time2)
self.assertIn('Running', info2['State'])
self.assertEqual(info2['State']['Running'], True)
self.client.kill(id)
class RemoveContainerTest(api_test.BaseTestCase):
def test_remove(self):
container = self.client.create_container(BUSYBOX, ['true'])
id = container['Id']
self.client.start(id)
self.client.wait(id)
self.client.remove_container(id)
containers = self.client.containers(all=True)
res = [x for x in containers if 'Id' in x and x['Id'].startswith(id)]
self.assertEqual(len(res), 0)
def test_remove_with_dict_instead_of_id(self):
container = self.client.create_container(BUSYBOX, ['true'])
id = container['Id']
self.client.start(id)
self.client.wait(id)
self.client.remove_container(container)
containers = self.client.containers(all=True)
res = [x for x in containers if 'Id' in x and x['Id'].startswith(id)]
self.assertEqual(len(res), 0)
class AttachContainerTest(api_test.BaseTestCase):
def test_run_container_streaming(self):
container = self.client.create_container(BUSYBOX, '/bin/sh',
detach=True, stdin_open=True)
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
sock = self.client.attach_socket(container, ws=False)
self.assertTrue(sock.fileno() > -1)
def test_run_container_reading_socket(self):
line = 'hi there and stuff and things, words!'
command = "echo '{0}'".format(line)
container = self.client.create_container(BUSYBOX, command,
detach=True, tty=False)
ident = container['Id']
self.tmp_containers.append(ident)
opts = {"stdout": 1, "stream": 1, "logs": 1}
pty_stdout = self.client.attach_socket(ident, opts)
self.client.start(ident)
recoverable_errors = (errno.EINTR, errno.EDEADLK, errno.EWOULDBLOCK)
def read(n=4096):
"""Code stolen from dockerpty to read the socket"""
try:
if hasattr(pty_stdout, 'recv'):
return pty_stdout.recv(n)
return os.read(pty_stdout.fileno(), n)
except EnvironmentError as e:
if e.errno not in recoverable_errors:
raise
def next_packet_size():
"""Code stolen from dockerpty to get the next packet size"""
data = six.binary_type()
while len(data) < 8:
next_data = read(8 - len(data))
if not next_data:
return 0
data = data + next_data
if data is None:
return 0
if len(data) == 8:
_, actual = struct.unpack('>BxxxL', data)
return actual
next_size = next_packet_size()
self.assertEqual(next_size, len(line) + 1)
data = six.binary_type()
while len(data) < next_size:
next_data = read(next_size - len(data))
if not next_data:
assert False, "Failed trying to read in the dataz"
data += next_data
self.assertEqual(data.decode('utf-8'), "{0}\n".format(line))
pty_stdout.close()
# Prevent segfault at the end of the test run
if hasattr(pty_stdout, "_response"):
del pty_stdout._response
class PauseTest(api_test.BaseTestCase):
def test_pause_unpause(self):
container = self.client.create_container(BUSYBOX, ['sleep', '9999'])
id = container['Id']
self.tmp_containers.append(id)
self.client.start(container)
self.client.pause(id)
container_info = self.client.inspect_container(id)
self.assertIn('State', container_info)
state = container_info['State']
self.assertIn('ExitCode', state)
self.assertEqual(state['ExitCode'], 0)
self.assertIn('Running', state)
self.assertEqual(state['Running'], True)
self.assertIn('Paused', state)
self.assertEqual(state['Paused'], True)
self.client.unpause(id)
container_info = self.client.inspect_container(id)
self.assertIn('State', container_info)
state = container_info['State']
self.assertIn('ExitCode', state)
self.assertEqual(state['ExitCode'], 0)
self.assertIn('Running', state)
self.assertEqual(state['Running'], True)
self.assertIn('Paused', state)
self.assertEqual(state['Paused'], False)

View File

@ -0,0 +1,106 @@
import pytest
from . import api_test
BUSYBOX = api_test.BUSYBOX
class ExecTest(api_test.BaseTestCase):
def test_execute_command(self):
if not api_test.exec_driver_is_native():
pytest.skip('Exec driver not native')
container = self.client.create_container(BUSYBOX, 'cat',
detach=True, stdin_open=True)
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
res = self.client.exec_create(id, ['echo', 'hello'])
self.assertIn('Id', res)
exec_log = self.client.exec_start(res)
self.assertEqual(exec_log, b'hello\n')
def test_exec_command_string(self):
if not api_test.exec_driver_is_native():
pytest.skip('Exec driver not native')
container = self.client.create_container(BUSYBOX, 'cat',
detach=True, stdin_open=True)
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
res = self.client.exec_create(id, 'echo hello world')
self.assertIn('Id', res)
exec_log = self.client.exec_start(res)
self.assertEqual(exec_log, b'hello world\n')
def test_exec_command_as_user(self):
if not api_test.exec_driver_is_native():
pytest.skip('Exec driver not native')
container = self.client.create_container(BUSYBOX, 'cat',
detach=True, stdin_open=True)
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
res = self.client.exec_create(id, 'whoami', user='default')
self.assertIn('Id', res)
exec_log = self.client.exec_start(res)
self.assertEqual(exec_log, b'default\n')
def test_exec_command_as_root(self):
if not api_test.exec_driver_is_native():
pytest.skip('Exec driver not native')
container = self.client.create_container(BUSYBOX, 'cat',
detach=True, stdin_open=True)
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
res = self.client.exec_create(id, 'whoami')
self.assertIn('Id', res)
exec_log = self.client.exec_start(res)
self.assertEqual(exec_log, b'root\n')
def test_exec_command_streaming(self):
if not api_test.exec_driver_is_native():
pytest.skip('Exec driver not native')
container = self.client.create_container(BUSYBOX, 'cat',
detach=True, stdin_open=True)
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
exec_id = self.client.exec_create(id, ['echo', 'hello\nworld'])
self.assertIn('Id', exec_id)
res = b''
for chunk in self.client.exec_start(exec_id, stream=True):
res += chunk
self.assertEqual(res, b'hello\nworld\n')
def test_exec_inspect(self):
if not api_test.exec_driver_is_native():
pytest.skip('Exec driver not native')
container = self.client.create_container(BUSYBOX, 'cat',
detach=True, stdin_open=True)
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
exec_id = self.client.exec_create(id, ['mkdir', '/does/not/exist'])
self.assertIn('Id', exec_id)
self.client.exec_start(exec_id)
exec_info = self.client.exec_inspect(exec_id)
self.assertIn('ExitCode', exec_info)
self.assertNotEqual(exec_info['ExitCode'], 0)

View File

@ -0,0 +1,235 @@
import contextlib
import json
import shutil
import socket
import tarfile
import tempfile
import threading
import pytest
import six
from six.moves import BaseHTTPServer
from six.moves import socketserver
import docker
from . import api_test
BUSYBOX = api_test.BUSYBOX
class ListImagesTest(api_test.BaseTestCase):
def test_images(self):
res1 = self.client.images(all=True)
self.assertIn('Id', res1[0])
res10 = res1[0]
self.assertIn('Created', res10)
self.assertIn('RepoTags', res10)
distinct = []
for img in res1:
if img['Id'] not in distinct:
distinct.append(img['Id'])
self.assertEqual(len(distinct), self.client.info()['Images'])
def test_images_quiet(self):
res1 = self.client.images(quiet=True)
self.assertEqual(type(res1[0]), six.text_type)
class PullImageTest(api_test.BaseTestCase):
def test_pull(self):
try:
self.client.remove_image('hello-world')
except docker.errors.APIError:
pass
res = self.client.pull('hello-world')
self.tmp_imgs.append('hello-world')
self.assertEqual(type(res), six.text_type)
self.assertGreaterEqual(
len(self.client.images('hello-world')), 1
)
img_info = self.client.inspect_image('hello-world')
self.assertIn('Id', img_info)
def test_pull_streaming(self):
try:
self.client.remove_image('hello-world')
except docker.errors.APIError:
pass
stream = self.client.pull('hello-world', stream=True)
self.tmp_imgs.append('hello-world')
for chunk in stream:
if six.PY3:
chunk = chunk.decode('utf-8')
json.loads(chunk) # ensure chunk is a single, valid JSON blob
self.assertGreaterEqual(
len(self.client.images('hello-world')), 1
)
img_info = self.client.inspect_image('hello-world')
self.assertIn('Id', img_info)
class CommitTest(api_test.BaseTestCase):
def test_commit(self):
container = self.client.create_container(BUSYBOX, ['touch', '/test'])
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
res = self.client.commit(id)
self.assertIn('Id', res)
img_id = res['Id']
self.tmp_imgs.append(img_id)
img = self.client.inspect_image(img_id)
self.assertIn('Container', img)
self.assertTrue(img['Container'].startswith(id))
self.assertIn('ContainerConfig', img)
self.assertIn('Image', img['ContainerConfig'])
self.assertEqual(BUSYBOX, img['ContainerConfig']['Image'])
busybox_id = self.client.inspect_image(BUSYBOX)['Id']
self.assertIn('Parent', img)
self.assertEqual(img['Parent'], busybox_id)
class RemoveImageTest(api_test.BaseTestCase):
def test_remove(self):
container = self.client.create_container(BUSYBOX, ['touch', '/test'])
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
res = self.client.commit(id)
self.assertIn('Id', res)
img_id = res['Id']
self.tmp_imgs.append(img_id)
self.client.remove_image(img_id, force=True)
images = self.client.images(all=True)
res = [x for x in images if x['Id'].startswith(img_id)]
self.assertEqual(len(res), 0)
class ImportImageTest(api_test.BaseTestCase):
'''Base class for `docker import` test cases.'''
TAR_SIZE = 512 * 1024
def write_dummy_tar_content(self, n_bytes, tar_fd):
def extend_file(f, n_bytes):
f.seek(n_bytes - 1)
f.write(bytearray([65]))
f.seek(0)
tar = tarfile.TarFile(fileobj=tar_fd, mode='w')
with tempfile.NamedTemporaryFile() as f:
extend_file(f, n_bytes)
tarinfo = tar.gettarinfo(name=f.name, arcname='testdata')
tar.addfile(tarinfo, fileobj=f)
tar.close()
@contextlib.contextmanager
def dummy_tar_stream(self, n_bytes):
'''Yields a stream that is valid tar data of size n_bytes.'''
with tempfile.NamedTemporaryFile() as tar_file:
self.write_dummy_tar_content(n_bytes, tar_file)
tar_file.seek(0)
yield tar_file
@contextlib.contextmanager
def dummy_tar_file(self, n_bytes):
'''Yields the name of a valid tar file of size n_bytes.'''
with tempfile.NamedTemporaryFile() as tar_file:
self.write_dummy_tar_content(n_bytes, tar_file)
tar_file.seek(0)
yield tar_file.name
def test_import_from_bytes(self):
with self.dummy_tar_stream(n_bytes=500) as f:
content = f.read()
# The generic import_image() function cannot import in-memory bytes
# data that happens to be represented as a string type, because
# import_image() will try to use it as a filename and usually then
# trigger an exception. So we test the import_image_from_data()
# function instead.
statuses = self.client.import_image_from_data(
content, repository='test/import-from-bytes')
result_text = statuses.splitlines()[-1]
result = json.loads(result_text)
self.assertNotIn('error', result)
img_id = result['status']
self.tmp_imgs.append(img_id)
def test_import_from_file(self):
with self.dummy_tar_file(n_bytes=self.TAR_SIZE) as tar_filename:
# statuses = self.client.import_image(
# src=tar_filename, repository='test/import-from-file')
statuses = self.client.import_image_from_file(
tar_filename, repository='test/import-from-file')
result_text = statuses.splitlines()[-1]
result = json.loads(result_text)
self.assertNotIn('error', result)
self.assertIn('status', result)
img_id = result['status']
self.tmp_imgs.append(img_id)
def test_import_from_stream(self):
with self.dummy_tar_stream(n_bytes=self.TAR_SIZE) as tar_stream:
statuses = self.client.import_image(
src=tar_stream, repository='test/import-from-stream')
# statuses = self.client.import_image_from_stream(
# tar_stream, repository='test/import-from-stream')
result_text = statuses.splitlines()[-1]
result = json.loads(result_text)
self.assertNotIn('error', result)
self.assertIn('status', result)
img_id = result['status']
self.tmp_imgs.append(img_id)
@contextlib.contextmanager
def temporary_http_file_server(self, stream):
'''Serve data from an IO stream over HTTP.'''
class Handler(BaseHTTPServer.BaseHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
self.send_header('Content-Type', 'application/x-tar')
self.end_headers()
shutil.copyfileobj(stream, self.wfile)
server = socketserver.TCPServer(('', 0), Handler)
thread = threading.Thread(target=server.serve_forever)
thread.setDaemon(True)
thread.start()
yield 'http://%s:%s' % (socket.gethostname(), server.server_address[1])
server.shutdown()
@pytest.mark.skipif(True, reason="Doesn't work inside a container - FIXME")
def test_import_from_url(self):
# The crappy test HTTP server doesn't handle large files well, so use
# a small file.
tar_size = 10240
with self.dummy_tar_stream(n_bytes=tar_size) as tar_data:
with self.temporary_http_file_server(tar_data) as url:
statuses = self.client.import_image(
src=url, repository='test/import-from-url')
result_text = statuses.splitlines()[-1]
result = json.loads(result_text)
self.assertNotIn('error', result)
self.assertIn('status', result)
img_id = result['status']
self.tmp_imgs.append(img_id)

View File

@ -0,0 +1,98 @@
import random
import docker
import pytest
from . import api_test
from ..base import requires_api_version
@requires_api_version('1.21')
class TestNetworks(api_test.BaseTestCase):
def create_network(self, *args, **kwargs):
net_name = 'dockerpy{}'.format(random.getrandbits(24))[:14]
net_id = self.client.create_network(net_name, *args, **kwargs)['id']
self.tmp_networks.append(net_id)
return (net_name, net_id)
def test_list_networks(self):
networks = self.client.networks()
initial_size = len(networks)
net_name, net_id = self.create_network()
networks = self.client.networks()
self.assertEqual(len(networks), initial_size + 1)
self.assertTrue(net_id in [n['id'] for n in networks])
networks_by_name = self.client.networks(names=[net_name])
self.assertEqual([n['id'] for n in networks_by_name], [net_id])
networks_by_partial_id = self.client.networks(ids=[net_id[:8]])
self.assertEqual([n['id'] for n in networks_by_partial_id], [net_id])
def test_inspect_network(self):
net_name, net_id = self.create_network()
net = self.client.inspect_network(net_id)
self.assertEqual(net, {
u'name': net_name,
u'id': net_id,
u'driver': 'bridge',
u'containers': {},
})
def test_create_network_with_host_driver_fails(self):
net_name = 'dockerpy{}'.format(random.getrandbits(24))[:14]
with pytest.raises(docker.errors.APIError):
self.client.create_network(net_name, driver='host')
def test_remove_network(self):
initial_size = len(self.client.networks())
net_name, net_id = self.create_network()
self.assertEqual(len(self.client.networks()), initial_size + 1)
self.client.remove_network(net_id)
self.assertEqual(len(self.client.networks()), initial_size)
def test_connect_and_disconnect_container(self):
net_name, net_id = self.create_network()
container = self.client.create_container('busybox', 'top')
self.tmp_containers.append(container)
self.client.start(container)
network_data = self.client.inspect_network(net_id)
self.assertFalse(network_data.get('containers'))
self.client.connect_container_to_network(container, net_id)
network_data = self.client.inspect_network(net_id)
self.assertEqual(
list(network_data['containers'].keys()),
[container['Id']])
self.client.disconnect_container_from_network(container, net_id)
network_data = self.client.inspect_network(net_id)
self.assertFalse(network_data.get('containers'))
def test_connect_on_container_create(self):
net_name, net_id = self.create_network()
container = self.client.create_container(
image='busybox',
command='top',
host_config=self.client.create_host_config(network_mode=net_name),
)
self.tmp_containers.append(container)
self.client.start(container)
network_data = self.client.inspect_network(net_id)
self.assertEqual(
list(network_data['containers'].keys()),
[container['Id']])
self.client.disconnect_container_from_network(container, net_id)
network_data = self.client.inspect_network(net_id)
self.assertFalse(network_data.get('containers'))

View File

@ -0,0 +1,69 @@
import io
import random
import docker
import six
from . import api_test
BUSYBOX = api_test.BUSYBOX
class TestRegressions(api_test.BaseTestCase):
def test_443_handle_nonchunked_response_in_stream(self):
dfile = io.BytesIO()
with self.assertRaises(docker.errors.APIError) as exc:
for line in self.client.build(fileobj=dfile, tag="a/b/c"):
pass
self.assertEqual(exc.exception.response.status_code, 500)
dfile.close()
def test_542_truncate_ids_client_side(self):
self.client.start(
self.client.create_container(BUSYBOX, ['true'])
)
result = self.client.containers(all=True, trunc=True)
self.assertEqual(len(result[0]['Id']), 12)
def test_647_support_doubleslash_in_image_names(self):
with self.assertRaises(docker.errors.APIError):
self.client.inspect_image('gensokyo.jp//kirisame')
def test_649_handle_timeout_value_none(self):
self.client.timeout = None
ctnr = self.client.create_container(BUSYBOX, ['sleep', '2'])
self.client.start(ctnr)
self.client.stop(ctnr)
def test_715_handle_user_param_as_int_value(self):
ctnr = self.client.create_container(BUSYBOX, ['id', '-u'], user=1000)
self.client.start(ctnr)
self.client.wait(ctnr)
logs = self.client.logs(ctnr)
if six.PY3:
logs = logs.decode('utf-8')
assert logs == '1000\n'
def test_792_explicit_port_protocol(self):
tcp_port, udp_port = random.sample(range(9999, 32000), 2)
ctnr = self.client.create_container(
BUSYBOX, ['sleep', '9999'], ports=[2000, (2000, 'udp')],
host_config=self.client.create_host_config(
port_bindings={'2000/tcp': tcp_port, '2000/udp': udp_port}
)
)
self.tmp_containers.append(ctnr)
self.client.start(ctnr)
self.assertEqual(
self.client.port(ctnr, 2000)[0]['HostPort'],
six.text_type(tcp_port)
)
self.assertEqual(
self.client.port(ctnr, '2000/tcp')[0]['HostPort'],
six.text_type(tcp_port)
)
self.assertEqual(
self.client.port(ctnr, '2000/udp')[0]['HostPort'],
six.text_type(udp_port)
)

View File

@ -0,0 +1,56 @@
import docker
import pytest
from . import api_test
from ..base import requires_api_version
@requires_api_version('1.21')
class TestVolumes(api_test.BaseTestCase):
def test_create_volume(self):
name = 'perfectcherryblossom'
self.tmp_volumes.append(name)
result = self.client.create_volume(name)
self.assertIn('Name', result)
self.assertEqual(result['Name'], name)
self.assertIn('Driver', result)
self.assertEqual(result['Driver'], 'local')
def test_create_volume_invalid_driver(self):
driver_name = 'invalid.driver'
with pytest.raises(docker.errors.NotFound):
self.client.create_volume('perfectcherryblossom', driver_name)
def test_list_volumes(self):
name = 'imperishablenight'
self.tmp_volumes.append(name)
volume_info = self.client.create_volume(name)
result = self.client.volumes()
self.assertIn('Volumes', result)
volumes = result['Volumes']
self.assertIn(volume_info, volumes)
def test_inspect_volume(self):
name = 'embodimentofscarletdevil'
self.tmp_volumes.append(name)
volume_info = self.client.create_volume(name)
result = self.client.inspect_volume(name)
self.assertEqual(volume_info, result)
def test_inspect_nonexistent_volume(self):
name = 'embodimentofscarletdevil'
with pytest.raises(docker.errors.NotFound):
self.client.inspect_volume(name)
def test_remove_volume(self):
name = 'shootthebullet'
self.tmp_volumes.append(name)
self.client.create_volume(name)
result = self.client.remove_volume(name)
self.assertTrue(result)
def test_remove_nonexistent_volume(self):
name = 'shootthebullet'
with pytest.raises(docker.errors.NotFound):
self.client.remove_volume(name)

File diff suppressed because it is too large Load Diff

418
tests/unit/api_test.py Normal file
View File

@ -0,0 +1,418 @@
# Copyright 2013 dotCloud inc.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import json
import os
import re
import shutil
import socket
import sys
import tempfile
import threading
import time
import docker
import requests
import six
from .. import base
from . import fake_api
import pytest
try:
from unittest import mock
except ImportError:
import mock
DEFAULT_TIMEOUT_SECONDS = docker.constants.DEFAULT_TIMEOUT_SECONDS
def response(status_code=200, content='', headers=None, reason=None, elapsed=0,
request=None):
res = requests.Response()
res.status_code = status_code
if not isinstance(content, six.binary_type):
content = json.dumps(content).encode('ascii')
res._content = content
res.headers = requests.structures.CaseInsensitiveDict(headers or {})
res.reason = reason
res.elapsed = datetime.timedelta(elapsed)
res.request = request
return res
def fake_resolve_authconfig(authconfig, registry=None):
return None
def fake_inspect_container(self, container, tty=False):
return fake_api.get_fake_inspect_container(tty=tty)[1]
def fake_resp(method, url, *args, **kwargs):
key = None
if url in fake_api.fake_responses:
key = url
elif (url, method) in fake_api.fake_responses:
key = (url, method)
if not key:
raise Exception('{0} {1}'.format(method, url))
status_code, content = fake_api.fake_responses[key]()
return response(status_code=status_code, content=content)
fake_request = mock.Mock(side_effect=fake_resp)
def fake_get(self, url, *args, **kwargs):
return fake_request('GET', url, *args, **kwargs)
def fake_post(self, url, *args, **kwargs):
return fake_request('POST', url, *args, **kwargs)
def fake_put(self, url, *args, **kwargs):
return fake_request('PUT', url, *args, **kwargs)
def fake_delete(self, url, *args, **kwargs):
return fake_request('DELETE', url, *args, **kwargs)
url_base = 'http+docker://localunixsocket/'
url_prefix = '{0}v{1}/'.format(
url_base,
docker.constants.DEFAULT_DOCKER_API_VERSION)
class DockerClientTest(base.Cleanup, base.BaseTestCase):
def setUp(self):
self.patcher = mock.patch.multiple(
'docker.Client', get=fake_get, post=fake_post, put=fake_put,
delete=fake_delete
)
self.patcher.start()
self.client = docker.Client()
# Force-clear authconfig to avoid tampering with the tests
self.client._cfg = {'Configs': {}}
def tearDown(self):
self.client.close()
self.patcher.stop()
def assertIn(self, object, collection):
if six.PY2 and sys.version_info[1] <= 6:
return self.assertTrue(object in collection)
return super(DockerClientTest, self).assertIn(object, collection)
def base_create_payload(self, img='busybox', cmd=None):
if not cmd:
cmd = ['true']
return {"Tty": False, "Image": img, "Cmd": cmd,
"AttachStdin": False,
"AttachStderr": True, "AttachStdout": True,
"StdinOnce": False,
"OpenStdin": False, "NetworkDisabled": False,
}
class DockerApiTest(DockerClientTest):
def test_ctor(self):
with pytest.raises(docker.errors.DockerException) as excinfo:
docker.Client(version=1.12)
self.assertEqual(
str(excinfo.value),
'Version parameter must be a string or None. Found float'
)
def test_url_valid_resource(self):
url = self.client._url('/hello/{0}/world', 'somename')
self.assertEqual(
url, '{0}{1}'.format(url_prefix, 'hello/somename/world')
)
url = self.client._url(
'/hello/{0}/world/{1}', 'somename', 'someothername'
)
self.assertEqual(
url,
'{0}{1}'.format(url_prefix, 'hello/somename/world/someothername')
)
url = self.client._url('/hello/{0}/world', '/some?name')
self.assertEqual(
url, '{0}{1}'.format(url_prefix, 'hello/%2Fsome%3Fname/world')
)
def test_url_invalid_resource(self):
with pytest.raises(ValueError):
self.client._url('/hello/{0}/world', ['sakuya', 'izayoi'])
def test_url_no_resource(self):
url = self.client._url('/simple')
self.assertEqual(url, '{0}{1}'.format(url_prefix, 'simple'))
def test_url_unversioned_api(self):
url = self.client._url(
'/hello/{0}/world', 'somename', versioned_api=False
)
self.assertEqual(
url, '{0}{1}'.format(url_base, 'hello/somename/world')
)
def test_version(self):
self.client.version()
fake_request.assert_called_with(
'GET',
url_prefix + 'version',
timeout=DEFAULT_TIMEOUT_SECONDS
)
def test_version_no_api_version(self):
self.client.version(False)
fake_request.assert_called_with(
'GET',
url_base + 'version',
timeout=DEFAULT_TIMEOUT_SECONDS
)
def test_retrieve_server_version(self):
client = docker.Client(version="auto")
self.assertTrue(isinstance(client._version, six.string_types))
self.assertFalse(client._version == "auto")
client.close()
def test_auto_retrieve_server_version(self):
version = self.client._retrieve_server_version()
self.assertTrue(isinstance(version, six.string_types))
def test_info(self):
self.client.info()
fake_request.assert_called_with(
'GET',
url_prefix + 'info',
timeout=DEFAULT_TIMEOUT_SECONDS
)
def test_search(self):
self.client.search('busybox')
fake_request.assert_called_with(
'GET',
url_prefix + 'images/search',
params={'term': 'busybox'},
timeout=DEFAULT_TIMEOUT_SECONDS
)
def test_events(self):
self.client.events()
fake_request.assert_called_with(
'GET',
url_prefix + 'events',
params={'since': None, 'until': None, 'filters': None},
stream=True
)
def test_events_with_since_until(self):
ts = 1356048000
now = datetime.datetime.utcfromtimestamp(ts)
since = now - datetime.timedelta(seconds=10)
until = now + datetime.timedelta(seconds=10)
self.client.events(since=since, until=until)
fake_request.assert_called_with(
'GET',
url_prefix + 'events',
params={
'since': ts - 10,
'until': ts + 10,
'filters': None
},
stream=True
)
def test_events_with_filters(self):
filters = {'event': ['die', 'stop'],
'container': fake_api.FAKE_CONTAINER_ID}
self.client.events(filters=filters)
expected_filters = docker.utils.convert_filters(filters)
fake_request.assert_called_with(
'GET',
url_prefix + 'events',
params={
'since': None,
'until': None,
'filters': expected_filters
},
stream=True
)
def _socket_path_for_client_session(self, client):
socket_adapter = client.get_adapter('http+docker://')
return socket_adapter.socket_path
def test_url_compatibility_unix(self):
c = docker.Client(base_url="unix://socket")
assert self._socket_path_for_client_session(c) == '/socket'
def test_url_compatibility_unix_triple_slash(self):
c = docker.Client(base_url="unix:///socket")
assert self._socket_path_for_client_session(c) == '/socket'
def test_url_compatibility_http_unix_triple_slash(self):
c = docker.Client(base_url="http+unix:///socket")
assert self._socket_path_for_client_session(c) == '/socket'
def test_url_compatibility_http(self):
c = docker.Client(base_url="http://hostname:1234")
assert c.base_url == "http://hostname:1234"
def test_url_compatibility_tcp(self):
c = docker.Client(base_url="tcp://hostname:1234")
assert c.base_url == "http://hostname:1234"
def test_remove_link(self):
self.client.remove_container(fake_api.FAKE_CONTAINER_ID, link=True)
fake_request.assert_called_with(
'DELETE',
url_prefix + 'containers/3cc2351ab11b',
params={'v': False, 'link': True, 'force': False},
timeout=DEFAULT_TIMEOUT_SECONDS
)
def test_create_host_config_secopt(self):
security_opt = ['apparmor:test_profile']
result = self.client.create_host_config(security_opt=security_opt)
self.assertIn('SecurityOpt', result)
self.assertEqual(result['SecurityOpt'], security_opt)
self.assertRaises(
docker.errors.DockerException, self.client.create_host_config,
security_opt='wrong'
)
class StreamTest(base.Cleanup, base.BaseTestCase):
def setUp(self):
socket_dir = tempfile.mkdtemp()
self.build_context = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, socket_dir)
self.addCleanup(shutil.rmtree, self.build_context)
self.socket_file = os.path.join(socket_dir, 'test_sock.sock')
self.server_socket = self._setup_socket()
self.stop_server = False
server_thread = threading.Thread(target=self.run_server)
server_thread.setDaemon(True)
server_thread.start()
self.response = None
self.request_handler = None
self.addCleanup(server_thread.join)
self.addCleanup(self.stop)
def stop(self):
self.stop_server = True
def _setup_socket(self):
server_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
server_sock.bind(self.socket_file)
# Non-blocking mode so that we can shut the test down easily
server_sock.setblocking(0)
server_sock.listen(5)
return server_sock
def run_server(self):
try:
while not self.stop_server:
try:
connection, client_address = self.server_socket.accept()
except socket.error:
# Probably no connection to accept yet
time.sleep(0.01)
continue
connection.setblocking(1)
try:
self.request_handler(connection)
finally:
connection.close()
finally:
self.server_socket.close()
def early_response_sending_handler(self, connection):
data = b''
headers = None
connection.sendall(self.response)
while not headers:
data += connection.recv(2048)
parts = data.split(b'\r\n\r\n', 1)
if len(parts) == 2:
headers, data = parts
mo = re.search(r'Content-Length: ([0-9]+)', headers.decode())
assert mo
content_length = int(mo.group(1))
while True:
if len(data) >= content_length:
break
data += connection.recv(2048)
def test_early_stream_response(self):
self.request_handler = self.early_response_sending_handler
lines = []
for i in range(0, 50):
line = str(i).encode()
lines += [('%x' % len(line)).encode(), line]
lines.append(b'0')
lines.append(b'')
self.response = (
b'HTTP/1.1 200 OK\r\n'
b'Transfer-Encoding: chunked\r\n'
b'\r\n'
) + b'\r\n'.join(lines)
with docker.Client(base_url="http+unix://" + self.socket_file) \
as client:
for i in range(5):
try:
stream = client.build(
path=self.build_context,
stream=True
)
break
except requests.ConnectionError as e:
if i == 4:
raise e
self.assertEqual(list(stream), [
str(i).encode() for i in range(50)])

318
tests/unit/auth_test.py Normal file
View File

@ -0,0 +1,318 @@
# -*- coding: utf-8 -*-
import base64
import json
import os
import os.path
import random
import shutil
import tempfile
from docker import auth
from .. import base
try:
from unittest import mock
except ImportError:
import mock
class RegressionTest(base.BaseTestCase):
def test_803_urlsafe_encode(self):
auth_data = {
'username': 'root',
'password': 'GR?XGR?XGR?XGR?X'
}
encoded = auth.encode_header(auth_data)
assert b'/' not in encoded
assert b'_' in encoded
class ResolveAuthTest(base.BaseTestCase):
auth_config = {
'https://index.docker.io/v1/': {'auth': 'indexuser'},
'my.registry.net': {'auth': 'privateuser'},
'http://legacy.registry.url/v1/': {'auth': 'legacyauth'}
}
def test_resolve_repository_name_hub_library_image(self):
self.assertEqual(
auth.resolve_repository_name('image'),
('index.docker.io', 'image'),
)
def test_resolve_repository_name_hub_image(self):
self.assertEqual(
auth.resolve_repository_name('username/image'),
('index.docker.io', 'username/image'),
)
def test_resolve_repository_name_private_registry(self):
self.assertEqual(
auth.resolve_repository_name('my.registry.net/image'),
('my.registry.net', 'image'),
)
def test_resolve_repository_name_private_registry_with_port(self):
self.assertEqual(
auth.resolve_repository_name('my.registry.net:5000/image'),
('my.registry.net:5000', 'image'),
)
def test_resolve_repository_name_private_registry_with_username(self):
self.assertEqual(
auth.resolve_repository_name('my.registry.net/username/image'),
('my.registry.net', 'username/image'),
)
def test_resolve_repository_name_no_dots_but_port(self):
self.assertEqual(
auth.resolve_repository_name('hostname:5000/image'),
('hostname:5000', 'image'),
)
def test_resolve_repository_name_no_dots_but_port_and_username(self):
self.assertEqual(
auth.resolve_repository_name('hostname:5000/username/image'),
('hostname:5000', 'username/image'),
)
def test_resolve_repository_name_localhost(self):
self.assertEqual(
auth.resolve_repository_name('localhost/image'),
('localhost', 'image'),
)
def test_resolve_repository_name_localhost_with_username(self):
self.assertEqual(
auth.resolve_repository_name('localhost/username/image'),
('localhost', 'username/image'),
)
def test_resolve_authconfig_hostname_only(self):
self.assertEqual(
auth.resolve_authconfig(self.auth_config, 'my.registry.net'),
{'auth': 'privateuser'}
)
def test_resolve_authconfig_no_protocol(self):
self.assertEqual(
auth.resolve_authconfig(self.auth_config, 'my.registry.net/v1/'),
{'auth': 'privateuser'}
)
def test_resolve_authconfig_no_path(self):
self.assertEqual(
auth.resolve_authconfig(
self.auth_config, 'http://my.registry.net'
),
{'auth': 'privateuser'}
)
def test_resolve_authconfig_no_path_trailing_slash(self):
self.assertEqual(
auth.resolve_authconfig(
self.auth_config, 'http://my.registry.net/'
),
{'auth': 'privateuser'}
)
def test_resolve_authconfig_no_path_wrong_secure_proto(self):
self.assertEqual(
auth.resolve_authconfig(
self.auth_config, 'https://my.registry.net'
),
{'auth': 'privateuser'}
)
def test_resolve_authconfig_no_path_wrong_insecure_proto(self):
self.assertEqual(
auth.resolve_authconfig(
self.auth_config, 'http://index.docker.io'
),
{'auth': 'indexuser'}
)
def test_resolve_authconfig_path_wrong_proto(self):
self.assertEqual(
auth.resolve_authconfig(
self.auth_config, 'https://my.registry.net/v1/'
),
{'auth': 'privateuser'}
)
def test_resolve_authconfig_default_registry(self):
self.assertEqual(
auth.resolve_authconfig(self.auth_config), {'auth': 'indexuser'}
)
def test_resolve_authconfig_default_explicit_none(self):
self.assertEqual(
auth.resolve_authconfig(self.auth_config, None),
{'auth': 'indexuser'}
)
def test_resolve_authconfig_fully_explicit(self):
self.assertEqual(
auth.resolve_authconfig(
self.auth_config, 'http://my.registry.net/v1/'
),
{'auth': 'privateuser'}
)
def test_resolve_authconfig_legacy_config(self):
self.assertEqual(
auth.resolve_authconfig(self.auth_config, 'legacy.registry.url'),
{'auth': 'legacyauth'}
)
def test_resolve_authconfig_no_match(self):
self.assertTrue(
auth.resolve_authconfig(self.auth_config, 'does.not.exist') is None
)
def test_resolve_registry_and_auth_library_image(self):
image = 'image'
self.assertEqual(
auth.resolve_authconfig(
self.auth_config, auth.resolve_repository_name(image)[0]
),
{'auth': 'indexuser'},
)
def test_resolve_registry_and_auth_hub_image(self):
image = 'username/image'
self.assertEqual(
auth.resolve_authconfig(
self.auth_config, auth.resolve_repository_name(image)[0]
),
{'auth': 'indexuser'},
)
def test_resolve_registry_and_auth_private_registry(self):
image = 'my.registry.net/image'
self.assertEqual(
auth.resolve_authconfig(
self.auth_config, auth.resolve_repository_name(image)[0]
),
{'auth': 'privateuser'},
)
def test_resolve_registry_and_auth_unauthenticated_registry(self):
image = 'other.registry.net/image'
self.assertEqual(
auth.resolve_authconfig(
self.auth_config, auth.resolve_repository_name(image)[0]
),
None,
)
class LoadConfigTest(base.Cleanup, base.BaseTestCase):
def test_load_config_no_file(self):
folder = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, folder)
cfg = auth.load_config(folder)
self.assertTrue(cfg is not None)
def test_load_config(self):
folder = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, folder)
dockercfg_path = os.path.join(folder, '.dockercfg')
with open(dockercfg_path, 'w') as f:
auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii')
f.write('auth = {0}\n'.format(auth_))
f.write('email = sakuya@scarlet.net')
cfg = auth.load_config(dockercfg_path)
assert auth.INDEX_NAME in cfg
self.assertNotEqual(cfg[auth.INDEX_NAME], None)
cfg = cfg[auth.INDEX_NAME]
self.assertEqual(cfg['username'], 'sakuya')
self.assertEqual(cfg['password'], 'izayoi')
self.assertEqual(cfg['email'], 'sakuya@scarlet.net')
self.assertEqual(cfg.get('auth'), None)
def test_load_config_with_random_name(self):
folder = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, folder)
dockercfg_path = os.path.join(folder,
'.{0}.dockercfg'.format(
random.randrange(100000)))
registry = 'https://your.private.registry.io'
auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii')
config = {
registry: {
'auth': '{0}'.format(auth_),
'email': 'sakuya@scarlet.net'
}
}
with open(dockercfg_path, 'w') as f:
json.dump(config, f)
cfg = auth.load_config(dockercfg_path)
assert registry in cfg
self.assertNotEqual(cfg[registry], None)
cfg = cfg[registry]
self.assertEqual(cfg['username'], 'sakuya')
self.assertEqual(cfg['password'], 'izayoi')
self.assertEqual(cfg['email'], 'sakuya@scarlet.net')
self.assertEqual(cfg.get('auth'), None)
def test_load_config_custom_config_env(self):
folder = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, folder)
dockercfg_path = os.path.join(folder, 'config.json')
registry = 'https://your.private.registry.io'
auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii')
config = {
registry: {
'auth': '{0}'.format(auth_),
'email': 'sakuya@scarlet.net'
}
}
with open(dockercfg_path, 'w') as f:
json.dump(config, f)
with mock.patch.dict(os.environ, {'DOCKER_CONFIG': folder}):
cfg = auth.load_config(None)
assert registry in cfg
self.assertNotEqual(cfg[registry], None)
cfg = cfg[registry]
self.assertEqual(cfg['username'], 'sakuya')
self.assertEqual(cfg['password'], 'izayoi')
self.assertEqual(cfg['email'], 'sakuya@scarlet.net')
self.assertEqual(cfg.get('auth'), None)
def test_load_config_custom_config_env_with_auths(self):
folder = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, folder)
dockercfg_path = os.path.join(folder, 'config.json')
registry = 'https://your.private.registry.io'
auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii')
config = {
'auths': {
registry: {
'auth': '{0}'.format(auth_),
'email': 'sakuya@scarlet.net'
}
}
}
with open(dockercfg_path, 'w') as f:
json.dump(config, f)
with mock.patch.dict(os.environ, {'DOCKER_CONFIG': folder}):
cfg = auth.load_config(None)
assert registry in cfg
self.assertNotEqual(cfg[registry], None)
cfg = cfg[registry]
self.assertEqual(cfg['username'], 'sakuya')
self.assertEqual(cfg['password'], 'izayoi')
self.assertEqual(cfg['email'], 'sakuya@scarlet.net')
self.assertEqual(cfg.get('auth'), None)

105
tests/unit/build_test.py Normal file
View File

@ -0,0 +1,105 @@
import gzip
import io
import docker
from .api_test import DockerClientTest
class BuildTest(DockerClientTest):
def test_build_container(self):
script = io.BytesIO('\n'.join([
'FROM busybox',
'MAINTAINER docker-py',
'RUN mkdir -p /tmp/test',
'EXPOSE 8080',
'ADD https://dl.dropboxusercontent.com/u/20637798/silence.tar.gz'
' /tmp/silence.tar.gz'
]).encode('ascii'))
self.client.build(fileobj=script)
def test_build_container_pull(self):
script = io.BytesIO('\n'.join([
'FROM busybox',
'MAINTAINER docker-py',
'RUN mkdir -p /tmp/test',
'EXPOSE 8080',
'ADD https://dl.dropboxusercontent.com/u/20637798/silence.tar.gz'
' /tmp/silence.tar.gz'
]).encode('ascii'))
self.client.build(fileobj=script, pull=True)
def test_build_container_stream(self):
script = io.BytesIO('\n'.join([
'FROM busybox',
'MAINTAINER docker-py',
'RUN mkdir -p /tmp/test',
'EXPOSE 8080',
'ADD https://dl.dropboxusercontent.com/u/20637798/silence.tar.gz'
' /tmp/silence.tar.gz'
]).encode('ascii'))
self.client.build(fileobj=script, stream=True)
def test_build_container_custom_context(self):
script = io.BytesIO('\n'.join([
'FROM busybox',
'MAINTAINER docker-py',
'RUN mkdir -p /tmp/test',
'EXPOSE 8080',
'ADD https://dl.dropboxusercontent.com/u/20637798/silence.tar.gz'
' /tmp/silence.tar.gz'
]).encode('ascii'))
context = docker.utils.mkbuildcontext(script)
self.client.build(fileobj=context, custom_context=True)
def test_build_container_custom_context_gzip(self):
script = io.BytesIO('\n'.join([
'FROM busybox',
'MAINTAINER docker-py',
'RUN mkdir -p /tmp/test',
'EXPOSE 8080',
'ADD https://dl.dropboxusercontent.com/u/20637798/silence.tar.gz'
' /tmp/silence.tar.gz'
]).encode('ascii'))
context = docker.utils.mkbuildcontext(script)
gz_context = gzip.GzipFile(fileobj=context)
self.client.build(
fileobj=gz_context,
custom_context=True,
encoding="gzip"
)
def test_build_remote_with_registry_auth(self):
self.client._auth_configs = {
'https://example.com': {
'user': 'example',
'password': 'example',
'email': 'example@example.com'
}
}
self.client.build(path='https://github.com/docker-library/mongo')
def test_build_container_with_named_dockerfile(self):
self.client.build('.', dockerfile='nameddockerfile')
def test_build_container_with_container_limits(self):
self.client.build('.', container_limits={
'memory': 1024 * 1024,
'cpusetcpus': 1,
'cpushares': 1000,
'memswap': 1024 * 1024 * 8
})
def test_build_container_invalid_container_limits(self):
self.assertRaises(
docker.errors.DockerException,
lambda: self.client.build('.', container_limits={
'foo': 'bar'
})
)

File diff suppressed because it is too large Load Diff

75
tests/unit/exec_test.py Normal file
View File

@ -0,0 +1,75 @@
import json
from . import fake_api
from .api_test import (
DockerClientTest, url_prefix, fake_request, DEFAULT_TIMEOUT_SECONDS,
)
class ExecTest(DockerClientTest):
def test_exec_create(self):
self.client.exec_create(fake_api.FAKE_CONTAINER_ID, ['ls', '-1'])
args = fake_request.call_args
self.assertEqual(
'POST',
args[0][0], url_prefix + 'containers/{0}/exec'.format(
fake_api.FAKE_CONTAINER_ID
)
)
self.assertEqual(
json.loads(args[1]['data']), {
'Tty': False,
'AttachStdout': True,
'Container': fake_api.FAKE_CONTAINER_ID,
'Cmd': ['ls', '-1'],
'Privileged': False,
'AttachStdin': False,
'AttachStderr': True,
'User': ''
}
)
self.assertEqual(args[1]['headers'],
{'Content-Type': 'application/json'})
def test_exec_start(self):
self.client.exec_start(fake_api.FAKE_EXEC_ID)
args = fake_request.call_args
self.assertEqual(
args[0][1], url_prefix + 'exec/{0}/start'.format(
fake_api.FAKE_EXEC_ID
)
)
self.assertEqual(
json.loads(args[1]['data']), {
'Tty': False,
'Detach': False,
}
)
self.assertEqual(args[1]['headers'],
{'Content-Type': 'application/json'})
def test_exec_inspect(self):
self.client.exec_inspect(fake_api.FAKE_EXEC_ID)
args = fake_request.call_args
self.assertEqual(
args[0][1], url_prefix + 'exec/{0}/json'.format(
fake_api.FAKE_EXEC_ID
)
)
def test_exec_resize(self):
self.client.exec_resize(fake_api.FAKE_EXEC_ID, height=20, width=60)
fake_request.assert_called_with(
'POST',
url_prefix + 'exec/{0}/resize'.format(fake_api.FAKE_EXEC_ID),
params={'h': 20, 'w': 60},
timeout=DEFAULT_TIMEOUT_SECONDS
)

View File

@ -529,7 +529,7 @@ fake_responses = {
get_fake_events,
('{1}/{0}/volumes'.format(CURRENT_VERSION, prefix), 'GET'):
get_fake_volume_list,
('{1}/{0}/volumes'.format(CURRENT_VERSION, prefix), 'POST'):
('{1}/{0}/volumes/create'.format(CURRENT_VERSION, prefix), 'POST'):
get_fake_volume,
('{1}/{0}/volumes/{2}'.format(
CURRENT_VERSION, prefix, FAKE_VOLUME_NAME

346
tests/unit/image_test.py Normal file
View File

@ -0,0 +1,346 @@
import docker
import pytest
from . import fake_api
from .api_test import (
DockerClientTest, fake_request, DEFAULT_TIMEOUT_SECONDS, url_prefix,
fake_resolve_authconfig
)
try:
from unittest import mock
except ImportError:
import mock
class ImageTest(DockerClientTest):
def test_image_viz(self):
with pytest.raises(Exception):
self.client.images('busybox', viz=True)
self.fail('Viz output should not be supported!')
def test_images(self):
self.client.images(all=True)
fake_request.assert_called_with(
'GET',
url_prefix + 'images/json',
params={'filter': None, 'only_ids': 0, 'all': 1},
timeout=DEFAULT_TIMEOUT_SECONDS
)
def test_images_quiet(self):
self.client.images(all=True, quiet=True)
fake_request.assert_called_with(
'GET',
url_prefix + 'images/json',
params={'filter': None, 'only_ids': 1, 'all': 1},
timeout=DEFAULT_TIMEOUT_SECONDS
)
def test_image_ids(self):
self.client.images(quiet=True)
fake_request.assert_called_with(
'GET',
url_prefix + 'images/json',
params={'filter': None, 'only_ids': 1, 'all': 0},
timeout=DEFAULT_TIMEOUT_SECONDS
)
def test_images_filters(self):
self.client.images(filters={'dangling': True})
fake_request.assert_called_with(
'GET',
url_prefix + 'images/json',
params={'filter': None, 'only_ids': 0, 'all': 0,
'filters': '{"dangling": ["true"]}'},
timeout=DEFAULT_TIMEOUT_SECONDS
)
def test_pull(self):
self.client.pull('joffrey/test001')
args = fake_request.call_args
self.assertEqual(
args[0][1],
url_prefix + 'images/create'
)
self.assertEqual(
args[1]['params'],
{'tag': None, 'fromImage': 'joffrey/test001'}
)
self.assertFalse(args[1]['stream'])
def test_pull_stream(self):
self.client.pull('joffrey/test001', stream=True)
args = fake_request.call_args
self.assertEqual(
args[0][1],
url_prefix + 'images/create'
)
self.assertEqual(
args[1]['params'],
{'tag': None, 'fromImage': 'joffrey/test001'}
)
self.assertTrue(args[1]['stream'])
def test_commit(self):
self.client.commit(fake_api.FAKE_CONTAINER_ID)
fake_request.assert_called_with(
'POST',
url_prefix + 'commit',
data='{}',
headers={'Content-Type': 'application/json'},
params={
'repo': None,
'comment': None,
'tag': None,
'container': '3cc2351ab11b',
'author': None
},
timeout=DEFAULT_TIMEOUT_SECONDS
)
def test_remove_image(self):
self.client.remove_image(fake_api.FAKE_IMAGE_ID)
fake_request.assert_called_with(
'DELETE',
url_prefix + 'images/e9aa60c60128',
params={'force': False, 'noprune': False},
timeout=DEFAULT_TIMEOUT_SECONDS
)
def test_image_history(self):
self.client.history(fake_api.FAKE_IMAGE_NAME)
fake_request.assert_called_with(
'GET',
url_prefix + 'images/test_image/history',
timeout=DEFAULT_TIMEOUT_SECONDS
)
def test_import_image(self):
self.client.import_image(
fake_api.FAKE_TARBALL_PATH,
repository=fake_api.FAKE_REPO_NAME,
tag=fake_api.FAKE_TAG_NAME
)
fake_request.assert_called_with(
'POST',
url_prefix + 'images/create',
params={
'repo': fake_api.FAKE_REPO_NAME,
'tag': fake_api.FAKE_TAG_NAME,
'fromSrc': fake_api.FAKE_TARBALL_PATH
},
data=None,
timeout=DEFAULT_TIMEOUT_SECONDS
)
def test_import_image_from_bytes(self):
stream = (i for i in range(0, 100))
self.client.import_image(
stream,
repository=fake_api.FAKE_REPO_NAME,
tag=fake_api.FAKE_TAG_NAME
)
fake_request.assert_called_with(
'POST',
url_prefix + 'images/create',
params={
'repo': fake_api.FAKE_REPO_NAME,
'tag': fake_api.FAKE_TAG_NAME,
'fromSrc': '-',
},
headers={
'Content-Type': 'application/tar',
},
data=stream,
timeout=DEFAULT_TIMEOUT_SECONDS
)
def test_import_image_from_image(self):
self.client.import_image(
image=fake_api.FAKE_IMAGE_NAME,
repository=fake_api.FAKE_REPO_NAME,
tag=fake_api.FAKE_TAG_NAME
)
fake_request.assert_called_with(
'POST',
url_prefix + 'images/create',
params={
'repo': fake_api.FAKE_REPO_NAME,
'tag': fake_api.FAKE_TAG_NAME,
'fromImage': fake_api.FAKE_IMAGE_NAME
},
data=None,
timeout=DEFAULT_TIMEOUT_SECONDS
)
def test_inspect_image(self):
self.client.inspect_image(fake_api.FAKE_IMAGE_NAME)
fake_request.assert_called_with(
'GET',
url_prefix + 'images/test_image/json',
timeout=DEFAULT_TIMEOUT_SECONDS
)
def test_inspect_image_undefined_id(self):
for arg in None, '', {True: True}:
with pytest.raises(docker.errors.NullResource) as excinfo:
self.client.inspect_image(arg)
self.assertEqual(
excinfo.value.args[0], 'image or container param is undefined'
)
def test_insert_image(self):
try:
self.client.insert(fake_api.FAKE_IMAGE_NAME,
fake_api.FAKE_URL, fake_api.FAKE_PATH)
except docker.errors.DeprecatedMethod:
self.assertTrue(
docker.utils.compare_version('1.12', self.client._version) >= 0
)
return
fake_request.assert_called_with(
'POST',
url_prefix + 'images/test_image/insert',
params={
'url': fake_api.FAKE_URL,
'path': fake_api.FAKE_PATH
},
timeout=DEFAULT_TIMEOUT_SECONDS
)
def test_push_image(self):
with mock.patch('docker.auth.auth.resolve_authconfig',
fake_resolve_authconfig):
self.client.push(fake_api.FAKE_IMAGE_NAME)
fake_request.assert_called_with(
'POST',
url_prefix + 'images/test_image/push',
params={
'tag': None
},
data='{}',
headers={'Content-Type': 'application/json'},
stream=False,
timeout=DEFAULT_TIMEOUT_SECONDS
)
def test_push_image_with_tag(self):
with mock.patch('docker.auth.auth.resolve_authconfig',
fake_resolve_authconfig):
self.client.push(
fake_api.FAKE_IMAGE_NAME, tag=fake_api.FAKE_TAG_NAME
)
fake_request.assert_called_with(
'POST',
url_prefix + 'images/test_image/push',
params={
'tag': fake_api.FAKE_TAG_NAME,
},
data='{}',
headers={'Content-Type': 'application/json'},
stream=False,
timeout=DEFAULT_TIMEOUT_SECONDS
)
def test_push_image_stream(self):
with mock.patch('docker.auth.auth.resolve_authconfig',
fake_resolve_authconfig):
self.client.push(fake_api.FAKE_IMAGE_NAME, stream=True)
fake_request.assert_called_with(
'POST',
url_prefix + 'images/test_image/push',
params={
'tag': None
},
data='{}',
headers={'Content-Type': 'application/json'},
stream=True,
timeout=DEFAULT_TIMEOUT_SECONDS
)
def test_tag_image(self):
self.client.tag(fake_api.FAKE_IMAGE_ID, fake_api.FAKE_REPO_NAME)
fake_request.assert_called_with(
'POST',
url_prefix + 'images/e9aa60c60128/tag',
params={
'tag': None,
'repo': 'repo',
'force': 0
},
timeout=DEFAULT_TIMEOUT_SECONDS
)
def test_tag_image_tag(self):
self.client.tag(
fake_api.FAKE_IMAGE_ID,
fake_api.FAKE_REPO_NAME,
tag=fake_api.FAKE_TAG_NAME
)
fake_request.assert_called_with(
'POST',
url_prefix + 'images/e9aa60c60128/tag',
params={
'tag': 'tag',
'repo': 'repo',
'force': 0
},
timeout=DEFAULT_TIMEOUT_SECONDS
)
def test_tag_image_force(self):
self.client.tag(
fake_api.FAKE_IMAGE_ID, fake_api.FAKE_REPO_NAME, force=True)
fake_request.assert_called_with(
'POST',
url_prefix + 'images/e9aa60c60128/tag',
params={
'tag': None,
'repo': 'repo',
'force': 1
},
timeout=DEFAULT_TIMEOUT_SECONDS
)
def test_get_image(self):
self.client.get_image(fake_api.FAKE_IMAGE_ID)
fake_request.assert_called_with(
'GET',
url_prefix + 'images/e9aa60c60128/get',
stream=True,
timeout=DEFAULT_TIMEOUT_SECONDS
)
def test_load_image(self):
self.client.load_image('Byte Stream....')
fake_request.assert_called_with(
'POST',
url_prefix + 'images/load',
data='Byte Stream....',
timeout=DEFAULT_TIMEOUT_SECONDS
)

149
tests/unit/network_test.py Normal file
View File

@ -0,0 +1,149 @@
import json
import six
from .. import base
from .api_test import DockerClientTest, url_prefix, response
try:
from unittest import mock
except ImportError:
import mock
class NetworkTest(DockerClientTest):
@base.requires_api_version('1.21')
def test_list_networks(self):
networks = [
{
"name": "none",
"id": "8e4e55c6863ef424",
"type": "null",
"endpoints": []
},
{
"name": "host",
"id": "062b6d9ea7913fde",
"type": "host",
"endpoints": []
},
]
get = mock.Mock(return_value=response(
status_code=200, content=json.dumps(networks).encode('utf-8')))
with mock.patch('docker.Client.get', get):
self.assertEqual(self.client.networks(), networks)
self.assertEqual(get.call_args[0][0], url_prefix + 'networks')
filters = json.loads(get.call_args[1]['params']['filters'])
self.assertFalse(filters)
self.client.networks(names=['foo'])
filters = json.loads(get.call_args[1]['params']['filters'])
self.assertEqual(filters, {'name': ['foo']})
self.client.networks(ids=['123'])
filters = json.loads(get.call_args[1]['params']['filters'])
self.assertEqual(filters, {'id': ['123']})
@base.requires_api_version('1.21')
def test_create_network(self):
network_data = {
"id": 'abc12345',
"warning": "",
}
network_response = response(status_code=200, content=network_data)
post = mock.Mock(return_value=network_response)
with mock.patch('docker.Client.post', post):
result = self.client.create_network('foo')
self.assertEqual(result, network_data)
self.assertEqual(
post.call_args[0][0],
url_prefix + 'networks/create')
self.assertEqual(
json.loads(post.call_args[1]['data']),
{"name": "foo"})
self.client.create_network('foo', 'bridge')
self.assertEqual(
json.loads(post.call_args[1]['data']),
{"name": "foo", "driver": "bridge"})
@base.requires_api_version('1.21')
def test_remove_network(self):
network_id = 'abc12345'
delete = mock.Mock(return_value=response(status_code=200))
with mock.patch('docker.Client.delete', delete):
self.client.remove_network(network_id)
args = delete.call_args
self.assertEqual(args[0][0],
url_prefix + 'networks/{0}'.format(network_id))
@base.requires_api_version('1.21')
def test_inspect_network(self):
network_id = 'abc12345'
network_name = 'foo'
network_data = {
six.u('name'): network_name,
six.u('id'): network_id,
six.u('driver'): 'bridge',
six.u('containers'): {},
}
network_response = response(status_code=200, content=network_data)
get = mock.Mock(return_value=network_response)
with mock.patch('docker.Client.get', get):
result = self.client.inspect_network(network_id)
self.assertEqual(result, network_data)
args = get.call_args
self.assertEqual(args[0][0],
url_prefix + 'networks/{0}'.format(network_id))
@base.requires_api_version('1.21')
def test_connect_container_to_network(self):
network_id = 'abc12345'
container_id = 'def45678'
post = mock.Mock(return_value=response(status_code=201))
with mock.patch('docker.Client.post', post):
self.client.connect_container_to_network(
{'Id': container_id}, network_id)
self.assertEqual(
post.call_args[0][0],
url_prefix + 'networks/{0}/connect'.format(network_id))
self.assertEqual(
json.loads(post.call_args[1]['data']),
{'container': container_id})
@base.requires_api_version('1.21')
def test_disconnect_container_from_network(self):
network_id = 'abc12345'
container_id = 'def45678'
post = mock.Mock(return_value=response(status_code=201))
with mock.patch('docker.Client.post', post):
self.client.disconnect_container_from_network(
{'Id': container_id}, network_id)
self.assertEqual(
post.call_args[0][0],
url_prefix + 'networks/{0}/disconnect'.format(network_id))
self.assertEqual(
json.loads(post.call_args[1]['data']),
{'container': container_id})

0
tests/unit/testdata/certs/key.pem vendored Normal file
View File

View File

@ -1,23 +1,30 @@
# -*- coding: utf-8 -*-
import base64
import json
import os
import os.path
import shutil
import tarfile
import tempfile
import pytest
import six
from docker.client import Client
from docker.constants import DEFAULT_DOCKER_API_VERSION
from docker.errors import DockerException
from docker.utils import (
parse_repository_tag, parse_host, convert_filters, kwargs_from_env,
create_host_config, Ulimit, LogConfig, parse_bytes, parse_env_file,
exclude_paths,
exclude_paths, convert_volume_binds, decode_json_header, tar,
split_command,
)
from docker.utils.ports import build_port_bindings, split_port
from docker.auth import resolve_repository_name, resolve_authconfig
from . import base
from .helpers import make_tree
from .. import base
from ..helpers import make_tree
import pytest
TEST_CERT_DIR = os.path.join(
os.path.dirname(__file__),
@ -178,9 +185,92 @@ class KwargsFromEnvTest(base.BaseTestCase):
shutil.rmtree(temp_dir)
class UtilsTest(base.BaseTestCase):
longMessage = True
class ConverVolumeBindsTest(base.BaseTestCase):
def test_convert_volume_binds_empty(self):
self.assertEqual(convert_volume_binds({}), [])
self.assertEqual(convert_volume_binds([]), [])
def test_convert_volume_binds_list(self):
data = ['/a:/a:ro', '/b:/c:z']
self.assertEqual(convert_volume_binds(data), data)
def test_convert_volume_binds_complete(self):
data = {
'/mnt/vol1': {
'bind': '/data',
'mode': 'ro'
}
}
self.assertEqual(convert_volume_binds(data), ['/mnt/vol1:/data:ro'])
def test_convert_volume_binds_compact(self):
data = {
'/mnt/vol1': '/data'
}
self.assertEqual(convert_volume_binds(data), ['/mnt/vol1:/data:rw'])
def test_convert_volume_binds_no_mode(self):
data = {
'/mnt/vol1': {
'bind': '/data'
}
}
self.assertEqual(convert_volume_binds(data), ['/mnt/vol1:/data:rw'])
def test_convert_volume_binds_unicode_bytes_input(self):
if six.PY2:
expected = [unicode('/mnt/지연:/unicode/박:rw', 'utf-8')]
data = {
'/mnt/지연': {
'bind': '/unicode/박',
'mode': 'rw'
}
}
self.assertEqual(
convert_volume_binds(data), expected
)
else:
expected = ['/mnt/지연:/unicode/박:rw']
data = {
bytes('/mnt/지연', 'utf-8'): {
'bind': bytes('/unicode/박', 'utf-8'),
'mode': 'rw'
}
}
self.assertEqual(
convert_volume_binds(data), expected
)
def test_convert_volume_binds_unicode_unicode_input(self):
if six.PY2:
expected = [unicode('/mnt/지연:/unicode/박:rw', 'utf-8')]
data = {
unicode('/mnt/지연', 'utf-8'): {
'bind': unicode('/unicode/박', 'utf-8'),
'mode': 'rw'
}
}
self.assertEqual(
convert_volume_binds(data), expected
)
else:
expected = ['/mnt/지연:/unicode/박:rw']
data = {
'/mnt/지연': {
'bind': '/unicode/박',
'mode': 'rw'
}
}
self.assertEqual(
convert_volume_binds(data), expected
)
class ParseEnvFileTest(base.BaseTestCase):
def generate_tempfile(self, file_content=None):
"""
Generates a temporary file for tests with the content
@ -192,26 +282,30 @@ class UtilsTest(base.BaseTestCase):
local_tempfile.close()
return local_tempfile.name
def test_parse_repository_tag(self):
self.assertEqual(parse_repository_tag("root"),
("root", None))
self.assertEqual(parse_repository_tag("root:tag"),
("root", "tag"))
self.assertEqual(parse_repository_tag("user/repo"),
("user/repo", None))
self.assertEqual(parse_repository_tag("user/repo:tag"),
("user/repo", "tag"))
self.assertEqual(parse_repository_tag("url:5000/repo"),
("url:5000/repo", None))
self.assertEqual(parse_repository_tag("url:5000/repo:tag"),
("url:5000/repo", "tag"))
def test_parse_env_file_proper(self):
env_file = self.generate_tempfile(
file_content='USER=jdoe\nPASS=secret')
get_parse_env_file = parse_env_file(env_file)
self.assertEqual(get_parse_env_file,
{'USER': 'jdoe', 'PASS': 'secret'})
os.unlink(env_file)
def test_parse_bytes(self):
self.assertEqual(parse_bytes("512MB"), (536870912))
self.assertEqual(parse_bytes("512M"), (536870912))
self.assertRaises(DockerException, parse_bytes, "512MK")
self.assertRaises(DockerException, parse_bytes, "512L")
def test_parse_env_file_commented_line(self):
env_file = self.generate_tempfile(
file_content='USER=jdoe\n#PASS=secret')
get_parse_env_file = parse_env_file((env_file))
self.assertEqual(get_parse_env_file, {'USER': 'jdoe'})
os.unlink(env_file)
def test_parse_env_file_invalid_line(self):
env_file = self.generate_tempfile(
file_content='USER jdoe')
self.assertRaises(
DockerException, parse_env_file, env_file)
os.unlink(env_file)
class ParseHostTest(base.BaseTestCase):
def test_parse_host(self):
invalid_hosts = [
'0.0.0.0',
@ -250,27 +344,29 @@ class UtilsTest(base.BaseTestCase):
assert parse_host(val, 'win32') == tcp_port
def test_parse_env_file_proper(self):
env_file = self.generate_tempfile(
file_content='USER=jdoe\nPASS=secret')
get_parse_env_file = parse_env_file(env_file)
self.assertEqual(get_parse_env_file,
{'USER': 'jdoe', 'PASS': 'secret'})
os.unlink(env_file)
def test_parse_env_file_commented_line(self):
env_file = self.generate_tempfile(
file_content='USER=jdoe\n#PASS=secret')
get_parse_env_file = parse_env_file((env_file))
self.assertEqual(get_parse_env_file, {'USER': 'jdoe'})
os.unlink(env_file)
class UtilsTest(base.BaseTestCase):
longMessage = True
def test_parse_env_file_invalid_line(self):
env_file = self.generate_tempfile(
file_content='USER jdoe')
self.assertRaises(
DockerException, parse_env_file, env_file)
os.unlink(env_file)
def test_parse_repository_tag(self):
self.assertEqual(parse_repository_tag("root"),
("root", None))
self.assertEqual(parse_repository_tag("root:tag"),
("root", "tag"))
self.assertEqual(parse_repository_tag("user/repo"),
("user/repo", None))
self.assertEqual(parse_repository_tag("user/repo:tag"),
("user/repo", "tag"))
self.assertEqual(parse_repository_tag("url:5000/repo"),
("url:5000/repo", None))
self.assertEqual(parse_repository_tag("url:5000/repo:tag"),
("url:5000/repo", "tag"))
def test_parse_bytes(self):
self.assertEqual(parse_bytes("512MB"), (536870912))
self.assertEqual(parse_bytes("512M"), (536870912))
self.assertRaises(DockerException, parse_bytes, "512MK")
self.assertRaises(DockerException, parse_bytes, "512L")
def test_convert_filters(self):
tests = [
@ -283,158 +379,31 @@ class UtilsTest(base.BaseTestCase):
for filters, expected in tests:
self.assertEqual(convert_filters(filters), expected)
def test_resolve_repository_name(self):
# docker hub library image
self.assertEqual(
resolve_repository_name('image'),
('index.docker.io', 'image'),
)
def test_decode_json_header(self):
obj = {'a': 'b', 'c': 1}
data = None
if six.PY3:
data = base64.urlsafe_b64encode(bytes(json.dumps(obj), 'utf-8'))
else:
data = base64.urlsafe_b64encode(json.dumps(obj))
decoded_data = decode_json_header(data)
self.assertEqual(obj, decoded_data)
# docker hub image
self.assertEqual(
resolve_repository_name('username/image'),
('index.docker.io', 'username/image'),
)
# private registry
self.assertEqual(
resolve_repository_name('my.registry.net/image'),
('my.registry.net', 'image'),
)
class SplitCommandTest(base.BaseTestCase):
# private registry with port
self.assertEqual(
resolve_repository_name('my.registry.net:5000/image'),
('my.registry.net:5000', 'image'),
)
def test_split_command_with_unicode(self):
if six.PY2:
self.assertEqual(
split_command(unicode('echo μμ', 'utf-8')),
['echo', 'μμ']
)
else:
self.assertEqual(split_command('echo μμ'), ['echo', 'μμ'])
# private registry with username
self.assertEqual(
resolve_repository_name('my.registry.net/username/image'),
('my.registry.net', 'username/image'),
)
# no dots but port
self.assertEqual(
resolve_repository_name('hostname:5000/image'),
('hostname:5000', 'image'),
)
# no dots but port and username
self.assertEqual(
resolve_repository_name('hostname:5000/username/image'),
('hostname:5000', 'username/image'),
)
# localhost
self.assertEqual(
resolve_repository_name('localhost/image'),
('localhost', 'image'),
)
# localhost with username
self.assertEqual(
resolve_repository_name('localhost/username/image'),
('localhost', 'username/image'),
)
def test_resolve_authconfig(self):
auth_config = {
'https://index.docker.io/v1/': {'auth': 'indexuser'},
'my.registry.net': {'auth': 'privateuser'},
'http://legacy.registry.url/v1/': {'auth': 'legacyauth'}
}
# hostname only
self.assertEqual(
resolve_authconfig(auth_config, 'my.registry.net'),
{'auth': 'privateuser'}
)
# no protocol
self.assertEqual(
resolve_authconfig(auth_config, 'my.registry.net/v1/'),
{'auth': 'privateuser'}
)
# no path
self.assertEqual(
resolve_authconfig(auth_config, 'http://my.registry.net'),
{'auth': 'privateuser'}
)
# no path, trailing slash
self.assertEqual(
resolve_authconfig(auth_config, 'http://my.registry.net/'),
{'auth': 'privateuser'}
)
# no path, wrong secure protocol
self.assertEqual(
resolve_authconfig(auth_config, 'https://my.registry.net'),
{'auth': 'privateuser'}
)
# no path, wrong insecure protocol
self.assertEqual(
resolve_authconfig(auth_config, 'http://index.docker.io'),
{'auth': 'indexuser'}
)
# with path, wrong protocol
self.assertEqual(
resolve_authconfig(auth_config, 'https://my.registry.net/v1/'),
{'auth': 'privateuser'}
)
# default registry
self.assertEqual(
resolve_authconfig(auth_config), {'auth': 'indexuser'}
)
# default registry (explicit None)
self.assertEqual(
resolve_authconfig(auth_config, None), {'auth': 'indexuser'}
)
# fully explicit
self.assertEqual(
resolve_authconfig(auth_config, 'http://my.registry.net/v1/'),
{'auth': 'privateuser'}
)
# legacy entry in config
self.assertEqual(
resolve_authconfig(auth_config, 'legacy.registry.url'),
{'auth': 'legacyauth'}
)
# no matching entry
self.assertTrue(
resolve_authconfig(auth_config, 'does.not.exist') is None
)
def test_resolve_registry_and_auth(self):
auth_config = {
'https://index.docker.io/v1/': {'auth': 'indexuser'},
'my.registry.net': {'auth': 'privateuser'},
}
# library image
image = 'image'
self.assertEqual(
resolve_authconfig(auth_config, resolve_repository_name(image)[0]),
{'auth': 'indexuser'},
)
# docker hub image
image = 'username/image'
self.assertEqual(
resolve_authconfig(auth_config, resolve_repository_name(image)[0]),
{'auth': 'indexuser'},
)
# private registry
image = 'my.registry.net/image'
self.assertEqual(
resolve_authconfig(auth_config, resolve_repository_name(image)[0]),
{'auth': 'privateuser'},
)
# unauthenticated registry
image = 'other.registry.net/image'
self.assertEqual(
resolve_authconfig(auth_config, resolve_repository_name(image)[0]),
None,
)
@pytest.mark.skipif(six.PY3, reason="shlex doesn't support bytes in py3")
def test_split_command_with_bytes(self):
self.assertEqual(split_command('echo μμ'), ['echo', 'μμ'])
class PortsTest(base.BaseTestCase):
@ -680,3 +649,85 @@ class ExcludePathsTest(base.BaseTestCase):
assert self.exclude(['foo/bar']) == self.all_paths - set([
'foo/bar', 'foo/bar/a.py',
])
class TarTest(base.Cleanup, base.BaseTestCase):
def test_tar_with_excludes(self):
dirs = [
'foo',
'foo/bar',
'bar',
]
files = [
'Dockerfile',
'Dockerfile.alt',
'.dockerignore',
'a.py',
'a.go',
'b.py',
'cde.py',
'foo/a.py',
'foo/b.py',
'foo/bar/a.py',
'bar/a.py',
]
exclude = [
'*.py',
'!b.py',
'!a.go',
'foo',
'Dockerfile*',
'.dockerignore',
]
expected_names = set([
'Dockerfile',
'.dockerignore',
'a.go',
'b.py',
'bar',
'bar/a.py',
])
base = make_tree(dirs, files)
self.addCleanup(shutil.rmtree, base)
with tar(base, exclude=exclude) as archive:
tar_data = tarfile.open(fileobj=archive)
assert sorted(tar_data.getnames()) == sorted(expected_names)
def test_tar_with_empty_directory(self):
base = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, base)
for d in ['foo', 'bar']:
os.makedirs(os.path.join(base, d))
with tar(base) as archive:
tar_data = tarfile.open(fileobj=archive)
self.assertEqual(sorted(tar_data.getnames()), ['bar', 'foo'])
def test_tar_with_file_symlinks(self):
base = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, base)
with open(os.path.join(base, 'foo'), 'w') as f:
f.write("content")
os.makedirs(os.path.join(base, 'bar'))
os.symlink('../foo', os.path.join(base, 'bar/foo'))
with tar(base) as archive:
tar_data = tarfile.open(fileobj=archive)
self.assertEqual(
sorted(tar_data.getnames()), ['bar', 'bar/foo', 'foo']
)
def test_tar_with_directory_symlinks(self):
base = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, base)
for d in ['foo', 'bar']:
os.makedirs(os.path.join(base, d))
os.symlink('../foo', os.path.join(base, 'bar/foo'))
with tar(base) as archive:
tar_data = tarfile.open(fileobj=archive)
self.assertEqual(
sorted(tar_data.getnames()), ['bar', 'bar/foo', 'foo']
)

85
tests/unit/volume_test.py Normal file
View File

@ -0,0 +1,85 @@
import json
import pytest
from .. import base
from .api_test import DockerClientTest, url_prefix, fake_request
class VolumeTest(DockerClientTest):
@base.requires_api_version('1.21')
def test_list_volumes(self):
volumes = self.client.volumes()
self.assertIn('Volumes', volumes)
self.assertEqual(len(volumes['Volumes']), 2)
args = fake_request.call_args
self.assertEqual(args[0][0], 'GET')
self.assertEqual(args[0][1], url_prefix + 'volumes')
@base.requires_api_version('1.21')
def test_create_volume(self):
name = 'perfectcherryblossom'
result = self.client.create_volume(name)
self.assertIn('Name', result)
self.assertEqual(result['Name'], name)
self.assertIn('Driver', result)
self.assertEqual(result['Driver'], 'local')
args = fake_request.call_args
self.assertEqual(args[0][0], 'POST')
self.assertEqual(args[0][1], url_prefix + 'volumes/create')
self.assertEqual(json.loads(args[1]['data']), {'Name': name})
@base.requires_api_version('1.21')
def test_create_volume_with_driver(self):
name = 'perfectcherryblossom'
driver_name = 'sshfs'
self.client.create_volume(name, driver=driver_name)
args = fake_request.call_args
self.assertEqual(args[0][0], 'POST')
self.assertEqual(args[0][1], url_prefix + 'volumes/create')
data = json.loads(args[1]['data'])
self.assertIn('Driver', data)
self.assertEqual(data['Driver'], driver_name)
@base.requires_api_version('1.21')
def test_create_volume_invalid_opts_type(self):
with pytest.raises(TypeError):
self.client.create_volume(
'perfectcherryblossom', driver_opts='hello=world'
)
with pytest.raises(TypeError):
self.client.create_volume(
'perfectcherryblossom', driver_opts=['hello=world']
)
with pytest.raises(TypeError):
self.client.create_volume(
'perfectcherryblossom', driver_opts=''
)
@base.requires_api_version('1.21')
def test_inspect_volume(self):
name = 'perfectcherryblossom'
result = self.client.inspect_volume(name)
self.assertIn('Name', result)
self.assertEqual(result['Name'], name)
self.assertIn('Driver', result)
self.assertEqual(result['Driver'], 'local')
args = fake_request.call_args
self.assertEqual(args[0][0], 'GET')
self.assertEqual(args[0][1], '{0}volumes/{1}'.format(url_prefix, name))
@base.requires_api_version('1.21')
def test_remove_volume(self):
name = 'perfectcherryblossom'
result = self.client.remove_volume(name)
self.assertTrue(result)
args = fake_request.call_args
self.assertEqual(args[0][0], 'DELETE')
self.assertEqual(args[0][1], '{0}volumes/{1}'.format(url_prefix, name))

View File

@ -5,7 +5,7 @@ skipsdist=True
[testenv]
usedevelop=True
commands =
py.test --cov=docker tests/test.py tests/utils_test.py
py.test --cov=docker tests/unit/
deps =
-r{toxinidir}/test-requirements.txt
-r{toxinidir}/requirements.txt