docker-py/tests/integration/api_network_test.py

496 lines
18 KiB
Python

import docker
from docker.types import IPAMConfig, IPAMPool
import pytest
from ..helpers import random_name, requires_api_version
from .base import BaseAPIIntegrationTest, TEST_IMG
class TestNetworks(BaseAPIIntegrationTest):
def tearDown(self):
self.client.leave_swarm(force=True)
super(TestNetworks, self).tearDown()
def create_network(self, *args, **kwargs):
net_name = random_name()
net_id = self.client.create_network(net_name, *args, **kwargs)['Id']
self.tmp_networks.append(net_id)
return (net_name, net_id)
def test_list_networks(self):
networks = self.client.networks()
net_name, net_id = self.create_network()
networks = self.client.networks()
assert net_id in [n['Id'] for n in networks]
networks_by_name = self.client.networks(names=[net_name])
assert [n['Id'] for n in networks_by_name] == [net_id]
networks_by_partial_id = self.client.networks(ids=[net_id[:8]])
assert [n['Id'] for n in networks_by_partial_id] == [net_id]
def test_inspect_network(self):
net_name, net_id = self.create_network()
net = self.client.inspect_network(net_id)
assert net['Id'] == net_id
assert net['Name'] == net_name
assert net['Driver'] == 'bridge'
assert net['Scope'] == 'local'
assert net['IPAM']['Driver'] == 'default'
def test_create_network_with_ipam_config(self):
_, net_id = self.create_network(
ipam=IPAMConfig(
driver='default',
pool_configs=[
IPAMPool(
subnet="172.28.0.0/16",
iprange="172.28.5.0/24",
gateway="172.28.5.254",
aux_addresses={
"a": "172.28.1.5",
"b": "172.28.1.6",
"c": "172.28.1.7",
},
),
],
),
)
net = self.client.inspect_network(net_id)
ipam = net['IPAM']
assert ipam.pop('Options', None) is None
assert ipam['Driver'] == 'default'
assert ipam['Config'] == [{
'Subnet': "172.28.0.0/16",
'IPRange': "172.28.5.0/24",
'Gateway': "172.28.5.254",
'AuxiliaryAddresses': {
"a": "172.28.1.5",
"b": "172.28.1.6",
"c": "172.28.1.7",
},
}]
def test_create_network_with_host_driver_fails(self):
with pytest.raises(docker.errors.APIError):
self.client.create_network(random_name(), driver='host')
def test_remove_network(self):
net_name, net_id = self.create_network()
assert net_name in [n['Name'] for n in self.client.networks()]
self.client.remove_network(net_id)
assert net_name not in [n['Name'] for n in self.client.networks()]
def test_connect_and_disconnect_container(self):
net_name, net_id = self.create_network()
container = self.client.create_container(TEST_IMG, 'top')
self.tmp_containers.append(container)
self.client.start(container)
network_data = self.client.inspect_network(net_id)
assert not network_data.get('Containers')
self.client.connect_container_to_network(container, net_id)
network_data = self.client.inspect_network(net_id)
assert list(network_data['Containers'].keys()) == [
container['Id']
]
with pytest.raises(docker.errors.APIError):
self.client.connect_container_to_network(container, net_id)
self.client.disconnect_container_from_network(container, net_id)
network_data = self.client.inspect_network(net_id)
assert not network_data.get('Containers')
with pytest.raises(docker.errors.APIError):
self.client.disconnect_container_from_network(container, net_id)
@requires_api_version('1.22')
def test_connect_and_force_disconnect_container(self):
net_name, net_id = self.create_network()
container = self.client.create_container(TEST_IMG, 'top')
self.tmp_containers.append(container)
self.client.start(container)
network_data = self.client.inspect_network(net_id)
assert not network_data.get('Containers')
self.client.connect_container_to_network(container, net_id)
network_data = self.client.inspect_network(net_id)
assert list(network_data['Containers'].keys()) == \
[container['Id']]
self.client.disconnect_container_from_network(container, net_id, True)
network_data = self.client.inspect_network(net_id)
assert not network_data.get('Containers')
with pytest.raises(docker.errors.APIError):
self.client.disconnect_container_from_network(
container, net_id, force=True
)
@requires_api_version('1.22')
def test_connect_with_aliases(self):
net_name, net_id = self.create_network()
container = self.client.create_container(TEST_IMG, 'top')
self.tmp_containers.append(container)
self.client.start(container)
self.client.connect_container_to_network(
container, net_id, aliases=['foo', 'bar'])
container_data = self.client.inspect_container(container)
aliases = (
container_data['NetworkSettings']['Networks'][net_name]['Aliases']
)
assert 'foo' in aliases
assert 'bar' in aliases
def test_connect_on_container_create(self):
net_name, net_id = self.create_network()
container = self.client.create_container(
image=TEST_IMG,
command='top',
host_config=self.client.create_host_config(network_mode=net_name),
)
self.tmp_containers.append(container)
self.client.start(container)
network_data = self.client.inspect_network(net_id)
assert list(network_data['Containers'].keys()) == \
[container['Id']]
self.client.disconnect_container_from_network(container, net_id)
network_data = self.client.inspect_network(net_id)
assert not network_data.get('Containers')
@requires_api_version('1.22')
def test_create_with_aliases(self):
net_name, net_id = self.create_network()
container = self.client.create_container(
image=TEST_IMG,
command='top',
host_config=self.client.create_host_config(
network_mode=net_name,
),
networking_config=self.client.create_networking_config({
net_name: self.client.create_endpoint_config(
aliases=['foo', 'bar'],
),
}),
)
self.tmp_containers.append(container)
self.client.start(container)
container_data = self.client.inspect_container(container)
aliases = (
container_data['NetworkSettings']['Networks'][net_name]['Aliases']
)
assert 'foo' in aliases
assert 'bar' in aliases
@requires_api_version('1.22')
def test_create_with_ipv4_address(self):
net_name, net_id = self.create_network(
ipam=IPAMConfig(
driver='default',
pool_configs=[IPAMPool(subnet="132.124.0.0/16")],
),
)
container = self.client.create_container(
image=TEST_IMG, command='top',
host_config=self.client.create_host_config(network_mode=net_name),
networking_config=self.client.create_networking_config({
net_name: self.client.create_endpoint_config(
ipv4_address='132.124.0.23'
)
})
)
self.tmp_containers.append(container)
self.client.start(container)
net_settings = self.client.inspect_container(container)[
'NetworkSettings'
]
assert net_settings['Networks'][net_name]['IPAMConfig']['IPv4Address']\
== '132.124.0.23'
@requires_api_version('1.22')
def test_create_with_ipv6_address(self):
net_name, net_id = self.create_network(
ipam=IPAMConfig(
driver='default',
pool_configs=[IPAMPool(subnet="2001:389::1/64")],
),
)
container = self.client.create_container(
image=TEST_IMG, command='top',
host_config=self.client.create_host_config(network_mode=net_name),
networking_config=self.client.create_networking_config({
net_name: self.client.create_endpoint_config(
ipv6_address='2001:389::f00d'
)
})
)
self.tmp_containers.append(container)
self.client.start(container)
net_settings = self.client.inspect_container(container)[
'NetworkSettings'
]
assert net_settings['Networks'][net_name]['IPAMConfig']['IPv6Address']\
== '2001:389::f00d'
@requires_api_version('1.24')
def test_create_with_linklocal_ips(self):
container = self.client.create_container(
TEST_IMG, 'top',
networking_config=self.client.create_networking_config(
{
'bridge': self.client.create_endpoint_config(
link_local_ips=['169.254.8.8']
)
}
),
host_config=self.client.create_host_config(network_mode='bridge')
)
self.tmp_containers.append(container)
self.client.start(container)
container_data = self.client.inspect_container(container)
net_cfg = container_data['NetworkSettings']['Networks']['bridge']
assert 'IPAMConfig' in net_cfg
assert 'LinkLocalIPs' in net_cfg['IPAMConfig']
assert net_cfg['IPAMConfig']['LinkLocalIPs'] == ['169.254.8.8']
@requires_api_version('1.32')
def test_create_with_driveropt(self):
container = self.client.create_container(
TEST_IMG, 'top',
networking_config=self.client.create_networking_config(
{
'bridge': self.client.create_endpoint_config(
driver_opt={'com.docker-py.setting': 'on'}
)
}
),
host_config=self.client.create_host_config(network_mode='bridge')
)
self.tmp_containers.append(container)
self.client.start(container)
container_data = self.client.inspect_container(container)
net_cfg = container_data['NetworkSettings']['Networks']['bridge']
assert 'DriverOpts' in net_cfg
assert 'com.docker-py.setting' in net_cfg['DriverOpts']
assert net_cfg['DriverOpts']['com.docker-py.setting'] == 'on'
@requires_api_version('1.22')
def test_create_with_links(self):
net_name, net_id = self.create_network()
container = self.create_and_start(
host_config=self.client.create_host_config(network_mode=net_name),
networking_config=self.client.create_networking_config({
net_name: self.client.create_endpoint_config(
links=[('docker-py-test-upstream', 'bar')],
),
}),
)
net_settings = self.client.inspect_container(container)[
'NetworkSettings'
]
assert net_settings['Networks'][net_name]['Links'] == [
'docker-py-test-upstream:bar'
]
self.create_and_start(
name='docker-py-test-upstream',
host_config=self.client.create_host_config(network_mode=net_name),
)
self.execute(container, ['nslookup', 'bar'])
def test_create_check_duplicate(self):
net_name, net_id = self.create_network()
with pytest.raises(docker.errors.APIError):
self.client.create_network(net_name, check_duplicate=True)
net_id = self.client.create_network(net_name, check_duplicate=False)
self.tmp_networks.append(net_id['Id'])
@requires_api_version('1.22')
def test_connect_with_links(self):
net_name, net_id = self.create_network()
container = self.create_and_start(
host_config=self.client.create_host_config(network_mode=net_name))
self.client.disconnect_container_from_network(container, net_name)
self.client.connect_container_to_network(
container, net_name,
links=[('docker-py-test-upstream', 'bar')])
net_settings = self.client.inspect_container(container)[
'NetworkSettings'
]
assert net_settings['Networks'][net_name]['Links'] == [
'docker-py-test-upstream:bar'
]
self.create_and_start(
name='docker-py-test-upstream',
host_config=self.client.create_host_config(network_mode=net_name),
)
self.execute(container, ['nslookup', 'bar'])
@requires_api_version('1.22')
def test_connect_with_ipv4_address(self):
net_name, net_id = self.create_network(
ipam=IPAMConfig(
driver='default',
pool_configs=[
IPAMPool(
subnet="172.28.0.0/16", iprange="172.28.5.0/24",
gateway="172.28.5.254"
)
]
)
)
container = self.create_and_start(
host_config=self.client.create_host_config(network_mode=net_name))
self.client.disconnect_container_from_network(container, net_name)
self.client.connect_container_to_network(
container, net_name, ipv4_address='172.28.5.24'
)
container_data = self.client.inspect_container(container)
net_data = container_data['NetworkSettings']['Networks'][net_name]
assert net_data['IPAMConfig']['IPv4Address'] == '172.28.5.24'
@requires_api_version('1.22')
def test_connect_with_ipv6_address(self):
net_name, net_id = self.create_network(
ipam=IPAMConfig(
driver='default',
pool_configs=[
IPAMPool(
subnet="2001:389::1/64", iprange="2001:389::0/96",
gateway="2001:389::ffff"
)
]
)
)
container = self.create_and_start(
host_config=self.client.create_host_config(network_mode=net_name))
self.client.disconnect_container_from_network(container, net_name)
self.client.connect_container_to_network(
container, net_name, ipv6_address='2001:389::f00d'
)
container_data = self.client.inspect_container(container)
net_data = container_data['NetworkSettings']['Networks'][net_name]
assert net_data['IPAMConfig']['IPv6Address'] == '2001:389::f00d'
@requires_api_version('1.23')
def test_create_internal_networks(self):
_, net_id = self.create_network(internal=True)
net = self.client.inspect_network(net_id)
assert net['Internal'] is True
@requires_api_version('1.23')
def test_create_network_with_labels(self):
_, net_id = self.create_network(labels={
'com.docker.py.test': 'label'
})
net = self.client.inspect_network(net_id)
assert 'Labels' in net
assert len(net['Labels']) == 1
assert net['Labels'] == {
'com.docker.py.test': 'label'
}
@requires_api_version('1.23')
def test_create_network_with_labels_wrong_type(self):
with pytest.raises(TypeError):
self.create_network(labels=['com.docker.py.test=label', ])
@requires_api_version('1.23')
def test_create_network_ipv6_enabled(self):
_, net_id = self.create_network(
enable_ipv6=True, ipam=IPAMConfig(
driver='default',
pool_configs=[
IPAMPool(
subnet="2001:389::1/64", iprange="2001:389::0/96",
gateway="2001:389::ffff"
)
]
)
)
net = self.client.inspect_network(net_id)
assert net['EnableIPv6'] is True
@requires_api_version('1.25')
def test_create_network_attachable(self):
assert self.init_swarm()
_, net_id = self.create_network(driver='overlay', attachable=True)
net = self.client.inspect_network(net_id)
assert net['Attachable'] is True
@requires_api_version('1.29')
def test_create_network_ingress(self):
assert self.init_swarm()
self.client.remove_network('ingress')
_, net_id = self.create_network(driver='overlay', ingress=True)
net = self.client.inspect_network(net_id)
assert net['Ingress'] is True
@requires_api_version('1.25')
def test_prune_networks(self):
net_name, _ = self.create_network()
result = self.client.prune_networks()
assert net_name in result['NetworksDeleted']
@requires_api_version('1.31')
def test_create_inspect_network_with_scope(self):
assert self.init_swarm()
net_name_loc, net_id_loc = self.create_network(scope='local')
assert self.client.inspect_network(net_name_loc)
assert self.client.inspect_network(net_name_loc, scope='local')
with pytest.raises(docker.errors.NotFound):
self.client.inspect_network(net_name_loc, scope='global')
net_name_swarm, net_id_swarm = self.create_network(
driver='overlay', scope='swarm'
)
assert self.client.inspect_network(net_name_swarm)
assert self.client.inspect_network(net_name_swarm, scope='swarm')
with pytest.raises(docker.errors.NotFound):
self.client.inspect_network(net_name_swarm, scope='local')
def test_create_remove_network_with_space_in_name(self):
net_id = self.client.create_network('test 01')
self.tmp_networks.append(net_id)
assert self.client.inspect_network('test 01')
assert self.client.remove_network('test 01') is None # does not raise