mirror of https://github.com/docker/docker-py.git
1592 lines
57 KiB
Python
1592 lines
57 KiB
Python
import os
|
|
import re
|
|
import signal
|
|
import tempfile
|
|
import threading
|
|
from datetime import datetime
|
|
|
|
import pytest
|
|
import requests
|
|
|
|
import docker
|
|
from docker.constants import IS_WINDOWS_PLATFORM
|
|
from docker.utils.socket import next_frame_header, read_exactly
|
|
|
|
from .. import helpers
|
|
from ..helpers import (
|
|
assert_cat_socket_detached_with_keys,
|
|
ctrl_with,
|
|
requires_api_version,
|
|
skip_if_desktop,
|
|
)
|
|
from .base import TEST_IMG, BaseAPIIntegrationTest
|
|
|
|
|
|
class ListContainersTest(BaseAPIIntegrationTest):
|
|
def test_list_containers(self):
|
|
res0 = self.client.containers(all=True)
|
|
size = len(res0)
|
|
res1 = self.client.create_container(TEST_IMG, 'true')
|
|
assert 'Id' in res1
|
|
self.client.start(res1['Id'])
|
|
self.tmp_containers.append(res1['Id'])
|
|
res2 = self.client.containers(all=True)
|
|
assert size + 1 == len(res2)
|
|
retrieved = [x for x in res2 if x['Id'].startswith(res1['Id'])]
|
|
assert len(retrieved) == 1
|
|
retrieved = retrieved[0]
|
|
assert 'Command' in retrieved
|
|
assert retrieved['Command'] == 'true'
|
|
assert 'Image' in retrieved
|
|
assert re.search(r'alpine:.*', retrieved['Image'])
|
|
assert 'Status' in retrieved
|
|
|
|
|
|
class CreateContainerTest(BaseAPIIntegrationTest):
|
|
|
|
def test_create(self):
|
|
res = self.client.create_container(TEST_IMG, 'true')
|
|
assert 'Id' in res
|
|
self.tmp_containers.append(res['Id'])
|
|
|
|
def test_create_with_host_pid_mode(self):
|
|
ctnr = self.client.create_container(
|
|
TEST_IMG, 'true', host_config=self.client.create_host_config(
|
|
pid_mode='host', network_mode='none'
|
|
)
|
|
)
|
|
assert 'Id' in ctnr
|
|
self.tmp_containers.append(ctnr['Id'])
|
|
self.client.start(ctnr)
|
|
inspect = self.client.inspect_container(ctnr)
|
|
assert 'HostConfig' in inspect
|
|
host_config = inspect['HostConfig']
|
|
assert 'PidMode' in host_config
|
|
assert host_config['PidMode'] == 'host'
|
|
|
|
def test_create_with_links(self):
|
|
res0 = self.client.create_container(
|
|
TEST_IMG, 'cat',
|
|
detach=True, stdin_open=True,
|
|
environment={'FOO': '1'})
|
|
|
|
container1_id = res0['Id']
|
|
self.tmp_containers.append(container1_id)
|
|
|
|
self.client.start(container1_id)
|
|
|
|
res1 = self.client.create_container(
|
|
TEST_IMG, 'cat',
|
|
detach=True, stdin_open=True,
|
|
environment={'FOO': '1'})
|
|
|
|
container2_id = res1['Id']
|
|
self.tmp_containers.append(container2_id)
|
|
|
|
self.client.start(container2_id)
|
|
|
|
# we don't want the first /
|
|
link_path1 = self.client.inspect_container(container1_id)['Name'][1:]
|
|
link_alias1 = 'mylink1'
|
|
link_env_prefix1 = link_alias1.upper()
|
|
|
|
link_path2 = self.client.inspect_container(container2_id)['Name'][1:]
|
|
link_alias2 = 'mylink2'
|
|
link_env_prefix2 = link_alias2.upper()
|
|
|
|
res2 = self.client.create_container(
|
|
TEST_IMG, 'env', host_config=self.client.create_host_config(
|
|
links={link_path1: link_alias1, link_path2: link_alias2},
|
|
network_mode='bridge'
|
|
)
|
|
)
|
|
container3_id = res2['Id']
|
|
self.tmp_containers.append(container3_id)
|
|
self.client.start(container3_id)
|
|
assert self.client.wait(container3_id)['StatusCode'] == 0
|
|
|
|
logs = self.client.logs(container3_id).decode('utf-8')
|
|
assert f'{link_env_prefix1}_NAME=' in logs
|
|
assert f'{link_env_prefix1}_ENV_FOO=1' in logs
|
|
assert f'{link_env_prefix2}_NAME=' in logs
|
|
assert f'{link_env_prefix2}_ENV_FOO=1' in logs
|
|
|
|
def test_create_with_restart_policy(self):
|
|
container = self.client.create_container(
|
|
TEST_IMG, ['sleep', '2'],
|
|
host_config=self.client.create_host_config(
|
|
restart_policy={"Name": "always", "MaximumRetryCount": 0},
|
|
network_mode='none'
|
|
)
|
|
)
|
|
id = container['Id']
|
|
self.client.start(id)
|
|
self.client.wait(id)
|
|
with pytest.raises(docker.errors.APIError) as exc:
|
|
self.client.remove_container(id)
|
|
err = exc.value.explanation.lower()
|
|
assert 'stop the container before' in err
|
|
self.client.remove_container(id, force=True)
|
|
|
|
def test_create_container_with_volumes_from(self):
|
|
vol_names = ['foobar_vol0', 'foobar_vol1']
|
|
|
|
res0 = self.client.create_container(
|
|
TEST_IMG, 'true', name=vol_names[0]
|
|
)
|
|
container1_id = res0['Id']
|
|
self.tmp_containers.append(container1_id)
|
|
self.client.start(container1_id)
|
|
|
|
res1 = self.client.create_container(
|
|
TEST_IMG, 'true', name=vol_names[1]
|
|
)
|
|
container2_id = res1['Id']
|
|
self.tmp_containers.append(container2_id)
|
|
self.client.start(container2_id)
|
|
|
|
res = self.client.create_container(
|
|
TEST_IMG, 'cat', detach=True, stdin_open=True,
|
|
host_config=self.client.create_host_config(
|
|
volumes_from=vol_names, network_mode='none'
|
|
)
|
|
)
|
|
container3_id = res['Id']
|
|
self.tmp_containers.append(container3_id)
|
|
self.client.start(container3_id)
|
|
|
|
info = self.client.inspect_container(res['Id'])
|
|
assert len(info['HostConfig']['VolumesFrom']) == len(vol_names)
|
|
|
|
def create_container_readonly_fs(self):
|
|
ctnr = self.client.create_container(
|
|
TEST_IMG, ['mkdir', '/shrine'],
|
|
host_config=self.client.create_host_config(
|
|
read_only=True, network_mode='none'
|
|
)
|
|
)
|
|
assert 'Id' in ctnr
|
|
self.tmp_containers.append(ctnr['Id'])
|
|
self.client.start(ctnr)
|
|
res = self.client.wait(ctnr)['StatusCode']
|
|
assert res != 0
|
|
|
|
def create_container_with_name(self):
|
|
res = self.client.create_container(TEST_IMG, 'true', name='foobar')
|
|
assert 'Id' in res
|
|
self.tmp_containers.append(res['Id'])
|
|
inspect = self.client.inspect_container(res['Id'])
|
|
assert 'Name' in inspect
|
|
assert '/foobar' == inspect['Name']
|
|
|
|
def create_container_privileged(self):
|
|
res = self.client.create_container(
|
|
TEST_IMG, 'true', host_config=self.client.create_host_config(
|
|
privileged=True, network_mode='none'
|
|
)
|
|
)
|
|
assert 'Id' in res
|
|
self.tmp_containers.append(res['Id'])
|
|
self.client.start(res['Id'])
|
|
inspect = self.client.inspect_container(res['Id'])
|
|
assert 'Config' in inspect
|
|
assert 'Id' in inspect
|
|
assert inspect['Id'].startswith(res['Id'])
|
|
assert 'Image' in inspect
|
|
assert 'State' in inspect
|
|
assert 'Running' in inspect['State']
|
|
if not inspect['State']['Running']:
|
|
assert 'ExitCode' in inspect['State']
|
|
assert inspect['State']['ExitCode'] == 0
|
|
# Since Nov 2013, the Privileged flag is no longer part of the
|
|
# container's config exposed via the API (safety concerns?).
|
|
#
|
|
if 'Privileged' in inspect['Config']:
|
|
assert inspect['Config']['Privileged'] is True
|
|
|
|
def test_create_with_mac_address(self):
|
|
mac_address_expected = "02:42:ac:11:00:0a"
|
|
container = self.client.create_container(
|
|
TEST_IMG, ['sleep', '60'], mac_address=mac_address_expected)
|
|
|
|
id = container['Id']
|
|
|
|
self.client.start(container)
|
|
res = self.client.inspect_container(container['Id'])
|
|
assert mac_address_expected == res['NetworkSettings']['MacAddress']
|
|
|
|
self.client.kill(id)
|
|
|
|
@requires_api_version('1.41')
|
|
def test_create_with_cgroupns(self):
|
|
host_config = self.client.create_host_config(cgroupns='private')
|
|
|
|
container = self.client.create_container(
|
|
image=TEST_IMG,
|
|
command=['sleep', '60'],
|
|
host_config=host_config,
|
|
)
|
|
self.tmp_containers.append(container)
|
|
|
|
res = self.client.inspect_container(container)
|
|
assert 'private' == res['HostConfig']['CgroupnsMode']
|
|
|
|
def test_group_id_ints(self):
|
|
container = self.client.create_container(
|
|
TEST_IMG, 'id -G',
|
|
host_config=self.client.create_host_config(group_add=[1000, 1001])
|
|
)
|
|
self.tmp_containers.append(container)
|
|
self.client.start(container)
|
|
self.client.wait(container)
|
|
|
|
logs = self.client.logs(container).decode('utf-8')
|
|
groups = logs.strip().split(' ')
|
|
assert '1000' in groups
|
|
assert '1001' in groups
|
|
|
|
def test_group_id_strings(self):
|
|
container = self.client.create_container(
|
|
TEST_IMG, 'id -G', host_config=self.client.create_host_config(
|
|
group_add=['1000', '1001']
|
|
)
|
|
)
|
|
self.tmp_containers.append(container)
|
|
self.client.start(container)
|
|
self.client.wait(container)
|
|
|
|
logs = self.client.logs(container).decode('utf-8')
|
|
|
|
groups = logs.strip().split(' ')
|
|
assert '1000' in groups
|
|
assert '1001' in groups
|
|
|
|
def test_valid_log_driver_and_log_opt(self):
|
|
log_config = docker.types.LogConfig(
|
|
type='json-file',
|
|
config={'max-file': '100'}
|
|
)
|
|
|
|
container = self.client.create_container(
|
|
TEST_IMG, ['true'],
|
|
host_config=self.client.create_host_config(log_config=log_config)
|
|
)
|
|
self.tmp_containers.append(container['Id'])
|
|
self.client.start(container)
|
|
|
|
info = self.client.inspect_container(container)
|
|
container_log_config = info['HostConfig']['LogConfig']
|
|
|
|
assert container_log_config['Type'] == log_config.type
|
|
assert container_log_config['Config'] == log_config.config
|
|
|
|
def test_invalid_log_driver_raises_exception(self):
|
|
log_config = docker.types.LogConfig(
|
|
type='asdf',
|
|
config={}
|
|
)
|
|
|
|
expected_msgs = [
|
|
"logger: no log driver named 'asdf' is registered",
|
|
"error looking up logging plugin asdf: plugin \"asdf\" not found",
|
|
]
|
|
with pytest.raises(docker.errors.APIError) as excinfo:
|
|
# raises an internal server error 500
|
|
container = self.client.create_container(
|
|
TEST_IMG, ['true'], host_config=self.client.create_host_config(
|
|
log_config=log_config
|
|
)
|
|
)
|
|
self.client.start(container)
|
|
|
|
assert excinfo.value.explanation in expected_msgs
|
|
|
|
def test_valid_no_log_driver_specified(self):
|
|
log_config = docker.types.LogConfig(
|
|
type="",
|
|
config={'max-file': '100'}
|
|
)
|
|
|
|
container = self.client.create_container(
|
|
TEST_IMG, ['true'],
|
|
host_config=self.client.create_host_config(log_config=log_config)
|
|
)
|
|
self.tmp_containers.append(container['Id'])
|
|
self.client.start(container)
|
|
|
|
info = self.client.inspect_container(container)
|
|
container_log_config = info['HostConfig']['LogConfig']
|
|
|
|
assert container_log_config['Type'] == "json-file"
|
|
assert container_log_config['Config'] == log_config.config
|
|
|
|
def test_valid_no_config_specified(self):
|
|
log_config = docker.types.LogConfig(
|
|
type="json-file",
|
|
config=None
|
|
)
|
|
|
|
container = self.client.create_container(
|
|
TEST_IMG, ['true'],
|
|
host_config=self.client.create_host_config(log_config=log_config)
|
|
)
|
|
self.tmp_containers.append(container['Id'])
|
|
self.client.start(container)
|
|
|
|
info = self.client.inspect_container(container)
|
|
container_log_config = info['HostConfig']['LogConfig']
|
|
|
|
assert container_log_config['Type'] == "json-file"
|
|
assert container_log_config['Config'] == {}
|
|
|
|
def test_create_with_memory_constraints_with_str(self):
|
|
ctnr = self.client.create_container(
|
|
TEST_IMG, 'true',
|
|
host_config=self.client.create_host_config(
|
|
memswap_limit='1G',
|
|
mem_limit='700M'
|
|
)
|
|
)
|
|
assert 'Id' in ctnr
|
|
self.tmp_containers.append(ctnr['Id'])
|
|
self.client.start(ctnr)
|
|
inspect = self.client.inspect_container(ctnr)
|
|
|
|
assert 'HostConfig' in inspect
|
|
host_config = inspect['HostConfig']
|
|
for limit in ['Memory', 'MemorySwap']:
|
|
assert limit in host_config
|
|
|
|
def test_create_with_memory_constraints_with_int(self):
|
|
ctnr = self.client.create_container(
|
|
TEST_IMG, 'true',
|
|
host_config=self.client.create_host_config(mem_swappiness=40)
|
|
)
|
|
assert 'Id' in ctnr
|
|
self.tmp_containers.append(ctnr['Id'])
|
|
self.client.start(ctnr)
|
|
inspect = self.client.inspect_container(ctnr)
|
|
|
|
assert 'HostConfig' in inspect
|
|
host_config = inspect['HostConfig']
|
|
assert 'MemorySwappiness' in host_config
|
|
|
|
def test_create_with_environment_variable_no_value(self):
|
|
container = self.client.create_container(
|
|
TEST_IMG,
|
|
['echo'],
|
|
environment={'Foo': None, 'Other': 'one', 'Blank': ''},
|
|
)
|
|
self.tmp_containers.append(container['Id'])
|
|
config = self.client.inspect_container(container['Id'])
|
|
assert 'Foo' in config['Config']['Env']
|
|
assert 'Other=one' in config['Config']['Env']
|
|
assert 'Blank=' in config['Config']['Env']
|
|
|
|
@requires_api_version('1.22')
|
|
def test_create_with_tmpfs(self):
|
|
tmpfs = {
|
|
'/tmp1': 'size=3M'
|
|
}
|
|
|
|
container = self.client.create_container(
|
|
TEST_IMG,
|
|
['echo'],
|
|
host_config=self.client.create_host_config(
|
|
tmpfs=tmpfs))
|
|
|
|
self.tmp_containers.append(container['Id'])
|
|
config = self.client.inspect_container(container)
|
|
assert config['HostConfig']['Tmpfs'] == tmpfs
|
|
|
|
@requires_api_version('1.24')
|
|
def test_create_with_isolation(self):
|
|
container = self.client.create_container(
|
|
TEST_IMG, ['echo'], host_config=self.client.create_host_config(
|
|
isolation='default'
|
|
)
|
|
)
|
|
self.tmp_containers.append(container['Id'])
|
|
config = self.client.inspect_container(container)
|
|
assert config['HostConfig']['Isolation'] == 'default'
|
|
|
|
@requires_api_version('1.25')
|
|
def test_create_with_auto_remove(self):
|
|
host_config = self.client.create_host_config(
|
|
auto_remove=True
|
|
)
|
|
container = self.client.create_container(
|
|
TEST_IMG, ['echo', 'test'], host_config=host_config
|
|
)
|
|
self.tmp_containers.append(container['Id'])
|
|
config = self.client.inspect_container(container)
|
|
assert config['HostConfig']['AutoRemove'] is True
|
|
|
|
@requires_api_version('1.25')
|
|
def test_create_with_stop_timeout(self):
|
|
container = self.client.create_container(
|
|
TEST_IMG, ['echo', 'test'], stop_timeout=25
|
|
)
|
|
self.tmp_containers.append(container['Id'])
|
|
config = self.client.inspect_container(container)
|
|
assert config['Config']['StopTimeout'] == 25
|
|
|
|
@requires_api_version('1.24')
|
|
@pytest.mark.xfail(True, reason='Not supported on most drivers')
|
|
def test_create_with_storage_opt(self):
|
|
host_config = self.client.create_host_config(
|
|
storage_opt={'size': '120G'}
|
|
)
|
|
container = self.client.create_container(
|
|
TEST_IMG, ['echo', 'test'], host_config=host_config
|
|
)
|
|
self.tmp_containers.append(container)
|
|
config = self.client.inspect_container(container)
|
|
assert config['HostConfig']['StorageOpt'] == {
|
|
'size': '120G'
|
|
}
|
|
|
|
@requires_api_version('1.25')
|
|
def test_create_with_init(self):
|
|
ctnr = self.client.create_container(
|
|
TEST_IMG, 'true',
|
|
host_config=self.client.create_host_config(
|
|
init=True
|
|
)
|
|
)
|
|
self.tmp_containers.append(ctnr['Id'])
|
|
config = self.client.inspect_container(ctnr)
|
|
assert config['HostConfig']['Init'] is True
|
|
|
|
@requires_api_version('1.24')
|
|
@pytest.mark.xfail(not os.path.exists('/sys/fs/cgroup/cpu.rt_runtime_us'),
|
|
reason='CONFIG_RT_GROUP_SCHED isn\'t enabled')
|
|
def test_create_with_cpu_rt_options(self):
|
|
ctnr = self.client.create_container(
|
|
TEST_IMG, 'true', host_config=self.client.create_host_config(
|
|
cpu_rt_period=1000, cpu_rt_runtime=500
|
|
)
|
|
)
|
|
self.tmp_containers.append(ctnr)
|
|
config = self.client.inspect_container(ctnr)
|
|
assert config['HostConfig']['CpuRealtimeRuntime'] == 500
|
|
assert config['HostConfig']['CpuRealtimePeriod'] == 1000
|
|
|
|
@requires_api_version('1.28')
|
|
def test_create_with_device_cgroup_rules(self):
|
|
rule = 'c 7:128 rwm'
|
|
ctnr = self.client.create_container(
|
|
TEST_IMG, 'true', host_config=self.client.create_host_config(
|
|
device_cgroup_rules=[rule]
|
|
)
|
|
)
|
|
self.tmp_containers.append(ctnr)
|
|
config = self.client.inspect_container(ctnr)
|
|
assert config['HostConfig']['DeviceCgroupRules'] == [rule]
|
|
|
|
def test_create_with_uts_mode(self):
|
|
container = self.client.create_container(
|
|
TEST_IMG, ['echo'], host_config=self.client.create_host_config(
|
|
uts_mode='host'
|
|
)
|
|
)
|
|
self.tmp_containers.append(container)
|
|
config = self.client.inspect_container(container)
|
|
assert config['HostConfig']['UTSMode'] == 'host'
|
|
|
|
|
|
@pytest.mark.xfail(
|
|
IS_WINDOWS_PLATFORM, reason='Test not designed for Windows platform'
|
|
)
|
|
class VolumeBindTest(BaseAPIIntegrationTest):
|
|
def setUp(self):
|
|
super().setUp()
|
|
|
|
self.mount_dest = '/mnt'
|
|
|
|
# Get a random pathname - we don't need it to exist locally
|
|
self.mount_origin = tempfile.mkdtemp()
|
|
self.filename = 'shared.txt'
|
|
|
|
self.run_with_volume(
|
|
False,
|
|
TEST_IMG,
|
|
['touch', os.path.join(self.mount_dest, self.filename)],
|
|
)
|
|
|
|
def test_create_with_binds_rw(self):
|
|
|
|
container = self.run_with_volume(
|
|
False,
|
|
TEST_IMG,
|
|
['ls', self.mount_dest],
|
|
)
|
|
logs = self.client.logs(container).decode('utf-8')
|
|
assert self.filename in logs
|
|
inspect_data = self.client.inspect_container(container)
|
|
self.check_container_data(inspect_data, True)
|
|
|
|
def test_create_with_binds_ro(self):
|
|
self.run_with_volume(
|
|
False,
|
|
TEST_IMG,
|
|
['touch', os.path.join(self.mount_dest, self.filename)],
|
|
)
|
|
container = self.run_with_volume(
|
|
True,
|
|
TEST_IMG,
|
|
['ls', self.mount_dest],
|
|
)
|
|
logs = self.client.logs(container).decode('utf-8')
|
|
|
|
assert self.filename in logs
|
|
|
|
inspect_data = self.client.inspect_container(container)
|
|
self.check_container_data(inspect_data, False)
|
|
|
|
@skip_if_desktop()
|
|
def test_create_with_binds_rw_rshared(self):
|
|
container = self.run_with_volume_propagation(
|
|
False,
|
|
'rshared',
|
|
TEST_IMG,
|
|
['touch', os.path.join(self.mount_dest, self.filename)],
|
|
)
|
|
inspect_data = self.client.inspect_container(container)
|
|
self.check_container_data(inspect_data, True, 'rshared')
|
|
container = self.run_with_volume_propagation(
|
|
True,
|
|
'rshared',
|
|
TEST_IMG,
|
|
['ls', self.mount_dest],
|
|
)
|
|
logs = self.client.logs(container).decode('utf-8')
|
|
assert self.filename in logs
|
|
inspect_data = self.client.inspect_container(container)
|
|
self.check_container_data(inspect_data, False, 'rshared')
|
|
|
|
@requires_api_version('1.30')
|
|
def test_create_with_mounts(self):
|
|
mount = docker.types.Mount(
|
|
type="bind", source=self.mount_origin, target=self.mount_dest
|
|
)
|
|
host_config = self.client.create_host_config(mounts=[mount])
|
|
container = self.run_container(
|
|
TEST_IMG, ['ls', self.mount_dest],
|
|
host_config=host_config
|
|
)
|
|
assert container
|
|
logs = self.client.logs(container).decode('utf-8')
|
|
assert self.filename in logs
|
|
inspect_data = self.client.inspect_container(container)
|
|
self.check_container_data(inspect_data, True)
|
|
|
|
@requires_api_version('1.30')
|
|
def test_create_with_mounts_ro(self):
|
|
mount = docker.types.Mount(
|
|
type="bind", source=self.mount_origin, target=self.mount_dest,
|
|
read_only=True
|
|
)
|
|
host_config = self.client.create_host_config(mounts=[mount])
|
|
container = self.run_container(
|
|
TEST_IMG, ['ls', self.mount_dest],
|
|
host_config=host_config
|
|
)
|
|
assert container
|
|
logs = self.client.logs(container).decode('utf-8')
|
|
assert self.filename in logs
|
|
inspect_data = self.client.inspect_container(container)
|
|
self.check_container_data(inspect_data, False)
|
|
|
|
@requires_api_version('1.30')
|
|
def test_create_with_volume_mount(self):
|
|
mount = docker.types.Mount(
|
|
type="volume", source=helpers.random_name(),
|
|
target=self.mount_dest, labels={'com.dockerpy.test': 'true'}
|
|
)
|
|
host_config = self.client.create_host_config(mounts=[mount])
|
|
container = self.client.create_container(
|
|
TEST_IMG, ['true'], host_config=host_config,
|
|
)
|
|
assert container
|
|
inspect_data = self.client.inspect_container(container)
|
|
assert 'Mounts' in inspect_data
|
|
filtered = list(filter(
|
|
lambda x: x['Destination'] == self.mount_dest,
|
|
inspect_data['Mounts']
|
|
))
|
|
assert len(filtered) == 1
|
|
mount_data = filtered[0]
|
|
assert mount['Source'] == mount_data['Name']
|
|
assert mount_data['RW'] is True
|
|
|
|
def check_container_data(self, inspect_data, rw, propagation='rprivate'):
|
|
assert 'Mounts' in inspect_data
|
|
filtered = list(filter(
|
|
lambda x: x['Destination'] == self.mount_dest,
|
|
inspect_data['Mounts']
|
|
))
|
|
assert len(filtered) == 1
|
|
mount_data = filtered[0]
|
|
assert mount_data['Source'] == self.mount_origin
|
|
assert mount_data['RW'] == rw
|
|
assert mount_data['Propagation'] == propagation
|
|
|
|
def run_with_volume(self, ro, *args, **kwargs):
|
|
return self.run_container(
|
|
*args,
|
|
volumes={self.mount_dest: {}},
|
|
host_config=self.client.create_host_config(
|
|
binds={
|
|
self.mount_origin: {
|
|
'bind': self.mount_dest,
|
|
'ro': ro,
|
|
},
|
|
},
|
|
network_mode='none'
|
|
),
|
|
**kwargs
|
|
)
|
|
|
|
def run_with_volume_propagation(self, ro, propagation, *args, **kwargs):
|
|
return self.run_container(
|
|
*args,
|
|
volumes={self.mount_dest: {}},
|
|
host_config=self.client.create_host_config(
|
|
binds={
|
|
self.mount_origin: {
|
|
'bind': self.mount_dest,
|
|
'ro': ro,
|
|
'propagation': propagation
|
|
},
|
|
},
|
|
network_mode='none'
|
|
),
|
|
**kwargs
|
|
)
|
|
|
|
|
|
class ArchiveTest(BaseAPIIntegrationTest):
|
|
def test_get_file_archive_from_container(self):
|
|
data = 'The Maid and the Pocket Watch of Blood'
|
|
ctnr = self.client.create_container(
|
|
TEST_IMG, f'sh -c "echo {data} > /vol1/data.txt"',
|
|
volumes=['/vol1']
|
|
)
|
|
self.tmp_containers.append(ctnr)
|
|
self.client.start(ctnr)
|
|
self.client.wait(ctnr)
|
|
with tempfile.NamedTemporaryFile() as destination:
|
|
strm, stat = self.client.get_archive(ctnr, '/vol1/data.txt')
|
|
for d in strm:
|
|
destination.write(d)
|
|
destination.seek(0)
|
|
retrieved_data = helpers.untar_file(destination, 'data.txt')\
|
|
.decode('utf-8')
|
|
assert data == retrieved_data.strip()
|
|
|
|
def test_get_file_stat_from_container(self):
|
|
data = 'The Maid and the Pocket Watch of Blood'
|
|
ctnr = self.client.create_container(
|
|
TEST_IMG, f'sh -c "echo -n {data} > /vol1/data.txt"',
|
|
volumes=['/vol1']
|
|
)
|
|
self.tmp_containers.append(ctnr)
|
|
self.client.start(ctnr)
|
|
self.client.wait(ctnr)
|
|
strm, stat = self.client.get_archive(ctnr, '/vol1/data.txt')
|
|
assert 'name' in stat
|
|
assert stat['name'] == 'data.txt'
|
|
assert 'size' in stat
|
|
assert stat['size'] == len(data)
|
|
|
|
def test_copy_file_to_container(self):
|
|
data = b'Deaf To All But The Song'
|
|
with tempfile.NamedTemporaryFile(delete=False) as test_file:
|
|
test_file.write(data)
|
|
test_file.seek(0)
|
|
ctnr = self.client.create_container(
|
|
TEST_IMG,
|
|
f"cat {os.path.join('/vol1/', os.path.basename(test_file.name))}",
|
|
volumes=['/vol1']
|
|
)
|
|
self.tmp_containers.append(ctnr)
|
|
with helpers.simple_tar(test_file.name) as test_tar:
|
|
self.client.put_archive(ctnr, '/vol1', test_tar)
|
|
self.client.start(ctnr)
|
|
self.client.wait(ctnr)
|
|
logs = self.client.logs(ctnr)
|
|
assert logs.strip() == data
|
|
|
|
def test_copy_directory_to_container(self):
|
|
files = ['a.py', 'b.py', 'foo/b.py']
|
|
dirs = ['foo', 'bar']
|
|
base = helpers.make_tree(dirs, files)
|
|
ctnr = self.client.create_container(
|
|
TEST_IMG, 'ls -p /vol1', volumes=['/vol1']
|
|
)
|
|
self.tmp_containers.append(ctnr)
|
|
with docker.utils.tar(base) as test_tar:
|
|
self.client.put_archive(ctnr, '/vol1', test_tar)
|
|
self.client.start(ctnr)
|
|
self.client.wait(ctnr)
|
|
logs = self.client.logs(ctnr).decode('utf-8')
|
|
results = logs.strip().split()
|
|
assert 'a.py' in results
|
|
assert 'b.py' in results
|
|
assert 'foo/' in results
|
|
assert 'bar/' in results
|
|
|
|
|
|
class RenameContainerTest(BaseAPIIntegrationTest):
|
|
def test_rename_container(self):
|
|
version = self.client.version()['Version']
|
|
name = 'hong_meiling'
|
|
res = self.client.create_container(TEST_IMG, 'true')
|
|
assert 'Id' in res
|
|
self.tmp_containers.append(res['Id'])
|
|
self.client.rename(res, name)
|
|
inspect = self.client.inspect_container(res['Id'])
|
|
assert 'Name' in inspect
|
|
if version == '1.5.0':
|
|
assert name == inspect['Name']
|
|
else:
|
|
assert f'/{name}' == inspect['Name']
|
|
|
|
|
|
class StartContainerTest(BaseAPIIntegrationTest):
|
|
def test_start_container(self):
|
|
res = self.client.create_container(TEST_IMG, 'true')
|
|
assert 'Id' in res
|
|
self.tmp_containers.append(res['Id'])
|
|
self.client.start(res['Id'])
|
|
inspect = self.client.inspect_container(res['Id'])
|
|
assert 'Config' in inspect
|
|
assert 'Id' in inspect
|
|
assert inspect['Id'].startswith(res['Id'])
|
|
assert 'Image' in inspect
|
|
assert 'State' in inspect
|
|
assert 'Running' in inspect['State']
|
|
if not inspect['State']['Running']:
|
|
assert 'ExitCode' in inspect['State']
|
|
assert inspect['State']['ExitCode'] == 0
|
|
|
|
def test_start_container_with_dict_instead_of_id(self):
|
|
res = self.client.create_container(TEST_IMG, 'true')
|
|
assert 'Id' in res
|
|
self.tmp_containers.append(res['Id'])
|
|
self.client.start(res)
|
|
inspect = self.client.inspect_container(res['Id'])
|
|
assert 'Config' in inspect
|
|
assert 'Id' in inspect
|
|
assert inspect['Id'].startswith(res['Id'])
|
|
assert 'Image' in inspect
|
|
assert 'State' in inspect
|
|
assert 'Running' in inspect['State']
|
|
if not inspect['State']['Running']:
|
|
assert 'ExitCode' in inspect['State']
|
|
assert inspect['State']['ExitCode'] == 0
|
|
|
|
def test_run_shlex_commands(self):
|
|
commands = [
|
|
'true',
|
|
'echo "The Young Descendant of Tepes & Septette for the '
|
|
'Dead Princess"',
|
|
'echo -n "The Young Descendant of Tepes & Septette for the '
|
|
'Dead Princess"',
|
|
'/bin/sh -c "echo Hello World"',
|
|
'/bin/sh -c \'echo "Hello World"\'',
|
|
'echo "\"Night of Nights\""',
|
|
'true && echo "Night of Nights"'
|
|
]
|
|
for cmd in commands:
|
|
container = self.client.create_container(TEST_IMG, cmd)
|
|
id = container['Id']
|
|
self.client.start(id)
|
|
self.tmp_containers.append(id)
|
|
exitcode = self.client.wait(id)['StatusCode']
|
|
assert exitcode == 0, cmd
|
|
|
|
|
|
class WaitTest(BaseAPIIntegrationTest):
|
|
def test_wait(self):
|
|
res = self.client.create_container(TEST_IMG, ['sleep', '3'])
|
|
id = res['Id']
|
|
self.tmp_containers.append(id)
|
|
self.client.start(id)
|
|
exitcode = self.client.wait(id)['StatusCode']
|
|
assert exitcode == 0
|
|
inspect = self.client.inspect_container(id)
|
|
assert 'Running' in inspect['State']
|
|
assert inspect['State']['Running'] is False
|
|
assert 'ExitCode' in inspect['State']
|
|
assert inspect['State']['ExitCode'] == exitcode
|
|
|
|
def test_wait_with_dict_instead_of_id(self):
|
|
res = self.client.create_container(TEST_IMG, ['sleep', '3'])
|
|
id = res['Id']
|
|
self.tmp_containers.append(id)
|
|
self.client.start(res)
|
|
exitcode = self.client.wait(res)['StatusCode']
|
|
assert exitcode == 0
|
|
inspect = self.client.inspect_container(res)
|
|
assert 'Running' in inspect['State']
|
|
assert inspect['State']['Running'] is False
|
|
assert 'ExitCode' in inspect['State']
|
|
assert inspect['State']['ExitCode'] == exitcode
|
|
|
|
@requires_api_version('1.30')
|
|
def test_wait_with_condition(self):
|
|
ctnr = self.client.create_container(TEST_IMG, 'true')
|
|
self.tmp_containers.append(ctnr)
|
|
with pytest.raises(requests.exceptions.ConnectionError):
|
|
self.client.wait(ctnr, condition='removed', timeout=1)
|
|
|
|
ctnr = self.client.create_container(
|
|
TEST_IMG, ['sleep', '3'],
|
|
host_config=self.client.create_host_config(auto_remove=True)
|
|
)
|
|
self.tmp_containers.append(ctnr)
|
|
self.client.start(ctnr)
|
|
assert self.client.wait(
|
|
ctnr, condition='removed', timeout=5
|
|
)['StatusCode'] == 0
|
|
|
|
|
|
class LogsTest(BaseAPIIntegrationTest):
|
|
def test_logs(self):
|
|
snippet = 'Flowering Nights (Sakuya Iyazoi)'
|
|
container = self.client.create_container(
|
|
TEST_IMG, f'echo {snippet}'
|
|
)
|
|
id = container['Id']
|
|
self.tmp_containers.append(id)
|
|
self.client.start(id)
|
|
exitcode = self.client.wait(id)['StatusCode']
|
|
assert exitcode == 0
|
|
logs = self.client.logs(id)
|
|
assert logs == f"{snippet}\n".encode(encoding='ascii')
|
|
|
|
def test_logs_tail_option(self):
|
|
snippet = '''Line1
|
|
Line2'''
|
|
container = self.client.create_container(
|
|
TEST_IMG, f'echo "{snippet}"'
|
|
)
|
|
id = container['Id']
|
|
self.tmp_containers.append(id)
|
|
self.client.start(id)
|
|
exitcode = self.client.wait(id)['StatusCode']
|
|
assert exitcode == 0
|
|
logs = self.client.logs(id, tail=1)
|
|
assert logs == 'Line2\n'.encode(encoding='ascii')
|
|
|
|
def test_logs_streaming_and_follow(self):
|
|
snippet = 'Flowering Nights (Sakuya Iyazoi)'
|
|
container = self.client.create_container(
|
|
TEST_IMG, f'echo {snippet}'
|
|
)
|
|
id = container['Id']
|
|
self.tmp_containers.append(id)
|
|
self.client.start(id)
|
|
logs = b''
|
|
for chunk in self.client.logs(id, stream=True, follow=True):
|
|
logs += chunk
|
|
|
|
exitcode = self.client.wait(id)['StatusCode']
|
|
assert exitcode == 0
|
|
|
|
assert logs == f"{snippet}\n".encode(encoding='ascii')
|
|
|
|
@pytest.mark.timeout(5)
|
|
@pytest.mark.skipif(os.environ.get('DOCKER_HOST', '').startswith('ssh://'),
|
|
reason='No cancellable streams over SSH')
|
|
def test_logs_streaming_and_follow_and_cancel(self):
|
|
snippet = 'Flowering Nights (Sakuya Iyazoi)'
|
|
container = self.client.create_container(
|
|
TEST_IMG, f'sh -c "echo \\"{snippet}\\" && sleep 3"'
|
|
)
|
|
id = container['Id']
|
|
self.tmp_containers.append(id)
|
|
self.client.start(id)
|
|
logs = b''
|
|
|
|
generator = self.client.logs(id, stream=True, follow=True)
|
|
threading.Timer(1, generator.close).start()
|
|
|
|
for chunk in generator:
|
|
logs += chunk
|
|
|
|
assert logs == f"{snippet}\n".encode(encoding='ascii')
|
|
|
|
def test_logs_with_dict_instead_of_id(self):
|
|
snippet = 'Flowering Nights (Sakuya Iyazoi)'
|
|
container = self.client.create_container(
|
|
TEST_IMG, f'echo {snippet}'
|
|
)
|
|
id = container['Id']
|
|
self.tmp_containers.append(id)
|
|
self.client.start(id)
|
|
exitcode = self.client.wait(id)['StatusCode']
|
|
assert exitcode == 0
|
|
logs = self.client.logs(container)
|
|
assert logs == f"{snippet}\n".encode(encoding='ascii')
|
|
|
|
def test_logs_with_tail_0(self):
|
|
snippet = 'Flowering Nights (Sakuya Iyazoi)'
|
|
container = self.client.create_container(
|
|
TEST_IMG, f'echo "{snippet}"'
|
|
)
|
|
id = container['Id']
|
|
self.tmp_containers.append(id)
|
|
self.client.start(id)
|
|
exitcode = self.client.wait(id)['StatusCode']
|
|
assert exitcode == 0
|
|
logs = self.client.logs(id, tail=0)
|
|
assert logs == ''.encode(encoding='ascii')
|
|
|
|
@requires_api_version('1.35')
|
|
def test_logs_with_until(self):
|
|
snippet = 'Shanghai Teahouse (Hong Meiling)'
|
|
container = self.client.create_container(
|
|
TEST_IMG, f'echo "{snippet}"'
|
|
)
|
|
|
|
self.tmp_containers.append(container)
|
|
self.client.start(container)
|
|
exitcode = self.client.wait(container)['StatusCode']
|
|
assert exitcode == 0
|
|
logs_until_1 = self.client.logs(container, until=1)
|
|
assert logs_until_1 == b''
|
|
logs_until_now = self.client.logs(container, datetime.now())
|
|
assert logs_until_now == f"{snippet}\n".encode(encoding='ascii')
|
|
|
|
|
|
class DiffTest(BaseAPIIntegrationTest):
|
|
def test_diff(self):
|
|
container = self.client.create_container(TEST_IMG, ['touch', '/test'])
|
|
id = container['Id']
|
|
self.client.start(id)
|
|
self.tmp_containers.append(id)
|
|
exitcode = self.client.wait(id)['StatusCode']
|
|
assert exitcode == 0
|
|
diff = self.client.diff(id)
|
|
test_diff = [x for x in diff if x.get('Path', None) == '/test']
|
|
assert len(test_diff) == 1
|
|
assert 'Kind' in test_diff[0]
|
|
assert test_diff[0]['Kind'] == 1
|
|
|
|
def test_diff_with_dict_instead_of_id(self):
|
|
container = self.client.create_container(TEST_IMG, ['touch', '/test'])
|
|
id = container['Id']
|
|
self.client.start(id)
|
|
self.tmp_containers.append(id)
|
|
exitcode = self.client.wait(id)['StatusCode']
|
|
assert exitcode == 0
|
|
diff = self.client.diff(container)
|
|
test_diff = [x for x in diff if x.get('Path', None) == '/test']
|
|
assert len(test_diff) == 1
|
|
assert 'Kind' in test_diff[0]
|
|
assert test_diff[0]['Kind'] == 1
|
|
|
|
|
|
class StopTest(BaseAPIIntegrationTest):
|
|
def test_stop(self):
|
|
container = self.client.create_container(TEST_IMG, ['sleep', '9999'])
|
|
id = container['Id']
|
|
self.client.start(id)
|
|
self.tmp_containers.append(id)
|
|
self.client.stop(id, timeout=2)
|
|
container_info = self.client.inspect_container(id)
|
|
assert 'State' in container_info
|
|
state = container_info['State']
|
|
assert 'Running' in state
|
|
assert state['Running'] is False
|
|
|
|
def test_stop_with_dict_instead_of_id(self):
|
|
container = self.client.create_container(TEST_IMG, ['sleep', '9999'])
|
|
assert 'Id' in container
|
|
id = container['Id']
|
|
self.client.start(container)
|
|
self.tmp_containers.append(id)
|
|
self.client.stop(container, timeout=2)
|
|
container_info = self.client.inspect_container(id)
|
|
assert 'State' in container_info
|
|
state = container_info['State']
|
|
assert 'Running' in state
|
|
assert state['Running'] is False
|
|
|
|
|
|
class KillTest(BaseAPIIntegrationTest):
|
|
def test_kill(self):
|
|
container = self.client.create_container(TEST_IMG, ['sleep', '9999'])
|
|
id = container['Id']
|
|
self.client.start(id)
|
|
self.tmp_containers.append(id)
|
|
self.client.kill(id)
|
|
container_info = self.client.inspect_container(id)
|
|
assert 'State' in container_info
|
|
state = container_info['State']
|
|
assert 'ExitCode' in state
|
|
assert state['ExitCode'] != 0
|
|
assert 'Running' in state
|
|
assert state['Running'] is False
|
|
|
|
def test_kill_with_dict_instead_of_id(self):
|
|
container = self.client.create_container(TEST_IMG, ['sleep', '9999'])
|
|
id = container['Id']
|
|
self.client.start(id)
|
|
self.tmp_containers.append(id)
|
|
self.client.kill(container)
|
|
container_info = self.client.inspect_container(id)
|
|
assert 'State' in container_info
|
|
state = container_info['State']
|
|
assert 'ExitCode' in state
|
|
assert state['ExitCode'] != 0
|
|
assert 'Running' in state
|
|
assert state['Running'] is False
|
|
|
|
def test_kill_with_signal(self):
|
|
id = self.client.create_container(TEST_IMG, ['sleep', '60'])
|
|
self.tmp_containers.append(id)
|
|
self.client.start(id)
|
|
self.client.kill(
|
|
id, signal=signal.SIGKILL if not IS_WINDOWS_PLATFORM else 9
|
|
)
|
|
exitcode = self.client.wait(id)['StatusCode']
|
|
assert exitcode != 0
|
|
container_info = self.client.inspect_container(id)
|
|
assert 'State' in container_info
|
|
state = container_info['State']
|
|
assert 'ExitCode' in state
|
|
assert state['ExitCode'] != 0
|
|
assert 'Running' in state
|
|
assert state['Running'] is False, state
|
|
|
|
def test_kill_with_signal_name(self):
|
|
id = self.client.create_container(TEST_IMG, ['sleep', '60'])
|
|
self.client.start(id)
|
|
self.tmp_containers.append(id)
|
|
self.client.kill(id, signal='SIGKILL')
|
|
exitcode = self.client.wait(id)['StatusCode']
|
|
assert exitcode != 0
|
|
container_info = self.client.inspect_container(id)
|
|
assert 'State' in container_info
|
|
state = container_info['State']
|
|
assert 'ExitCode' in state
|
|
assert state['ExitCode'] != 0
|
|
assert 'Running' in state
|
|
assert state['Running'] is False, state
|
|
|
|
def test_kill_with_signal_integer(self):
|
|
id = self.client.create_container(TEST_IMG, ['sleep', '60'])
|
|
self.client.start(id)
|
|
self.tmp_containers.append(id)
|
|
self.client.kill(id, signal=9)
|
|
exitcode = self.client.wait(id)['StatusCode']
|
|
assert exitcode != 0
|
|
container_info = self.client.inspect_container(id)
|
|
assert 'State' in container_info
|
|
state = container_info['State']
|
|
assert 'ExitCode' in state
|
|
assert state['ExitCode'] != 0
|
|
assert 'Running' in state
|
|
assert state['Running'] is False, state
|
|
|
|
|
|
class PortTest(BaseAPIIntegrationTest):
|
|
def test_port(self):
|
|
port_bindings = {
|
|
'1111': ('127.0.0.1', '4567'),
|
|
'2222': ('127.0.0.1', '4568'),
|
|
'3333/udp': ('127.0.0.1', '4569'),
|
|
}
|
|
ports = [
|
|
1111,
|
|
2222,
|
|
(3333, 'udp'),
|
|
]
|
|
|
|
container = self.client.create_container(
|
|
TEST_IMG, ['sleep', '60'], ports=ports,
|
|
host_config=self.client.create_host_config(
|
|
port_bindings=port_bindings, network_mode='bridge'
|
|
)
|
|
)
|
|
id = container['Id']
|
|
|
|
self.client.start(container)
|
|
|
|
# Call the port function on each biding and compare expected vs actual
|
|
for port in port_bindings:
|
|
port, _, protocol = port.partition('/')
|
|
actual_bindings = self.client.port(container, port)
|
|
port_binding = actual_bindings.pop()
|
|
|
|
ip, host_port = port_binding['HostIp'], port_binding['HostPort']
|
|
|
|
port_binding = port if not protocol else f"{port}/{protocol}"
|
|
assert ip == port_bindings[port_binding][0]
|
|
assert host_port == port_bindings[port_binding][1]
|
|
|
|
self.client.kill(id)
|
|
|
|
|
|
class ContainerTopTest(BaseAPIIntegrationTest):
|
|
@pytest.mark.xfail(reason='Output of docker top depends on host distro, '
|
|
'and is not formalized.')
|
|
def test_top(self):
|
|
container = self.client.create_container(
|
|
TEST_IMG, ['sleep', '60']
|
|
)
|
|
|
|
self.tmp_containers.append(container)
|
|
|
|
self.client.start(container)
|
|
res = self.client.top(container)
|
|
if not IS_WINDOWS_PLATFORM:
|
|
assert res['Titles'] == ['PID', 'USER', 'TIME', 'COMMAND']
|
|
assert len(res['Processes']) == 1
|
|
assert res['Processes'][0][-1] == 'sleep 60'
|
|
self.client.kill(container)
|
|
|
|
@pytest.mark.skipif(
|
|
IS_WINDOWS_PLATFORM, reason='No psargs support on windows'
|
|
)
|
|
@pytest.mark.xfail(reason='Output of docker top depends on host distro, '
|
|
'and is not formalized.')
|
|
def test_top_with_psargs(self):
|
|
container = self.client.create_container(
|
|
TEST_IMG, ['sleep', '60'])
|
|
|
|
self.tmp_containers.append(container)
|
|
|
|
self.client.start(container)
|
|
res = self.client.top(container, '-eopid,user')
|
|
assert res['Titles'] == ['PID', 'USER']
|
|
assert len(res['Processes']) == 1
|
|
assert res['Processes'][0][10] == 'sleep 60'
|
|
|
|
|
|
class RestartContainerTest(BaseAPIIntegrationTest):
|
|
def test_restart(self):
|
|
container = self.client.create_container(TEST_IMG, ['sleep', '9999'])
|
|
id = container['Id']
|
|
self.client.start(id)
|
|
self.tmp_containers.append(id)
|
|
info = self.client.inspect_container(id)
|
|
assert 'State' in info
|
|
assert 'StartedAt' in info['State']
|
|
start_time1 = info['State']['StartedAt']
|
|
self.client.restart(id, timeout=2)
|
|
info2 = self.client.inspect_container(id)
|
|
assert 'State' in info2
|
|
assert 'StartedAt' in info2['State']
|
|
start_time2 = info2['State']['StartedAt']
|
|
assert start_time1 != start_time2
|
|
assert 'Running' in info2['State']
|
|
assert info2['State']['Running'] is True
|
|
self.client.kill(id)
|
|
|
|
def test_restart_with_low_timeout(self):
|
|
container = self.client.create_container(TEST_IMG, ['sleep', '9999'])
|
|
self.client.start(container)
|
|
self.client.timeout = 3
|
|
self.client.restart(container, timeout=1)
|
|
self.client.timeout = None
|
|
self.client.restart(container, timeout=1)
|
|
self.client.kill(container)
|
|
|
|
def test_restart_with_dict_instead_of_id(self):
|
|
container = self.client.create_container(TEST_IMG, ['sleep', '9999'])
|
|
assert 'Id' in container
|
|
id = container['Id']
|
|
self.client.start(container)
|
|
self.tmp_containers.append(id)
|
|
info = self.client.inspect_container(id)
|
|
assert 'State' in info
|
|
assert 'StartedAt' in info['State']
|
|
start_time1 = info['State']['StartedAt']
|
|
self.client.restart(container, timeout=2)
|
|
info2 = self.client.inspect_container(id)
|
|
assert 'State' in info2
|
|
assert 'StartedAt' in info2['State']
|
|
start_time2 = info2['State']['StartedAt']
|
|
assert start_time1 != start_time2
|
|
assert 'Running' in info2['State']
|
|
assert info2['State']['Running'] is True
|
|
self.client.kill(id)
|
|
|
|
|
|
class RemoveContainerTest(BaseAPIIntegrationTest):
|
|
def test_remove(self):
|
|
container = self.client.create_container(TEST_IMG, ['true'])
|
|
id = container['Id']
|
|
self.client.start(id)
|
|
self.client.wait(id)
|
|
self.client.remove_container(id)
|
|
containers = self.client.containers(all=True)
|
|
res = [x for x in containers if 'Id' in x and x['Id'].startswith(id)]
|
|
assert len(res) == 0
|
|
|
|
def test_remove_with_dict_instead_of_id(self):
|
|
container = self.client.create_container(TEST_IMG, ['true'])
|
|
id = container['Id']
|
|
self.client.start(id)
|
|
self.client.wait(id)
|
|
self.client.remove_container(container)
|
|
containers = self.client.containers(all=True)
|
|
res = [x for x in containers if 'Id' in x and x['Id'].startswith(id)]
|
|
assert len(res) == 0
|
|
|
|
|
|
class AttachContainerTest(BaseAPIIntegrationTest):
|
|
def test_run_container_streaming(self):
|
|
container = self.client.create_container(TEST_IMG, '/bin/sh',
|
|
detach=True, stdin_open=True)
|
|
id = container['Id']
|
|
self.tmp_containers.append(id)
|
|
self.client.start(id)
|
|
sock = self.client.attach_socket(container, ws=False)
|
|
assert sock.fileno() > -1
|
|
|
|
def test_run_container_reading_socket_http(self):
|
|
line = 'hi there and stuff and things, words!'
|
|
# `echo` appends CRLF, `printf` doesn't
|
|
command = f"printf '{line}'"
|
|
container = self.client.create_container(TEST_IMG, command,
|
|
detach=True, tty=False)
|
|
self.tmp_containers.append(container)
|
|
|
|
opts = {"stdout": 1, "stream": 1, "logs": 1}
|
|
pty_stdout = self.client.attach_socket(container, opts)
|
|
self.addCleanup(pty_stdout.close)
|
|
|
|
self.client.start(container)
|
|
|
|
(stream, next_size) = next_frame_header(pty_stdout)
|
|
assert stream == 1 # correspond to stdout
|
|
assert next_size == len(line)
|
|
data = read_exactly(pty_stdout, next_size)
|
|
assert data.decode('utf-8') == line
|
|
|
|
@pytest.mark.xfail(condition=bool(os.environ.get('DOCKER_CERT_PATH', '')),
|
|
reason='DOCKER_CERT_PATH not respected for websockets')
|
|
def test_run_container_reading_socket_ws(self):
|
|
line = 'hi there and stuff and things, words!'
|
|
# `echo` appends CRLF, `printf` doesn't
|
|
command = f"printf '{line}'"
|
|
container = self.client.create_container(TEST_IMG, command,
|
|
detach=True, tty=False)
|
|
self.tmp_containers.append(container)
|
|
|
|
opts = {"stdout": 1, "stream": 1, "logs": 1}
|
|
pty_stdout = self.client.attach_socket(container, opts, ws=True)
|
|
self.addCleanup(pty_stdout.close)
|
|
|
|
self.client.start(container)
|
|
|
|
data = pty_stdout.recv()
|
|
assert data.decode('utf-8') == line
|
|
|
|
@pytest.mark.timeout(10)
|
|
def test_attach_no_stream(self):
|
|
container = self.client.create_container(
|
|
TEST_IMG, 'echo hello'
|
|
)
|
|
self.tmp_containers.append(container)
|
|
self.client.start(container)
|
|
self.client.wait(container, condition='not-running')
|
|
output = self.client.attach(container, stream=False, logs=True)
|
|
assert output == 'hello\n'.encode(encoding='ascii')
|
|
|
|
@pytest.mark.timeout(10)
|
|
@pytest.mark.skipif(os.environ.get('DOCKER_HOST', '').startswith('ssh://'),
|
|
reason='No cancellable streams over SSH')
|
|
@pytest.mark.xfail(condition=os.environ.get('DOCKER_TLS_VERIFY') or
|
|
os.environ.get('DOCKER_CERT_PATH'),
|
|
reason='Flaky test on TLS')
|
|
def test_attach_stream_and_cancel(self):
|
|
container = self.client.create_container(
|
|
TEST_IMG, 'sh -c "sleep 2 && echo hello && sleep 60"',
|
|
tty=True
|
|
)
|
|
self.tmp_containers.append(container)
|
|
self.client.start(container)
|
|
output = self.client.attach(container, stream=True, logs=True)
|
|
|
|
threading.Timer(3, output.close).start()
|
|
|
|
lines = []
|
|
for line in output:
|
|
lines.append(line)
|
|
|
|
assert len(lines) == 1
|
|
assert lines[0] == 'hello\r\n'.encode(encoding='ascii')
|
|
|
|
def test_detach_with_default(self):
|
|
container = self.client.create_container(
|
|
TEST_IMG, 'cat',
|
|
detach=True, stdin_open=True, tty=True
|
|
)
|
|
self.tmp_containers.append(container)
|
|
self.client.start(container)
|
|
|
|
sock = self.client.attach_socket(
|
|
container,
|
|
{'stdin': True, 'stream': True}
|
|
)
|
|
|
|
assert_cat_socket_detached_with_keys(
|
|
sock, [ctrl_with('p'), ctrl_with('q')]
|
|
)
|
|
|
|
def test_detach_with_config_file(self):
|
|
self.client._general_configs['detachKeys'] = 'ctrl-p'
|
|
|
|
container = self.client.create_container(
|
|
TEST_IMG, 'cat',
|
|
detach=True, stdin_open=True, tty=True
|
|
)
|
|
self.tmp_containers.append(container)
|
|
self.client.start(container)
|
|
|
|
sock = self.client.attach_socket(
|
|
container,
|
|
{'stdin': True, 'stream': True}
|
|
)
|
|
|
|
assert_cat_socket_detached_with_keys(sock, [ctrl_with('p')])
|
|
|
|
def test_detach_with_arg(self):
|
|
self.client._general_configs['detachKeys'] = 'ctrl-p'
|
|
|
|
container = self.client.create_container(
|
|
TEST_IMG, 'cat',
|
|
detach=True, stdin_open=True, tty=True
|
|
)
|
|
self.tmp_containers.append(container)
|
|
self.client.start(container)
|
|
|
|
sock = self.client.attach_socket(
|
|
container,
|
|
{'stdin': True, 'stream': True, 'detachKeys': 'ctrl-x'}
|
|
)
|
|
|
|
assert_cat_socket_detached_with_keys(sock, [ctrl_with('x')])
|
|
|
|
|
|
class PauseTest(BaseAPIIntegrationTest):
|
|
def test_pause_unpause(self):
|
|
container = self.client.create_container(TEST_IMG, ['sleep', '9999'])
|
|
id = container['Id']
|
|
self.tmp_containers.append(id)
|
|
self.client.start(container)
|
|
self.client.pause(id)
|
|
container_info = self.client.inspect_container(id)
|
|
assert 'State' in container_info
|
|
state = container_info['State']
|
|
assert 'ExitCode' in state
|
|
assert state['ExitCode'] == 0
|
|
assert 'Running' in state
|
|
assert state['Running'] is True
|
|
assert 'Paused' in state
|
|
assert state['Paused'] is True
|
|
|
|
self.client.unpause(id)
|
|
container_info = self.client.inspect_container(id)
|
|
assert 'State' in container_info
|
|
state = container_info['State']
|
|
assert 'ExitCode' in state
|
|
assert state['ExitCode'] == 0
|
|
assert 'Running' in state
|
|
assert state['Running'] is True
|
|
assert 'Paused' in state
|
|
assert state['Paused'] is False
|
|
|
|
|
|
class PruneTest(BaseAPIIntegrationTest):
|
|
@requires_api_version('1.25')
|
|
def test_prune_containers(self):
|
|
container1 = self.client.create_container(
|
|
TEST_IMG, ['sh', '-c', 'echo hello > /data.txt']
|
|
)
|
|
container2 = self.client.create_container(TEST_IMG, ['sleep', '9999'])
|
|
self.client.start(container1)
|
|
self.client.start(container2)
|
|
self.client.wait(container1)
|
|
result = self.client.prune_containers()
|
|
assert container1['Id'] in result['ContainersDeleted']
|
|
assert result['SpaceReclaimed'] > 0
|
|
assert container2['Id'] not in result['ContainersDeleted']
|
|
|
|
|
|
class GetContainerStatsTest(BaseAPIIntegrationTest):
|
|
def test_get_container_stats_no_stream(self):
|
|
container = self.client.create_container(
|
|
TEST_IMG, ['sleep', '60'],
|
|
)
|
|
self.tmp_containers.append(container)
|
|
self.client.start(container)
|
|
response = self.client.stats(container, stream=0)
|
|
self.client.kill(container)
|
|
|
|
assert isinstance(response, dict)
|
|
for key in ['read', 'networks', 'precpu_stats', 'cpu_stats',
|
|
'memory_stats', 'blkio_stats']:
|
|
assert key in response
|
|
|
|
def test_get_container_stats_stream(self):
|
|
container = self.client.create_container(
|
|
TEST_IMG, ['sleep', '60'],
|
|
)
|
|
self.tmp_containers.append(container)
|
|
self.client.start(container)
|
|
stream = self.client.stats(container)
|
|
for chunk in stream:
|
|
assert isinstance(chunk, dict)
|
|
for key in ['read', 'network', 'precpu_stats', 'cpu_stats',
|
|
'memory_stats', 'blkio_stats']:
|
|
assert key in chunk
|
|
|
|
|
|
class ContainerUpdateTest(BaseAPIIntegrationTest):
|
|
@requires_api_version('1.22')
|
|
def test_update_container(self):
|
|
old_mem_limit = 400 * 1024 * 1024
|
|
new_mem_limit = 300 * 1024 * 1024
|
|
container = self.client.create_container(
|
|
TEST_IMG, 'top', host_config=self.client.create_host_config(
|
|
mem_limit=old_mem_limit
|
|
)
|
|
)
|
|
self.tmp_containers.append(container)
|
|
self.client.start(container)
|
|
self.client.update_container(container, mem_limit=new_mem_limit)
|
|
inspect_data = self.client.inspect_container(container)
|
|
assert inspect_data['HostConfig']['Memory'] == new_mem_limit
|
|
|
|
@requires_api_version('1.23')
|
|
def test_restart_policy_update(self):
|
|
old_restart_policy = {
|
|
'MaximumRetryCount': 0,
|
|
'Name': 'always'
|
|
}
|
|
new_restart_policy = {
|
|
'MaximumRetryCount': 42,
|
|
'Name': 'on-failure'
|
|
}
|
|
container = self.client.create_container(
|
|
TEST_IMG, ['sleep', '60'],
|
|
host_config=self.client.create_host_config(
|
|
restart_policy=old_restart_policy
|
|
)
|
|
)
|
|
self.tmp_containers.append(container)
|
|
self.client.start(container)
|
|
self.client.update_container(container,
|
|
restart_policy=new_restart_policy)
|
|
inspect_data = self.client.inspect_container(container)
|
|
assert (
|
|
inspect_data['HostConfig']['RestartPolicy']['MaximumRetryCount'] ==
|
|
new_restart_policy['MaximumRetryCount']
|
|
)
|
|
assert (
|
|
inspect_data['HostConfig']['RestartPolicy']['Name'] ==
|
|
new_restart_policy['Name']
|
|
)
|
|
|
|
|
|
class ContainerCPUTest(BaseAPIIntegrationTest):
|
|
def test_container_cpu_shares(self):
|
|
cpu_shares = 512
|
|
container = self.client.create_container(
|
|
TEST_IMG, 'ls', host_config=self.client.create_host_config(
|
|
cpu_shares=cpu_shares
|
|
)
|
|
)
|
|
self.tmp_containers.append(container)
|
|
self.client.start(container)
|
|
inspect_data = self.client.inspect_container(container)
|
|
assert inspect_data['HostConfig']['CpuShares'] == 512
|
|
|
|
def test_container_cpuset(self):
|
|
cpuset_cpus = "0,1"
|
|
container = self.client.create_container(
|
|
TEST_IMG, 'ls', host_config=self.client.create_host_config(
|
|
cpuset_cpus=cpuset_cpus
|
|
)
|
|
)
|
|
self.tmp_containers.append(container)
|
|
self.client.start(container)
|
|
inspect_data = self.client.inspect_container(container)
|
|
assert inspect_data['HostConfig']['CpusetCpus'] == cpuset_cpus
|
|
|
|
@requires_api_version('1.25')
|
|
def test_create_with_runtime(self):
|
|
container = self.client.create_container(
|
|
TEST_IMG, ['echo', 'test'], runtime='runc'
|
|
)
|
|
self.tmp_containers.append(container['Id'])
|
|
config = self.client.inspect_container(container)
|
|
assert config['HostConfig']['Runtime'] == 'runc'
|
|
|
|
|
|
class LinkTest(BaseAPIIntegrationTest):
|
|
def test_remove_link(self):
|
|
# Create containers
|
|
container1 = self.client.create_container(
|
|
TEST_IMG, 'cat', detach=True, stdin_open=True
|
|
)
|
|
container1_id = container1['Id']
|
|
self.tmp_containers.append(container1_id)
|
|
self.client.start(container1_id)
|
|
|
|
# Create Link
|
|
# we don't want the first /
|
|
link_path = self.client.inspect_container(container1_id)['Name'][1:]
|
|
link_alias = 'mylink'
|
|
|
|
container2 = self.client.create_container(
|
|
TEST_IMG, 'cat', host_config=self.client.create_host_config(
|
|
links={link_path: link_alias}
|
|
)
|
|
)
|
|
container2_id = container2['Id']
|
|
self.tmp_containers.append(container2_id)
|
|
self.client.start(container2_id)
|
|
|
|
# Remove link
|
|
linked_name = self.client.inspect_container(container2_id)['Name'][1:]
|
|
link_name = f'{linked_name}/{link_alias}'
|
|
self.client.remove_container(link_name, link=True)
|
|
|
|
# Link is gone
|
|
containers = self.client.containers(all=True)
|
|
retrieved = [x for x in containers if link_name in x['Names']]
|
|
assert len(retrieved) == 0
|
|
|
|
# Containers are still there
|
|
retrieved = [
|
|
x for x in containers if x['Id'].startswith(container1_id) or
|
|
x['Id'].startswith(container2_id)
|
|
]
|
|
assert len(retrieved) == 2
|
|
|
|
class ContainerInfoObjectTest(BaseAPIIntegrationTest):
|
|
def test_container_info_object(self):
|
|
container = self.client.create_container(
|
|
TEST_IMG, 'true', host_config=self.client.create_host_config())
|
|
self.tmp_containers.append(container)
|
|
self.client.start(container)
|
|
|
|
inspect_data = self.client.inspect_container(container)
|
|
assert isinstance(inspect_data, docker.api.container.ContainerInfo)
|
|
assert inspect_data['Config']['Image'] == TEST_IMG
|
|
assert inspect_data['HostConfig']['NetworkMode'] == 'bridge'
|
|
|
|
# attribute style access
|
|
assert inspect_data.Id == container['Id']
|
|
assert inspect_data.Config.Image == TEST_IMG
|
|
assert inspect_data.HostConfig.NetworkMode == 'bridge'
|