mirror of https://github.com/docker/docker-py.git
561 lines
21 KiB
Python
561 lines
21 KiB
Python
import os
|
|
import tempfile
|
|
import threading
|
|
|
|
import pytest
|
|
|
|
import docker
|
|
|
|
from ..helpers import random_name, requires_api_version
|
|
from .base import TEST_API_VERSION, BaseIntegrationTest
|
|
|
|
|
|
class ContainerCollectionTest(BaseIntegrationTest):
|
|
|
|
def test_run(self):
|
|
client = docker.from_env(version=TEST_API_VERSION)
|
|
assert client.containers.run(
|
|
"alpine", "echo hello world", remove=True
|
|
) == b'hello world\n'
|
|
|
|
def test_run_detach(self):
|
|
client = docker.from_env(version=TEST_API_VERSION)
|
|
container = client.containers.run("alpine", "sleep 300", detach=True)
|
|
self.tmp_containers.append(container.id)
|
|
assert container.attrs['Config']['Image'] == "alpine"
|
|
assert container.attrs['Config']['Cmd'] == ['sleep', '300']
|
|
|
|
def test_run_with_error(self):
|
|
client = docker.from_env(version=TEST_API_VERSION)
|
|
with pytest.raises(docker.errors.ContainerError) as cm:
|
|
client.containers.run("alpine", "cat /test", remove=True)
|
|
assert cm.value.exit_status == 1
|
|
assert "cat /test" in cm.exconly()
|
|
assert "alpine" in cm.exconly()
|
|
assert "No such file or directory" in cm.exconly()
|
|
|
|
def test_run_with_image_that_does_not_exist(self):
|
|
client = docker.from_env(version=TEST_API_VERSION)
|
|
with pytest.raises(docker.errors.ImageNotFound):
|
|
client.containers.run("dockerpytest_does_not_exist")
|
|
|
|
@pytest.mark.skipif(
|
|
docker.constants.IS_WINDOWS_PLATFORM, reason="host mounts on Windows"
|
|
)
|
|
def test_run_with_volume(self):
|
|
client = docker.from_env(version=TEST_API_VERSION)
|
|
path = tempfile.mkdtemp()
|
|
|
|
container = client.containers.run(
|
|
"alpine", "sh -c 'echo \"hello\" > /insidecontainer/test'",
|
|
volumes=[f"{path}:/insidecontainer"],
|
|
detach=True
|
|
)
|
|
self.tmp_containers.append(container.id)
|
|
container.wait()
|
|
|
|
name = "container_volume_test"
|
|
out = client.containers.run(
|
|
"alpine", "cat /insidecontainer/test",
|
|
volumes=[f"{path}:/insidecontainer"],
|
|
name=name
|
|
)
|
|
self.tmp_containers.append(name)
|
|
assert out == b'hello\n'
|
|
|
|
def test_run_with_named_volume(self):
|
|
client = docker.from_env(version=TEST_API_VERSION)
|
|
volume = client.volumes.create(name="somevolume")
|
|
self.tmp_volumes.append(volume.id)
|
|
|
|
container = client.containers.run(
|
|
"alpine", "sh -c 'echo \"hello\" > /insidecontainer/test'",
|
|
volumes=["somevolume:/insidecontainer"],
|
|
detach=True
|
|
)
|
|
self.tmp_containers.append(container.id)
|
|
container.wait()
|
|
|
|
name = "container_volume_test"
|
|
out = client.containers.run(
|
|
"alpine", "cat /insidecontainer/test",
|
|
volumes=["somevolume:/insidecontainer"],
|
|
name=name
|
|
)
|
|
self.tmp_containers.append(name)
|
|
assert out == b'hello\n'
|
|
|
|
def test_run_with_network(self):
|
|
net_name = random_name()
|
|
client = docker.from_env(version=TEST_API_VERSION)
|
|
client.networks.create(net_name)
|
|
self.tmp_networks.append(net_name)
|
|
|
|
container = client.containers.run(
|
|
'alpine', 'echo hello world', network=net_name,
|
|
detach=True
|
|
)
|
|
self.tmp_containers.append(container.id)
|
|
|
|
attrs = container.attrs
|
|
|
|
assert 'NetworkSettings' in attrs
|
|
assert 'Networks' in attrs['NetworkSettings']
|
|
assert list(attrs['NetworkSettings']['Networks'].keys()) == [net_name]
|
|
|
|
def test_run_with_networking_config(self):
|
|
net_name = random_name()
|
|
client = docker.from_env(version=TEST_API_VERSION)
|
|
client.networks.create(net_name)
|
|
self.tmp_networks.append(net_name)
|
|
|
|
test_alias = 'hello'
|
|
test_driver_opt = {'key1': 'a'}
|
|
|
|
networking_config = {
|
|
net_name: client.api.create_endpoint_config(
|
|
aliases=[test_alias],
|
|
driver_opt=test_driver_opt
|
|
)
|
|
}
|
|
|
|
container = client.containers.run(
|
|
'alpine', 'echo hello world', network=net_name,
|
|
networking_config=networking_config,
|
|
detach=True
|
|
)
|
|
self.tmp_containers.append(container.id)
|
|
|
|
attrs = container.attrs
|
|
|
|
assert 'NetworkSettings' in attrs
|
|
assert 'Networks' in attrs['NetworkSettings']
|
|
assert list(attrs['NetworkSettings']['Networks'].keys()) == [net_name]
|
|
# Aliases no longer include the container's short-id in API v1.45.
|
|
assert attrs['NetworkSettings']['Networks'][net_name]['Aliases'] \
|
|
== [test_alias]
|
|
assert attrs['NetworkSettings']['Networks'][net_name]['DriverOpts'] \
|
|
== test_driver_opt
|
|
|
|
def test_run_with_networking_config_with_undeclared_network(self):
|
|
net_name = random_name()
|
|
client = docker.from_env(version=TEST_API_VERSION)
|
|
client.networks.create(net_name)
|
|
self.tmp_networks.append(net_name)
|
|
|
|
test_aliases = ['hello']
|
|
test_driver_opt = {'key1': 'a'}
|
|
|
|
networking_config = {
|
|
net_name: client.api.create_endpoint_config(
|
|
aliases=test_aliases,
|
|
driver_opt=test_driver_opt
|
|
),
|
|
'bar': client.api.create_endpoint_config(
|
|
aliases=['test'],
|
|
driver_opt={'key2': 'b'}
|
|
),
|
|
}
|
|
|
|
with pytest.raises(docker.errors.ContainerStartError) as err:
|
|
client.containers.run(
|
|
'alpine', 'echo hello world', network=net_name,
|
|
networking_config=networking_config,
|
|
detach=True
|
|
)
|
|
self.tmp_containers.append(err.container.id)
|
|
|
|
def test_run_with_networking_config_only_undeclared_network(self):
|
|
net_name = random_name()
|
|
client = docker.from_env(version=TEST_API_VERSION)
|
|
client.networks.create(net_name)
|
|
self.tmp_networks.append(net_name)
|
|
|
|
networking_config = {
|
|
'bar': client.api.create_endpoint_config(
|
|
aliases=['hello'],
|
|
driver_opt={'key1': 'a'}
|
|
),
|
|
}
|
|
|
|
container = client.containers.run(
|
|
'alpine', 'echo hello world', network=net_name,
|
|
networking_config=networking_config,
|
|
detach=True
|
|
)
|
|
self.tmp_containers.append(container.id)
|
|
|
|
attrs = container.attrs
|
|
|
|
assert 'NetworkSettings' in attrs
|
|
assert 'Networks' in attrs['NetworkSettings']
|
|
assert list(attrs['NetworkSettings']['Networks'].keys()) == [net_name]
|
|
# Aliases no longer include the container's short-id in API v1.45.
|
|
assert (attrs['NetworkSettings']['Networks'][net_name]['Aliases']
|
|
is None)
|
|
assert (attrs['NetworkSettings']['Networks'][net_name]['DriverOpts']
|
|
is None)
|
|
|
|
def test_run_with_none_driver(self):
|
|
client = docker.from_env(version=TEST_API_VERSION)
|
|
|
|
out = client.containers.run(
|
|
"alpine", "echo hello",
|
|
log_config={"type": 'none'}
|
|
)
|
|
assert out is None
|
|
|
|
def test_run_with_json_file_driver(self):
|
|
client = docker.from_env(version=TEST_API_VERSION)
|
|
|
|
out = client.containers.run(
|
|
"alpine", "echo hello",
|
|
log_config={"type": 'json-file'}
|
|
)
|
|
assert out == b'hello\n'
|
|
|
|
@requires_api_version('1.25')
|
|
def test_run_with_auto_remove(self):
|
|
client = docker.from_env(version=TEST_API_VERSION)
|
|
out = client.containers.run(
|
|
# sleep(2) to allow any communication with the container
|
|
# before it gets removed by the host.
|
|
'alpine', 'sh -c "echo hello && sleep 2"', auto_remove=True
|
|
)
|
|
assert out == b'hello\n'
|
|
|
|
@requires_api_version('1.25')
|
|
def test_run_with_auto_remove_error(self):
|
|
client = docker.from_env(version=TEST_API_VERSION)
|
|
with pytest.raises(docker.errors.ContainerError) as e:
|
|
client.containers.run(
|
|
# sleep(2) to allow any communication with the container
|
|
# before it gets removed by the host.
|
|
'alpine', 'sh -c ">&2 echo error && sleep 2 && exit 1"',
|
|
auto_remove=True
|
|
)
|
|
assert e.value.exit_status == 1
|
|
assert e.value.stderr is None
|
|
|
|
def test_run_with_streamed_logs(self):
|
|
client = docker.from_env(version=TEST_API_VERSION)
|
|
out = client.containers.run(
|
|
'alpine', 'sh -c "echo hello && echo world"', stream=True
|
|
)
|
|
logs = list(out)
|
|
assert logs[0] == b'hello\n'
|
|
assert logs[1] == b'world\n'
|
|
|
|
@pytest.mark.timeout(5)
|
|
@pytest.mark.skipif(os.environ.get('DOCKER_HOST', '').startswith('ssh://'),
|
|
reason='No cancellable streams over SSH')
|
|
def test_run_with_streamed_logs_and_cancel(self):
|
|
client = docker.from_env(version=TEST_API_VERSION)
|
|
out = client.containers.run(
|
|
'alpine', 'sh -c "echo hello && echo world"', stream=True
|
|
)
|
|
|
|
threading.Timer(1, out.close).start()
|
|
|
|
logs = list(out)
|
|
|
|
assert len(logs) == 2
|
|
assert logs[0] == b'hello\n'
|
|
assert logs[1] == b'world\n'
|
|
|
|
def test_run_with_proxy_config(self):
|
|
client = docker.from_env(version=TEST_API_VERSION)
|
|
client.api._proxy_configs = docker.utils.proxy.ProxyConfig(
|
|
ftp='sakuya.jp:4967'
|
|
)
|
|
|
|
out = client.containers.run('alpine', 'sh -c "env"')
|
|
|
|
assert b'FTP_PROXY=sakuya.jp:4967\n' in out
|
|
assert b'ftp_proxy=sakuya.jp:4967\n' in out
|
|
|
|
def test_get(self):
|
|
client = docker.from_env(version=TEST_API_VERSION)
|
|
container = client.containers.run("alpine", "sleep 300", detach=True)
|
|
self.tmp_containers.append(container.id)
|
|
assert client.containers.get(container.id).attrs[
|
|
'Config']['Image'] == "alpine"
|
|
|
|
def test_list(self):
|
|
client = docker.from_env(version=TEST_API_VERSION)
|
|
container_id = client.containers.run(
|
|
"alpine", "sleep 300", detach=True).id
|
|
self.tmp_containers.append(container_id)
|
|
containers = [c for c in client.containers.list() if c.id ==
|
|
container_id]
|
|
assert len(containers) == 1
|
|
|
|
container = containers[0]
|
|
assert container.attrs['Config']['Image'] == 'alpine'
|
|
assert container.status == 'running'
|
|
assert container.image == client.images.get('alpine')
|
|
|
|
container.kill()
|
|
container.remove()
|
|
assert container_id not in [c.id for c in client.containers.list()]
|
|
|
|
def test_list_sparse(self):
|
|
client = docker.from_env(version=TEST_API_VERSION)
|
|
container_id = client.containers.run(
|
|
"alpine", "sleep 300", detach=True).id
|
|
self.tmp_containers.append(container_id)
|
|
containers = [c for c in client.containers.list(sparse=True) if c.id ==
|
|
container_id]
|
|
assert len(containers) == 1
|
|
|
|
container = containers[0]
|
|
assert container.attrs['Image'] == 'alpine'
|
|
assert container.status == 'running'
|
|
assert container.image == client.images.get('alpine')
|
|
with pytest.raises(docker.errors.DockerException):
|
|
_ = container.labels
|
|
|
|
container.kill()
|
|
container.remove()
|
|
assert container_id not in [c.id for c in client.containers.list()]
|
|
|
|
|
|
class ContainerTest(BaseIntegrationTest):
|
|
|
|
def test_commit(self):
|
|
client = docker.from_env(version=TEST_API_VERSION)
|
|
container = client.containers.run(
|
|
"alpine", "sh -c 'echo \"hello\" > /test'",
|
|
detach=True
|
|
)
|
|
self.tmp_containers.append(container.id)
|
|
container.wait()
|
|
image = container.commit()
|
|
assert client.containers.run(
|
|
image.id, "cat /test", remove=True
|
|
) == b"hello\n"
|
|
|
|
def test_diff(self):
|
|
client = docker.from_env(version=TEST_API_VERSION)
|
|
container = client.containers.run("alpine", "touch /test", detach=True)
|
|
self.tmp_containers.append(container.id)
|
|
container.wait()
|
|
assert container.diff() == [{'Path': '/test', 'Kind': 1}]
|
|
|
|
def test_exec_run_success(self):
|
|
client = docker.from_env(version=TEST_API_VERSION)
|
|
container = client.containers.run(
|
|
"alpine", "sh -c 'echo \"hello\" > /test; sleep 60'", detach=True
|
|
)
|
|
self.tmp_containers.append(container.id)
|
|
exec_output = container.exec_run("cat /test")
|
|
assert exec_output[0] == 0
|
|
assert exec_output[1] == b"hello\n"
|
|
|
|
def test_exec_run_error_code_from_exec(self):
|
|
client = docker.from_env(version=TEST_API_VERSION)
|
|
container = client.containers.run(
|
|
"alpine", "sh -c 'sleep 20'", detach=True
|
|
)
|
|
self.tmp_containers.append(container.id)
|
|
exec_output = container.exec_run("sh -c 'exit 42'")
|
|
assert exec_output[0] == 42
|
|
|
|
def test_exec_run_failed(self):
|
|
client = docker.from_env(version=TEST_API_VERSION)
|
|
container = client.containers.run(
|
|
"alpine", "sh -c 'sleep 60'", detach=True
|
|
)
|
|
self.tmp_containers.append(container.id)
|
|
exec_output = container.exec_run("non-existent")
|
|
# older versions of docker return `126` in the case that an exec cannot
|
|
# be started due to a missing executable. We're fixing this for the
|
|
# future, so accept both for now.
|
|
assert exec_output[0] == 127 or exec_output[0] == 126
|
|
|
|
def test_kill(self):
|
|
client = docker.from_env(version=TEST_API_VERSION)
|
|
container = client.containers.run("alpine", "sleep 300", detach=True)
|
|
self.tmp_containers.append(container.id)
|
|
while container.status != 'running':
|
|
container.reload()
|
|
assert container.status == 'running'
|
|
container.kill()
|
|
container.reload()
|
|
assert container.status == 'exited'
|
|
|
|
def test_logs(self):
|
|
client = docker.from_env(version=TEST_API_VERSION)
|
|
container = client.containers.run("alpine", "echo hello world",
|
|
detach=True)
|
|
self.tmp_containers.append(container.id)
|
|
container.wait()
|
|
assert container.logs() == b"hello world\n"
|
|
|
|
def test_pause(self):
|
|
client = docker.from_env(version=TEST_API_VERSION)
|
|
container = client.containers.run("alpine", "sleep 300", detach=True)
|
|
self.tmp_containers.append(container.id)
|
|
container.pause()
|
|
container.reload()
|
|
assert container.status == "paused"
|
|
container.unpause()
|
|
container.reload()
|
|
assert container.status == "running"
|
|
|
|
def test_remove(self):
|
|
client = docker.from_env(version=TEST_API_VERSION)
|
|
container = client.containers.run("alpine", "echo hello", detach=True)
|
|
self.tmp_containers.append(container.id)
|
|
assert container.id in [c.id for c in client.containers.list(all=True)]
|
|
container.wait()
|
|
container.remove()
|
|
containers = client.containers.list(all=True)
|
|
assert container.id not in [c.id for c in containers]
|
|
|
|
def test_rename(self):
|
|
client = docker.from_env(version=TEST_API_VERSION)
|
|
container = client.containers.run("alpine", "echo hello", name="test1",
|
|
detach=True)
|
|
self.tmp_containers.append(container.id)
|
|
assert container.name == "test1"
|
|
container.rename("test2")
|
|
container.reload()
|
|
assert container.name == "test2"
|
|
|
|
def test_restart(self):
|
|
client = docker.from_env(version=TEST_API_VERSION)
|
|
container = client.containers.run("alpine", "sleep 100", detach=True)
|
|
self.tmp_containers.append(container.id)
|
|
first_started_at = container.attrs['State']['StartedAt']
|
|
container.restart()
|
|
container.reload()
|
|
second_started_at = container.attrs['State']['StartedAt']
|
|
assert first_started_at != second_started_at
|
|
|
|
def test_start(self):
|
|
client = docker.from_env(version=TEST_API_VERSION)
|
|
container = client.containers.create("alpine", "sleep 50", detach=True)
|
|
self.tmp_containers.append(container.id)
|
|
assert container.status == "created"
|
|
container.start()
|
|
container.reload()
|
|
assert container.status == "running"
|
|
|
|
def test_stats(self):
|
|
client = docker.from_env(version=TEST_API_VERSION)
|
|
container = client.containers.run("alpine", "sleep 100", detach=True)
|
|
self.tmp_containers.append(container.id)
|
|
stats = container.stats(stream=False)
|
|
for key in ['read', 'networks', 'precpu_stats', 'cpu_stats',
|
|
'memory_stats', 'blkio_stats']:
|
|
assert key in stats
|
|
|
|
def test_ports_target_none(self):
|
|
client = docker.from_env(version=TEST_API_VERSION)
|
|
ports = None
|
|
target_ports = {'2222/tcp': ports}
|
|
container = client.containers.run(
|
|
"alpine", "sleep 100", detach=True,
|
|
ports=target_ports
|
|
)
|
|
self.tmp_containers.append(container.id)
|
|
container.reload() # required to get auto-assigned ports
|
|
actual_ports = container.ports
|
|
assert sorted(target_ports.keys()) == sorted(actual_ports.keys())
|
|
for target_client, target_host in target_ports.items():
|
|
for actual_port in actual_ports[target_client]:
|
|
actual_keys = sorted(actual_port.keys())
|
|
assert sorted(['HostIp', 'HostPort']) == actual_keys
|
|
assert target_host is ports
|
|
assert int(actual_port['HostPort']) > 0
|
|
client.close()
|
|
|
|
def test_ports_target_tuple(self):
|
|
client = docker.from_env(version=TEST_API_VERSION)
|
|
ports = ('127.0.0.1', 1111)
|
|
target_ports = {'2222/tcp': ports}
|
|
container = client.containers.run(
|
|
"alpine", "sleep 100", detach=True,
|
|
ports=target_ports
|
|
)
|
|
self.tmp_containers.append(container.id)
|
|
container.reload() # required to get auto-assigned ports
|
|
actual_ports = container.ports
|
|
assert sorted(target_ports.keys()) == sorted(actual_ports.keys())
|
|
for target_client, target_host in target_ports.items():
|
|
for actual_port in actual_ports[target_client]:
|
|
actual_keys = sorted(actual_port.keys())
|
|
assert sorted(['HostIp', 'HostPort']) == actual_keys
|
|
assert target_host == ports
|
|
assert int(actual_port['HostPort']) > 0
|
|
client.close()
|
|
|
|
def test_ports_target_list(self):
|
|
client = docker.from_env(version=TEST_API_VERSION)
|
|
ports = [1234, 4567]
|
|
target_ports = {'2222/tcp': ports}
|
|
container = client.containers.run(
|
|
"alpine", "sleep 100", detach=True,
|
|
ports=target_ports
|
|
)
|
|
self.tmp_containers.append(container.id)
|
|
container.reload() # required to get auto-assigned ports
|
|
actual_ports = container.ports
|
|
assert sorted(target_ports.keys()) == sorted(actual_ports.keys())
|
|
for target_client, target_host in target_ports.items():
|
|
for actual_port in actual_ports[target_client]:
|
|
actual_keys = sorted(actual_port.keys())
|
|
assert sorted(['HostIp', 'HostPort']) == actual_keys
|
|
assert target_host == ports
|
|
assert int(actual_port['HostPort']) > 0
|
|
client.close()
|
|
|
|
def test_stop(self):
|
|
client = docker.from_env(version=TEST_API_VERSION)
|
|
container = client.containers.run("alpine", "top", detach=True)
|
|
self.tmp_containers.append(container.id)
|
|
assert container.status in ("running", "created")
|
|
container.stop(timeout=2)
|
|
container.reload()
|
|
assert container.status == "exited"
|
|
|
|
def test_top(self):
|
|
client = docker.from_env(version=TEST_API_VERSION)
|
|
container = client.containers.run("alpine", "sleep 60", detach=True)
|
|
self.tmp_containers.append(container.id)
|
|
top = container.top()
|
|
assert len(top['Processes']) == 1
|
|
assert 'sleep 60' in top['Processes'][0]
|
|
|
|
def test_update(self):
|
|
client = docker.from_env(version=TEST_API_VERSION)
|
|
container = client.containers.run("alpine", "sleep 60", detach=True,
|
|
cpu_shares=2)
|
|
self.tmp_containers.append(container.id)
|
|
assert container.attrs['HostConfig']['CpuShares'] == 2
|
|
container.update(cpu_shares=3)
|
|
container.reload()
|
|
assert container.attrs['HostConfig']['CpuShares'] == 3
|
|
|
|
def test_wait(self):
|
|
client = docker.from_env(version=TEST_API_VERSION)
|
|
container = client.containers.run("alpine", "sh -c 'exit 0'",
|
|
detach=True)
|
|
self.tmp_containers.append(container.id)
|
|
assert container.wait()['StatusCode'] == 0
|
|
container = client.containers.run("alpine", "sh -c 'exit 1'",
|
|
detach=True)
|
|
self.tmp_containers.append(container.id)
|
|
assert container.wait()['StatusCode'] == 1
|
|
|
|
def test_create_with_volume_driver(self):
|
|
client = docker.from_env(version=TEST_API_VERSION)
|
|
container = client.containers.create(
|
|
'alpine',
|
|
'sleep 300',
|
|
volume_driver='foo'
|
|
)
|
|
self.tmp_containers.append(container.id)
|
|
assert container.attrs['HostConfig']['VolumeDriver'] == 'foo'
|