Reorganize test directories

More clearly separate unit and integration tests
Allow splitting into multiple files
Cleaner

Signed-off-by: Joffrey F <joffrey@docker.com>
This commit is contained in:
Joffrey F 2015-09-23 17:42:29 -07:00
parent 5a1c7ed8bf
commit 93a296fb04
28 changed files with 3757 additions and 3739 deletions

View File

@ -17,21 +17,21 @@ build-dind-certs:
test: flake8 unit-test unit-test-py3 integration-dind integration-dind-ssl
unit-test: build
docker run docker-py py.test tests/test.py tests/utils_test.py
docker run docker-py py.test tests/unit
unit-test-py3: build-py3
docker run docker-py3 py.test tests/test.py tests/utils_test.py
docker run docker-py3 py.test tests/unit
integration-test: build
docker run -v /var/run/docker.sock:/var/run/docker.sock docker-py py.test tests/integration_test.py
docker run -v /var/run/docker.sock:/var/run/docker.sock docker-py py.test tests/integration
integration-test-py3: build-py3
docker run -v /var/run/docker.sock:/var/run/docker.sock docker-py3 py.test tests/integration_test.py
docker run -v /var/run/docker.sock:/var/run/docker.sock docker-py3 py.test tests/integration
integration-dind: build build-py3
docker run -d --name dpy-dind --env="DOCKER_HOST=tcp://localhost:2375" --privileged dockerswarm/dind:1.8.1 docker -d -H tcp://0.0.0.0:2375
docker run --volumes-from dpy-dind --env="DOCKER_HOST=tcp://docker:2375" --link=dpy-dind:docker docker-py py.test tests/integration_test.py
docker run --volumes-from dpy-dind --env="DOCKER_HOST=tcp://docker:2375" --link=dpy-dind:docker docker-py3 py.test tests/integration_test.py
docker run --env="DOCKER_HOST=tcp://docker:2375" --link=dpy-dind:docker docker-py py.test tests/integration
docker run --env="DOCKER_HOST=tcp://docker:2375" --link=dpy-dind:docker docker-py3 py.test tests/integration
docker rm -vf dpy-dind
integration-dind-ssl: build-dind-certs build build-py3

View File

@ -21,3 +21,28 @@ def requires_api_version(version):
),
reason="API version is too low (< {0})".format(version)
)
class Cleanup(object):
if sys.version_info < (2, 7):
# Provide a basic implementation of addCleanup for Python < 2.7
def __init__(self, *args, **kwargs):
super(Cleanup, self).__init__(*args, **kwargs)
self._cleanups = []
def tearDown(self):
super(Cleanup, self).tearDown()
ok = True
while self._cleanups:
fn, args, kwargs = self._cleanups.pop(-1)
try:
fn(*args, **kwargs)
except KeyboardInterrupt:
raise
except:
ok = False
if not ok:
raise
def addCleanup(self, function, *args, **kwargs):
self._cleanups.append((function, args, kwargs))

View File

@ -0,0 +1,13 @@
# flake8: noqa
# FIXME: crutch while we transition to the new folder architecture
# Remove imports when merged in master and Jenkins is updated to find the
# tests in the new location.
from .api_test import *
from .build_test import *
from .container_test import *
from .exec_test import *
from .image_test import *
from .network_test import *
from .regression_test import *
from .volume_test import *

View File

@ -0,0 +1,292 @@
import base64
import json
import os
import shutil
import tempfile
import time
import unittest
import warnings
import docker
import six
BUSYBOX = 'busybox:buildroot-2014.02'
EXEC_DRIVER = []
def exec_driver_is_native():
global EXEC_DRIVER
if not EXEC_DRIVER:
c = docker_client()
EXEC_DRIVER = c.info()['ExecutionDriver']
c.close()
return EXEC_DRIVER.startswith('native')
def docker_client(**kwargs):
return docker.Client(**docker_client_kwargs(**kwargs))
def docker_client_kwargs(**kwargs):
client_kwargs = docker.utils.kwargs_from_env(assert_hostname=False)
client_kwargs.update(kwargs)
return client_kwargs
def setup_module():
warnings.simplefilter('error')
c = docker_client()
try:
c.inspect_image(BUSYBOX)
except docker.errors.NotFound:
os.write(2, "\npulling busybox\n".encode('utf-8'))
for data in c.pull(BUSYBOX, stream=True):
data = json.loads(data.decode('utf-8'))
os.write(2, ("%c[2K\r" % 27).encode('utf-8'))
status = data.get("status")
progress = data.get("progress")
detail = "{0} - {1}".format(status, progress).encode('utf-8')
os.write(2, detail)
os.write(2, "\npulled busybox\n".encode('utf-8'))
# Double make sure we now have busybox
c.inspect_image(BUSYBOX)
c.close()
class BaseTestCase(unittest.TestCase):
tmp_imgs = []
tmp_containers = []
tmp_folders = []
tmp_volumes = []
def setUp(self):
if six.PY2:
self.assertRegex = self.assertRegexpMatches
self.assertCountEqual = self.assertItemsEqual
self.client = docker_client(timeout=60)
self.tmp_imgs = []
self.tmp_containers = []
self.tmp_folders = []
self.tmp_volumes = []
self.tmp_networks = []
def tearDown(self):
for img in self.tmp_imgs:
try:
self.client.remove_image(img)
except docker.errors.APIError:
pass
for container in self.tmp_containers:
try:
self.client.stop(container, timeout=1)
self.client.remove_container(container)
except docker.errors.APIError:
pass
for network in self.tmp_networks:
try:
self.client.remove_network(network)
except docker.errors.APIError:
pass
for folder in self.tmp_folders:
shutil.rmtree(folder)
for volume in self.tmp_volumes:
try:
self.client.remove_volume(volume)
except docker.errors.APIError:
pass
self.client.close()
def run_container(self, *args, **kwargs):
container = self.client.create_container(*args, **kwargs)
self.tmp_containers.append(container)
self.client.start(container)
exitcode = self.client.wait(container)
if exitcode != 0:
output = self.client.logs(container)
raise Exception(
"Container exited with code {}:\n{}"
.format(exitcode, output))
return container
#########################
# INFORMATION TESTS #
#########################
class InformationTest(BaseTestCase):
def test_version(self):
res = self.client.version()
self.assertIn('GoVersion', res)
self.assertIn('Version', res)
self.assertEqual(len(res['Version'].split('.')), 3)
def test_info(self):
res = self.client.info()
self.assertIn('Containers', res)
self.assertIn('Images', res)
self.assertIn('Debug', res)
def test_search(self):
self.client = docker_client(timeout=10)
res = self.client.search('busybox')
self.assertTrue(len(res) >= 1)
base_img = [x for x in res if x['name'] == 'busybox']
self.assertEqual(len(base_img), 1)
self.assertIn('description', base_img[0])
#################
# LINKS TESTS #
#################
class LinkTest(BaseTestCase):
def test_remove_link(self):
# Create containers
container1 = self.client.create_container(
BUSYBOX, 'cat', detach=True, stdin_open=True
)
container1_id = container1['Id']
self.tmp_containers.append(container1_id)
self.client.start(container1_id)
# Create Link
# we don't want the first /
link_path = self.client.inspect_container(container1_id)['Name'][1:]
link_alias = 'mylink'
container2 = self.client.create_container(
BUSYBOX, 'cat', host_config=self.client.create_host_config(
links={link_path: link_alias}, network_mode='none'
)
)
container2_id = container2['Id']
self.tmp_containers.append(container2_id)
self.client.start(container2_id)
# Remove link
linked_name = self.client.inspect_container(container2_id)['Name'][1:]
link_name = '%s/%s' % (linked_name, link_alias)
self.client.remove_container(link_name, link=True)
# Link is gone
containers = self.client.containers(all=True)
retrieved = [x for x in containers if link_name in x['Names']]
self.assertEqual(len(retrieved), 0)
# Containers are still there
retrieved = [
x for x in containers if x['Id'].startswith(container1_id) or
x['Id'].startswith(container2_id)
]
self.assertEqual(len(retrieved), 2)
#######################
# PY SPECIFIC TESTS #
#######################
class LoadConfigTest(BaseTestCase):
def test_load_legacy_config(self):
folder = tempfile.mkdtemp()
self.tmp_folders.append(folder)
cfg_path = os.path.join(folder, '.dockercfg')
f = open(cfg_path, 'w')
auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii')
f.write('auth = {0}\n'.format(auth_))
f.write('email = sakuya@scarlet.net')
f.close()
cfg = docker.auth.load_config(cfg_path)
self.assertNotEqual(cfg[docker.auth.INDEX_NAME], None)
cfg = cfg[docker.auth.INDEX_NAME]
self.assertEqual(cfg['username'], 'sakuya')
self.assertEqual(cfg['password'], 'izayoi')
self.assertEqual(cfg['email'], 'sakuya@scarlet.net')
self.assertEqual(cfg.get('Auth'), None)
def test_load_json_config(self):
folder = tempfile.mkdtemp()
self.tmp_folders.append(folder)
cfg_path = os.path.join(folder, '.dockercfg')
f = open(os.path.join(folder, '.dockercfg'), 'w')
auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii')
email_ = 'sakuya@scarlet.net'
f.write('{{"{0}": {{"auth": "{1}", "email": "{2}"}}}}\n'.format(
docker.auth.INDEX_URL, auth_, email_))
f.close()
cfg = docker.auth.load_config(cfg_path)
self.assertNotEqual(cfg[docker.auth.INDEX_URL], None)
cfg = cfg[docker.auth.INDEX_URL]
self.assertEqual(cfg['username'], 'sakuya')
self.assertEqual(cfg['password'], 'izayoi')
self.assertEqual(cfg['email'], 'sakuya@scarlet.net')
self.assertEqual(cfg.get('Auth'), None)
class AutoDetectVersionTest(unittest.TestCase):
def test_client_init(self):
client = docker_client(version='auto')
client_version = client._version
api_version = client.version(api_version=False)['ApiVersion']
self.assertEqual(client_version, api_version)
api_version_2 = client.version()['ApiVersion']
self.assertEqual(client_version, api_version_2)
client.close()
def test_auto_client(self):
client = docker.AutoVersionClient(**docker_client_kwargs())
client_version = client._version
api_version = client.version(api_version=False)['ApiVersion']
self.assertEqual(client_version, api_version)
api_version_2 = client.version()['ApiVersion']
self.assertEqual(client_version, api_version_2)
client.close()
with self.assertRaises(docker.errors.DockerException):
docker.AutoVersionClient(**docker_client_kwargs(version='1.11'))
class ConnectionTimeoutTest(unittest.TestCase):
def setUp(self):
self.timeout = 0.5
self.client = docker.client.Client(base_url='http://192.168.10.2:4243',
timeout=self.timeout)
def test_timeout(self):
start = time.time()
res = None
# This call isn't supposed to complete, and it should fail fast.
try:
res = self.client.inspect_container('id')
except:
pass
end = time.time()
self.assertTrue(res is None)
self.assertTrue(end - start < 2 * self.timeout)
class UnixconnTest(unittest.TestCase):
"""
Test UNIX socket connection adapter.
"""
def test_resource_warnings(self):
"""
Test no warnings are produced when using the client.
"""
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter('always')
client = docker_client()
client.images()
client.close()
del client
assert len(w) == 0, \
"No warnings produced: {0}".format(w[0].message)

View File

@ -0,0 +1,98 @@
import io
import json
import os
import shutil
import tempfile
import six
from . import api_test
from ..base import requires_api_version
class BuildTest(api_test.BaseTestCase):
def test_build_streaming(self):
script = io.BytesIO('\n'.join([
'FROM busybox',
'MAINTAINER docker-py',
'RUN mkdir -p /tmp/test',
'EXPOSE 8080',
'ADD https://dl.dropboxusercontent.com/u/20637798/silence.tar.gz'
' /tmp/silence.tar.gz'
]).encode('ascii'))
stream = self.client.build(fileobj=script, stream=True)
logs = ''
for chunk in stream:
if six.PY3:
chunk = chunk.decode('utf-8')
json.loads(chunk) # ensure chunk is a single, valid JSON blob
logs += chunk
self.assertNotEqual(logs, '')
def test_build_from_stringio(self):
if six.PY3:
return
script = io.StringIO(six.text_type('\n').join([
'FROM busybox',
'MAINTAINER docker-py',
'RUN mkdir -p /tmp/test',
'EXPOSE 8080',
'ADD https://dl.dropboxusercontent.com/u/20637798/silence.tar.gz'
' /tmp/silence.tar.gz'
]))
stream = self.client.build(fileobj=script, stream=True)
logs = ''
for chunk in stream:
if six.PY3:
chunk = chunk.decode('utf-8')
logs += chunk
self.assertNotEqual(logs, '')
@requires_api_version('1.8')
def test_build_with_dockerignore(self):
base_dir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, base_dir)
with open(os.path.join(base_dir, 'Dockerfile'), 'w') as f:
f.write("\n".join([
'FROM busybox',
'MAINTAINER docker-py',
'ADD . /test',
]))
with open(os.path.join(base_dir, '.dockerignore'), 'w') as f:
f.write("\n".join([
'ignored',
'Dockerfile',
'.dockerignore',
'', # empty line
]))
with open(os.path.join(base_dir, 'not-ignored'), 'w') as f:
f.write("this file should not be ignored")
subdir = os.path.join(base_dir, 'ignored', 'subdir')
os.makedirs(subdir)
with open(os.path.join(subdir, 'file'), 'w') as f:
f.write("this file should be ignored")
tag = 'docker-py-test-build-with-dockerignore'
stream = self.client.build(
path=base_dir,
tag=tag,
)
for chunk in stream:
pass
c = self.client.create_container(tag, ['ls', '-1A', '/test'])
self.client.start(c)
self.client.wait(c)
logs = self.client.logs(c)
if six.PY3:
logs = logs.decode('utf-8')
self.assertEqual(
list(filter(None, logs.split('\n'))),
['not-ignored'],
)

View File

@ -0,0 +1,991 @@
import errno
import os
import shutil
import signal
import struct
import tempfile
import docker
import pytest
import six
from . import api_test
from ..base import requires_api_version
from .. import helpers
BUSYBOX = api_test.BUSYBOX
class ListContainersTest(api_test.BaseTestCase):
def test_list_containers(self):
res0 = self.client.containers(all=True)
size = len(res0)
res1 = self.client.create_container(BUSYBOX, 'true')
self.assertIn('Id', res1)
self.client.start(res1['Id'])
self.tmp_containers.append(res1['Id'])
res2 = self.client.containers(all=True)
self.assertEqual(size + 1, len(res2))
retrieved = [x for x in res2 if x['Id'].startswith(res1['Id'])]
self.assertEqual(len(retrieved), 1)
retrieved = retrieved[0]
self.assertIn('Command', retrieved)
self.assertEqual(retrieved['Command'], six.text_type('true'))
self.assertIn('Image', retrieved)
self.assertRegex(retrieved['Image'], r'busybox:.*')
self.assertIn('Status', retrieved)
class CreateContainerTest(api_test.BaseTestCase):
def setUp(self):
super(CreateContainerTest, self).setUp()
self.mount_dest = '/mnt'
# Get a random pathname - we don't need it to exist locally
self.mount_origin = tempfile.mkdtemp()
shutil.rmtree(self.mount_origin)
self.filename = 'shared.txt'
self.run_with_volume(
False,
BUSYBOX,
['touch', os.path.join(self.mount_dest, self.filename)],
)
def test_create(self):
res = self.client.create_container(BUSYBOX, 'true')
self.assertIn('Id', res)
self.tmp_containers.append(res['Id'])
def test_create_with_host_pid_mode(self):
ctnr = self.client.create_container(
BUSYBOX, 'true', host_config=self.client.create_host_config(
pid_mode='host', network_mode='none'
)
)
self.assertIn('Id', ctnr)
self.tmp_containers.append(ctnr['Id'])
self.client.start(ctnr)
inspect = self.client.inspect_container(ctnr)
self.assertIn('HostConfig', inspect)
host_config = inspect['HostConfig']
self.assertIn('PidMode', host_config)
self.assertEqual(host_config['PidMode'], 'host')
def test_create_with_links(self):
res0 = self.client.create_container(
BUSYBOX, 'cat',
detach=True, stdin_open=True,
environment={'FOO': '1'})
container1_id = res0['Id']
self.tmp_containers.append(container1_id)
self.client.start(container1_id)
res1 = self.client.create_container(
BUSYBOX, 'cat',
detach=True, stdin_open=True,
environment={'FOO': '1'})
container2_id = res1['Id']
self.tmp_containers.append(container2_id)
self.client.start(container2_id)
# we don't want the first /
link_path1 = self.client.inspect_container(container1_id)['Name'][1:]
link_alias1 = 'mylink1'
link_env_prefix1 = link_alias1.upper()
link_path2 = self.client.inspect_container(container2_id)['Name'][1:]
link_alias2 = 'mylink2'
link_env_prefix2 = link_alias2.upper()
res2 = self.client.create_container(
BUSYBOX, 'env', host_config=self.client.create_host_config(
links={link_path1: link_alias1, link_path2: link_alias2},
network_mode='none'
)
)
container3_id = res2['Id']
self.tmp_containers.append(container3_id)
self.client.start(container3_id)
self.assertEqual(self.client.wait(container3_id), 0)
logs = self.client.logs(container3_id)
if six.PY3:
logs = logs.decode('utf-8')
self.assertIn('{0}_NAME='.format(link_env_prefix1), logs)
self.assertIn('{0}_ENV_FOO=1'.format(link_env_prefix1), logs)
self.assertIn('{0}_NAME='.format(link_env_prefix2), logs)
self.assertIn('{0}_ENV_FOO=1'.format(link_env_prefix2), logs)
def test_create_with_restart_policy(self):
container = self.client.create_container(
BUSYBOX, ['sleep', '2'],
host_config=self.client.create_host_config(
restart_policy={"Name": "always", "MaximumRetryCount": 0},
network_mode='none'
)
)
id = container['Id']
self.client.start(id)
self.client.wait(id)
with self.assertRaises(docker.errors.APIError) as exc:
self.client.remove_container(id)
err = exc.exception.response.text
self.assertIn(
'You cannot remove a running container', err
)
self.client.remove_container(id, force=True)
def test_create_container_with_volumes_from(self):
vol_names = ['foobar_vol0', 'foobar_vol1']
res0 = self.client.create_container(
BUSYBOX, 'true', name=vol_names[0]
)
container1_id = res0['Id']
self.tmp_containers.append(container1_id)
self.client.start(container1_id)
res1 = self.client.create_container(
BUSYBOX, 'true', name=vol_names[1]
)
container2_id = res1['Id']
self.tmp_containers.append(container2_id)
self.client.start(container2_id)
with self.assertRaises(docker.errors.DockerException):
self.client.create_container(
BUSYBOX, 'cat', detach=True, stdin_open=True,
volumes_from=vol_names
)
res2 = self.client.create_container(
BUSYBOX, 'cat', detach=True, stdin_open=True,
host_config=self.client.create_host_config(
volumes_from=vol_names, network_mode='none'
)
)
container3_id = res2['Id']
self.tmp_containers.append(container3_id)
self.client.start(container3_id)
info = self.client.inspect_container(res2['Id'])
self.assertCountEqual(info['HostConfig']['VolumesFrom'], vol_names)
def test_create_with_binds_rw(self):
container = self.run_with_volume(
False,
BUSYBOX,
['ls', self.mount_dest],
)
logs = self.client.logs(container)
if six.PY3:
logs = logs.decode('utf-8')
self.assertIn(self.filename, logs)
inspect_data = self.client.inspect_container(container)
self.check_container_data(inspect_data, True)
def test_create_with_binds_ro(self):
container = self.run_with_volume(
True,
BUSYBOX,
['ls', self.mount_dest],
)
logs = self.client.logs(container)
if six.PY3:
logs = logs.decode('utf-8')
self.assertIn(self.filename, logs)
inspect_data = self.client.inspect_container(container)
self.check_container_data(inspect_data, False)
def create_container_readonly_fs(self):
if not api_test.exec_driver_is_native():
pytest.skip('Exec driver not native')
ctnr = self.client.create_container(
BUSYBOX, ['mkdir', '/shrine'],
host_config=self.client.create_host_config(
read_only=True, network_mode='none'
)
)
self.assertIn('Id', ctnr)
self.tmp_containers.append(ctnr['Id'])
self.client.start(ctnr)
res = self.client.wait(ctnr)
self.assertNotEqual(res, 0)
def create_container_with_name(self):
res = self.client.create_container(BUSYBOX, 'true', name='foobar')
self.assertIn('Id', res)
self.tmp_containers.append(res['Id'])
inspect = self.client.inspect_container(res['Id'])
self.assertIn('Name', inspect)
self.assertEqual('/foobar', inspect['Name'])
def create_container_privileged(self):
res = self.client.create_container(
BUSYBOX, 'true', host_config=self.client.create_host_config(
privileged=True, network_mode='none'
)
)
self.assertIn('Id', res)
self.tmp_containers.append(res['Id'])
self.client.start(res['Id'])
inspect = self.client.inspect_container(res['Id'])
self.assertIn('Config', inspect)
self.assertIn('Id', inspect)
self.assertTrue(inspect['Id'].startswith(res['Id']))
self.assertIn('Image', inspect)
self.assertIn('State', inspect)
self.assertIn('Running', inspect['State'])
if not inspect['State']['Running']:
self.assertIn('ExitCode', inspect['State'])
self.assertEqual(inspect['State']['ExitCode'], 0)
# Since Nov 2013, the Privileged flag is no longer part of the
# container's config exposed via the API (safety concerns?).
#
if 'Privileged' in inspect['Config']:
self.assertEqual(inspect['Config']['Privileged'], True)
def test_create_with_mac_address(self):
mac_address_expected = "02:42:ac:11:00:0a"
container = self.client.create_container(
BUSYBOX, ['sleep', '60'], mac_address=mac_address_expected)
id = container['Id']
self.client.start(container)
res = self.client.inspect_container(container['Id'])
self.assertEqual(mac_address_expected,
res['NetworkSettings']['MacAddress'])
self.client.kill(id)
@requires_api_version('1.20')
def test_group_id_ints(self):
container = self.client.create_container(
BUSYBOX, 'id -G',
host_config=self.client.create_host_config(group_add=[1000, 1001])
)
self.tmp_containers.append(container)
self.client.start(container)
self.client.wait(container)
logs = self.client.logs(container)
if six.PY3:
logs = logs.decode('utf-8')
groups = logs.strip().split(' ')
self.assertIn('1000', groups)
self.assertIn('1001', groups)
@requires_api_version('1.20')
def test_group_id_strings(self):
container = self.client.create_container(
BUSYBOX, 'id -G', host_config=self.client.create_host_config(
group_add=['1000', '1001']
)
)
self.tmp_containers.append(container)
self.client.start(container)
self.client.wait(container)
logs = self.client.logs(container)
if six.PY3:
logs = logs.decode('utf-8')
groups = logs.strip().split(' ')
self.assertIn('1000', groups)
self.assertIn('1001', groups)
def test_valid_log_driver_and_log_opt(self):
log_config = docker.utils.LogConfig(
type='json-file',
config={'max-file': '100'}
)
container = self.client.create_container(
BUSYBOX, ['true'],
host_config=self.client.create_host_config(log_config=log_config)
)
self.tmp_containers.append(container['Id'])
self.client.start(container)
info = self.client.inspect_container(container)
container_log_config = info['HostConfig']['LogConfig']
self.assertEqual(container_log_config['Type'], log_config.type)
self.assertEqual(container_log_config['Config'], log_config.config)
def test_invalid_log_driver_raises_exception(self):
log_config = docker.utils.LogConfig(
type='asdf-nope',
config={}
)
container = self.client.create_container(
BUSYBOX, ['true'],
host_config=self.client.create_host_config(log_config=log_config)
)
expected_msg = "logger: no log driver named 'asdf-nope' is registered"
with pytest.raises(docker.errors.APIError) as excinfo:
# raises an internal server error 500
self.client.start(container)
assert expected_msg in str(excinfo.value)
@pytest.mark.skipif(True,
reason="https://github.com/docker/docker/issues/15633")
def test_valid_no_log_driver_specified(self):
log_config = docker.utils.LogConfig(
type="",
config={'max-file': '100'}
)
container = self.client.create_container(
BUSYBOX, ['true'],
host_config=self.client.create_host_config(log_config=log_config)
)
self.tmp_containers.append(container['Id'])
self.client.start(container)
info = self.client.inspect_container(container)
container_log_config = info['HostConfig']['LogConfig']
self.assertEqual(container_log_config['Type'], "json-file")
self.assertEqual(container_log_config['Config'], log_config.config)
def test_valid_no_config_specified(self):
log_config = docker.utils.LogConfig(
type="json-file",
config=None
)
container = self.client.create_container(
BUSYBOX, ['true'],
host_config=self.client.create_host_config(log_config=log_config)
)
self.tmp_containers.append(container['Id'])
self.client.start(container)
info = self.client.inspect_container(container)
container_log_config = info['HostConfig']['LogConfig']
self.assertEqual(container_log_config['Type'], "json-file")
self.assertEqual(container_log_config['Config'], {})
def check_container_data(self, inspect_data, rw):
if docker.utils.compare_version('1.20', self.client._version) < 0:
self.assertIn('Volumes', inspect_data)
self.assertIn(self.mount_dest, inspect_data['Volumes'])
self.assertEqual(
self.mount_origin, inspect_data['Volumes'][self.mount_dest]
)
self.assertIn(self.mount_dest, inspect_data['VolumesRW'])
self.assertFalse(inspect_data['VolumesRW'][self.mount_dest])
else:
self.assertIn('Mounts', inspect_data)
filtered = list(filter(
lambda x: x['Destination'] == self.mount_dest,
inspect_data['Mounts']
))
self.assertEqual(len(filtered), 1)
mount_data = filtered[0]
self.assertEqual(mount_data['Source'], self.mount_origin)
self.assertEqual(mount_data['RW'], rw)
def run_with_volume(self, ro, *args, **kwargs):
return self.run_container(
*args,
volumes={self.mount_dest: {}},
host_config=self.client.create_host_config(
binds={
self.mount_origin: {
'bind': self.mount_dest,
'ro': ro,
},
},
network_mode='none'
),
**kwargs
)
@requires_api_version('1.20')
class ArchiveTest(api_test.BaseTestCase):
def test_get_file_archive_from_container(self):
data = 'The Maid and the Pocket Watch of Blood'
ctnr = self.client.create_container(
BUSYBOX, 'sh -c "echo {0} > /vol1/data.txt"'.format(data),
volumes=['/vol1']
)
self.tmp_containers.append(ctnr)
self.client.start(ctnr)
self.client.wait(ctnr)
with tempfile.NamedTemporaryFile() as destination:
strm, stat = self.client.get_archive(ctnr, '/vol1/data.txt')
for d in strm:
destination.write(d)
destination.seek(0)
retrieved_data = helpers.untar_file(destination, 'data.txt')
if six.PY3:
retrieved_data = retrieved_data.decode('utf-8')
self.assertEqual(data, retrieved_data.strip())
def test_get_file_stat_from_container(self):
data = 'The Maid and the Pocket Watch of Blood'
ctnr = self.client.create_container(
BUSYBOX, 'sh -c "echo -n {0} > /vol1/data.txt"'.format(data),
volumes=['/vol1']
)
self.tmp_containers.append(ctnr)
self.client.start(ctnr)
self.client.wait(ctnr)
strm, stat = self.client.get_archive(ctnr, '/vol1/data.txt')
self.assertIn('name', stat)
self.assertEqual(stat['name'], 'data.txt')
self.assertIn('size', stat)
self.assertEqual(stat['size'], len(data))
def test_copy_file_to_container(self):
data = b'Deaf To All But The Song'
with tempfile.NamedTemporaryFile() as test_file:
test_file.write(data)
test_file.seek(0)
ctnr = self.client.create_container(
BUSYBOX,
'cat {0}'.format(
os.path.join('/vol1', os.path.basename(test_file.name))
),
volumes=['/vol1']
)
self.tmp_containers.append(ctnr)
with helpers.simple_tar(test_file.name) as test_tar:
self.client.put_archive(ctnr, '/vol1', test_tar)
self.client.start(ctnr)
self.client.wait(ctnr)
logs = self.client.logs(ctnr)
if six.PY3:
logs = logs.decode('utf-8')
data = data.decode('utf-8')
self.assertEqual(logs.strip(), data)
def test_copy_directory_to_container(self):
files = ['a.py', 'b.py', 'foo/b.py']
dirs = ['foo', 'bar']
base = helpers.make_tree(dirs, files)
ctnr = self.client.create_container(
BUSYBOX, 'ls -p /vol1', volumes=['/vol1']
)
self.tmp_containers.append(ctnr)
with docker.utils.tar(base) as test_tar:
self.client.put_archive(ctnr, '/vol1', test_tar)
self.client.start(ctnr)
self.client.wait(ctnr)
logs = self.client.logs(ctnr)
if six.PY3:
logs = logs.decode('utf-8')
results = logs.strip().split()
self.assertIn('a.py', results)
self.assertIn('b.py', results)
self.assertIn('foo/', results)
self.assertIn('bar/', results)
class RenameContainerTest(api_test.BaseTestCase):
def test_rename_container(self):
version = self.client.version()['Version']
name = 'hong_meiling'
res = self.client.create_container(BUSYBOX, 'true')
self.assertIn('Id', res)
self.tmp_containers.append(res['Id'])
self.client.rename(res, name)
inspect = self.client.inspect_container(res['Id'])
self.assertIn('Name', inspect)
if version == '1.5.0':
self.assertEqual(name, inspect['Name'])
else:
self.assertEqual('/{0}'.format(name), inspect['Name'])
class StartContainerTest(api_test.BaseTestCase):
def test_start_container(self):
res = self.client.create_container(BUSYBOX, 'true')
self.assertIn('Id', res)
self.tmp_containers.append(res['Id'])
self.client.start(res['Id'])
inspect = self.client.inspect_container(res['Id'])
self.assertIn('Config', inspect)
self.assertIn('Id', inspect)
self.assertTrue(inspect['Id'].startswith(res['Id']))
self.assertIn('Image', inspect)
self.assertIn('State', inspect)
self.assertIn('Running', inspect['State'])
if not inspect['State']['Running']:
self.assertIn('ExitCode', inspect['State'])
self.assertEqual(inspect['State']['ExitCode'], 0)
def test_start_container_with_dict_instead_of_id(self):
res = self.client.create_container(BUSYBOX, 'true')
self.assertIn('Id', res)
self.tmp_containers.append(res['Id'])
self.client.start(res)
inspect = self.client.inspect_container(res['Id'])
self.assertIn('Config', inspect)
self.assertIn('Id', inspect)
self.assertTrue(inspect['Id'].startswith(res['Id']))
self.assertIn('Image', inspect)
self.assertIn('State', inspect)
self.assertIn('Running', inspect['State'])
if not inspect['State']['Running']:
self.assertIn('ExitCode', inspect['State'])
self.assertEqual(inspect['State']['ExitCode'], 0)
def test_run_shlex_commands(self):
commands = [
'true',
'echo "The Young Descendant of Tepes & Septette for the '
'Dead Princess"',
'echo -n "The Young Descendant of Tepes & Septette for the '
'Dead Princess"',
'/bin/sh -c "echo Hello World"',
'/bin/sh -c \'echo "Hello World"\'',
'echo "\"Night of Nights\""',
'true && echo "Night of Nights"'
]
for cmd in commands:
container = self.client.create_container(BUSYBOX, cmd)
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
exitcode = self.client.wait(id)
self.assertEqual(exitcode, 0, msg=cmd)
class WaitTest(api_test.BaseTestCase):
def test_wait(self):
res = self.client.create_container(BUSYBOX, ['sleep', '3'])
id = res['Id']
self.tmp_containers.append(id)
self.client.start(id)
exitcode = self.client.wait(id)
self.assertEqual(exitcode, 0)
inspect = self.client.inspect_container(id)
self.assertIn('Running', inspect['State'])
self.assertEqual(inspect['State']['Running'], False)
self.assertIn('ExitCode', inspect['State'])
self.assertEqual(inspect['State']['ExitCode'], exitcode)
def test_wait_with_dict_instead_of_id(self):
res = self.client.create_container(BUSYBOX, ['sleep', '3'])
id = res['Id']
self.tmp_containers.append(id)
self.client.start(res)
exitcode = self.client.wait(res)
self.assertEqual(exitcode, 0)
inspect = self.client.inspect_container(res)
self.assertIn('Running', inspect['State'])
self.assertEqual(inspect['State']['Running'], False)
self.assertIn('ExitCode', inspect['State'])
self.assertEqual(inspect['State']['ExitCode'], exitcode)
class LogsTest(api_test.BaseTestCase):
def test_logs(self):
snippet = 'Flowering Nights (Sakuya Iyazoi)'
container = self.client.create_container(
BUSYBOX, 'echo {0}'.format(snippet)
)
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
exitcode = self.client.wait(id)
self.assertEqual(exitcode, 0)
logs = self.client.logs(id)
self.assertEqual(logs, (snippet + '\n').encode(encoding='ascii'))
def test_logs_tail_option(self):
snippet = '''Line1
Line2'''
container = self.client.create_container(
BUSYBOX, 'echo "{0}"'.format(snippet)
)
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
exitcode = self.client.wait(id)
self.assertEqual(exitcode, 0)
logs = self.client.logs(id, tail=1)
self.assertEqual(logs, ('Line2\n').encode(encoding='ascii'))
# def test_logs_streaming(self):
# snippet = 'Flowering Nights (Sakuya Iyazoi)'
# container = self.client.create_container(
# BUSYBOX, 'echo {0}'.format(snippet)
# )
# id = container['Id']
# self.client.start(id)
# self.tmp_containers.append(id)
# logs = bytes() if six.PY3 else str()
# for chunk in self.client.logs(id, stream=True):
# logs += chunk
# exitcode = self.client.wait(id)
# self.assertEqual(exitcode, 0)
# self.assertEqual(logs, (snippet + '\n').encode(encoding='ascii'))
def test_logs_with_dict_instead_of_id(self):
snippet = 'Flowering Nights (Sakuya Iyazoi)'
container = self.client.create_container(
BUSYBOX, 'echo {0}'.format(snippet)
)
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
exitcode = self.client.wait(id)
self.assertEqual(exitcode, 0)
logs = self.client.logs(container)
self.assertEqual(logs, (snippet + '\n').encode(encoding='ascii'))
class DiffTest(api_test.BaseTestCase):
def test_diff(self):
container = self.client.create_container(BUSYBOX, ['touch', '/test'])
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
exitcode = self.client.wait(id)
self.assertEqual(exitcode, 0)
diff = self.client.diff(id)
test_diff = [x for x in diff if x.get('Path', None) == '/test']
self.assertEqual(len(test_diff), 1)
self.assertIn('Kind', test_diff[0])
self.assertEqual(test_diff[0]['Kind'], 1)
def test_diff_with_dict_instead_of_id(self):
container = self.client.create_container(BUSYBOX, ['touch', '/test'])
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
exitcode = self.client.wait(id)
self.assertEqual(exitcode, 0)
diff = self.client.diff(container)
test_diff = [x for x in diff if x.get('Path', None) == '/test']
self.assertEqual(len(test_diff), 1)
self.assertIn('Kind', test_diff[0])
self.assertEqual(test_diff[0]['Kind'], 1)
class StopTest(api_test.BaseTestCase):
def test_stop(self):
container = self.client.create_container(BUSYBOX, ['sleep', '9999'])
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
self.client.stop(id, timeout=2)
container_info = self.client.inspect_container(id)
self.assertIn('State', container_info)
state = container_info['State']
self.assertIn('ExitCode', state)
if api_test.exec_driver_is_native():
self.assertNotEqual(state['ExitCode'], 0)
self.assertIn('Running', state)
self.assertEqual(state['Running'], False)
def test_stop_with_dict_instead_of_id(self):
container = self.client.create_container(BUSYBOX, ['sleep', '9999'])
self.assertIn('Id', container)
id = container['Id']
self.client.start(container)
self.tmp_containers.append(id)
self.client.stop(container, timeout=2)
container_info = self.client.inspect_container(id)
self.assertIn('State', container_info)
state = container_info['State']
self.assertIn('ExitCode', state)
if api_test.exec_driver_is_native():
self.assertNotEqual(state['ExitCode'], 0)
self.assertIn('Running', state)
self.assertEqual(state['Running'], False)
class KillTest(api_test.BaseTestCase):
def test_kill(self):
container = self.client.create_container(BUSYBOX, ['sleep', '9999'])
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
self.client.kill(id)
container_info = self.client.inspect_container(id)
self.assertIn('State', container_info)
state = container_info['State']
self.assertIn('ExitCode', state)
if api_test.exec_driver_is_native():
self.assertNotEqual(state['ExitCode'], 0)
self.assertIn('Running', state)
self.assertEqual(state['Running'], False)
def test_kill_with_dict_instead_of_id(self):
container = self.client.create_container(BUSYBOX, ['sleep', '9999'])
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
self.client.kill(container)
container_info = self.client.inspect_container(id)
self.assertIn('State', container_info)
state = container_info['State']
self.assertIn('ExitCode', state)
if api_test.exec_driver_is_native():
self.assertNotEqual(state['ExitCode'], 0)
self.assertIn('Running', state)
self.assertEqual(state['Running'], False)
def test_kill_with_signal(self):
container = self.client.create_container(BUSYBOX, ['sleep', '60'])
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
self.client.kill(id, signal=signal.SIGKILL)
exitcode = self.client.wait(id)
self.assertNotEqual(exitcode, 0)
container_info = self.client.inspect_container(id)
self.assertIn('State', container_info)
state = container_info['State']
self.assertIn('ExitCode', state)
self.assertNotEqual(state['ExitCode'], 0)
self.assertIn('Running', state)
self.assertEqual(state['Running'], False, state)
class PortTest(api_test.BaseTestCase):
def test_port(self):
port_bindings = {
'1111': ('127.0.0.1', '4567'),
'2222': ('127.0.0.1', '4568')
}
container = self.client.create_container(
BUSYBOX, ['sleep', '60'], ports=list(port_bindings.keys()),
host_config=self.client.create_host_config(
port_bindings=port_bindings, network_mode='bridge'
)
)
id = container['Id']
self.client.start(container)
# Call the port function on each biding and compare expected vs actual
for port in port_bindings:
actual_bindings = self.client.port(container, port)
port_binding = actual_bindings.pop()
ip, host_port = port_binding['HostIp'], port_binding['HostPort']
self.assertEqual(ip, port_bindings[port][0])
self.assertEqual(host_port, port_bindings[port][1])
self.client.kill(id)
class ContainerTopTest(api_test.BaseTestCase):
def test_top(self):
container = self.client.create_container(
BUSYBOX, ['sleep', '60'])
id = container['Id']
self.client.start(container)
res = self.client.top(container['Id'])
self.assertEqual(
res['Titles'],
['UID', 'PID', 'PPID', 'C', 'STIME', 'TTY', 'TIME', 'CMD']
)
self.assertEqual(len(res['Processes']), 1)
self.assertEqual(res['Processes'][0][7], 'sleep 60')
self.client.kill(id)
def test_top_with_psargs(self):
container = self.client.create_container(
BUSYBOX, ['sleep', '60'])
id = container['Id']
self.client.start(container)
res = self.client.top(container['Id'], 'waux')
self.assertEqual(
res['Titles'],
['USER', 'PID', '%CPU', '%MEM', 'VSZ', 'RSS',
'TTY', 'STAT', 'START', 'TIME', 'COMMAND'],
)
self.assertEqual(len(res['Processes']), 1)
self.assertEqual(res['Processes'][0][10], 'sleep 60')
self.client.kill(id)
class RestartContainerTest(api_test.BaseTestCase):
def test_restart(self):
container = self.client.create_container(BUSYBOX, ['sleep', '9999'])
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
info = self.client.inspect_container(id)
self.assertIn('State', info)
self.assertIn('StartedAt', info['State'])
start_time1 = info['State']['StartedAt']
self.client.restart(id, timeout=2)
info2 = self.client.inspect_container(id)
self.assertIn('State', info2)
self.assertIn('StartedAt', info2['State'])
start_time2 = info2['State']['StartedAt']
self.assertNotEqual(start_time1, start_time2)
self.assertIn('Running', info2['State'])
self.assertEqual(info2['State']['Running'], True)
self.client.kill(id)
def test_restart_with_dict_instead_of_id(self):
container = self.client.create_container(BUSYBOX, ['sleep', '9999'])
self.assertIn('Id', container)
id = container['Id']
self.client.start(container)
self.tmp_containers.append(id)
info = self.client.inspect_container(id)
self.assertIn('State', info)
self.assertIn('StartedAt', info['State'])
start_time1 = info['State']['StartedAt']
self.client.restart(container, timeout=2)
info2 = self.client.inspect_container(id)
self.assertIn('State', info2)
self.assertIn('StartedAt', info2['State'])
start_time2 = info2['State']['StartedAt']
self.assertNotEqual(start_time1, start_time2)
self.assertIn('Running', info2['State'])
self.assertEqual(info2['State']['Running'], True)
self.client.kill(id)
class RemoveContainerTest(api_test.BaseTestCase):
def test_remove(self):
container = self.client.create_container(BUSYBOX, ['true'])
id = container['Id']
self.client.start(id)
self.client.wait(id)
self.client.remove_container(id)
containers = self.client.containers(all=True)
res = [x for x in containers if 'Id' in x and x['Id'].startswith(id)]
self.assertEqual(len(res), 0)
def test_remove_with_dict_instead_of_id(self):
container = self.client.create_container(BUSYBOX, ['true'])
id = container['Id']
self.client.start(id)
self.client.wait(id)
self.client.remove_container(container)
containers = self.client.containers(all=True)
res = [x for x in containers if 'Id' in x and x['Id'].startswith(id)]
self.assertEqual(len(res), 0)
class AttachContainerTest(api_test.BaseTestCase):
def test_run_container_streaming(self):
container = self.client.create_container(BUSYBOX, '/bin/sh',
detach=True, stdin_open=True)
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
sock = self.client.attach_socket(container, ws=False)
self.assertTrue(sock.fileno() > -1)
def test_run_container_reading_socket(self):
line = 'hi there and stuff and things, words!'
command = "echo '{0}'".format(line)
container = self.client.create_container(BUSYBOX, command,
detach=True, tty=False)
ident = container['Id']
self.tmp_containers.append(ident)
opts = {"stdout": 1, "stream": 1, "logs": 1}
pty_stdout = self.client.attach_socket(ident, opts)
self.client.start(ident)
recoverable_errors = (errno.EINTR, errno.EDEADLK, errno.EWOULDBLOCK)
def read(n=4096):
"""Code stolen from dockerpty to read the socket"""
try:
if hasattr(pty_stdout, 'recv'):
return pty_stdout.recv(n)
return os.read(pty_stdout.fileno(), n)
except EnvironmentError as e:
if e.errno not in recoverable_errors:
raise
def next_packet_size():
"""Code stolen from dockerpty to get the next packet size"""
data = six.binary_type()
while len(data) < 8:
next_data = read(8 - len(data))
if not next_data:
return 0
data = data + next_data
if data is None:
return 0
if len(data) == 8:
_, actual = struct.unpack('>BxxxL', data)
return actual
next_size = next_packet_size()
self.assertEqual(next_size, len(line) + 1)
data = six.binary_type()
while len(data) < next_size:
next_data = read(next_size - len(data))
if not next_data:
assert False, "Failed trying to read in the dataz"
data += next_data
self.assertEqual(data.decode('utf-8'), "{0}\n".format(line))
pty_stdout.close()
# Prevent segfault at the end of the test run
if hasattr(pty_stdout, "_response"):
del pty_stdout._response
class PauseTest(api_test.BaseTestCase):
def test_pause_unpause(self):
container = self.client.create_container(BUSYBOX, ['sleep', '9999'])
id = container['Id']
self.tmp_containers.append(id)
self.client.start(container)
self.client.pause(id)
container_info = self.client.inspect_container(id)
self.assertIn('State', container_info)
state = container_info['State']
self.assertIn('ExitCode', state)
self.assertEqual(state['ExitCode'], 0)
self.assertIn('Running', state)
self.assertEqual(state['Running'], True)
self.assertIn('Paused', state)
self.assertEqual(state['Paused'], True)
self.client.unpause(id)
container_info = self.client.inspect_container(id)
self.assertIn('State', container_info)
state = container_info['State']
self.assertIn('ExitCode', state)
self.assertEqual(state['ExitCode'], 0)
self.assertIn('Running', state)
self.assertEqual(state['Running'], True)
self.assertIn('Paused', state)
self.assertEqual(state['Paused'], False)

View File

@ -0,0 +1,106 @@
import pytest
from . import api_test
BUSYBOX = api_test.BUSYBOX
class ExecTest(api_test.BaseTestCase):
def test_execute_command(self):
if not api_test.exec_driver_is_native():
pytest.skip('Exec driver not native')
container = self.client.create_container(BUSYBOX, 'cat',
detach=True, stdin_open=True)
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
res = self.client.exec_create(id, ['echo', 'hello'])
self.assertIn('Id', res)
exec_log = self.client.exec_start(res)
self.assertEqual(exec_log, b'hello\n')
def test_exec_command_string(self):
if not api_test.exec_driver_is_native():
pytest.skip('Exec driver not native')
container = self.client.create_container(BUSYBOX, 'cat',
detach=True, stdin_open=True)
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
res = self.client.exec_create(id, 'echo hello world')
self.assertIn('Id', res)
exec_log = self.client.exec_start(res)
self.assertEqual(exec_log, b'hello world\n')
def test_exec_command_as_user(self):
if not api_test.exec_driver_is_native():
pytest.skip('Exec driver not native')
container = self.client.create_container(BUSYBOX, 'cat',
detach=True, stdin_open=True)
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
res = self.client.exec_create(id, 'whoami', user='default')
self.assertIn('Id', res)
exec_log = self.client.exec_start(res)
self.assertEqual(exec_log, b'default\n')
def test_exec_command_as_root(self):
if not api_test.exec_driver_is_native():
pytest.skip('Exec driver not native')
container = self.client.create_container(BUSYBOX, 'cat',
detach=True, stdin_open=True)
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
res = self.client.exec_create(id, 'whoami')
self.assertIn('Id', res)
exec_log = self.client.exec_start(res)
self.assertEqual(exec_log, b'root\n')
def test_exec_command_streaming(self):
if not api_test.exec_driver_is_native():
pytest.skip('Exec driver not native')
container = self.client.create_container(BUSYBOX, 'cat',
detach=True, stdin_open=True)
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
exec_id = self.client.exec_create(id, ['echo', 'hello\nworld'])
self.assertIn('Id', exec_id)
res = b''
for chunk in self.client.exec_start(exec_id, stream=True):
res += chunk
self.assertEqual(res, b'hello\nworld\n')
def test_exec_inspect(self):
if not api_test.exec_driver_is_native():
pytest.skip('Exec driver not native')
container = self.client.create_container(BUSYBOX, 'cat',
detach=True, stdin_open=True)
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
exec_id = self.client.exec_create(id, ['mkdir', '/does/not/exist'])
self.assertIn('Id', exec_id)
self.client.exec_start(exec_id)
exec_info = self.client.exec_inspect(exec_id)
self.assertIn('ExitCode', exec_info)
self.assertNotEqual(exec_info['ExitCode'], 0)

View File

@ -0,0 +1,235 @@
import contextlib
import json
import shutil
import socket
import tarfile
import tempfile
import threading
import pytest
import six
from six.moves import BaseHTTPServer
from six.moves import socketserver
import docker
from . import api_test
BUSYBOX = api_test.BUSYBOX
class ListImagesTest(api_test.BaseTestCase):
def test_images(self):
res1 = self.client.images(all=True)
self.assertIn('Id', res1[0])
res10 = res1[0]
self.assertIn('Created', res10)
self.assertIn('RepoTags', res10)
distinct = []
for img in res1:
if img['Id'] not in distinct:
distinct.append(img['Id'])
self.assertEqual(len(distinct), self.client.info()['Images'])
def test_images_quiet(self):
res1 = self.client.images(quiet=True)
self.assertEqual(type(res1[0]), six.text_type)
class PullImageTest(api_test.BaseTestCase):
def test_pull(self):
try:
self.client.remove_image('hello-world')
except docker.errors.APIError:
pass
res = self.client.pull('hello-world')
self.tmp_imgs.append('hello-world')
self.assertEqual(type(res), six.text_type)
self.assertGreaterEqual(
len(self.client.images('hello-world')), 1
)
img_info = self.client.inspect_image('hello-world')
self.assertIn('Id', img_info)
def test_pull_streaming(self):
try:
self.client.remove_image('hello-world')
except docker.errors.APIError:
pass
stream = self.client.pull('hello-world', stream=True)
self.tmp_imgs.append('hello-world')
for chunk in stream:
if six.PY3:
chunk = chunk.decode('utf-8')
json.loads(chunk) # ensure chunk is a single, valid JSON blob
self.assertGreaterEqual(
len(self.client.images('hello-world')), 1
)
img_info = self.client.inspect_image('hello-world')
self.assertIn('Id', img_info)
class CommitTest(api_test.BaseTestCase):
def test_commit(self):
container = self.client.create_container(BUSYBOX, ['touch', '/test'])
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
res = self.client.commit(id)
self.assertIn('Id', res)
img_id = res['Id']
self.tmp_imgs.append(img_id)
img = self.client.inspect_image(img_id)
self.assertIn('Container', img)
self.assertTrue(img['Container'].startswith(id))
self.assertIn('ContainerConfig', img)
self.assertIn('Image', img['ContainerConfig'])
self.assertEqual(BUSYBOX, img['ContainerConfig']['Image'])
busybox_id = self.client.inspect_image(BUSYBOX)['Id']
self.assertIn('Parent', img)
self.assertEqual(img['Parent'], busybox_id)
class RemoveImageTest(api_test.BaseTestCase):
def test_remove(self):
container = self.client.create_container(BUSYBOX, ['touch', '/test'])
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
res = self.client.commit(id)
self.assertIn('Id', res)
img_id = res['Id']
self.tmp_imgs.append(img_id)
self.client.remove_image(img_id, force=True)
images = self.client.images(all=True)
res = [x for x in images if x['Id'].startswith(img_id)]
self.assertEqual(len(res), 0)
class ImportImageTest(api_test.BaseTestCase):
'''Base class for `docker import` test cases.'''
TAR_SIZE = 512 * 1024
def write_dummy_tar_content(self, n_bytes, tar_fd):
def extend_file(f, n_bytes):
f.seek(n_bytes - 1)
f.write(bytearray([65]))
f.seek(0)
tar = tarfile.TarFile(fileobj=tar_fd, mode='w')
with tempfile.NamedTemporaryFile() as f:
extend_file(f, n_bytes)
tarinfo = tar.gettarinfo(name=f.name, arcname='testdata')
tar.addfile(tarinfo, fileobj=f)
tar.close()
@contextlib.contextmanager
def dummy_tar_stream(self, n_bytes):
'''Yields a stream that is valid tar data of size n_bytes.'''
with tempfile.NamedTemporaryFile() as tar_file:
self.write_dummy_tar_content(n_bytes, tar_file)
tar_file.seek(0)
yield tar_file
@contextlib.contextmanager
def dummy_tar_file(self, n_bytes):
'''Yields the name of a valid tar file of size n_bytes.'''
with tempfile.NamedTemporaryFile() as tar_file:
self.write_dummy_tar_content(n_bytes, tar_file)
tar_file.seek(0)
yield tar_file.name
def test_import_from_bytes(self):
with self.dummy_tar_stream(n_bytes=500) as f:
content = f.read()
# The generic import_image() function cannot import in-memory bytes
# data that happens to be represented as a string type, because
# import_image() will try to use it as a filename and usually then
# trigger an exception. So we test the import_image_from_data()
# function instead.
statuses = self.client.import_image_from_data(
content, repository='test/import-from-bytes')
result_text = statuses.splitlines()[-1]
result = json.loads(result_text)
self.assertNotIn('error', result)
img_id = result['status']
self.tmp_imgs.append(img_id)
def test_import_from_file(self):
with self.dummy_tar_file(n_bytes=self.TAR_SIZE) as tar_filename:
# statuses = self.client.import_image(
# src=tar_filename, repository='test/import-from-file')
statuses = self.client.import_image_from_file(
tar_filename, repository='test/import-from-file')
result_text = statuses.splitlines()[-1]
result = json.loads(result_text)
self.assertNotIn('error', result)
self.assertIn('status', result)
img_id = result['status']
self.tmp_imgs.append(img_id)
def test_import_from_stream(self):
with self.dummy_tar_stream(n_bytes=self.TAR_SIZE) as tar_stream:
statuses = self.client.import_image(
src=tar_stream, repository='test/import-from-stream')
# statuses = self.client.import_image_from_stream(
# tar_stream, repository='test/import-from-stream')
result_text = statuses.splitlines()[-1]
result = json.loads(result_text)
self.assertNotIn('error', result)
self.assertIn('status', result)
img_id = result['status']
self.tmp_imgs.append(img_id)
@contextlib.contextmanager
def temporary_http_file_server(self, stream):
'''Serve data from an IO stream over HTTP.'''
class Handler(BaseHTTPServer.BaseHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
self.send_header('Content-Type', 'application/x-tar')
self.end_headers()
shutil.copyfileobj(stream, self.wfile)
server = socketserver.TCPServer(('', 0), Handler)
thread = threading.Thread(target=server.serve_forever)
thread.setDaemon(True)
thread.start()
yield 'http://%s:%s' % (socket.gethostname(), server.server_address[1])
server.shutdown()
@pytest.mark.skipif(True, reason="Doesn't work inside a container - FIXME")
def test_import_from_url(self):
# The crappy test HTTP server doesn't handle large files well, so use
# a small file.
tar_size = 10240
with self.dummy_tar_stream(n_bytes=tar_size) as tar_data:
with self.temporary_http_file_server(tar_data) as url:
statuses = self.client.import_image(
src=url, repository='test/import-from-url')
result_text = statuses.splitlines()[-1]
result = json.loads(result_text)
self.assertNotIn('error', result)
self.assertIn('status', result)
img_id = result['status']
self.tmp_imgs.append(img_id)

View File

@ -0,0 +1,98 @@
import random
import docker
import pytest
from . import api_test
from ..base import requires_api_version
@requires_api_version('1.21')
class TestNetworks(api_test.BaseTestCase):
def create_network(self, *args, **kwargs):
net_name = 'dockerpy{}'.format(random.getrandbits(24))[:14]
net_id = self.client.create_network(net_name, *args, **kwargs)['id']
self.tmp_networks.append(net_id)
return (net_name, net_id)
def test_list_networks(self):
networks = self.client.networks()
initial_size = len(networks)
net_name, net_id = self.create_network()
networks = self.client.networks()
self.assertEqual(len(networks), initial_size + 1)
self.assertTrue(net_id in [n['id'] for n in networks])
networks_by_name = self.client.networks(names=[net_name])
self.assertEqual([n['id'] for n in networks_by_name], [net_id])
networks_by_partial_id = self.client.networks(ids=[net_id[:8]])
self.assertEqual([n['id'] for n in networks_by_partial_id], [net_id])
def test_inspect_network(self):
net_name, net_id = self.create_network()
net = self.client.inspect_network(net_id)
self.assertEqual(net, {
u'name': net_name,
u'id': net_id,
u'driver': 'bridge',
u'containers': {},
})
def test_create_network_with_host_driver_fails(self):
net_name = 'dockerpy{}'.format(random.getrandbits(24))[:14]
with pytest.raises(docker.errors.APIError):
self.client.create_network(net_name, driver='host')
def test_remove_network(self):
initial_size = len(self.client.networks())
net_name, net_id = self.create_network()
self.assertEqual(len(self.client.networks()), initial_size + 1)
self.client.remove_network(net_id)
self.assertEqual(len(self.client.networks()), initial_size)
def test_connect_and_disconnect_container(self):
net_name, net_id = self.create_network()
container = self.client.create_container('busybox', 'top')
self.tmp_containers.append(container)
self.client.start(container)
network_data = self.client.inspect_network(net_id)
self.assertFalse(network_data.get('containers'))
self.client.connect_container_to_network(container, net_id)
network_data = self.client.inspect_network(net_id)
self.assertEqual(
list(network_data['containers'].keys()),
[container['Id']])
self.client.disconnect_container_from_network(container, net_id)
network_data = self.client.inspect_network(net_id)
self.assertFalse(network_data.get('containers'))
def test_connect_on_container_create(self):
net_name, net_id = self.create_network()
container = self.client.create_container(
image='busybox',
command='top',
host_config=self.client.create_host_config(network_mode=net_name),
)
self.tmp_containers.append(container)
self.client.start(container)
network_data = self.client.inspect_network(net_id)
self.assertEqual(
list(network_data['containers'].keys()),
[container['Id']])
self.client.disconnect_container_from_network(container, net_id)
network_data = self.client.inspect_network(net_id)
self.assertFalse(network_data.get('containers'))

View File

@ -0,0 +1,69 @@
import io
import random
import docker
import six
from . import api_test
BUSYBOX = api_test.BUSYBOX
class TestRegressions(api_test.BaseTestCase):
def test_443_handle_nonchunked_response_in_stream(self):
dfile = io.BytesIO()
with self.assertRaises(docker.errors.APIError) as exc:
for line in self.client.build(fileobj=dfile, tag="a/b/c"):
pass
self.assertEqual(exc.exception.response.status_code, 500)
dfile.close()
def test_542_truncate_ids_client_side(self):
self.client.start(
self.client.create_container(BUSYBOX, ['true'])
)
result = self.client.containers(all=True, trunc=True)
self.assertEqual(len(result[0]['Id']), 12)
def test_647_support_doubleslash_in_image_names(self):
with self.assertRaises(docker.errors.APIError):
self.client.inspect_image('gensokyo.jp//kirisame')
def test_649_handle_timeout_value_none(self):
self.client.timeout = None
ctnr = self.client.create_container(BUSYBOX, ['sleep', '2'])
self.client.start(ctnr)
self.client.stop(ctnr)
def test_715_handle_user_param_as_int_value(self):
ctnr = self.client.create_container(BUSYBOX, ['id', '-u'], user=1000)
self.client.start(ctnr)
self.client.wait(ctnr)
logs = self.client.logs(ctnr)
if six.PY3:
logs = logs.decode('utf-8')
assert logs == '1000\n'
def test_792_explicit_port_protocol(self):
tcp_port, udp_port = random.sample(range(9999, 32000), 2)
ctnr = self.client.create_container(
BUSYBOX, ['sleep', '9999'], ports=[2000, (2000, 'udp')],
host_config=self.client.create_host_config(
port_bindings={'2000/tcp': tcp_port, '2000/udp': udp_port}
)
)
self.tmp_containers.append(ctnr)
self.client.start(ctnr)
self.assertEqual(
self.client.port(ctnr, 2000)[0]['HostPort'],
six.text_type(tcp_port)
)
self.assertEqual(
self.client.port(ctnr, '2000/tcp')[0]['HostPort'],
six.text_type(tcp_port)
)
self.assertEqual(
self.client.port(ctnr, '2000/udp')[0]['HostPort'],
six.text_type(udp_port)
)

View File

@ -0,0 +1,56 @@
import docker
import pytest
from . import api_test
from ..base import requires_api_version
@requires_api_version('1.21')
class TestVolumes(api_test.BaseTestCase):
def test_create_volume(self):
name = 'perfectcherryblossom'
self.tmp_volumes.append(name)
result = self.client.create_volume(name)
self.assertIn('Name', result)
self.assertEqual(result['Name'], name)
self.assertIn('Driver', result)
self.assertEqual(result['Driver'], 'local')
def test_create_volume_invalid_driver(self):
driver_name = 'invalid.driver'
with pytest.raises(docker.errors.NotFound):
self.client.create_volume('perfectcherryblossom', driver_name)
def test_list_volumes(self):
name = 'imperishablenight'
self.tmp_volumes.append(name)
volume_info = self.client.create_volume(name)
result = self.client.volumes()
self.assertIn('Volumes', result)
volumes = result['Volumes']
self.assertIn(volume_info, volumes)
def test_inspect_volume(self):
name = 'embodimentofscarletdevil'
self.tmp_volumes.append(name)
volume_info = self.client.create_volume(name)
result = self.client.inspect_volume(name)
self.assertEqual(volume_info, result)
def test_inspect_nonexistent_volume(self):
name = 'embodimentofscarletdevil'
with pytest.raises(docker.errors.NotFound):
self.client.inspect_volume(name)
def test_remove_volume(self):
name = 'shootthebullet'
self.tmp_volumes.append(name)
self.client.create_volume(name)
result = self.client.remove_volume(name)
self.assertTrue(result)
def test_remove_nonexistent_volume(self):
name = 'shootthebullet'
with pytest.raises(docker.errors.NotFound):
self.client.remove_volume(name)

File diff suppressed because it is too large Load Diff

418
tests/unit/api_test.py Normal file
View File

@ -0,0 +1,418 @@
# Copyright 2013 dotCloud inc.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import json
import os
import re
import shutil
import socket
import sys
import tempfile
import threading
import time
import docker
import requests
import six
from .. import base
from . import fake_api
import pytest
try:
from unittest import mock
except ImportError:
import mock
DEFAULT_TIMEOUT_SECONDS = docker.constants.DEFAULT_TIMEOUT_SECONDS
def response(status_code=200, content='', headers=None, reason=None, elapsed=0,
request=None):
res = requests.Response()
res.status_code = status_code
if not isinstance(content, six.binary_type):
content = json.dumps(content).encode('ascii')
res._content = content
res.headers = requests.structures.CaseInsensitiveDict(headers or {})
res.reason = reason
res.elapsed = datetime.timedelta(elapsed)
res.request = request
return res
def fake_resolve_authconfig(authconfig, registry=None):
return None
def fake_inspect_container(self, container, tty=False):
return fake_api.get_fake_inspect_container(tty=tty)[1]
def fake_resp(method, url, *args, **kwargs):
key = None
if url in fake_api.fake_responses:
key = url
elif (url, method) in fake_api.fake_responses:
key = (url, method)
if not key:
raise Exception('{0} {1}'.format(method, url))
status_code, content = fake_api.fake_responses[key]()
return response(status_code=status_code, content=content)
fake_request = mock.Mock(side_effect=fake_resp)
def fake_get(self, url, *args, **kwargs):
return fake_request('GET', url, *args, **kwargs)
def fake_post(self, url, *args, **kwargs):
return fake_request('POST', url, *args, **kwargs)
def fake_put(self, url, *args, **kwargs):
return fake_request('PUT', url, *args, **kwargs)
def fake_delete(self, url, *args, **kwargs):
return fake_request('DELETE', url, *args, **kwargs)
url_base = 'http+docker://localunixsocket/'
url_prefix = '{0}v{1}/'.format(
url_base,
docker.constants.DEFAULT_DOCKER_API_VERSION)
class DockerClientTest(base.Cleanup, base.BaseTestCase):
def setUp(self):
self.patcher = mock.patch.multiple(
'docker.Client', get=fake_get, post=fake_post, put=fake_put,
delete=fake_delete
)
self.patcher.start()
self.client = docker.Client()
# Force-clear authconfig to avoid tampering with the tests
self.client._cfg = {'Configs': {}}
def tearDown(self):
self.client.close()
self.patcher.stop()
def assertIn(self, object, collection):
if six.PY2 and sys.version_info[1] <= 6:
return self.assertTrue(object in collection)
return super(DockerClientTest, self).assertIn(object, collection)
def base_create_payload(self, img='busybox', cmd=None):
if not cmd:
cmd = ['true']
return {"Tty": False, "Image": img, "Cmd": cmd,
"AttachStdin": False,
"AttachStderr": True, "AttachStdout": True,
"StdinOnce": False,
"OpenStdin": False, "NetworkDisabled": False,
}
class DockerApiTest(DockerClientTest):
def test_ctor(self):
with pytest.raises(docker.errors.DockerException) as excinfo:
docker.Client(version=1.12)
self.assertEqual(
str(excinfo.value),
'Version parameter must be a string or None. Found float'
)
def test_url_valid_resource(self):
url = self.client._url('/hello/{0}/world', 'somename')
self.assertEqual(
url, '{0}{1}'.format(url_prefix, 'hello/somename/world')
)
url = self.client._url(
'/hello/{0}/world/{1}', 'somename', 'someothername'
)
self.assertEqual(
url,
'{0}{1}'.format(url_prefix, 'hello/somename/world/someothername')
)
url = self.client._url('/hello/{0}/world', '/some?name')
self.assertEqual(
url, '{0}{1}'.format(url_prefix, 'hello/%2Fsome%3Fname/world')
)
def test_url_invalid_resource(self):
with pytest.raises(ValueError):
self.client._url('/hello/{0}/world', ['sakuya', 'izayoi'])
def test_url_no_resource(self):
url = self.client._url('/simple')
self.assertEqual(url, '{0}{1}'.format(url_prefix, 'simple'))
def test_url_unversioned_api(self):
url = self.client._url(
'/hello/{0}/world', 'somename', versioned_api=False
)
self.assertEqual(
url, '{0}{1}'.format(url_base, 'hello/somename/world')
)
def test_version(self):
self.client.version()
fake_request.assert_called_with(
'GET',
url_prefix + 'version',
timeout=DEFAULT_TIMEOUT_SECONDS
)
def test_version_no_api_version(self):
self.client.version(False)
fake_request.assert_called_with(
'GET',
url_base + 'version',
timeout=DEFAULT_TIMEOUT_SECONDS
)
def test_retrieve_server_version(self):
client = docker.Client(version="auto")
self.assertTrue(isinstance(client._version, six.string_types))
self.assertFalse(client._version == "auto")
client.close()
def test_auto_retrieve_server_version(self):
version = self.client._retrieve_server_version()
self.assertTrue(isinstance(version, six.string_types))
def test_info(self):
self.client.info()
fake_request.assert_called_with(
'GET',
url_prefix + 'info',
timeout=DEFAULT_TIMEOUT_SECONDS
)
def test_search(self):
self.client.search('busybox')
fake_request.assert_called_with(
'GET',
url_prefix + 'images/search',
params={'term': 'busybox'},
timeout=DEFAULT_TIMEOUT_SECONDS
)
def test_events(self):
self.client.events()
fake_request.assert_called_with(
'GET',
url_prefix + 'events',
params={'since': None, 'until': None, 'filters': None},
stream=True
)
def test_events_with_since_until(self):
ts = 1356048000
now = datetime.datetime.utcfromtimestamp(ts)
since = now - datetime.timedelta(seconds=10)
until = now + datetime.timedelta(seconds=10)
self.client.events(since=since, until=until)
fake_request.assert_called_with(
'GET',
url_prefix + 'events',
params={
'since': ts - 10,
'until': ts + 10,
'filters': None
},
stream=True
)
def test_events_with_filters(self):
filters = {'event': ['die', 'stop'],
'container': fake_api.FAKE_CONTAINER_ID}
self.client.events(filters=filters)
expected_filters = docker.utils.convert_filters(filters)
fake_request.assert_called_with(
'GET',
url_prefix + 'events',
params={
'since': None,
'until': None,
'filters': expected_filters
},
stream=True
)
def _socket_path_for_client_session(self, client):
socket_adapter = client.get_adapter('http+docker://')
return socket_adapter.socket_path
def test_url_compatibility_unix(self):
c = docker.Client(base_url="unix://socket")
assert self._socket_path_for_client_session(c) == '/socket'
def test_url_compatibility_unix_triple_slash(self):
c = docker.Client(base_url="unix:///socket")
assert self._socket_path_for_client_session(c) == '/socket'
def test_url_compatibility_http_unix_triple_slash(self):
c = docker.Client(base_url="http+unix:///socket")
assert self._socket_path_for_client_session(c) == '/socket'
def test_url_compatibility_http(self):
c = docker.Client(base_url="http://hostname:1234")
assert c.base_url == "http://hostname:1234"
def test_url_compatibility_tcp(self):
c = docker.Client(base_url="tcp://hostname:1234")
assert c.base_url == "http://hostname:1234"
def test_remove_link(self):
self.client.remove_container(fake_api.FAKE_CONTAINER_ID, link=True)
fake_request.assert_called_with(
'DELETE',
url_prefix + 'containers/3cc2351ab11b',
params={'v': False, 'link': True, 'force': False},
timeout=DEFAULT_TIMEOUT_SECONDS
)
def test_create_host_config_secopt(self):
security_opt = ['apparmor:test_profile']
result = self.client.create_host_config(security_opt=security_opt)
self.assertIn('SecurityOpt', result)
self.assertEqual(result['SecurityOpt'], security_opt)
self.assertRaises(
docker.errors.DockerException, self.client.create_host_config,
security_opt='wrong'
)
class StreamTest(base.Cleanup, base.BaseTestCase):
def setUp(self):
socket_dir = tempfile.mkdtemp()
self.build_context = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, socket_dir)
self.addCleanup(shutil.rmtree, self.build_context)
self.socket_file = os.path.join(socket_dir, 'test_sock.sock')
self.server_socket = self._setup_socket()
self.stop_server = False
server_thread = threading.Thread(target=self.run_server)
server_thread.setDaemon(True)
server_thread.start()
self.response = None
self.request_handler = None
self.addCleanup(server_thread.join)
self.addCleanup(self.stop)
def stop(self):
self.stop_server = True
def _setup_socket(self):
server_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
server_sock.bind(self.socket_file)
# Non-blocking mode so that we can shut the test down easily
server_sock.setblocking(0)
server_sock.listen(5)
return server_sock
def run_server(self):
try:
while not self.stop_server:
try:
connection, client_address = self.server_socket.accept()
except socket.error:
# Probably no connection to accept yet
time.sleep(0.01)
continue
connection.setblocking(1)
try:
self.request_handler(connection)
finally:
connection.close()
finally:
self.server_socket.close()
def early_response_sending_handler(self, connection):
data = b''
headers = None
connection.sendall(self.response)
while not headers:
data += connection.recv(2048)
parts = data.split(b'\r\n\r\n', 1)
if len(parts) == 2:
headers, data = parts
mo = re.search(r'Content-Length: ([0-9]+)', headers.decode())
assert mo
content_length = int(mo.group(1))
while True:
if len(data) >= content_length:
break
data += connection.recv(2048)
def test_early_stream_response(self):
self.request_handler = self.early_response_sending_handler
lines = []
for i in range(0, 50):
line = str(i).encode()
lines += [('%x' % len(line)).encode(), line]
lines.append(b'0')
lines.append(b'')
self.response = (
b'HTTP/1.1 200 OK\r\n'
b'Transfer-Encoding: chunked\r\n'
b'\r\n'
) + b'\r\n'.join(lines)
with docker.Client(base_url="http+unix://" + self.socket_file) \
as client:
for i in range(5):
try:
stream = client.build(
path=self.build_context,
stream=True
)
break
except requests.ConnectionError as e:
if i == 4:
raise e
self.assertEqual(list(stream), [
str(i).encode() for i in range(50)])

289
tests/unit/auth_test.py Normal file
View File

@ -0,0 +1,289 @@
# -*- coding: utf-8 -*-
import base64
import json
import os
import os.path
import random
import shutil
import tempfile
from docker import auth
from .. import base
try:
from unittest import mock
except ImportError:
import mock
class RegressionTest(base.BaseTestCase):
def test_803_urlsafe_encode(self):
auth_data = {
'username': 'root',
'password': 'GR?XGR?XGR?XGR?X'
}
encoded = auth.encode_header(auth_data)
assert b'/' not in encoded
assert b'_' in encoded
class ResolveAuthTest(base.BaseTestCase):
auth_config = {
'https://index.docker.io/v1/': {'auth': 'indexuser'},
'my.registry.net': {'auth': 'privateuser'},
'http://legacy.registry.url/v1/': {'auth': 'legacyauth'}
}
def test_resolve_repository_name_hub_library_image(self):
self.assertEqual(
auth.resolve_repository_name('image'),
('index.docker.io', 'image'),
)
def test_resolve_repository_name_hub_image(self):
self.assertEqual(
auth.resolve_repository_name('username/image'),
('index.docker.io', 'username/image'),
)
def test_resolve_repository_name_private_registry(self):
self.assertEqual(
auth.resolve_repository_name('my.registry.net/image'),
('my.registry.net', 'image'),
)
def test_resolve_repository_name_private_registry_with_port(self):
self.assertEqual(
auth.resolve_repository_name('my.registry.net:5000/image'),
('my.registry.net:5000', 'image'),
)
def test_resolve_repository_name_private_registry_with_username(self):
self.assertEqual(
auth.resolve_repository_name('my.registry.net/username/image'),
('my.registry.net', 'username/image'),
)
def test_resolve_repository_name_no_dots_but_port(self):
self.assertEqual(
auth.resolve_repository_name('hostname:5000/image'),
('hostname:5000', 'image'),
)
def test_resolve_repository_name_no_dots_but_port_and_username(self):
self.assertEqual(
auth.resolve_repository_name('hostname:5000/username/image'),
('hostname:5000', 'username/image'),
)
def test_resolve_repository_name_localhost(self):
self.assertEqual(
auth.resolve_repository_name('localhost/image'),
('localhost', 'image'),
)
def test_resolve_repository_name_localhost_with_username(self):
self.assertEqual(
auth.resolve_repository_name('localhost/username/image'),
('localhost', 'username/image'),
)
def test_resolve_authconfig_hostname_only(self):
self.assertEqual(
auth.resolve_authconfig(self.auth_config, 'my.registry.net'),
{'auth': 'privateuser'}
)
def test_resolve_authconfig_no_protocol(self):
self.assertEqual(
auth.resolve_authconfig(self.auth_config, 'my.registry.net/v1/'),
{'auth': 'privateuser'}
)
def test_resolve_authconfig_no_path(self):
self.assertEqual(
auth.resolve_authconfig(
self.auth_config, 'http://my.registry.net'
),
{'auth': 'privateuser'}
)
def test_resolve_authconfig_no_path_trailing_slash(self):
self.assertEqual(
auth.resolve_authconfig(
self.auth_config, 'http://my.registry.net/'
),
{'auth': 'privateuser'}
)
def test_resolve_authconfig_no_path_wrong_secure_proto(self):
self.assertEqual(
auth.resolve_authconfig(
self.auth_config, 'https://my.registry.net'
),
{'auth': 'privateuser'}
)
def test_resolve_authconfig_no_path_wrong_insecure_proto(self):
self.assertEqual(
auth.resolve_authconfig(
self.auth_config, 'http://index.docker.io'
),
{'auth': 'indexuser'}
)
def test_resolve_authconfig_path_wrong_proto(self):
self.assertEqual(
auth.resolve_authconfig(
self.auth_config, 'https://my.registry.net/v1/'
),
{'auth': 'privateuser'}
)
def test_resolve_authconfig_default_registry(self):
self.assertEqual(
auth.resolve_authconfig(self.auth_config), {'auth': 'indexuser'}
)
def test_resolve_authconfig_default_explicit_none(self):
self.assertEqual(
auth.resolve_authconfig(self.auth_config, None),
{'auth': 'indexuser'}
)
def test_resolve_authconfig_fully_explicit(self):
self.assertEqual(
auth.resolve_authconfig(
self.auth_config, 'http://my.registry.net/v1/'
),
{'auth': 'privateuser'}
)
def test_resolve_authconfig_legacy_config(self):
self.assertEqual(
auth.resolve_authconfig(self.auth_config, 'legacy.registry.url'),
{'auth': 'legacyauth'}
)
def test_resolve_authconfig_no_match(self):
self.assertTrue(
auth.resolve_authconfig(self.auth_config, 'does.not.exist') is None
)
def test_resolve_registry_and_auth_library_image(self):
image = 'image'
self.assertEqual(
auth.resolve_authconfig(
self.auth_config, auth.resolve_repository_name(image)[0]
),
{'auth': 'indexuser'},
)
def test_resolve_registry_and_auth_hub_image(self):
image = 'username/image'
self.assertEqual(
auth.resolve_authconfig(
self.auth_config, auth.resolve_repository_name(image)[0]
),
{'auth': 'indexuser'},
)
def test_resolve_registry_and_auth_private_registry(self):
image = 'my.registry.net/image'
self.assertEqual(
auth.resolve_authconfig(
self.auth_config, auth.resolve_repository_name(image)[0]
),
{'auth': 'privateuser'},
)
def test_resolve_registry_and_auth_unauthenticated_registry(self):
image = 'other.registry.net/image'
self.assertEqual(
auth.resolve_authconfig(
self.auth_config, auth.resolve_repository_name(image)[0]
),
None,
)
class LoadConfigTest(base.Cleanup, base.BaseTestCase):
def test_load_config_no_file(self):
folder = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, folder)
cfg = auth.load_config(folder)
self.assertTrue(cfg is not None)
def test_load_config(self):
folder = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, folder)
dockercfg_path = os.path.join(folder, '.dockercfg')
with open(dockercfg_path, 'w') as f:
auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii')
f.write('auth = {0}\n'.format(auth_))
f.write('email = sakuya@scarlet.net')
cfg = auth.load_config(dockercfg_path)
assert auth.INDEX_NAME in cfg
self.assertNotEqual(cfg[auth.INDEX_NAME], None)
cfg = cfg[auth.INDEX_NAME]
self.assertEqual(cfg['username'], 'sakuya')
self.assertEqual(cfg['password'], 'izayoi')
self.assertEqual(cfg['email'], 'sakuya@scarlet.net')
self.assertEqual(cfg.get('auth'), None)
def test_load_config_with_random_name(self):
folder = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, folder)
dockercfg_path = os.path.join(folder,
'.{0}.dockercfg'.format(
random.randrange(100000)))
registry = 'https://your.private.registry.io'
auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii')
config = {
registry: {
'auth': '{0}'.format(auth_),
'email': 'sakuya@scarlet.net'
}
}
with open(dockercfg_path, 'w') as f:
json.dump(config, f)
cfg = auth.load_config(dockercfg_path)
assert registry in cfg
self.assertNotEqual(cfg[registry], None)
cfg = cfg[registry]
self.assertEqual(cfg['username'], 'sakuya')
self.assertEqual(cfg['password'], 'izayoi')
self.assertEqual(cfg['email'], 'sakuya@scarlet.net')
self.assertEqual(cfg.get('auth'), None)
def test_load_config_custom_config_env(self):
folder = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, folder)
dockercfg_path = os.path.join(folder, 'config.json')
registry = 'https://your.private.registry.io'
auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii')
config = {
registry: {
'auth': '{0}'.format(auth_),
'email': 'sakuya@scarlet.net'
}
}
with open(dockercfg_path, 'w') as f:
json.dump(config, f)
with mock.patch.dict(os.environ, {'DOCKER_CONFIG': folder}):
cfg = auth.load_config(None)
assert registry in cfg
self.assertNotEqual(cfg[registry], None)
cfg = cfg[registry]
self.assertEqual(cfg['username'], 'sakuya')
self.assertEqual(cfg['password'], 'izayoi')
self.assertEqual(cfg['email'], 'sakuya@scarlet.net')
self.assertEqual(cfg.get('auth'), None)

105
tests/unit/build_test.py Normal file
View File

@ -0,0 +1,105 @@
import gzip
import io
import docker
from .api_test import DockerClientTest
class BuildTest(DockerClientTest):
def test_build_container(self):
script = io.BytesIO('\n'.join([
'FROM busybox',
'MAINTAINER docker-py',
'RUN mkdir -p /tmp/test',
'EXPOSE 8080',
'ADD https://dl.dropboxusercontent.com/u/20637798/silence.tar.gz'
' /tmp/silence.tar.gz'
]).encode('ascii'))
self.client.build(fileobj=script)
def test_build_container_pull(self):
script = io.BytesIO('\n'.join([
'FROM busybox',
'MAINTAINER docker-py',
'RUN mkdir -p /tmp/test',
'EXPOSE 8080',
'ADD https://dl.dropboxusercontent.com/u/20637798/silence.tar.gz'
' /tmp/silence.tar.gz'
]).encode('ascii'))
self.client.build(fileobj=script, pull=True)
def test_build_container_stream(self):
script = io.BytesIO('\n'.join([
'FROM busybox',
'MAINTAINER docker-py',
'RUN mkdir -p /tmp/test',
'EXPOSE 8080',
'ADD https://dl.dropboxusercontent.com/u/20637798/silence.tar.gz'
' /tmp/silence.tar.gz'
]).encode('ascii'))
self.client.build(fileobj=script, stream=True)
def test_build_container_custom_context(self):
script = io.BytesIO('\n'.join([
'FROM busybox',
'MAINTAINER docker-py',
'RUN mkdir -p /tmp/test',
'EXPOSE 8080',
'ADD https://dl.dropboxusercontent.com/u/20637798/silence.tar.gz'
' /tmp/silence.tar.gz'
]).encode('ascii'))
context = docker.utils.mkbuildcontext(script)
self.client.build(fileobj=context, custom_context=True)
def test_build_container_custom_context_gzip(self):
script = io.BytesIO('\n'.join([
'FROM busybox',
'MAINTAINER docker-py',
'RUN mkdir -p /tmp/test',
'EXPOSE 8080',
'ADD https://dl.dropboxusercontent.com/u/20637798/silence.tar.gz'
' /tmp/silence.tar.gz'
]).encode('ascii'))
context = docker.utils.mkbuildcontext(script)
gz_context = gzip.GzipFile(fileobj=context)
self.client.build(
fileobj=gz_context,
custom_context=True,
encoding="gzip"
)
def test_build_remote_with_registry_auth(self):
self.client._auth_configs = {
'https://example.com': {
'user': 'example',
'password': 'example',
'email': 'example@example.com'
}
}
self.client.build(path='https://github.com/docker-library/mongo')
def test_build_container_with_named_dockerfile(self):
self.client.build('.', dockerfile='nameddockerfile')
def test_build_container_with_container_limits(self):
self.client.build('.', container_limits={
'memory': 1024 * 1024,
'cpusetcpus': 1,
'cpushares': 1000,
'memswap': 1024 * 1024 * 8
})
def test_build_container_invalid_container_limits(self):
self.assertRaises(
docker.errors.DockerException,
lambda: self.client.build('.', container_limits={
'foo': 'bar'
})
)

File diff suppressed because it is too large Load Diff

75
tests/unit/exec_test.py Normal file
View File

@ -0,0 +1,75 @@
import json
from . import fake_api
from .api_test import (
DockerClientTest, url_prefix, fake_request, DEFAULT_TIMEOUT_SECONDS,
)
class ExecTest(DockerClientTest):
def test_exec_create(self):
self.client.exec_create(fake_api.FAKE_CONTAINER_ID, ['ls', '-1'])
args = fake_request.call_args
self.assertEqual(
'POST',
args[0][0], url_prefix + 'containers/{0}/exec'.format(
fake_api.FAKE_CONTAINER_ID
)
)
self.assertEqual(
json.loads(args[1]['data']), {
'Tty': False,
'AttachStdout': True,
'Container': fake_api.FAKE_CONTAINER_ID,
'Cmd': ['ls', '-1'],
'Privileged': False,
'AttachStdin': False,
'AttachStderr': True,
'User': ''
}
)
self.assertEqual(args[1]['headers'],
{'Content-Type': 'application/json'})
def test_exec_start(self):
self.client.exec_start(fake_api.FAKE_EXEC_ID)
args = fake_request.call_args
self.assertEqual(
args[0][1], url_prefix + 'exec/{0}/start'.format(
fake_api.FAKE_EXEC_ID
)
)
self.assertEqual(
json.loads(args[1]['data']), {
'Tty': False,
'Detach': False,
}
)
self.assertEqual(args[1]['headers'],
{'Content-Type': 'application/json'})
def test_exec_inspect(self):
self.client.exec_inspect(fake_api.FAKE_EXEC_ID)
args = fake_request.call_args
self.assertEqual(
args[0][1], url_prefix + 'exec/{0}/json'.format(
fake_api.FAKE_EXEC_ID
)
)
def test_exec_resize(self):
self.client.exec_resize(fake_api.FAKE_EXEC_ID, height=20, width=60)
fake_request.assert_called_with(
'POST',
url_prefix + 'exec/{0}/resize'.format(fake_api.FAKE_EXEC_ID),
params={'h': 20, 'w': 60},
timeout=DEFAULT_TIMEOUT_SECONDS
)

346
tests/unit/image_test.py Normal file
View File

@ -0,0 +1,346 @@
import docker
import pytest
from . import fake_api
from .api_test import (
DockerClientTest, fake_request, DEFAULT_TIMEOUT_SECONDS, url_prefix,
fake_resolve_authconfig
)
try:
from unittest import mock
except ImportError:
import mock
class ImageTest(DockerClientTest):
def test_image_viz(self):
with pytest.raises(Exception):
self.client.images('busybox', viz=True)
self.fail('Viz output should not be supported!')
def test_images(self):
self.client.images(all=True)
fake_request.assert_called_with(
'GET',
url_prefix + 'images/json',
params={'filter': None, 'only_ids': 0, 'all': 1},
timeout=DEFAULT_TIMEOUT_SECONDS
)
def test_images_quiet(self):
self.client.images(all=True, quiet=True)
fake_request.assert_called_with(
'GET',
url_prefix + 'images/json',
params={'filter': None, 'only_ids': 1, 'all': 1},
timeout=DEFAULT_TIMEOUT_SECONDS
)
def test_image_ids(self):
self.client.images(quiet=True)
fake_request.assert_called_with(
'GET',
url_prefix + 'images/json',
params={'filter': None, 'only_ids': 1, 'all': 0},
timeout=DEFAULT_TIMEOUT_SECONDS
)
def test_images_filters(self):
self.client.images(filters={'dangling': True})
fake_request.assert_called_with(
'GET',
url_prefix + 'images/json',
params={'filter': None, 'only_ids': 0, 'all': 0,
'filters': '{"dangling": ["true"]}'},
timeout=DEFAULT_TIMEOUT_SECONDS
)
def test_pull(self):
self.client.pull('joffrey/test001')
args = fake_request.call_args
self.assertEqual(
args[0][1],
url_prefix + 'images/create'
)
self.assertEqual(
args[1]['params'],
{'tag': None, 'fromImage': 'joffrey/test001'}
)
self.assertFalse(args[1]['stream'])
def test_pull_stream(self):
self.client.pull('joffrey/test001', stream=True)
args = fake_request.call_args
self.assertEqual(
args[0][1],
url_prefix + 'images/create'
)
self.assertEqual(
args[1]['params'],
{'tag': None, 'fromImage': 'joffrey/test001'}
)
self.assertTrue(args[1]['stream'])
def test_commit(self):
self.client.commit(fake_api.FAKE_CONTAINER_ID)
fake_request.assert_called_with(
'POST',
url_prefix + 'commit',
data='{}',
headers={'Content-Type': 'application/json'},
params={
'repo': None,
'comment': None,
'tag': None,
'container': '3cc2351ab11b',
'author': None
},
timeout=DEFAULT_TIMEOUT_SECONDS
)
def test_remove_image(self):
self.client.remove_image(fake_api.FAKE_IMAGE_ID)
fake_request.assert_called_with(
'DELETE',
url_prefix + 'images/e9aa60c60128',
params={'force': False, 'noprune': False},
timeout=DEFAULT_TIMEOUT_SECONDS
)
def test_image_history(self):
self.client.history(fake_api.FAKE_IMAGE_NAME)
fake_request.assert_called_with(
'GET',
url_prefix + 'images/test_image/history',
timeout=DEFAULT_TIMEOUT_SECONDS
)
def test_import_image(self):
self.client.import_image(
fake_api.FAKE_TARBALL_PATH,
repository=fake_api.FAKE_REPO_NAME,
tag=fake_api.FAKE_TAG_NAME
)
fake_request.assert_called_with(
'POST',
url_prefix + 'images/create',
params={
'repo': fake_api.FAKE_REPO_NAME,
'tag': fake_api.FAKE_TAG_NAME,
'fromSrc': fake_api.FAKE_TARBALL_PATH
},
data=None,
timeout=DEFAULT_TIMEOUT_SECONDS
)
def test_import_image_from_bytes(self):
stream = (i for i in range(0, 100))
self.client.import_image(
stream,
repository=fake_api.FAKE_REPO_NAME,
tag=fake_api.FAKE_TAG_NAME
)
fake_request.assert_called_with(
'POST',
url_prefix + 'images/create',
params={
'repo': fake_api.FAKE_REPO_NAME,
'tag': fake_api.FAKE_TAG_NAME,
'fromSrc': '-',
},
headers={
'Content-Type': 'application/tar',
},
data=stream,
timeout=DEFAULT_TIMEOUT_SECONDS
)
def test_import_image_from_image(self):
self.client.import_image(
image=fake_api.FAKE_IMAGE_NAME,
repository=fake_api.FAKE_REPO_NAME,
tag=fake_api.FAKE_TAG_NAME
)
fake_request.assert_called_with(
'POST',
url_prefix + 'images/create',
params={
'repo': fake_api.FAKE_REPO_NAME,
'tag': fake_api.FAKE_TAG_NAME,
'fromImage': fake_api.FAKE_IMAGE_NAME
},
data=None,
timeout=DEFAULT_TIMEOUT_SECONDS
)
def test_inspect_image(self):
self.client.inspect_image(fake_api.FAKE_IMAGE_NAME)
fake_request.assert_called_with(
'GET',
url_prefix + 'images/test_image/json',
timeout=DEFAULT_TIMEOUT_SECONDS
)
def test_inspect_image_undefined_id(self):
for arg in None, '', {True: True}:
with pytest.raises(docker.errors.NullResource) as excinfo:
self.client.inspect_image(arg)
self.assertEqual(
excinfo.value.args[0], 'image or container param is undefined'
)
def test_insert_image(self):
try:
self.client.insert(fake_api.FAKE_IMAGE_NAME,
fake_api.FAKE_URL, fake_api.FAKE_PATH)
except docker.errors.DeprecatedMethod:
self.assertTrue(
docker.utils.compare_version('1.12', self.client._version) >= 0
)
return
fake_request.assert_called_with(
'POST',
url_prefix + 'images/test_image/insert',
params={
'url': fake_api.FAKE_URL,
'path': fake_api.FAKE_PATH
},
timeout=DEFAULT_TIMEOUT_SECONDS
)
def test_push_image(self):
with mock.patch('docker.auth.auth.resolve_authconfig',
fake_resolve_authconfig):
self.client.push(fake_api.FAKE_IMAGE_NAME)
fake_request.assert_called_with(
'POST',
url_prefix + 'images/test_image/push',
params={
'tag': None
},
data='{}',
headers={'Content-Type': 'application/json'},
stream=False,
timeout=DEFAULT_TIMEOUT_SECONDS
)
def test_push_image_with_tag(self):
with mock.patch('docker.auth.auth.resolve_authconfig',
fake_resolve_authconfig):
self.client.push(
fake_api.FAKE_IMAGE_NAME, tag=fake_api.FAKE_TAG_NAME
)
fake_request.assert_called_with(
'POST',
url_prefix + 'images/test_image/push',
params={
'tag': fake_api.FAKE_TAG_NAME,
},
data='{}',
headers={'Content-Type': 'application/json'},
stream=False,
timeout=DEFAULT_TIMEOUT_SECONDS
)
def test_push_image_stream(self):
with mock.patch('docker.auth.auth.resolve_authconfig',
fake_resolve_authconfig):
self.client.push(fake_api.FAKE_IMAGE_NAME, stream=True)
fake_request.assert_called_with(
'POST',
url_prefix + 'images/test_image/push',
params={
'tag': None
},
data='{}',
headers={'Content-Type': 'application/json'},
stream=True,
timeout=DEFAULT_TIMEOUT_SECONDS
)
def test_tag_image(self):
self.client.tag(fake_api.FAKE_IMAGE_ID, fake_api.FAKE_REPO_NAME)
fake_request.assert_called_with(
'POST',
url_prefix + 'images/e9aa60c60128/tag',
params={
'tag': None,
'repo': 'repo',
'force': 0
},
timeout=DEFAULT_TIMEOUT_SECONDS
)
def test_tag_image_tag(self):
self.client.tag(
fake_api.FAKE_IMAGE_ID,
fake_api.FAKE_REPO_NAME,
tag=fake_api.FAKE_TAG_NAME
)
fake_request.assert_called_with(
'POST',
url_prefix + 'images/e9aa60c60128/tag',
params={
'tag': 'tag',
'repo': 'repo',
'force': 0
},
timeout=DEFAULT_TIMEOUT_SECONDS
)
def test_tag_image_force(self):
self.client.tag(
fake_api.FAKE_IMAGE_ID, fake_api.FAKE_REPO_NAME, force=True)
fake_request.assert_called_with(
'POST',
url_prefix + 'images/e9aa60c60128/tag',
params={
'tag': None,
'repo': 'repo',
'force': 1
},
timeout=DEFAULT_TIMEOUT_SECONDS
)
def test_get_image(self):
self.client.get_image(fake_api.FAKE_IMAGE_ID)
fake_request.assert_called_with(
'GET',
url_prefix + 'images/e9aa60c60128/get',
stream=True,
timeout=DEFAULT_TIMEOUT_SECONDS
)
def test_load_image(self):
self.client.load_image('Byte Stream....')
fake_request.assert_called_with(
'POST',
url_prefix + 'images/load',
data='Byte Stream....',
timeout=DEFAULT_TIMEOUT_SECONDS
)

149
tests/unit/network_test.py Normal file
View File

@ -0,0 +1,149 @@
import json
import six
from .. import base
from .api_test import DockerClientTest, url_prefix, response
try:
from unittest import mock
except ImportError:
import mock
class NetworkTest(DockerClientTest):
@base.requires_api_version('1.21')
def test_list_networks(self):
networks = [
{
"name": "none",
"id": "8e4e55c6863ef424",
"type": "null",
"endpoints": []
},
{
"name": "host",
"id": "062b6d9ea7913fde",
"type": "host",
"endpoints": []
},
]
get = mock.Mock(return_value=response(
status_code=200, content=json.dumps(networks).encode('utf-8')))
with mock.patch('docker.Client.get', get):
self.assertEqual(self.client.networks(), networks)
self.assertEqual(get.call_args[0][0], url_prefix + 'networks')
filters = json.loads(get.call_args[1]['params']['filters'])
self.assertFalse(filters)
self.client.networks(names=['foo'])
filters = json.loads(get.call_args[1]['params']['filters'])
self.assertEqual(filters, {'name': ['foo']})
self.client.networks(ids=['123'])
filters = json.loads(get.call_args[1]['params']['filters'])
self.assertEqual(filters, {'id': ['123']})
@base.requires_api_version('1.21')
def test_create_network(self):
network_data = {
"id": 'abc12345',
"warning": "",
}
network_response = response(status_code=200, content=network_data)
post = mock.Mock(return_value=network_response)
with mock.patch('docker.Client.post', post):
result = self.client.create_network('foo')
self.assertEqual(result, network_data)
self.assertEqual(
post.call_args[0][0],
url_prefix + 'networks/create')
self.assertEqual(
json.loads(post.call_args[1]['data']),
{"name": "foo"})
self.client.create_network('foo', 'bridge')
self.assertEqual(
json.loads(post.call_args[1]['data']),
{"name": "foo", "driver": "bridge"})
@base.requires_api_version('1.21')
def test_remove_network(self):
network_id = 'abc12345'
delete = mock.Mock(return_value=response(status_code=200))
with mock.patch('docker.Client.delete', delete):
self.client.remove_network(network_id)
args = delete.call_args
self.assertEqual(args[0][0],
url_prefix + 'networks/{0}'.format(network_id))
@base.requires_api_version('1.21')
def test_inspect_network(self):
network_id = 'abc12345'
network_name = 'foo'
network_data = {
six.u('name'): network_name,
six.u('id'): network_id,
six.u('driver'): 'bridge',
six.u('containers'): {},
}
network_response = response(status_code=200, content=network_data)
get = mock.Mock(return_value=network_response)
with mock.patch('docker.Client.get', get):
result = self.client.inspect_network(network_id)
self.assertEqual(result, network_data)
args = get.call_args
self.assertEqual(args[0][0],
url_prefix + 'networks/{0}'.format(network_id))
@base.requires_api_version('1.21')
def test_connect_container_to_network(self):
network_id = 'abc12345'
container_id = 'def45678'
post = mock.Mock(return_value=response(status_code=201))
with mock.patch('docker.Client.post', post):
self.client.connect_container_to_network(
{'Id': container_id}, network_id)
self.assertEqual(
post.call_args[0][0],
url_prefix + 'networks/{0}/connect'.format(network_id))
self.assertEqual(
json.loads(post.call_args[1]['data']),
{'container': container_id})
@base.requires_api_version('1.21')
def test_disconnect_container_from_network(self):
network_id = 'abc12345'
container_id = 'def45678'
post = mock.Mock(return_value=response(status_code=201))
with mock.patch('docker.Client.post', post):
self.client.disconnect_container_from_network(
{'Id': container_id}, network_id)
self.assertEqual(
post.call_args[0][0],
url_prefix + 'networks/{0}/disconnect'.format(network_id))
self.assertEqual(
json.loads(post.call_args[1]['data']),
{'container': container_id})

0
tests/unit/testdata/certs/key.pem vendored Normal file
View File

View File

@ -5,6 +5,7 @@ import json
import os
import os.path
import shutil
import tarfile
import tempfile
import pytest
@ -16,15 +17,12 @@ from docker.errors import DockerException
from docker.utils import (
parse_repository_tag, parse_host, convert_filters, kwargs_from_env,
create_host_config, Ulimit, LogConfig, parse_bytes, parse_env_file,
exclude_paths, convert_volume_binds, decode_json_header
exclude_paths, convert_volume_binds, decode_json_header, tar
)
from docker.utils.ports import build_port_bindings, split_port
from docker.auth import (
resolve_repository_name, resolve_authconfig, encode_header
)
from . import base
from .helpers import make_tree
from .. import base
from ..helpers import make_tree
TEST_CERT_DIR = os.path.join(
@ -186,20 +184,7 @@ class KwargsFromEnvTest(base.BaseTestCase):
shutil.rmtree(temp_dir)
class UtilsTest(base.BaseTestCase):
longMessage = True
def generate_tempfile(self, file_content=None):
"""
Generates a temporary file for tests with the content
of 'file_content' and returns the filename.
Don't forget to unlink the file with os.unlink() after.
"""
local_tempfile = tempfile.NamedTemporaryFile(delete=False)
local_tempfile.write(file_content.encode('UTF-8'))
local_tempfile.close()
return local_tempfile.name
class ConverVolumeBindsTest(base.BaseTestCase):
def test_convert_volume_binds_empty(self):
self.assertEqual(convert_volume_binds({}), [])
self.assertEqual(convert_volume_binds([]), [])
@ -283,26 +268,43 @@ class UtilsTest(base.BaseTestCase):
convert_volume_binds(data), expected
)
def test_parse_repository_tag(self):
self.assertEqual(parse_repository_tag("root"),
("root", None))
self.assertEqual(parse_repository_tag("root:tag"),
("root", "tag"))
self.assertEqual(parse_repository_tag("user/repo"),
("user/repo", None))
self.assertEqual(parse_repository_tag("user/repo:tag"),
("user/repo", "tag"))
self.assertEqual(parse_repository_tag("url:5000/repo"),
("url:5000/repo", None))
self.assertEqual(parse_repository_tag("url:5000/repo:tag"),
("url:5000/repo", "tag"))
def test_parse_bytes(self):
self.assertEqual(parse_bytes("512MB"), (536870912))
self.assertEqual(parse_bytes("512M"), (536870912))
self.assertRaises(DockerException, parse_bytes, "512MK")
self.assertRaises(DockerException, parse_bytes, "512L")
class ParseEnvFileTest(base.BaseTestCase):
def generate_tempfile(self, file_content=None):
"""
Generates a temporary file for tests with the content
of 'file_content' and returns the filename.
Don't forget to unlink the file with os.unlink() after.
"""
local_tempfile = tempfile.NamedTemporaryFile(delete=False)
local_tempfile.write(file_content.encode('UTF-8'))
local_tempfile.close()
return local_tempfile.name
def test_parse_env_file_proper(self):
env_file = self.generate_tempfile(
file_content='USER=jdoe\nPASS=secret')
get_parse_env_file = parse_env_file(env_file)
self.assertEqual(get_parse_env_file,
{'USER': 'jdoe', 'PASS': 'secret'})
os.unlink(env_file)
def test_parse_env_file_commented_line(self):
env_file = self.generate_tempfile(
file_content='USER=jdoe\n#PASS=secret')
get_parse_env_file = parse_env_file((env_file))
self.assertEqual(get_parse_env_file, {'USER': 'jdoe'})
os.unlink(env_file)
def test_parse_env_file_invalid_line(self):
env_file = self.generate_tempfile(
file_content='USER jdoe')
self.assertRaises(
DockerException, parse_env_file, env_file)
os.unlink(env_file)
class ParseHostTest(base.BaseTestCase):
def test_parse_host(self):
invalid_hosts = [
'0.0.0.0',
@ -341,27 +343,29 @@ class UtilsTest(base.BaseTestCase):
assert parse_host(val, 'win32') == tcp_port
def test_parse_env_file_proper(self):
env_file = self.generate_tempfile(
file_content='USER=jdoe\nPASS=secret')
get_parse_env_file = parse_env_file(env_file)
self.assertEqual(get_parse_env_file,
{'USER': 'jdoe', 'PASS': 'secret'})
os.unlink(env_file)
def test_parse_env_file_commented_line(self):
env_file = self.generate_tempfile(
file_content='USER=jdoe\n#PASS=secret')
get_parse_env_file = parse_env_file((env_file))
self.assertEqual(get_parse_env_file, {'USER': 'jdoe'})
os.unlink(env_file)
class UtilsTest(base.BaseTestCase):
longMessage = True
def test_parse_env_file_invalid_line(self):
env_file = self.generate_tempfile(
file_content='USER jdoe')
self.assertRaises(
DockerException, parse_env_file, env_file)
os.unlink(env_file)
def test_parse_repository_tag(self):
self.assertEqual(parse_repository_tag("root"),
("root", None))
self.assertEqual(parse_repository_tag("root:tag"),
("root", "tag"))
self.assertEqual(parse_repository_tag("user/repo"),
("user/repo", None))
self.assertEqual(parse_repository_tag("user/repo:tag"),
("user/repo", "tag"))
self.assertEqual(parse_repository_tag("url:5000/repo"),
("url:5000/repo", None))
self.assertEqual(parse_repository_tag("url:5000/repo:tag"),
("url:5000/repo", "tag"))
def test_parse_bytes(self):
self.assertEqual(parse_bytes("512MB"), (536870912))
self.assertEqual(parse_bytes("512M"), (536870912))
self.assertRaises(DockerException, parse_bytes, "512MK")
self.assertRaises(DockerException, parse_bytes, "512L")
def test_convert_filters(self):
tests = [
@ -384,168 +388,6 @@ class UtilsTest(base.BaseTestCase):
decoded_data = decode_json_header(data)
self.assertEqual(obj, decoded_data)
def test_803_urlsafe_encode(self):
auth_data = {
'username': 'root',
'password': 'GR?XGR?XGR?XGR?X'
}
encoded = encode_header(auth_data)
assert b'/' not in encoded
assert b'_' in encoded
def test_resolve_repository_name(self):
# docker hub library image
self.assertEqual(
resolve_repository_name('image'),
('index.docker.io', 'image'),
)
# docker hub image
self.assertEqual(
resolve_repository_name('username/image'),
('index.docker.io', 'username/image'),
)
# private registry
self.assertEqual(
resolve_repository_name('my.registry.net/image'),
('my.registry.net', 'image'),
)
# private registry with port
self.assertEqual(
resolve_repository_name('my.registry.net:5000/image'),
('my.registry.net:5000', 'image'),
)
# private registry with username
self.assertEqual(
resolve_repository_name('my.registry.net/username/image'),
('my.registry.net', 'username/image'),
)
# no dots but port
self.assertEqual(
resolve_repository_name('hostname:5000/image'),
('hostname:5000', 'image'),
)
# no dots but port and username
self.assertEqual(
resolve_repository_name('hostname:5000/username/image'),
('hostname:5000', 'username/image'),
)
# localhost
self.assertEqual(
resolve_repository_name('localhost/image'),
('localhost', 'image'),
)
# localhost with username
self.assertEqual(
resolve_repository_name('localhost/username/image'),
('localhost', 'username/image'),
)
def test_resolve_authconfig(self):
auth_config = {
'https://index.docker.io/v1/': {'auth': 'indexuser'},
'my.registry.net': {'auth': 'privateuser'},
'http://legacy.registry.url/v1/': {'auth': 'legacyauth'}
}
# hostname only
self.assertEqual(
resolve_authconfig(auth_config, 'my.registry.net'),
{'auth': 'privateuser'}
)
# no protocol
self.assertEqual(
resolve_authconfig(auth_config, 'my.registry.net/v1/'),
{'auth': 'privateuser'}
)
# no path
self.assertEqual(
resolve_authconfig(auth_config, 'http://my.registry.net'),
{'auth': 'privateuser'}
)
# no path, trailing slash
self.assertEqual(
resolve_authconfig(auth_config, 'http://my.registry.net/'),
{'auth': 'privateuser'}
)
# no path, wrong secure protocol
self.assertEqual(
resolve_authconfig(auth_config, 'https://my.registry.net'),
{'auth': 'privateuser'}
)
# no path, wrong insecure protocol
self.assertEqual(
resolve_authconfig(auth_config, 'http://index.docker.io'),
{'auth': 'indexuser'}
)
# with path, wrong protocol
self.assertEqual(
resolve_authconfig(auth_config, 'https://my.registry.net/v1/'),
{'auth': 'privateuser'}
)
# default registry
self.assertEqual(
resolve_authconfig(auth_config), {'auth': 'indexuser'}
)
# default registry (explicit None)
self.assertEqual(
resolve_authconfig(auth_config, None), {'auth': 'indexuser'}
)
# fully explicit
self.assertEqual(
resolve_authconfig(auth_config, 'http://my.registry.net/v1/'),
{'auth': 'privateuser'}
)
# legacy entry in config
self.assertEqual(
resolve_authconfig(auth_config, 'legacy.registry.url'),
{'auth': 'legacyauth'}
)
# no matching entry
self.assertTrue(
resolve_authconfig(auth_config, 'does.not.exist') is None
)
def test_resolve_registry_and_auth(self):
auth_config = {
'https://index.docker.io/v1/': {'auth': 'indexuser'},
'my.registry.net': {'auth': 'privateuser'},
}
# library image
image = 'image'
self.assertEqual(
resolve_authconfig(auth_config, resolve_repository_name(image)[0]),
{'auth': 'indexuser'},
)
# docker hub image
image = 'username/image'
self.assertEqual(
resolve_authconfig(auth_config, resolve_repository_name(image)[0]),
{'auth': 'indexuser'},
)
# private registry
image = 'my.registry.net/image'
self.assertEqual(
resolve_authconfig(auth_config, resolve_repository_name(image)[0]),
{'auth': 'privateuser'},
)
# unauthenticated registry
image = 'other.registry.net/image'
self.assertEqual(
resolve_authconfig(auth_config, resolve_repository_name(image)[0]),
None,
)
class PortsTest(base.BaseTestCase):
def test_split_port_with_host_ip(self):
@ -790,3 +632,85 @@ class ExcludePathsTest(base.BaseTestCase):
assert self.exclude(['foo/bar']) == self.all_paths - set([
'foo/bar', 'foo/bar/a.py',
])
class TarTest(base.Cleanup, base.BaseTestCase):
def test_tar_with_excludes(self):
dirs = [
'foo',
'foo/bar',
'bar',
]
files = [
'Dockerfile',
'Dockerfile.alt',
'.dockerignore',
'a.py',
'a.go',
'b.py',
'cde.py',
'foo/a.py',
'foo/b.py',
'foo/bar/a.py',
'bar/a.py',
]
exclude = [
'*.py',
'!b.py',
'!a.go',
'foo',
'Dockerfile*',
'.dockerignore',
]
expected_names = set([
'Dockerfile',
'.dockerignore',
'a.go',
'b.py',
'bar',
'bar/a.py',
])
base = make_tree(dirs, files)
self.addCleanup(shutil.rmtree, base)
with tar(base, exclude=exclude) as archive:
tar_data = tarfile.open(fileobj=archive)
assert sorted(tar_data.getnames()) == sorted(expected_names)
def test_tar_with_empty_directory(self):
base = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, base)
for d in ['foo', 'bar']:
os.makedirs(os.path.join(base, d))
with tar(base) as archive:
tar_data = tarfile.open(fileobj=archive)
self.assertEqual(sorted(tar_data.getnames()), ['bar', 'foo'])
def test_tar_with_file_symlinks(self):
base = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, base)
with open(os.path.join(base, 'foo'), 'w') as f:
f.write("content")
os.makedirs(os.path.join(base, 'bar'))
os.symlink('../foo', os.path.join(base, 'bar/foo'))
with tar(base) as archive:
tar_data = tarfile.open(fileobj=archive)
self.assertEqual(
sorted(tar_data.getnames()), ['bar', 'bar/foo', 'foo']
)
def test_tar_with_directory_symlinks(self):
base = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, base)
for d in ['foo', 'bar']:
os.makedirs(os.path.join(base, d))
os.symlink('../foo', os.path.join(base, 'bar/foo'))
with tar(base) as archive:
tar_data = tarfile.open(fileobj=archive)
self.assertEqual(
sorted(tar_data.getnames()), ['bar', 'bar/foo', 'foo']
)

85
tests/unit/volume_test.py Normal file
View File

@ -0,0 +1,85 @@
import json
import pytest
from .. import base
from .api_test import DockerClientTest, url_prefix, fake_request
class VolumeTest(DockerClientTest):
@base.requires_api_version('1.21')
def test_list_volumes(self):
volumes = self.client.volumes()
self.assertIn('Volumes', volumes)
self.assertEqual(len(volumes['Volumes']), 2)
args = fake_request.call_args
self.assertEqual(args[0][0], 'GET')
self.assertEqual(args[0][1], url_prefix + 'volumes')
@base.requires_api_version('1.21')
def test_create_volume(self):
name = 'perfectcherryblossom'
result = self.client.create_volume(name)
self.assertIn('Name', result)
self.assertEqual(result['Name'], name)
self.assertIn('Driver', result)
self.assertEqual(result['Driver'], 'local')
args = fake_request.call_args
self.assertEqual(args[0][0], 'POST')
self.assertEqual(args[0][1], url_prefix + 'volumes/create')
self.assertEqual(json.loads(args[1]['data']), {'Name': name})
@base.requires_api_version('1.21')
def test_create_volume_with_driver(self):
name = 'perfectcherryblossom'
driver_name = 'sshfs'
self.client.create_volume(name, driver=driver_name)
args = fake_request.call_args
self.assertEqual(args[0][0], 'POST')
self.assertEqual(args[0][1], url_prefix + 'volumes/create')
data = json.loads(args[1]['data'])
self.assertIn('Driver', data)
self.assertEqual(data['Driver'], driver_name)
@base.requires_api_version('1.21')
def test_create_volume_invalid_opts_type(self):
with pytest.raises(TypeError):
self.client.create_volume(
'perfectcherryblossom', driver_opts='hello=world'
)
with pytest.raises(TypeError):
self.client.create_volume(
'perfectcherryblossom', driver_opts=['hello=world']
)
with pytest.raises(TypeError):
self.client.create_volume(
'perfectcherryblossom', driver_opts=''
)
@base.requires_api_version('1.21')
def test_inspect_volume(self):
name = 'perfectcherryblossom'
result = self.client.inspect_volume(name)
self.assertIn('Name', result)
self.assertEqual(result['Name'], name)
self.assertIn('Driver', result)
self.assertEqual(result['Driver'], 'local')
args = fake_request.call_args
self.assertEqual(args[0][0], 'GET')
self.assertEqual(args[0][1], '{0}volumes/{1}'.format(url_prefix, name))
@base.requires_api_version('1.21')
def test_remove_volume(self):
name = 'perfectcherryblossom'
result = self.client.remove_volume(name)
self.assertTrue(result)
args = fake_request.call_args
self.assertEqual(args[0][0], 'DELETE')
self.assertEqual(args[0][1], '{0}volumes/{1}'.format(url_prefix, name))

View File

@ -5,7 +5,7 @@ skipsdist=True
[testenv]
usedevelop=True
commands =
py.test --cov=docker tests/test.py tests/utils_test.py
py.test --cov=docker tests/unit/
deps =
-r{toxinidir}/test-requirements.txt
-r{toxinidir}/requirements.txt